Skip to content

Commit

Permalink
add non-adjacent chunk seeking, fix MockStream to match journald beha…
Browse files Browse the repository at this point in the history
…vior
  • Loading branch information
bastidest committed Nov 12, 2023
1 parent 3a0e678 commit 5a312c1
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 16 deletions.
3 changes: 3 additions & 0 deletions src/CSeekableStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ concept SeekableStream = requires( T a ) {
{
a.next()
} -> std::same_as<bool>;
{
a.previous()
} -> std::same_as<bool>;
{
a.getLine()
} -> std::same_as<SdLine>;
Expand Down
46 changes: 41 additions & 5 deletions src/ChunkedJournal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ class ChunkedJournal
const auto& getChunks() const { return m_chunks; }

private:
decltype( m_pCurrentChunk ) findChunkInsertionPosition( const Chunk& chunk )
{
const auto firstId = chunk.lines.front().seqid();
auto lastValidId = m_chunks.begin();
for(auto it = m_chunks.begin(); it != m_chunks.end(); ++it)
{
if (auto seq = it->highestIdsInChunk.find( firstId.seqnumId ); seq != it->highestIdsInChunk.end())
{
if(firstId.seqnum > seq->second)
{
lastValidId = it;
} else
{
return lastValidId;
}
}
}
return m_chunks.end();
}

auto createChunkAtCurrentPosition( const Adjacency adjacency ) -> decltype( m_chunks.begin() )
{
Chunk newChunk{ {}, {}, {} };
Expand Down Expand Up @@ -84,11 +104,28 @@ class ChunkedJournal
}
}

auto insertIt = m_pCurrentChunk;
if ( adjacency == Adjacency::AFTER_CURRENT )
decltype( m_pCurrentChunk ) insertIt;

switch ( adjacency )
{
std::advance( insertIt, 1 );
case Adjacency::NON_ADJACENT: {
insertIt = findChunkInsertionPosition( newChunk );
break;
}
case Adjacency::BEFORE_CURRENT: {
insertIt = m_pCurrentChunk;
break;
}
case Adjacency::AFTER_CURRENT: {
insertIt = m_pCurrentChunk;
std::advance( insertIt, 1 );
break;
}
default: {
throw std::logic_error{ "unexpected adjacency state" };
}
}

auto newChunkIt = m_chunks.insert( insertIt, std::move( newChunk ) );

if ( adjacency == Adjacency::BEFORE_CURRENT )
Expand Down Expand Up @@ -149,7 +186,7 @@ class ChunkedJournal
void seekToEof()
{
m_journal.seekToEof();
m_journal.next();
m_journal.previous();
loadChunkAtCurrentPosition( Adjacency::NON_ADJACENT );
m_uLineOffsetInChunk = m_uChunkSize - 1;
}
Expand All @@ -173,7 +210,6 @@ class ChunkedJournal
if ( const size_t uLinesToSeek = uNumChunksToSkip * m_uChunkSize; uLinesToSeek > 0 )
{
m_journal.seekLinesForward( uLinesToSeek );
m_journal.next();
}

const Adjacency adjacency = uNumChunksToSkip == 0 ? Adjacency::AFTER_CURRENT : Adjacency::NON_ADJACENT;
Expand Down
2 changes: 2 additions & 0 deletions src/SdJournal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class SdJournal

bool next() { return sd_journal_next( handle.get() ) > 0; }

bool previous() { return sd_journal_previous( handle.get() ) > 0; }

std::string_view getFieldString( std::string_view sFieldName )
{
const void* ptr = nullptr;
Expand Down
68 changes: 57 additions & 11 deletions test/ChunkedJournal_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ using namespace std::string_view_literals;

template<size_t uStreamLength>
struct MockStream {
size_t pos{};
int64_t pos{};
std::optional<jess::SdLine> currentLine{};

void seekToBof() { pos = 0; }
void seekToBof() { pos = -1; }

void seekToEof() { pos = uStreamLength - 1; }
void seekToEof() { pos = uStreamLength; }

void seekLinesForward( const size_t uNumLines ) { pos = std::min( pos + uNumLines, uStreamLength - 1 ); }
void seekLinesForward( const size_t uNumLines )
{
pos = std::min( pos + uNumLines, uStreamLength - 1 );
loadCurrentLine();
}

void seekLinesBackward( const size_t uNumLines )
{
Expand All @@ -24,23 +28,48 @@ struct MockStream {
{
pos = pos - uNumLines;
}
loadCurrentLine();
}

/// read the data the the current position and advance the position
bool next()
{
currentLine = jess::SdLine{
{ { std::array<uint8_t, 16>{} }, { pos } },
std::string{ "line " } + std::to_string( pos ),
std::chrono::system_clock::time_point{ std::chrono::seconds{ pos } },
};
pos += 1;
return pos <= uStreamLength;
assert( pos >= 0 );

if ( pos < uStreamLength )
{
loadCurrentLine();
}

return pos < uStreamLength;
}

bool previous()
{
pos -= 1;
assert( pos < uStreamLength );

if ( pos >= 0 )
{
loadCurrentLine();
}

return pos >= 0;
}

[[nodiscard]] jess::SdSeqid getSeqid() const { return currentLine->seqid(); }

jess::SdLine getLine() { return currentLine.value(); }

private:
void loadCurrentLine()
{
currentLine = jess::SdLine{
{ { std::array<uint8_t, 16>{} }, { static_cast<size_t>( pos ) } },
std::string{ "line " } + std::to_string( pos ),
std::chrono::system_clock::time_point{ std::chrono::seconds{ pos } },
};
}
};

void checkSequence( const jess::Chunk& chunk, const size_t uLength, const size_t uFirstIndex )
Expand Down Expand Up @@ -95,6 +124,23 @@ TEST_CASE( "ChunkedJournal(1) load BOF" )
CHECK( secondChunk.contiguityBeginning == jess::Contiguity::CONTIGUOUS );
CHECK( secondChunk.contiguityEnd == jess::Contiguity::NON_CONTIGUOUS );
}

SUBCASE( "advance two lines" )
{
sut.seekLines( 2 );
const std::list<jess::Chunk>& chunks2 = sut.getChunks();
REQUIRE( chunks2.size() == 2 );
REQUIRE_MESSAGE( &chunks.front() == &firstChunk, "the first chunk shall remain at the first position" );

checkSequence( firstChunk, 1, 0 );
CHECK( firstChunk.contiguityBeginning == jess::Contiguity::NON_CONTIGUOUS );
CHECK( firstChunk.contiguityEnd == jess::Contiguity::NON_CONTIGUOUS );

const jess::Chunk& secondChunk = chunks2.back();
checkSequence( secondChunk, 1, 2 );
CHECK( secondChunk.contiguityBeginning == jess::Contiguity::NON_CONTIGUOUS );
CHECK( secondChunk.contiguityEnd == jess::Contiguity::NON_CONTIGUOUS );
}
}

TEST_CASE( "ChunkedJournal(2) load BOF" )
Expand Down

0 comments on commit 5a312c1

Please sign in to comment.