Skip to content

Commit

Permalink
WIP: implementation + tests for ChunkedJournal
Browse files Browse the repository at this point in the history
  • Loading branch information
bastidest committed Nov 12, 2023
1 parent fc835e1 commit 8e38182
Show file tree
Hide file tree
Showing 11 changed files with 506 additions and 143 deletions.
16 changes: 12 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,22 @@ add_executable(jess src/main.cpp
src/MainFrame.hpp
src/SdLine.hpp
src/SdCursor.hpp
src/ChunkedJournal.hpp)
target_link_libraries(jess ncurses systemd)
src/ChunkedJournal.hpp
src/CSeekableStream.hpp
src/SdCursor.cpp
src/SdSeqid.hpp)
target_link_libraries(jess PUBLIC ncurses systemd)

if (BUILD_TESTING)
find_package(doctest REQUIRED)
include(doctest) # for doctest_discover_tests
add_executable(tests src/SdCursor.hpp test/SdCursor_test.cpp)
add_executable(tests
src/SdCursor.hpp
src/SdCursor.cpp
test/SdCursor_test.cpp
test/ChunkedJournal_test.cpp
test/main.cpp)
target_include_directories(tests PUBLIC src)
target_link_libraries(tests PRIVATE doctest::doctest)
target_link_libraries(tests PUBLIC ncurses systemd PRIVATE doctest::doctest)
doctest_discover_tests(tests)
endif ()
36 changes: 36 additions & 0 deletions src/CSeekableStream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "SdLine.hpp"
#include <concepts>
#include <cstddef>

namespace jess
{

template<typename T>
concept SeekableStream = requires( T a ) {
std::is_default_constructible_v<T>;
{
a.seekToBof()
} -> std::same_as<void>;
{
a.seekToEof()
} -> std::same_as<void>;
{
a.seekLinesForward( std::declval<size_t>() )
} -> std::same_as<void>;
{
a.seekLinesBackward( std::declval<size_t>() )
} -> std::same_as<void>;
{
a.next()
} -> std::same_as<bool>;
{
a.getLine()
} -> std::same_as<SdLine>;
{
a.getSeqid()
} -> std::same_as<SdSeqid>;
};

}// namespace jess
199 changes: 133 additions & 66 deletions src/ChunkedJournal.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "CSeekableStream.hpp"
#include "SdCursor.hpp"
#include "SdJournal.hpp"
#include "SdLine.hpp"
Expand All @@ -9,66 +10,98 @@
#include <span>
#include <vector>

namespace jess {
namespace jess
{

static constexpr auto uChunkSize = 1024;
enum class Contiguity {
CONTIGUOUS,
NON_CONTIGUOUS,
OVERLAPPING,
};

class ChunkedJournal {
enum class Contiguity {
CONTIGUOUS,
NON_CONTIGUOUS,
OVERLAPPING,
};
enum class InsertPosition {
BEFORE,
AFTER,
};

enum class InsertPosition {
BEFORE,
AFTER,
};
struct Chunk {
std::map<SdSeqnumId, SdSeqnum> lowestIdsInChunk{};
std::map<SdSeqnumId, SdSeqnum> highestIdsInChunk{};
std::vector<SdLine> lines;
Contiguity contiguityBeginning{ Contiguity::NON_CONTIGUOUS };
Contiguity contiguityEnd{ Contiguity::NON_CONTIGUOUS };
};

struct Chunk {
// we might need multiple cursors depending on the sequence id
SdCursor beginning;
SdCursor end;
std::vector<SdLine> lines;
Contiguity contiguityBeginning{Contiguity::NON_CONTIGUOUS};
Contiguity contiguityEnd{Contiguity::NON_CONTIGUOUS};
template<SeekableStream TJournal>
class ChunkedJournal
{
size_t m_uChunkSize;
size_t m_uPreloadLines;
TJournal m_journal{};
std::list<Chunk> m_chunks{};
decltype( m_chunks.begin() ) m_pCurrentChunk{ m_chunks.begin() };
size_t m_uLineOffsetInChunk{ 0 };

std::partial_ordering operator<=>(const Chunk &other) const { return beginning <=> other.beginning; }
};
public:
explicit ChunkedJournal( const size_t uChunkSize, const size_t uPreloadLines )
: m_uChunkSize( uChunkSize )
, m_uPreloadLines( uPreloadLines )
{
}

SdJournal m_journal{};
std::list<Chunk> m_chunks{};
decltype(m_chunks.begin()) m_pCurrentChunk{m_chunks.begin()};
size_t m_uLineOffsetInChunk{0};
const auto& getChunks() const { return m_chunks; }

auto createChunkFromCurrentPosition(const SdCursor &cursor, InsertPosition insertPos, Contiguity contiguity)
-> decltype(m_chunks.begin()) {
Chunk newChunk{cursor, cursor, {}};
private:
auto createChunkAtCurrentPosition( const InsertPosition insertPos, const Contiguity contiguity ) -> decltype( m_chunks.begin() )
{
Chunk newChunk{ {}, {}, {} };

newChunk.lines.reserve(uChunkSize);
newChunk.lines.reserve( m_uChunkSize );

for (size_t i = 0; i < uChunkSize; ++i) {
newChunk.lines.push_back(m_journal.getLine());
if (!m_journal.next()) {
for ( size_t i = 0; i < m_uChunkSize; ++i )
{
newChunk.lines.push_back( m_journal.getLine() );
const SdSeqnumId key = newChunk.lines.back().seqid().seqnumId;
const SdSeqnum value = newChunk.lines.back().seqid().seqnum;

// if we have not seen the seqnumid before, add the first seqnum we encountered
if ( !newChunk.lowestIdsInChunk.contains( key ) )
{
newChunk.lowestIdsInChunk[key] = value;
newChunk.highestIdsInChunk[key] = value;
}

// if we know the seqnumid already, update the last known seqnum
if ( newChunk.highestIdsInChunk.contains( key ) )
{
newChunk.highestIdsInChunk[key] = value;
}

if ( !m_journal.next() )
{
break;
}
}

newChunk.end = m_journal.getCursor();

auto insertIt = m_pCurrentChunk;
if (insertPos == InsertPosition::AFTER) {
std::advance(insertIt, 1);
if ( insertPos == InsertPosition::AFTER )
{
std::advance( insertIt, 1 );
}
auto newChunkIt = m_chunks.insert(insertIt, std::move(newChunk));
auto newChunkIt = m_chunks.insert( insertIt, std::move( newChunk ) );

if (insertPos == InsertPosition::BEFORE) {
if ( insertPos == InsertPosition::BEFORE )
{
newChunkIt->contiguityEnd = contiguity;
if(m_pCurrentChunk != m_chunks.end()) {
if ( m_pCurrentChunk != m_chunks.end() )
{
m_pCurrentChunk->contiguityBeginning = contiguity;
}
} else if (insertPos == InsertPosition::AFTER) {
if(m_pCurrentChunk != m_chunks.end()) {
}
else if ( insertPos == InsertPosition::AFTER )
{
if ( m_pCurrentChunk != m_chunks.end() )
{
m_pCurrentChunk->contiguityEnd = contiguity;
}
newChunkIt->contiguityBeginning = contiguity;
Expand All @@ -77,50 +110,84 @@ class ChunkedJournal {
return newChunkIt;
}

public:
void seekToBof() {
m_journal.seekToBof();
m_journal.next();
auto cursorBof = m_journal.getCursor();
[[nodiscard]] std::vector<decltype( m_chunks.begin() )> getChunksBySeqid( SdSeqid seqid )
{
std::vector<decltype( m_chunks.begin() )> ret{};
for ( auto chunkIt = m_chunks.begin(); chunkIt != m_chunks.end(); ++chunkIt )
{
if ( auto highestIt = chunkIt->highestIdsInChunk.find( seqid.seqnumId );
highestIt != chunkIt->highestIdsInChunk.end() && seqid.seqnum <= highestIt->second )
{
if ( auto lowestIt = chunkIt->lowestIdsInChunk.find( seqid.seqnumId ); lowestIt == chunkIt->lowestIdsInChunk.end() || seqid.seqnum >= lowestIt->second )
{
ret.push_back( chunkIt );
}
}
}
return ret;
}

if (m_chunks.empty() || m_chunks.front().beginning == cursorBof) {
m_pCurrentChunk = createChunkFromCurrentPosition(cursorBof, InsertPosition::BEFORE, Contiguity::NON_CONTIGUOUS);
void loadChunkAtCurrentPosition( const InsertPosition insertPos, const Contiguity contiguity )
{
const auto seqid = m_journal.getSeqid();
if ( auto chunks = getChunksBySeqid( seqid ); !chunks.empty() )
{
m_pCurrentChunk = chunks.back();
}
else
{
m_pCurrentChunk = createChunkAtCurrentPosition( insertPos, contiguity );
}
}

public:
void seekToBof()
{
m_journal.seekToBof();
m_journal.next();
loadChunkAtCurrentPosition( InsertPosition::BEFORE, Contiguity::CONTIGUOUS );
m_uLineOffsetInChunk = 0;
}

void seekLines(int64_t uNumLines) {
assert(m_pCurrentChunk != m_chunks.end());
void seekLines( const int64_t uNumLines )
{
assert( m_pCurrentChunk != m_chunks.end() );

if (uNumLines < 0 && static_cast<size_t>(std::abs(uNumLines)) > m_uLineOffsetInChunk) {
if ( uNumLines < 0 && static_cast<size_t>( std::abs( uNumLines ) ) > m_uLineOffsetInChunk )
{
m_uLineOffsetInChunk = 0;
// todo: load previous chunk
return;
}
if (m_uLineOffsetInChunk + uNumLines > m_pCurrentChunk->lines.size()) {
int64_t uLinesAfterEndOfChunk =
uNumLines - static_cast<int64_t>(m_pCurrentChunk->lines.size() - m_uLineOffsetInChunk);
size_t uNumChunksToSkip = uLinesAfterEndOfChunk / uChunkSize;
m_uLineOffsetInChunk = uLinesAfterEndOfChunk - uNumChunksToSkip * uChunkSize;
m_journal.seekForward(uNumChunksToSkip * uChunkSize);
m_journal.next();
auto cursorBeginningOfChunk = m_journal.getCursor();
Contiguity contiguity = uNumChunksToSkip == 0 ? Contiguity::CONTIGUOUS : Contiguity::NON_CONTIGUOUS;
m_pCurrentChunk = createChunkFromCurrentPosition(cursorBeginningOfChunk, InsertPosition::AFTER, contiguity);
if ( m_uLineOffsetInChunk + uNumLines >= m_pCurrentChunk->lines.size() )
{
const int64_t uNewOffsetRelativeToEndOfCurrentChunk = uNumLines - static_cast<int64_t>( m_pCurrentChunk->lines.size() - m_uLineOffsetInChunk );
const size_t uNumChunksToSkip = uNewOffsetRelativeToEndOfCurrentChunk / m_uChunkSize;
m_uLineOffsetInChunk = uNewOffsetRelativeToEndOfCurrentChunk - uNumChunksToSkip * m_uChunkSize;

if ( const size_t uLinesToSeek = uNumChunksToSkip * m_uChunkSize; uLinesToSeek > 0)
{
m_journal.seekLinesForward( uLinesToSeek );
m_journal.next();
}

const Contiguity contiguity = uNumChunksToSkip == 0 ? Contiguity::CONTIGUOUS : Contiguity::NON_CONTIGUOUS;
loadChunkAtCurrentPosition( InsertPosition::AFTER, contiguity );
return;
}

m_uLineOffsetInChunk += uNumLines;
}

std::span<SdLine> getLines(size_t uNumLines) {
if (m_pCurrentChunk == m_chunks.end()) {
std::span<SdLine> getLines( size_t uNumLines )
{
if ( m_pCurrentChunk == m_chunks.end() )
{
return {};
}
std::span ret{m_pCurrentChunk->lines};
return ret.subspan(m_uLineOffsetInChunk, std::min(ret.size() - m_uLineOffsetInChunk, uNumLines));
const std::span ret{ m_pCurrentChunk->lines };
return ret.subspan( m_uLineOffsetInChunk, std::min( ret.size() - m_uLineOffsetInChunk, uNumLines ) );
}
};

} // namespace jess
}// namespace jess
21 changes: 21 additions & 0 deletions src/SdCursor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "SdCursor.hpp"


std::ostream &jess::operator<<(std::ostream &os, const std::array<uint8_t, 16> &data) {
for (uint8_t num : data) {
os << std::setfill('0') << std::setw(2) << static_cast<uint32_t>(num);
}
os << std::setw(0);
return os;
}

std::ostream &jess::operator<<(std::ostream &os, const jess::SdCursor &that) {
os << std::hex;
os << "s=" << that.seqid.seqnumId.value << ";";
os << "i=" << that.seqid.seqnum.value << ";";
os << "b=" << that.bootId << ";";
os << "m=" << that.timeMonotonic << ";";
os << "t=" << that.timeRealtime << ";";
os << "x=" << that.someXor;
return os;
}
Loading

0 comments on commit 8e38182

Please sign in to comment.