From dd3f6dc277165df17cbe9160e8e6104b63581b21 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Wed, 11 Dec 2024 18:00:42 +0800 Subject: [PATCH] mqbc::ClusterUtil: Validate appIds in CSL Signed-off-by: Yuan Jing Vincent Yan --- src/groups/mqb/mqbc/mqbc_clusterstate.cpp | 3 +- src/groups/mqb/mqbc/mqbc_clusterstate.h | 39 ++- src/groups/mqb/mqbc/mqbc_clusterutil.cpp | 292 ++++++++++------------ 3 files changed, 167 insertions(+), 167 deletions(-) diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 3d47083e0..a0aae26e3 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -47,6 +47,7 @@ bsl::ostream& ClusterStateQueueInfo::print(bsl::ostream& stream, printer.printAttribute("queueKey", key()); printer.printAttribute("partitionId", partitionId()); printer.printAttribute("appIdInfos", appInfos()); + printer.printAttribute("stateOfAssignment", state()); printer.end(); return stream; @@ -94,7 +95,7 @@ void ClusterStateObserver::onQueueUpdated( } void ClusterStateObserver::onPartitionOrphanThreshold( - BSLS_ANNOTATION_UNUSED size_t partitiondId) + BSLS_ANNOTATION_UNUSED size_t partitionId) { // NOTHING } diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index 52bdeb054..a6aaf967e 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -155,9 +155,6 @@ class ClusterStatePartitionInfo { /// `bmqp_ctrlmsg::QueueInfo`. Doing vice versa will not be possible /// because we don't want to edit generated file. Perhaps we can place the /// converter routine in `ClusterUtil`. -/// -/// TBD: When should AppIds and AppKeys come from? Should the leader/primary -/// generate them, or should we hardcode them in the domain config? class ClusterStateQueueInfo { public: // TYPES @@ -171,10 +168,10 @@ class ClusterStateQueueInfo { // Assigning following unassigning is also supported. // On Replica, the only possible state is k_ASSIGNED. - k_NONE, - k_ASSIGNING, - k_ASSIGNED, - k_UNASSIGNING + k_NONE = 0, + k_ASSIGNING = -1, + k_ASSIGNED = -2, + k_UNASSIGNING = -3 }; private: @@ -272,6 +269,18 @@ class ClusterStateQueueInfo { bsl::ostream& operator<<(bsl::ostream& stream, const ClusterStateQueueInfo& rhs); +/// Return `true` if the specified `rhs` object contains the value of the +/// same type as contained in the specified `lhs` object and the value +/// itself is the same in both objects, return false otherwise. +bool operator==(const ClusterStateQueueInfo& lhs, + const ClusterStateQueueInfo& rhs); + +/// Return `false` if the specified `rhs` object contains the value of the +/// same type as contained in the specified `lhs` object and the value +/// itself is the same in both objects, return `true` otherwise. +bool operator!=(const ClusterStateQueueInfo& lhs, + const ClusterStateQueueInfo& rhs); + // ========================== // class ClusterStateObserver // ========================== @@ -343,7 +352,7 @@ class ClusterStateObserver { /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void onPartitionOrphanThreshold(size_t partitiondId); + virtual void onPartitionOrphanThreshold(size_t partitionId); /// Callback invoked when the specified `node` has been unavailable /// above a certain threshold amount of time. @@ -1163,6 +1172,20 @@ inline bsl::ostream& mqbc::operator<<(bsl::ostream& stream, return rhs.print(stream, 0, -1); } +inline bool mqbc::operator==(const ClusterStateQueueInfo& lhs, + const ClusterStateQueueInfo& rhs) +{ + return lhs.uri() == rhs.uri() && lhs.key() == rhs.key() && + lhs.partitionId() == rhs.partitionId() && + lhs.appInfos() == rhs.appInfos() && lhs.state() == rhs.state(); +} + +inline bool mqbc::operator!=(const ClusterStateQueueInfo& lhs, + const ClusterStateQueueInfo& rhs) +{ + return !(lhs == rhs); +} + } // close enterprise namespace #endif diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 61ad786e1..14f8825c7 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -313,28 +313,6 @@ void getNextPrimarys(NumNewPartitionsMap* numNewPartitions, } } -void printQueues(bsl::ostream& out, const ClusterState& state) -{ - out << '\n' - << "-------------------------" << '\n' - << "QUEUES IN CLUSTER STATE :" << '\n' - << "-------------------------"; - for (ClusterState::DomainStatesCIter domCit = - state.domainStates().cbegin(); - domCit != state.domainStates().cend(); - ++domCit) { - for (ClusterState::UriToQueueInfoMapCIter citer = - domCit->second->queuesInfo().cbegin(); - citer != domCit->second->queuesInfo().cend(); - ++citer) { - const bsl::shared_ptr& info = citer->second; - bdlb::Print::newlineAndIndent(out, 1); - out << "[key: " << info->key() << ", uri: " << info->uri() - << ", partitionId: " << info->partitionId() << "]"; - } - } -} - /// If the specified `status` is SUCCESS, load the specified `domain` into /// the specified `domainState`. void createDomainCb(const bmqp_ctrlmsg::Status& status, @@ -1683,30 +1661,27 @@ void ClusterUtil::apply(mqbc::ClusterState* clusterState, } } -int ClusterUtil::validateState(bsl::ostream& errorDescription, - const mqbc::ClusterState& state, - const mqbc::ClusterState& reference) +int ClusterUtil::validateState(bsl::ostream& errorDescription, + const ClusterState& state, + const ClusterState& reference) { // PRECONDITIONS BSLS_ASSERT_SAFE(state.partitions().size() == reference.partitions().size()); - bool seenIncorrectPartitionInfo = false; - bool seenIncorrectQueueInfo = false; - bool seenMissingQueue = false; - bool seenExtraQueue = false; - bmqu::MemOutStream out; const int level = 0; - // Check incorrect partition information + // Validate partition information + bsl::vector incorrectPartitions; for (size_t pid = 0; pid < state.partitions().size(); ++pid) { - const mqbc::ClusterStatePartitionInfo& stateInfo = - state.partitions()[pid]; - const mqbc::ClusterStatePartitionInfo& referenceInfo = + const ClusterStatePartitionInfo& stateInfo = state.partitions()[pid]; + BSLS_ASSERT_SAFE(stateInfo.partitionId() == pid); + + const ClusterStatePartitionInfo& referenceInfo = reference.partitions()[pid]; - if (stateInfo.partitionId() != referenceInfo.partitionId() || - stateInfo.primaryLeaseId() != referenceInfo.primaryLeaseId()) { + BSLS_ASSERT_SAFE(referenceInfo.partitionId() == pid); + if (stateInfo.primaryLeaseId() != referenceInfo.primaryLeaseId()) { // Partition information mismatch. Note that we don't compare // primaryNodeIds here because 'state' is initialized with cluster // state ledger's contents and will likely have a valid @@ -1716,28 +1691,49 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, // primaryNodeId, specially if a primary has not yet been assigned // in the startup sequence. If if a primary has been assigned, its // nodeId may be different one. - if (!seenIncorrectPartitionInfo) { - bdlb::Print::newlineAndIndent(out, level); - out << "---------------------------"; - bdlb::Print::newlineAndIndent(out, level); - out << "Incorrect Partition Infos :"; - bdlb::Print::newlineAndIndent(out, level); - out << "---------------------------"; - seenIncorrectPartitionInfo = true; - } + incorrectPartitions.push_back(stateInfo); + } + } + + if (!incorrectPartitions.empty()) { + bdlb::Print::newlineAndIndent(out, level); + out << "---------------------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Incorrect Partition Infos :"; + bdlb::Print::newlineAndIndent(out, level); + out << "---------------------------"; + for (bsl::vector::const_iterator citer = + incorrectPartitions.cbegin(); + citer != incorrectPartitions.cend(); + ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); - out << "[partitionId: " << stateInfo.partitionId() - << ", primaryLeaseId: " << stateInfo.primaryLeaseId() - << ", primaryNodeId: " << stateInfo.primaryNodeId() << "]" - << " (Correct: " - << "[partitionId: " << referenceInfo.partitionId() - << ", primaryLeaseId: " << referenceInfo.primaryLeaseId() - << ", primaryNodeId: " << referenceInfo.primaryNodeId() << "]" - << ")"; + out << "Partition [" << citer->partitionId() + << "]: " << " primaryLeaseId: " << citer->primaryLeaseId() + << ", primaryNodeId: " << citer->primaryNodeId(); + } + + bdlb::Print::newlineAndIndent(out, level); + out << "--------------------------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Partition Infos In Cluster State:"; + bdlb::Print::newlineAndIndent(out, level); + out << "--------------------------------"; + for (size_t pid = 0; pid < state.partitions().size(); ++pid) { + const ClusterStatePartitionInfo& referenceInfo = + reference.partitions()[pid]; + BSLS_ASSERT_SAFE(referenceInfo.partitionId() == pid); + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [" << pid << "]: " << " primaryLeaseId: " + << referenceInfo.primaryLeaseId() + << ", primaryNodeId: " << referenceInfo.primaryNodeId(); } } - // Check incorrect queue information + // Check incorrect or extra queues + bsl::vector, + bsl::shared_ptr > > + incorrectQueues; + bsl::vector > extraQueues; for (ClusterState::DomainStatesCIter domCit = state.domainStates().begin(); domCit != state.domainStates().end(); ++domCit) { @@ -1746,7 +1742,13 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, ClusterState::DomainStatesCIter refDomCit = reference.domainStates().find(domainName); if (refDomCit == reference.domainStates().cend()) { - // Domain not found in both states + // Entire domain is extra + for (ClusterState::UriToQueueInfoMapCIter citer = + domCit->second->queuesInfo().cbegin(); + citer != domCit->second->queuesInfo().cend(); + ++citer) { + extraQueues.push_back(citer->second); + } continue; // CONTINUE } @@ -1759,37 +1761,55 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, ClusterState::UriToQueueInfoMapCIter refCiter = refDomCit->second->queuesInfo().find(uri); if (refCiter == refDomCit->second->queuesInfo().cend()) { - // Queue not found in both states - continue; // CONTINUE + // Extra queue + extraQueues.push_back(citer->second); } - - const bsl::shared_ptr& referenceInfo = - refCiter->second; - const bsl::shared_ptr& info = citer->second; - if (info->uri() == referenceInfo->uri() && - info->key() == referenceInfo->key() && - info->partitionId() == referenceInfo->partitionId()) { - continue; // CONTINUE + else { + const bsl::shared_ptr& info = + citer->second; + const bsl::shared_ptr& referenceInfo = + refCiter->second; + if (*info != *referenceInfo) { + // Incorrect queue information + incorrectQueues.push_back( + bsl::make_pair(info, referenceInfo)); + } } + } + } - if (!seenIncorrectQueueInfo) { - bdlb::Print::newlineAndIndent(out, level); - out << "-----------------------------"; - bdlb::Print::newlineAndIndent(out, level); - out << "Incorrect Queue Information :"; - bdlb::Print::newlineAndIndent(out, level); - out << "-----------------------------"; - seenIncorrectQueueInfo = true; - } + if (!incorrectQueues.empty()) { + bdlb::Print::newlineAndIndent(out, level); + out << "-----------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Incorrect Queues:"; + bdlb::Print::newlineAndIndent(out, level); + out << "-----------------"; + for (bsl::vector, + bsl::shared_ptr > >:: + const_iterator citer = incorrectQueues.cbegin(); + citer != incorrectQueues.cend(); + ++citer) { + bdlb::Print::newlineAndIndent(out, level + 1); + out << citer->first; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "(correct queue info) " << citer->second; + } + } + if (!extraQueues.empty()) { + bdlb::Print::newlineAndIndent(out, level); + out << "--------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Extra queues :"; + bdlb::Print::newlineAndIndent(out, level); + out << "--------------"; + for (bsl::vector >:: + const_iterator citer = extraQueues.cbegin(); + citer != extraQueues.cend(); + ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); - out << "[key: " << info->key() << ", uri: " << info->uri() - << ", partitionId: " << info->partitionId() << "]" - << " (Correct: " - << "[key: " << referenceInfo->key() - << ", uri: " << referenceInfo->uri() - << ", partitionId: " << referenceInfo->partitionId() << "]" - << ")"; + out << *citer; } } @@ -1804,7 +1824,7 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, ClusterState::DomainStatesCIter domCit = state.domainStates().find( domainName); if (domCit == state.domainStates().cend()) { - // Entire domain of queues is not found + // Entire domain is missing for (ClusterState::UriToQueueInfoMapCIter refCiter = refDomCit->second->queuesInfo().cbegin(); refCiter != refDomCit->second->queuesInfo().cend(); @@ -1823,16 +1843,13 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, ClusterState::UriToQueueInfoMapCIter citer = domCit->second->queuesInfo().find(uri); - if (citer != domCit->second->queuesInfo().cend()) { - // Queue is found in both states - continue; // CONTINUE + if (citer == domCit->second->queuesInfo().cend()) { + // Missing queue + missingQueues.push_back(refCiter->second); } - - missingQueues.push_back(refCiter->second); } } - // Queue is missing from the cluster state if (!missingQueues.empty()) { bdlb::Print::newlineAndIndent(out, level); out << "----------------"; @@ -1840,78 +1857,40 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, out << "Missing queues :"; bdlb::Print::newlineAndIndent(out, level); out << "----------------"; - seenMissingQueue = true; - } - - for (bsl::vector >::const_iterator - citer = missingQueues.cbegin(); - citer != missingQueues.cend(); - ++citer) { - bdlb::Print::newlineAndIndent(out, level + 1); - out << "[key: " << (*citer)->key() << ", uri: " << (*citer)->uri() - << ", partitionId: " << (*citer)->partitionId() << "]"; + for (bsl::vector >:: + const_iterator citer = missingQueues.cbegin(); + citer != missingQueues.cend(); + ++citer) { + bdlb::Print::newlineAndIndent(out, level + 1); + out << *citer; + } } - // Check extra queues - bsl::vector > extraQueues; - for (ClusterState::DomainStatesCIter domCit = state.domainStates().begin(); - domCit != state.domainStates().end(); - ++domCit) { - const bsl::string& domainName = domCit->first; - - ClusterState::DomainStatesCIter refDomCit = - reference.domainStates().find(domainName); - if (refDomCit == reference.domainStates().cend()) { - // Entire domain of queues is not found + const bool queueInfoMismatch = !incorrectQueues.empty() || + !extraQueues.empty() || + !missingQueues.empty(); + if (queueInfoMismatch) { + bdlb::Print::newlineAndIndent(out, level); + out << "-------------------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "QUEUES IN CLUSTER STATE :"; + bdlb::Print::newlineAndIndent(out, level); + out << "-------------------------"; + for (ClusterState::DomainStatesCIter domCit = + reference.domainStates().cbegin(); + domCit != reference.domainStates().cend(); + ++domCit) { for (ClusterState::UriToQueueInfoMapCIter citer = domCit->second->queuesInfo().cbegin(); citer != domCit->second->queuesInfo().cend(); ++citer) { - extraQueues.push_back(citer->second); + bdlb::Print::newlineAndIndent(out, level + 1); + out << citer->second; } - - continue; // CONTINUE } - - for (ClusterState::UriToQueueInfoMapCIter citer = - domCit->second->queuesInfo().cbegin(); - citer != domCit->second->queuesInfo().cend(); - ++citer) { - const bmqt::Uri& uri = citer->first; - - ClusterState::UriToQueueInfoMapCIter refCiter = - refDomCit->second->queuesInfo().find(uri); - if (refCiter != refDomCit->second->queuesInfo().cend()) { - // Queue is found in both states - continue; // CONTINUE - } - - extraQueues.push_back(citer->second); - } - } - - // Extra queue in the cluster state - if (!extraQueues.empty()) { - bdlb::Print::newlineAndIndent(out, level); - out << "--------------"; - bdlb::Print::newlineAndIndent(out, level); - out << "Extra queues :"; - bdlb::Print::newlineAndIndent(out, level); - out << "--------------"; - seenExtraQueue = true; } - for (bsl::vector >::const_iterator - citer = extraQueues.cbegin(); - citer != extraQueues.cend(); - ++citer) { - bdlb::Print::newlineAndIndent(out, level + 1); - out << "[key: " << (*citer)->key() << ", uri: " << (*citer)->uri() - << ", partitionId: " << (*citer)->partitionId() << "]"; - } - - if (seenIncorrectPartitionInfo || seenIncorrectQueueInfo || - seenMissingQueue || seenExtraQueue) { + if (!incorrectPartitions.empty() || queueInfoMismatch) { // Inconsistency detected errorDescription << out.str(); return -1; // RETURN @@ -1958,14 +1937,11 @@ void ClusterUtil::validateClusterStateLedger(mqbi::Cluster* cluster, bmqu::MemOutStream errorDescription; rc = validateState(errorDescription, tempState, clusterState); if (rc != 0) { - BALL_LOG_WARN_BLOCK - { - BALL_LOG_OUTPUT_STREAM << clusterData.identity().description() - << ": Cluster state ledger's contents are" - << " different from the cluster state: " - << errorDescription.str(); - printQueues(BALL_LOG_OUTPUT_STREAM, clusterState); - } + BMQTSK_ALARMLOG_ALARM("CLUSTER") + << clusterData.identity().description() + << ": Cluster state ledger's contents are" + << " different from the cluster state: " << errorDescription.str() + << BMQTSK_ALARMLOG_END; } }