From 9b7f26fadfc69d2aa9a2c8a9925e23314bf4ca86 Mon Sep 17 00:00:00 2001 From: Anton Pryakhin Date: Tue, 26 Nov 2024 00:44:55 +0200 Subject: [PATCH] Print local and primary records (#520) Signed-off-by: Anton Pryakhin --- src/groups/mqb/mqbs/mqbs_filestore.cpp | 77 ++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index a5e4678ed..87b90bf78 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -210,6 +210,70 @@ bool compareByByte(const bsl::pair& lhs, return lhs.second.second > rhs.second.second; } +void printRecord(bsl::ostream& stream, const MemoryBlock& mBlock) +{ + OffsetPtr recHeader(mBlock, 0); + + if (mqbs::RecordType::e_MESSAGE == recHeader->type()) { + OffsetPtr mesRec(mBlock, 0); + mesRec->print(stream); + } + else if (mqbs::RecordType::e_CONFIRM == recHeader->type()) { + OffsetPtr confRec(mBlock, 0); + confRec->print(stream); + } + else if (mqbs::RecordType::e_DELETION == recHeader->type()) { + OffsetPtr delRec(mBlock, 0); + delRec->print(stream); + } + else if (mqbs::RecordType::e_JOURNAL_OP == recHeader->type()) { + OffsetPtr jOpRec(mBlock, 0); + jOpRec->print(stream); + } + else { + BSLS_ASSERT_SAFE(mqbs::RecordType::e_QUEUE_OP == recHeader->type()); + OffsetPtr qOpRec(mBlock, 0); + qOpRec->print(stream); + } +} + +void printLastJournalRecord(bsl::ostream& stream, + const MappedFileDescriptor& journal, + const bsls::Types::Uint64& journalPos) +{ + if (journalPos >= FileStoreProtocol::k_JOURNAL_RECORD_SIZE) { + // Create a 'MemoryBlock' for the last record in local journal + MemoryBlock selfBlock(journal.block().base() + journalPos - + FileStoreProtocol::k_JOURNAL_RECORD_SIZE, + FileStoreProtocol::k_JOURNAL_RECORD_SIZE); + stream << "Last record in self journal: \n"; + printRecord(stream, selfBlock); + stream << "\n"; + } + else { + stream << "There are no records in self journal yet\n"; + } +} + +void printNextEventRecord(bsl::ostream& stream, + const bsl::shared_ptr& event, + const bmqu::BlobPosition& recordPosition, + bslma::Allocator* allocator) +{ + char* p = static_cast( + allocator->allocate(FileStoreProtocol::k_JOURNAL_RECORD_SIZE)); + MemoryBlock primaryBlock(p, FileStoreProtocol::k_JOURNAL_RECORD_SIZE); + bmqu::BlobUtil::copyToRawBufferFromIndex( + primaryBlock.base(), + *event, + recordPosition.buffer(), + recordPosition.byte(), + FileStoreProtocol::k_JOURNAL_RECORD_SIZE); + stream << "Next record in primary journal: \n"; + printRecord(stream, primaryBlock); + allocator->deallocate(p); +} + } // close unnamed namespace // ------------------------------------- @@ -4252,8 +4316,8 @@ int FileStore::writeQueueCreationRecord( return rc_INVALID_QUEUE_RECORD; // RETURN } - bmqt::Uri quri; - AppInfos appIdKeyPairs; + bmqt::Uri quri; + AppInfos appIdKeyPairs; if (!d_isFSMWorkflow) { // Check qlist offset in the replicated journal record sent by the // primary vs qlist offset maintained by self. A mismatch means that @@ -4404,14 +4468,19 @@ int FileStore::writeJournalRecord(const bmqp::StorageHeader& header, if (journalPos != primaryJournalOffset) { // Primary's and self views of the journal have diverged. + bsl::ostringstream os; + printLastJournalRecord(os, journal, journalPos); + printNextEventRecord(os, event, recordPosition, d_allocator_p); + BMQTSK_ALARMLOG_ALARM("REPLICATION") << partitionDesc() << "Received journal record of type [" << messageType << "] with journal offset mismatch. Primary's journal offset: " << primaryJournalOffset << ", self journal offset: " << journalPos << ", msg sequence number (" << recHeader.primaryLeaseId() << ", " - << recHeader.sequenceNumber() << "). Ignoring this message." - << BMQTSK_ALARMLOG_END; + << recHeader.sequenceNumber() << "). Ignoring this message.\n" + << os.str() << BMQTSK_ALARMLOG_END; + return rc_JOURNAL_OUT_OF_SYNC; // RETURN }