Skip to content

Commit

Permalink
Print local and primary records (#520)
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Pryakhin <[email protected]>
  • Loading branch information
waldgange authored Nov 25, 2024
1 parent 2f93931 commit 9b7f26f
Showing 1 changed file with 73 additions and 4 deletions.
77 changes: 73 additions & 4 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,70 @@ bool compareByByte(const bsl::pair<mqbu::StorageKey, MessageByteCounter>& lhs,
return lhs.second.second > rhs.second.second;
}

void printRecord(bsl::ostream& stream, const MemoryBlock& mBlock)
{
OffsetPtr<const RecordHeader> recHeader(mBlock, 0);

if (mqbs::RecordType::e_MESSAGE == recHeader->type()) {
OffsetPtr<const MessageRecord> mesRec(mBlock, 0);
mesRec->print(stream);
}
else if (mqbs::RecordType::e_CONFIRM == recHeader->type()) {
OffsetPtr<const ConfirmRecord> confRec(mBlock, 0);
confRec->print(stream);
}
else if (mqbs::RecordType::e_DELETION == recHeader->type()) {
OffsetPtr<const DeletionRecord> delRec(mBlock, 0);
delRec->print(stream);
}
else if (mqbs::RecordType::e_JOURNAL_OP == recHeader->type()) {
OffsetPtr<const JournalOpRecord> jOpRec(mBlock, 0);
jOpRec->print(stream);
}
else {
BSLS_ASSERT_SAFE(mqbs::RecordType::e_QUEUE_OP == recHeader->type());
OffsetPtr<const QueueOpRecord> qOpRec(mBlock, 0);
qOpRec->print(stream);
}
}

void printLastJournalRecord(bsl::ostream& stream,
const MappedFileDescriptor& journal,
const bsls::Types::Uint64& journalPos)
{
if (journalPos >= FileStoreProtocol::k_JOURNAL_RECORD_SIZE) {
// Create a 'MemoryBlock' for the last record in local journal
MemoryBlock selfBlock(journal.block().base() + journalPos -
FileStoreProtocol::k_JOURNAL_RECORD_SIZE,
FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
stream << "Last record in self journal: \n";
printRecord(stream, selfBlock);
stream << "\n";
}
else {
stream << "There are no records in self journal yet\n";
}
}

void printNextEventRecord(bsl::ostream& stream,
const bsl::shared_ptr<bdlbb::Blob>& event,
const bmqu::BlobPosition& recordPosition,
bslma::Allocator* allocator)
{
char* p = static_cast<char*>(
allocator->allocate(FileStoreProtocol::k_JOURNAL_RECORD_SIZE));
MemoryBlock primaryBlock(p, FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
bmqu::BlobUtil::copyToRawBufferFromIndex(
primaryBlock.base(),
*event,
recordPosition.buffer(),
recordPosition.byte(),
FileStoreProtocol::k_JOURNAL_RECORD_SIZE);
stream << "Next record in primary journal: \n";
printRecord(stream, primaryBlock);
allocator->deallocate(p);
}

} // close unnamed namespace

// -------------------------------------
Expand Down Expand Up @@ -4252,8 +4316,8 @@ int FileStore::writeQueueCreationRecord(
return rc_INVALID_QUEUE_RECORD; // RETURN
}

bmqt::Uri quri;
AppInfos appIdKeyPairs;
bmqt::Uri quri;
AppInfos appIdKeyPairs;
if (!d_isFSMWorkflow) {
// Check qlist offset in the replicated journal record sent by the
// primary vs qlist offset maintained by self. A mismatch means that
Expand Down Expand Up @@ -4404,14 +4468,19 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header,
if (journalPos != primaryJournalOffset) {
// Primary's and self views of the journal have diverged.

bsl::ostringstream os;
printLastJournalRecord(os, journal, journalPos);
printNextEventRecord(os, event, recordPosition, d_allocator_p);

BMQTSK_ALARMLOG_ALARM("REPLICATION")
<< partitionDesc() << "Received journal record of type ["
<< messageType
<< "] with journal offset mismatch. Primary's journal offset: "
<< primaryJournalOffset << ", self journal offset: " << journalPos
<< ", msg sequence number (" << recHeader.primaryLeaseId() << ", "
<< recHeader.sequenceNumber() << "). Ignoring this message."
<< BMQTSK_ALARMLOG_END;
<< recHeader.sequenceNumber() << "). Ignoring this message.\n"
<< os.str() << BMQTSK_ALARMLOG_END;

return rc_JOURNAL_OUT_OF_SYNC; // RETURN
}

Expand Down

0 comments on commit 9b7f26f

Please sign in to comment.