diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 609df8b01..b457cf04b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -214,7 +214,7 @@ ClusterQueueHelper::QueueLiveState::QueueLiveState( } // MANIPULATORS -void ClusterQueueHelper::QueueLiveState::reset() +void ClusterQueueHelper::QueueLiveState::resetButKeepPending() { // NOTE: Do not reset d_pending and d_inFlight, and some other data. @@ -386,9 +386,7 @@ ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext) // PRECONDITIONS BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(!isQueueAssigned(*(queueContext.get())) || - ((d_cluster_p->isCSLModeEnabled() && - queueContext->d_stateQInfo_sp->pendingUnassignment()))); + BSLS_ASSERT_SAFE(!isQueueAssigned(*queueContext)); if (d_cluster_p->isRemote()) { // Assigning a queue in a remote, is simply giving it a new queueId. @@ -420,146 +418,6 @@ ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext) return QueueAssignmentResult::k_ASSIGNMENT_OK; } -void ClusterQueueHelper::onQueueAssigning(const bmqt::Uri& uri, - bool processingPendingRequests) -{ - // executed by the cluster *DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE( - d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); - - QueueContextSp queueContext; - QueueContextMapIter queueContextIt = d_queues.find(uri); - if (queueContextIt == d_queues.end()) { - // Queue unknown, create a new one - queueContext.reset(new (*d_allocator_p) - QueueContext(uri, d_allocator_p), - d_allocator_p); - - d_queues[uri] = queueContext; - } - else { - queueContext = queueContextIt->second; - - if (d_cluster_p->isCSLModeEnabled()) { - queueContext->d_liveQInfo.d_queueExpirationTimestampMs = 0; - } - } - - if (!d_cluster_p->isCSLModeEnabled()) { - queueContext->d_stateQInfo_sp = d_clusterState_p->domainStates() - .at(uri.qualifiedDomain()) - ->queuesInfo() - .at(uri); - - // Process the pending requests on this machine, if any. - if (processingPendingRequests) { - onQueueContextAssigned(queueContext); - } - } -} - -bool ClusterQueueHelper::onQueueUnassigning( - bool* hasInFlightRequests, - const bmqp_ctrlmsg::QueueInfo& queueInfo) -{ - // executed by the cluster *DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE( - d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(!d_cluster_p->isCSLModeEnabled()); - BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); - BSLS_ASSERT_SAFE(hasInFlightRequests); - - bmqt::Uri uri(queueInfo.uri()); - mqbu::StorageKey key(mqbu::StorageKey::BinaryRepresentation(), - queueInfo.key().data()); - - QueueContextMapIter queueContextIt = d_queues.find(uri); - if (queueContextIt == d_queues.end()) { - // We don't know about that uri .. nothing to do, but error because - // it should not happen. - // - // NOTE: it may happen if the node is starting, hasn't yet - // synchronized its cluster state but receives an - // unassignment advisory from a primary. - BALL_LOG_ERROR << d_cluster_p->description() - << " Ignoring queueUnAssignementAdvisory for unknown " - << "queue " << queueInfo; - - BSLS_ASSERT_SAFE(0 == d_clusterState_p->queueKeys().count(key)); - // Since queue uri is unknown to self node, queue key should be - // unknown too. - - return false; // RETURN - } - - QueueContext* queueContext = queueContextIt->second.get(); - if (0 != queueContext->d_liveQInfo.d_numQueueHandles) { - // This could occur if destruction of a handle at self node is - // delayed (note that we enqueue handleSp to various threads when - // it is removed from a queue) until after a queue unassignment - // advisory is received. - - BALL_LOG_WARN << d_cluster_p->description() - << " Received queue-unassignment advisory for [" << uri - << "] but num handle count is [" - << queueContext->d_liveQInfo.d_numQueueHandles << "]."; - } - - if (queueContext->d_liveQInfo.d_inFlight != 0 || - !queueContext->d_liveQInfo.d_pending.empty()) { - // If we have in flight requests, we can't delete the QueueInfo - // references; so we simply reset it's members. This can occur in - // this scenario: - // 1) Self node (replica) receives a close-queue request and forwards - // it to primary. - // 2) Primary receives close-queue request and decides to unmap the - // queue and broadcast queue-unassignment advisory. - // 3) Before self can receive queue-unassignment advisory from the - // primary, it receives an open-queue request for the same queue. - // 4) Self bumps up queue's in-flight/pending count, and sends - // request to the primary. - // 5) Self receives queue-unassignment advisory from the primary. - - // The pending/inFlight request received in (4) will eventually get - // processed, or rejected (the old primary will reject it) and - // reprocessed from the beginning with the assignment step. - - BALL_LOG_INFO << d_cluster_p->description() - << " While processing queueUnAssignmentAdvisory: " - << queueInfo << ", resetting queue info of '" << uri - << "', key: " << queueContext->key() - << " [in-flight contexts: " - << queueContext->d_liveQInfo.d_inFlight - << ", pending contexts: " - << queueContext->d_liveQInfo.d_pending.size() << "]"; - - d_queuesById.erase(queueContext->d_liveQInfo.d_id); - queueContext->d_liveQInfo.reset(); - - *hasInFlightRequests = true; - } - else { - // Nothing is pending, it is safe to delete all references. - - BALL_LOG_INFO << d_cluster_p->description() - << " All references to queue " << uri << " with key '" - << queueContext->key() - << "' removed. Queue was mapped to Partition [" - << queueInfo.partitionId() << "]."; - - removeQueueRaw(queueContextIt); - - *hasInFlightRequests = false; - } - - return true; -} - void ClusterQueueHelper::requestQueueAssignment(const bmqt::Uri& uri) { // executed by the cluster *DISPATCHER* thread @@ -4069,7 +3927,7 @@ void ClusterQueueHelper::onClusterLeader( } void ClusterQueueHelper::onQueueAssigned( - const mqbc::ClusterStateQueueInfo& info) + const bsl::shared_ptr& info) { // executed by the cluster *DISPATCHER* thread @@ -4077,11 +3935,7 @@ void ClusterQueueHelper::onQueueAssigned( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); - - if (!d_cluster_p->isCSLModeEnabled()) { - // REVISIT - return; // RETURN - } + BSLS_ASSERT_SAFE(info); const mqbnet::ClusterNode* leaderNode = d_clusterData_p->electorInfo().leaderNode(); @@ -4090,7 +3944,8 @@ void ClusterQueueHelper::onQueueAssigned( : "** UNKNOWN**"; QueueContextSp queueContext; - QueueContextMapIter queueContextIt = d_queues.find(info.uri()); + QueueContextMapIter queueContextIt = d_queues.find(info->uri()); + if (queueContextIt != d_queues.end()) { // We already have a queueContext created for that queue queueContext = queueContextIt->second; @@ -4101,10 +3956,10 @@ void ClusterQueueHelper::onQueueAssigned( // partitionId/queueKey mismatch. And d_queueKeys must also // contain the key. BSLS_ASSERT_SAFE( - (queueContext->partitionId() == info.partitionId()) && - (queueContext->key() == info.key())); + (queueContext->partitionId() == info->partitionId()) && + (queueContext->key() == info->key())); BSLS_ASSERT_SAFE(1 == - d_clusterState_p->queueKeys().count(info.key())); + d_clusterState_p->queueKeys().count(info->key())); BSLS_ASSERT_SAFE( !queueContext->d_stateQInfo_sp->pendingUnassignment()); @@ -4113,7 +3968,7 @@ void ClusterQueueHelper::onQueueAssigned( return; // RETURN } else { - if (1 == d_clusterState_p->queueKeys().count(info.key())) { + if (1 == d_clusterState_p->queueKeys().count(info->key())) { // Self node's queue context is unaware of the assigned queue, // but queueKey specified in the advisory is present in the // 'queueKeys' data structure. @@ -4122,8 +3977,8 @@ void ClusterQueueHelper::onQueueAssigned( << d_cluster_p->description() << ": attempting to apply queue assignment for a known but" << " unassigned queue, but queueKey is not unique. " - << "QueueKey [" << info.key() << "], URI [" << info.uri() - << "], Partition [" << info.partitionId() + << "QueueKey [" << info->key() << "], URI [" << info->uri() + << "], Partition [" << info->partitionId() << "]. Current leader is: '" << leaderDescription << "'. Ignoring this entry in the advisory." << BMQTSK_ALARMLOG_END; @@ -4131,20 +3986,16 @@ void ClusterQueueHelper::onQueueAssigned( } // Update queue's mapping etc. - mqbc::ClusterState::QueueKeysInsertRc insertRc = - d_clusterState_p->queueKeys().insert(info.key()); - if (insertRc.second) { - d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) - ->adjustQueueCount(1); - } + BSLA_MAYBE_UNUSED mqbc::ClusterState::QueueKeysInsertRc insertRc = + d_clusterState_p->queueKeys().insert(info->key()); + BSLS_ASSERT_SAFE(insertRc.second); } } else { // First time hearing about this queue. Update 'queueKeys' and // ensure that queue key is unique. mqbc::ClusterState::QueueKeysInsertRc insertRc = - d_clusterState_p->queueKeys().insert(info.key()); + d_clusterState_p->queueKeys().insert(info->key()); if (false == insertRc.second) { // QueueKey is not unique. @@ -4152,29 +4003,26 @@ void ClusterQueueHelper::onQueueAssigned( BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") << d_cluster_p->description() << ": attempting to apply queue assignment for an unknown " - << "queue [" << info.uri() << "] assigned to Partition [" - << info.partitionId() << "], but queueKey [" << info.key() + << "queue [" << info->uri() << "] assigned to Partition [" + << info->partitionId() << "], but queueKey [" << info->key() << "] is not unique. Current leader is: '" << leaderDescription << "'. Ignoring this assignment." << BMQTSK_ALARMLOG_END; return; // RETURN } - d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) - ->adjustQueueCount(1); - // Create the queueContext. queueContext.reset(new (*d_allocator_p) - QueueContext(info.uri(), d_allocator_p), + QueueContext(info->uri(), d_allocator_p), d_allocator_p); - d_queues[info.uri()] = queueContext; + d_queues[info->uri()] = queueContext; } + mqbc::ClusterState::DomainState& domainState = + *d_clusterState_p->domainStates().at(info->uri().qualifiedDomain()); + + domainState.adjustQueueCount(1); - queueContext->d_stateQInfo_sp = d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) - ->queuesInfo() - .at(info.uri()); + queueContext->d_stateQInfo_sp = info; // Queue assignment from the leader is honored per the info updated // above @@ -4184,31 +4032,33 @@ void ClusterQueueHelper::onQueueAssigned( // Note: In non-CSL mode, the queue creation callback is instead invoked at // replica nodes when they receive a queue creation record from the primary // in the partition stream. - if (!d_clusterState_p->isSelfPrimary(info.partitionId())) { - // This is a replica node - - // Note: It's possible that the queue has already been registered in - // the StorageMgr if it was a queue found during storage recovery. - // Therefore, we will allow for duplicate registration which will - // simply result in a no-op. - d_storageManager_p->registerQueueReplica( - info.partitionId(), - info.uri(), - info.key(), - d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) - ->domain(), - true); // allowDuplicate - - d_storageManager_p->updateQueueReplica( - info.partitionId(), - info.uri(), - info.key(), - info.appInfos(), - d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) - ->domain(), - true); // allowDuplicate + if (d_cluster_p->isCSLModeEnabled()) { + if (!d_clusterState_p->isSelfPrimary(info->partitionId())) { + // This is a replica node + + // Note: It's possible that the queue has already been registered + // in the StorageMgr if it was a queue found during storage + // recovery. Therefore, we will allow for duplicate registration + // which will simply result in a no-op. + d_storageManager_p->registerQueueReplica( + info->partitionId(), + info->uri(), + info->key(), + d_clusterState_p->domainStates() + .at(info->uri().qualifiedDomain()) + ->domain(), + true); // allowDuplicate + + d_storageManager_p->updateQueueReplica( + info->partitionId(), + info->uri(), + info->key(), + info->appInfos(), + d_clusterState_p->domainStates() + .at(info->uri().qualifiedDomain()) + ->domain(), + true); // allowDuplicate + } } // NOTE: Even if it is not needed to invoke 'onQueueContextAssigned' in the @@ -4218,7 +4068,7 @@ void ClusterQueueHelper::onQueueAssigned( } void ClusterQueueHelper::onQueueUnassigned( - const mqbc::ClusterStateQueueInfo& info) + const bsl::shared_ptr& info) { // executed by the cluster *DISPATCHER* thread @@ -4226,15 +4076,12 @@ void ClusterQueueHelper::onQueueUnassigned( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); - - if (!d_cluster_p->isCSLModeEnabled()) { - return; // RETURN - } + BSLS_ASSERT_SAFE(info); const bsl::string& leaderDesc = d_clusterData_p->electorInfo().leaderNode()->nodeDescription(); - const QueueContextMapIter queueContextIt = d_queues.find(info.uri()); + const QueueContextMapIter queueContextIt = d_queues.find(info->uri()); if (queueContextIt == d_queues.end()) { // We don't know about that uri .. nothing to do, but error because // it should not happen. @@ -4246,7 +4093,8 @@ void ClusterQueueHelper::onQueueUnassigned( << ": Ignoring queue unassignment from leader " << leaderDesc << ", for unknown queue: " << info; - BSLS_ASSERT_SAFE(0 == d_clusterState_p->queueKeys().count(info.key())); + BSLS_ASSERT_SAFE(0 == + d_clusterState_p->queueKeys().count(info->key())); // Since queue uri is unknown to self node, queue key should be // unknown too. @@ -4255,7 +4103,11 @@ void ClusterQueueHelper::onQueueUnassigned( const QueueContextSp& queueContextSp = queueContextIt->second; QueueLiveState& qinfo = queueContextSp->d_liveQInfo; - if (!isQueueAssigned(*queueContextSp)) { + + mqbc::ClusterStateQueueInfo* assigned = + d_clusterState_p->getAssignedOrUnassigning(queueContextSp->uri()); + + if (assigned == 0) { // Queue is known but not assigned. Error because it should not occur. // Note that it may occur if self node is starting, received an // open-queue request for this queue (and thus, populated 'd_queues' @@ -4268,8 +4120,8 @@ void ClusterQueueHelper::onQueueUnassigned( << " because self node sees queue as unassigned."; return; // RETURN } - BSLS_ASSERT_SAFE(queueContextSp->partitionId() == info.partitionId() && - queueContextSp->key() == info.key()); + BSLS_ASSERT_SAFE(queueContextSp->partitionId() == info->partitionId() && + queueContextSp->key() == info->key()); if (0 != qinfo.d_numQueueHandles) { // This could occur if destruction of a handle at self node is delayed @@ -4283,14 +4135,14 @@ void ClusterQueueHelper::onQueueUnassigned( << qinfo.d_numQueueHandles << "]."; } - if (d_clusterState_p->isSelfPrimary(info.partitionId())) { - // We already ensured there are no pending contexts for this queue, + if (d_clusterState_p->isSelfPrimary(info->partitionId())) { + // openQueue while queue unassigning cancels the unassigning // so we can safely delete it from the various maps. removeQueueRaw(queueContextIt); // Unregister the queue/storage from the partition, which will end up // issuing a QueueDeletion record. Note that this method is async. - d_storageManager_p->unregisterQueue(info.uri(), info.partitionId()); + d_storageManager_p->unregisterQueue(info->uri(), info->partitionId()); } else { // This is a replica node. @@ -4324,14 +4176,14 @@ void ClusterQueueHelper::onQueueUnassigned( if (queueContextSp->d_liveQInfo.d_queue_sp) { d_clusterState_p->updatePartitionNumActiveQueues( - info.partitionId(), + info->partitionId(), -1); } d_queuesById.erase(qinfo.d_id); - qinfo.reset(); + qinfo.resetButKeepPending(); + // CQH will recreate 'queueContextSp->d_liveQInfo.d_queue_sp' upon + // 'onOpenQueueResponse' - // We do this in CSL mode only, such that isQueueAssigned() will - // return false. queueContextSp->d_stateQInfo_sp.reset(); } else { @@ -4346,15 +4198,18 @@ void ClusterQueueHelper::onQueueUnassigned( // Note: In non-CSL mode, the queue deletion callback is instead // invoked at nodes when they receive a queue deletion record from the // primary in the partition stream. - d_storageManager_p->unregisterQueueReplica(info.partitionId(), - info.uri(), - info.key(), - mqbu::StorageKey()); + + if (d_cluster_p->isCSLModeEnabled()) { + d_storageManager_p->unregisterQueueReplica(info->partitionId(), + info->uri(), + info->key(), + mqbu::StorageKey()); + } } - d_clusterState_p->queueKeys().erase(info.key()); + d_clusterState_p->queueKeys().erase(info->key()); d_clusterState_p->domainStates() - .at(info.uri().qualifiedDomain()) + .at(info->uri().qualifiedDomain()) ->adjustQueueCount(-1); BALL_LOG_INFO << d_cluster_p->description() @@ -4506,16 +4361,6 @@ ClusterQueueHelper::ClusterQueueHelper( // response processed first for the closeQueue) if (d_clusterStateManager_p) { - d_clusterStateManager_p->setQueueAssigningCb(bdlf::BindUtil::bind( - &ClusterQueueHelper::onQueueAssigning, - this, - bdlf::PlaceHolders::_1, // uri - bdlf::PlaceHolders::_2)); // processingPendingRequests - d_clusterStateManager_p->setQueueUnassigningCb(bdlf::BindUtil::bind( - &ClusterQueueHelper::onQueueUnassigning, - this, - bdlf::PlaceHolders::_1, // hasInFlightRequests - bdlf::PlaceHolders::_2)); // queueInfo d_clusterStateManager_p->setAfterPartitionPrimaryAssignmentCb( bdlf::BindUtil::bind( &ClusterQueueHelper::afterPartitionPrimaryAssignment, @@ -4742,6 +4587,11 @@ void ClusterQueueHelper::openQueue( // assign the queue, which is a no-op in case there is no leader. if (!isQueueAssigned(*(queueContextIt->second))) { + // In CSL, unassignment is async. + // Since QueueUnassignmentAdvisory can contain multiple queues, + // canceling pending Advisory is not an option. + // Instead, initiate new QueueAssignemntAdvisory which must + // take effect after old QueueUnassignemntAdvisory. assignQueue(queueContextIt->second); } } @@ -6337,23 +6187,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate) << "], queueKey [" << keyCopy << "] assigned to " << "Partition [" << pid << "] as it has expired."; - mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p, - uriCopy, - true); - - if (!d_cluster_p->isCSLModeEnabled()) { - // We already ensured there are no pending contexts for this queue, - // so we can safely delete it from the various maps. - removeQueueRaw(qit); - - d_clusterState_p->queueKeys().erase(keyCopy); - - d_clusterState_p->domainStates() - .at(uriCopy.qualifiedDomain()) - ->adjustQueueCount(-1); - - d_clusterState_p->unassignQueue(uriCopy); - } + mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p, uriCopy); // Populate 'queueUnassignedAdvisory' bdlma::LocalSequentialAllocator<1024> localAlloc(d_allocator_p); @@ -6377,11 +6211,6 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate) if (!d_cluster_p->isCSLModeEnabled()) { // Broadcast 'queueUnassignedAdvisory' to all followers d_clusterData_p->messageTransmitter().broadcastMessage(controlMsg); - - // Unregister the queue/storage from the partition, which will end - // up issuing a QueueDeletion record. Note that this method is - // async. - d_storageManager_p->unregisterQueue(uriCopy, pid); } } diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index f698d0e2b..82e966108 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -285,7 +285,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// Reset the `id`, `partitionId`, `key` and `queue` members of this /// object. Note that `uri` is left untouched because it is an /// invariant member of a given instance of such a QueueInfo object. - void reset(); + void resetButKeepPending(); }; struct StopContext { @@ -523,29 +523,6 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL QueueAssignmentResult::Enum assignQueue(const QueueContextSp& queueContext); - /// Called when the specified `uri` is in the process of being assigned. - /// If the specified `processingPendingRequests` is true, we will - /// process pending requests on this machine. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - void onQueueAssigning(const bmqt::Uri& uri, - bool processingPendingRequests); - - /// Called when the queue with the specified `queueInfo` is being - /// unassigned. Load into the specified `hasInFlightRequests` whether - /// there are still in-flight requests for the queue. Return true on - /// success, or false on failure. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - /// - /// TODO_CSL: This is the current workflow which we should be able to - /// remove after the new workflow via - /// ClusterQueueHelper::onQueueUnassigned() is stable. - bool onQueueUnassigning(bool* hasInFlightRequests, - const bmqp_ctrlmsg::QueueInfo& queueInfo); - /// Send a queueAssignment request to the leader, requesting assignment /// of the queue with the specified `uri`. This method is called only /// on a non leader node of a cluster member, for a cluster having a @@ -978,7 +955,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) + void + onQueueAssigned(const bsl::shared_ptr& info) BSLS_KEYWORD_OVERRIDE; /// Callback invoked when a queue with the specified `info` gets @@ -986,7 +964,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - void onQueueUnassigned(const mqbc::ClusterStateQueueInfo& info) + void + onQueueUnassigned(const bsl::shared_ptr& info) BSLS_KEYWORD_OVERRIDE; /// Callback invoked when a queue with the specified `uri` belonging to @@ -1109,13 +1088,6 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// failure. int gcExpiredQueues(bool immediate = false); - ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value); - - /// Set the corresponding member to the specified `value` and return a - /// reference offering modifiable access to this object. - ClusterQueueHelper& - setOnQueueUnassignedCb(const OnQueueUnassignedCb& value); - /// Start executing multi-step processing of StopRequest or CLOSING node /// advisory received from the specified `clusterNode`. In the case of /// StopRequest the specified `request` references the request; in the @@ -1135,8 +1107,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL mqbc::ClusterNodeSession* ns, const VoidFunctor& callback = VoidFunctor()); + /// Called upon leader becoming available. void onLeaderAvailable(); - // Called upon leader becoming available. // ACCESSORS @@ -1266,21 +1238,16 @@ ClusterQueueHelper::isQueueAssigned(const QueueContext& queueContext) const bmqp::QueueId::k_UNASSIGNED_QUEUE_ID; // RETURN } - DomainStatesCIter domCit = d_clusterState_p->domainStates().find( - queueContext.uri().qualifiedDomain()); - if (domCit == d_clusterState_p->domainStates().cend()) { - return false; // RETURN - } - - UriToQueueInfoMapCIter qCit = domCit->second->queuesInfo().find( + mqbc::ClusterStateQueueInfo* assigned = d_clusterState_p->getAssigned( queueContext.uri()); - if (qCit == domCit->second->queuesInfo().cend()) { + + if (assigned == 0) { return false; // RETURN } - BSLS_ASSERT_SAFE(qCit->second->partitionId() != + BSLS_ASSERT_SAFE(assigned->partitionId() != mqbs::DataStore::k_INVALID_PARTITION_ID && - !qCit->second->key().isNull()); + !assigned->key().isNull()); return true; } diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index d1137b0e5..16fdfcc0b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -1167,7 +1167,6 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri, d_clusterStateLedger_mp.get(), d_cluster_p, uri, - d_queueAssigningCb, d_allocator_p, status); } @@ -1189,7 +1188,6 @@ void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri, partitionId, queueKey, appIdInfos, - d_queueAssigningCb, forceUpdate); } @@ -1213,6 +1211,25 @@ void ClusterStateManager::unassignQueue( << ": Failed to apply queue unassignment advisory: " << advisory << ", rc: " << rc; } + else { + // In non-CSL mode this is the shortcut to call Primary CQH instead of + // waiting for the quorum of acks in the ledger. + for (bsl::vector::const_iterator cit = + advisory.queues().begin(); + cit != advisory.queues().end(); + ++cit) { + const bmqp_ctrlmsg::QueueInfo& queueInfo = *cit; + + if (d_state_p->unassignQueue(queueInfo.uri())) { + BALL_LOG_INFO << d_clusterData_p->identity().description() + << ": Queue unassigned: " << queueInfo; + } + else { + BALL_LOG_INFO << d_clusterData_p->identity().description() + << ": Failed to unassign Queue: " << queueInfo; + } + } + } } void ClusterStateManager::sendClusterState( @@ -1462,7 +1479,6 @@ void ClusterStateManager::processQueueAssignmentRequest( d_cluster_p, request, requester, - d_queueAssigningCb, d_allocator_p); } @@ -1585,149 +1601,99 @@ void ClusterStateManager::processQueueAssignmentAdvisory( mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - bool queueAlreadyAssigned = false; - const DomainStatesCIter domCit = d_state_p->domainStates().find( - uri.qualifiedDomain()); - if (domCit != d_state_p->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find( - uri); - if (qcit != domCit->second->queuesInfo().cend()) { - // Queue is assigned. Verify that the key and partition match - // with what we already have. - queueAlreadyAssigned = true; - - if (qcit->second->partitionId() != queueInfo.partitionId() || - (qcit->second->key() != queueKey)) { - if (!delayed) { - // Leader is telling self node to map a queue to new - // partition or have a new key (basically, its a new - // incarnation of the queue). This could occur when a - // queue is being opened-closed-opened in very quick - // succession. Old instance of the queue is deleted by - // the primary, primary broadcasts queue-unasssignment - // advisory, leader broadcasts queue-assignment - // advisory for the new instance of the queue, but self - // node receives those 2 broadcasts out of order - // (leader's advisory followed by primary's advisory). - // In this case, its beneficial to force-update self's - // view of the queue with what the leader is - // advertising (with an error). When self receives - // queue-unassignment advisory from the primary for the - // old instance of the queue, it will log an error and - // ignore it. - - BALL_LOG_ERROR - << d_cluster_p->description() << ": " - << "received queueAssignmentAdvisory from leader '" - << source->nodeDescription() << "' for a known and" - << " assigned queue with different " - << "partitionId/key: [received: " << queueInfo - << ", knownPartitionId: " - << qcit->second->partitionId() - << ", knownQueueKey: " << qcit->second->key() - << "]"; - } - else { - // There is partitionId/queueKey mismatch and this is a - // delayed (aka, buffered) advisory. This is a valid - // scenario. Here's how: Node starts up, initiates - // storage sync with the primary While recovery is - // underway, a queue, which is active, is deleted and - // unassigned by the primary. Further, same queue is - // opened again, which means leader may assign it to a - // different partition, and will definitely assign it a - // different queue key, and will issue a queue - // assignment advisory. But self will buffer it. When - // recovery is complete, self's storage manager will - // apply all recovered queues (including the previous - // incarnation of this queue) to self's cluster state - // (via 'ClusterStateManager::registerQueueInfo'), and - // thus, populate 'd_queues', and this is how we will - // end up here. So instead of alarming/asserting, we - // simply log at warn, and overwrite current state with - // the buffered (this) advisory and move on. - - BALL_LOG_WARN - << d_cluster_p->description() - << ": overwriting current known queue state " - << "with the buffered advisory for queue [" - << qcit->second->uri() - << "]. Current assigned Partition [" - << qcit->second->partitionId() - << "], current queueKey [" << qcit->second->key() - << "], new Partition [" << queueInfo.partitionId() - << "], new queueKey [" << queueKey << "]."; - } - - // Remove existing state, mapping, etc. - - d_state_p->queueKeys().erase(qcit->second->key()); - - mqbc::ClusterState::QueueKeysInsertRc irc = - d_state_p->queueKeys().insert(queueKey); - if (false == irc.second) { - // QueueKey provided by the leader is not unique. This - // is bad, as thing means that 2 different queue URIs - // have queue key. Unfortunately we can't retrieve the - // URI of the 'other' queue. - - BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << d_cluster_p->description() - << ": queueKey clash while applying" - << (delayed ? " buffered " : " ") - << "queue assignment advisory: " << queueInfo - << ". QueueKey [" << queueKey - << "]. Ignoring this entry in the advisory msg." - << BMQTSK_ALARMLOG_END; - continue; // CONTINUE - } - - // no need to update d_state_p->domainStates() entry - // , queue was already known and registered - AppInfos appIdInfos(d_allocator_p); - - mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, - queueInfo, - d_allocator_p); - - BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue( - uri, - queueKey, - queueInfo.partitionId(), - appIdInfos); - BSLS_ASSERT_SAFE(rc == false); + mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri); + // Only Replica can `processQueueAssignmentAdvisory`. Therefore, the + // state cannot be `k_UNASSIGNING` + + if (assigned) { + // Queue is assigned. Verify that the key and partition match + // with what we already have. + + if (assigned->partitionId() != queueInfo.partitionId() || + (assigned->key() != queueKey)) { + if (!delayed) { + // Leader is telling self node to map a queue to new + // partition or have a new key (basically, its a new + // incarnation of the queue). This could occur when a + // queue is being opened-closed-opened in very quick + // succession. Old instance of the queue is deleted by + // the primary, primary broadcasts queue-unasssignment + // advisory, leader broadcasts queue-assignment + // advisory for the new instance of the queue, but self + // node receives those 2 broadcasts out of order + // (leader's advisory followed by primary's advisory). + // In this case, its beneficial to force-update self's + // view of the queue with what the leader is + // advertising (with an error). When self receives + // queue-unassignment advisory from the primary for the + // old instance of the queue, it will log an error and + // ignore it. + + BALL_LOG_ERROR + << d_cluster_p->description() << ": " + << "received queueAssignmentAdvisory from leader '" + << source->nodeDescription() << "' for a known and" + << " assigned queue with different " + << "partitionId/key: [received: " << queueInfo + << ", knownPartitionId: " << assigned->partitionId() + << ", knownQueueKey: " << assigned->key() << "]"; } else { - // Queue is assigned, and there is no partitionId/queueKey - // mismatch. So this assert should not fire. - BSLS_ASSERT_SAFE(1 == - d_state_p->queueKeys().count(queueKey)); + // There is partitionId/queueKey mismatch and this is a + // delayed (aka, buffered) advisory. This is a valid + // scenario. Here's how: Node starts up, initiates + // storage sync with the primary While recovery is + // underway, a queue, which is active, is deleted and + // unassigned by the primary. Further, same queue is + // opened again, which means leader may assign it to a + // different partition, and will definitely assign it a + // different queue key, and will issue a queue + // assignment advisory. But self will buffer it. When + // recovery is complete, self's storage manager will + // apply all recovered queues (including the previous + // incarnation of this queue) to self's cluster state + // (via 'ClusterStateManager::registerQueueInfo'), and + // thus, populate 'd_queues', and this is how we will + // end up here. So instead of alarming/asserting, we + // simply log at warn, and overwrite current state with + // the buffered (this) advisory and move on. + + BALL_LOG_WARN + << d_cluster_p->description() + << ": overwriting current known queue state " + << "with the buffered advisory for queue [" + << assigned->uri() << "]. Current assigned Partition [" + << assigned->partitionId() << "], current queueKey [" + << assigned->key() << "], new Partition [" + << queueInfo.partitionId() << "], new queueKey [" + << queueKey << "]."; } - } - } - if (!queueAlreadyAssigned) { - // Since self node doesn't see the queue as assigned, the - // queueKey specified in the advisory must not occur in - // 'queueKeys' data structure. - mqbc::ClusterState::QueueKeysInsertRc insertRc = - d_state_p->queueKeys().insert(queueKey); - - if (false == insertRc.second) { - // QueueKey is not unique. - - BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << d_cluster_p->description() << ": attemping to apply" - << (delayed ? " buffered " : " ") - << " queueAssignmentAdvisory from leader [" - << source->nodeDescription() << "] for an unknown queue [" - << uri << "] assigned to Partition [" - << queueInfo.partitionId() << "], but queueKey [" - << queueKey << "] is not unique. Ignoring this entry in " - << "the advisory." << BMQTSK_ALARMLOG_END; - continue; // CONTINUE - } + // Remove existing state, mapping, etc. + d_state_p->queueKeys().erase(assigned->key()); + // no need to update d_state_p->domainStates() entry + // , queue was already known and registered + AppInfos appIdInfos(d_allocator_p); + + mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, + queueInfo, + d_allocator_p); + + BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue( + uri, + queueKey, + queueInfo.partitionId(), + appIdInfos); + BSLS_ASSERT_SAFE(rc == false); + } + else { + // Queue is assigned, and there is no partitionId/queueKey + // mismatch. So this assert should not fire. + BSLS_ASSERT_SAFE(1 == d_state_p->queueKeys().count(queueKey)); + } + } + else { AppInfos appIdInfos(d_allocator_p); mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, @@ -1738,16 +1704,10 @@ void ClusterStateManager::processQueueAssignmentAdvisory( queueKey, queueInfo.partitionId(), appIdInfos); - - d_state_p->domainStates() - .at(uri.qualifiedDomain()) - ->adjustQueueCount(1); } BALL_LOG_INFO << d_cluster_p->description() << ": Queue assigned: " << queueInfo; - - d_queueAssigningCb(uri, true); // processingPendingRequests } } @@ -1895,19 +1855,11 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory( mqbu::StorageKey key(mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - bool hasQueue = true; - const DomainStatesCIter domCit = d_state_p->domainStates().find( - uri.qualifiedDomain()); - if (domCit == d_state_p->domainStates().cend()) { - hasQueue = false; - } - const UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find( - uri); - if (qcit == domCit->second->queuesInfo().cend()) { - hasQueue = false; - } + mqbc::ClusterStateQueueInfo* assigned = d_state_p->getAssigned(uri); + // Only Replica can `processQueueAssignmentAdvisory`. Therefore, the + // state cannot be `k_UNASSIGNING` - if (!hasQueue) { + if (assigned == 0) { // Queue is not assigned. Error because it should not occur. BALL_LOG_ERROR << d_cluster_p->description() @@ -1922,8 +1874,8 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory( // Self node sees queue as assigned. Validate that the key/partition // from the unassignment match the internal state. - if ((qcit->second->partitionId() != advisory.partitionId()) || - (qcit->second->key() != key)) { + if ((assigned->partitionId() != advisory.partitionId()) || + (assigned->key() != key)) { // This can occur if a queue is deleted by the primary and created // immediately by the client. Primary broadcasts queue // unassignment advisory upon deleting old instance of the queue, @@ -1945,30 +1897,11 @@ void ClusterStateManager::processQueueUnAssignmentAdvisory( << advisory.partitionId() << ", advisoryKey: " << key << ", internalPartitionId: " - << qcit->second->partitionId() - << ", internalKey: " << qcit->second->key() << "]"; + << assigned->partitionId() + << ", internalKey: " << assigned->key() << "]"; continue; // CONTINUE } - - bool hasInFlightRequests = false; - if (d_queueUnassigningCb(&hasInFlightRequests, queueInfo)) { - const mqbu::StorageKey queueKey = qcit->second->key(); - if (hasInFlightRequests) { - d_state_p->updatePartitionQueueMapped(queueInfo.partitionId(), - -1); - } - else { - d_state_p->unassignQueue(uri); - } - - d_state_p->queueKeys().erase(queueKey); - d_state_p->domainStates() - .at(uri.qualifiedDomain()) - ->adjustQueueCount(-1); - - BALL_LOG_INFO << d_cluster_p->description() - << ": Unassigned queue: " << queueInfo; - } + d_state_p->unassignQueue(uri); } } diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index 8a1fbc85b..c745ef8b6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -163,10 +163,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL // A queue advisory can be either an // assignment or an un-assignment msg. - QueueAssigningCb d_queueAssigningCb; - - QueueUnassigningCb d_queueUnassigningCb; - AfterPartitionPrimaryAssignmentCb d_afterPartitionPrimaryAssignmentCb; private: @@ -302,15 +298,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. void setStorageManager(mqbi::StorageManager* value) BSLS_KEYWORD_OVERRIDE; - - /// Set the queue assigning callback to the specified `value`. - void - setQueueAssigningCb(const QueueAssigningCb& value) BSLS_KEYWORD_OVERRIDE; - - void setQueueUnassigningCb(const QueueUnassigningCb& value) - BSLS_KEYWORD_OVERRIDE; - // Set the queue unassigning callback to the specified 'value'. - void setAfterPartitionPrimaryAssignmentCb( const AfterPartitionPrimaryAssignmentCb& value) BSLS_KEYWORD_OVERRIDE; // Set the after partition primary assignment callback to the specified @@ -639,18 +626,6 @@ inline void ClusterStateManager::setStorageManager(mqbi::StorageManager* value) d_storageManager_p = value; } -inline void -ClusterStateManager::setQueueAssigningCb(const QueueAssigningCb& value) -{ - d_queueAssigningCb = value; -} - -inline void -ClusterStateManager::setQueueUnassigningCb(const QueueUnassigningCb& value) -{ - d_queueUnassigningCb = value; -} - inline void ClusterStateManager::setAfterPartitionPrimaryAssignmentCb( const AfterPartitionPrimaryAssignmentCb& value) { diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 1f1928ffd..3fd90fb12 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -276,13 +276,15 @@ void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds, } } -void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) +void Domain::onQueueAssigned( + const bsl::shared_ptr& info) { // executed by the associated CLUSTER's DISPATCHER thread // PRECONDITIONS BSLS_ASSERT_SAFE( d_cluster_sp->dispatcher()->inDispatcherThread(d_cluster_sp.get())); + BSLS_ASSERT_SAFE(info); if (!d_cluster_sp->isCSLModeEnabled()) { return; // RETURN @@ -292,7 +294,7 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) return; // RETURN } - if (info.uri().domain() != d_name) { + if (info->uri().domain() != d_name) { // Note: This method will fire on all domains which belong to the // cluster having the queue assignment, but we examine the domain // name from the 'uri' to guarantee that only one domain is @@ -301,7 +303,7 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) return; // RETURN } - updateAuthorizedAppIds(info.appInfos()); + updateAuthorizedAppIds(info->appInfos()); } void Domain::onQueueUpdated(const bmqt::Uri& uri, diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index dc1b76004..4fabd0b45 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -211,7 +211,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) + void + onQueueAssigned(const bsl::shared_ptr& info) BSLS_KEYWORD_OVERRIDE; /// Callback invoked when a queue with the specified `uri` belonging to diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 3f2ab1ed5..b0615f910 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -73,13 +73,13 @@ void ClusterStateObserver::onPartitionPrimaryAssignment( } void ClusterStateObserver::onQueueAssigned( - BSLS_ANNOTATION_UNUSED const ClusterStateQueueInfo& info) + BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& info) { // NOTHING } void ClusterStateObserver::onQueueUnassigned( - BSLS_ANNOTATION_UNUSED const ClusterStateQueueInfo& info) + BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& info) { // NOTHING } @@ -322,42 +322,49 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, // PRECONDITIONS BSLS_ASSERT_SAFE(cluster()->dispatcher()->inDispatcherThread(cluster())); - bool isNewAssignment = true; - const DomainStatesIter domIt = d_domainStates.find(uri.qualifiedDomain()); + bool isNewAssignment = true; + DomainStatesIter domIt = domainStates().find(uri.qualifiedDomain()); + UriToQueueInfoMapIter queueIt; - if (domIt == d_domainStates.end()) { - d_domainStates[uri.qualifiedDomain()].createInplace(d_allocator_p, - d_allocator_p); - d_domainStates.at(uri.qualifiedDomain()) - ->queuesInfo()[uri] - .createInplace(d_allocator_p, - uri, - key, - partitionId, - appIdInfos, - d_allocator_p); + if (domIt == domainStates().end()) { + ClusterState::DomainStateSp domainState; + domainState.createInplace(d_allocator_p, d_allocator_p); + domIt = + domainStates().emplace(uri.qualifiedDomain(), domainState).first; + + queueIt = domIt->second->queuesInfo().end(); } else { - const UriToQueueInfoMapIter iter = domIt->second->queuesInfo().find( - uri); - if (iter == domIt->second->queuesInfo().end()) { - domIt->second->queuesInfo()[uri].createInplace(d_allocator_p, - uri, - key, - partitionId, - appIdInfos, - d_allocator_p); - } - else { + queueIt = domIt->second->queuesInfo().find(uri); + } + + if (queueIt == domIt->second->queuesInfo().end()) { + QueueInfoSp queueInfo; + + queueInfo.createInplace(d_allocator_p, + uri, + key, + partitionId, + appIdInfos, + d_allocator_p); + + queueIt = domIt->second->queuesInfo().emplace(uri, queueInfo).first; + } + else { + if (queueIt->second->state() == ClusterStateQueueInfo::k_ASSIGNED) { + // See 'ClusterStateManager::processQueueAssignmentAdvisory' which + // insists on re-assigning isNewAssignment = false; - updatePartitionQueueMapped(iter->second->partitionId(), -1); - iter->second->setKey(key).setPartitionId(partitionId); - iter->second->appInfos() = appIdInfos; - iter->second->setPendingUnassignment(false); + updatePartitionQueueMapped(queueIt->second->partitionId(), -1); } + queueIt->second->setKey(key).setPartitionId(partitionId); + queueIt->second->appInfos() = appIdInfos; } + // Set the queue as assigned + queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNED); + updatePartitionQueueMapped(partitionId, 1); bmqu::Printer printer(&appIdInfos); @@ -369,11 +376,7 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, for (ObserversSetIter it = d_observers.begin(); it != d_observers.end(); ++it) { - (*it)->onQueueAssigned(ClusterStateQueueInfo(uri, - key, - partitionId, - appIdInfos, - d_allocator_p)); + (*it)->onQueueAssigned(queueIt->second); } // POSTCONDITIONS @@ -413,7 +416,7 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri) for (ObserversSetIter it = d_observers.begin(); it != d_observers.end(); ++it) { - (*it)->onQueueUnassigned(*cit->second); + (*it)->onQueueUnassigned(cit->second); } domIt->second->queuesInfo().erase(cit); diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index e1c4a928c..52bdeb054 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -165,6 +165,18 @@ class ClusterStateQueueInfo { typedef mqbi::ClusterStateManager::AppInfos AppInfos; typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter; + enum State { + // State of Assignment. In CSL, assignment and unassignment are async, + // hence the need for k_ASSIGNING/k_UNASSIGNING + // Assigning following unassigning is also supported. + // On Replica, the only possible state is k_ASSIGNED. + + k_NONE, + k_ASSIGNING, + k_ASSIGNED, + k_UNASSIGNING + }; + private: // DATA bmqt::Uri d_uri; @@ -183,9 +195,9 @@ class ClusterStateQueueInfo { // // TBD: Should also be added to mqbconfm::Domain - bool d_pendingUnassignment; + State d_state; // Flag indicating whether this queue is in the process of - // being unassigned. + // being assigned / unassigned. private: // NOT IMPLEMENTED @@ -219,7 +231,7 @@ class ClusterStateQueueInfo { /// Set the corresponding member to the specified `value` and return a /// reference offering modifiable access to this object. - ClusterStateQueueInfo& setPendingUnassignment(bool value); + void setState(State value); /// Get a modifiable reference to this object's appIdInfos. AppInfos& appInfos(); @@ -236,7 +248,8 @@ class ClusterStateQueueInfo { const AppInfos& appInfos() const; /// Return the value of the corresponding member of this object. - bool pendingUnassignment() const; + State state() const; + bool pendingUnassignment() const; /// Format this object to the specified output `stream` at the (absolute /// value of) the optionally specified indentation `level` and return a @@ -302,14 +315,16 @@ class ClusterStateObserver { /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void onQueueAssigned(const ClusterStateQueueInfo& info); + virtual void + onQueueAssigned(const bsl::shared_ptr& info); /// Callback invoked when a queue with the specified `info` gets /// unassigned from the cluster. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void onQueueUnassigned(const ClusterStateQueueInfo& info); + virtual void + onQueueUnassigned(const bsl::shared_ptr& info); /// Callback invoked when a queue with the specified `uri` belonging to /// the specified `domain` is updated with the optionally specified @@ -645,6 +660,20 @@ class ClusterState { /// validation can be performed. The bahavior is undefined unless /// `partitionId >= 0` and `partitionId < partitionsCount`. const ClusterStatePartitionInfo& partition(int partitionId) const; + + /// Return `ClusterStateQueueInfo` for the specified `uri` or `0` if it + /// does not exist. + ClusterStateQueueInfo* getQueueInfo(const bmqt::Uri& uri) const; + + /// Return `ClusterStateQueueInfo` for the specified `uri` if it exists and + /// is in the `k_ASSIGNED` state or `0` otherwise. + ClusterStateQueueInfo* getAssigned(const bmqt::Uri& uri) const; + + /// Return `ClusterStateQueueInfo` for the specified `uri` if it exists and + /// is in either the `k_ASSIGNED` or `k_UNASSIGNING` state. Return `0` + /// otherwise. + ClusterStateQueueInfo* + getAssignedOrUnassigning(const bmqt::Uri& uri) const; }; // ============================================================================ @@ -767,7 +796,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( , d_key() , d_partitionId(mqbs::DataStore::k_INVALID_PARTITION_ID) , d_appInfos(allocator) -, d_pendingUnassignment(false) +, d_state(k_NONE) { // NOTHING } @@ -782,7 +811,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( , d_key(key) , d_partitionId(partitionId) , d_appInfos(appIdInfos, allocator) -, d_pendingUnassignment(false) +, d_state(k_NONE) { // NOTHING } @@ -801,11 +830,23 @@ inline ClusterStateQueueInfo& ClusterStateQueueInfo::setPartitionId(int value) return *this; } -inline ClusterStateQueueInfo& -ClusterStateQueueInfo::setPendingUnassignment(bool value) +inline void ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State value) { - d_pendingUnassignment = value; - return *this; + // k_NONE + // | | + // ClusterUtil::assignQueue | | + // | V + // | k_ASSIGNING <---+ + // | | | + // ClusterState::assignQueue | | | + // V V | + // k_ASSIGNED | + // | | + // | | ClusterState::assignQueue + // V | + // k_UNASSIGNING + + d_state = value; } inline ClusterStateQueueInfo::AppInfos& ClusterStateQueueInfo::appInfos() @@ -845,9 +886,14 @@ ClusterStateQueueInfo::appInfos() const return d_appInfos; } +inline ClusterStateQueueInfo::State ClusterStateQueueInfo::state() const +{ + return d_state; +} + inline bool ClusterStateQueueInfo::pendingUnassignment() const { - return d_pendingUnassignment; + return d_state == k_UNASSIGNING; } // ------------------ @@ -1022,6 +1068,46 @@ ClusterState::partition(int partitionId) const return d_partitionsInfo[partitionId]; } +inline ClusterStateQueueInfo* +ClusterState::getQueueInfo(const bmqt::Uri& uri) const +{ + const DomainStatesCIter domCit = domainStates().find( + uri.qualifiedDomain()); + if (domCit == domainStates().cend()) { + return 0; + } + + UriToQueueInfoMapCIter qcit = domCit->second->queuesInfo().find(uri); + if (qcit == domCit->second->queuesInfo().cend()) { + return 0; + } + + return qcit->second.get(); +} + +inline ClusterStateQueueInfo* +ClusterState::getAssigned(const bmqt::Uri& uri) const +{ + ClusterStateQueueInfo* queue = getQueueInfo(uri); + + return queue ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED ? queue + : 0 + : 0; +} + +inline ClusterStateQueueInfo* +ClusterState::getAssignedOrUnassigning(const bmqt::Uri& uri) const +{ + ClusterStateQueueInfo* queue = getQueueInfo(uri); + + return queue + ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED || + queue->state() == ClusterStateQueueInfo::k_UNASSIGNING + ? queue + : 0 + : 0; +} + // -------------------------------- // struct ClusterState::DomainState // -------------------------------- diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index 0316e7418..78d0e9933 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -67,7 +67,6 @@ ClusterStateManager::ClusterStateManager( // TODO Add cluster config to determine Eventual vs Strong , d_clusterStateLedger_mp(clusterStateLedger) , d_storageManager_p(0) -, d_queueAssigningCb() , d_afterPartitionPrimaryAssignmentCb() { // PRECONDITIONS @@ -1501,7 +1500,6 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri, d_clusterStateLedger_mp.get(), d_cluster_p, uri, - d_queueAssigningCb, d_allocator_p, status); } @@ -1523,7 +1521,6 @@ void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri, partitionId, queueKey, appIdInfos, - d_queueAssigningCb, forceUpdate); } @@ -1810,7 +1807,6 @@ void ClusterStateManager::processQueueAssignmentRequest( d_cluster_p, request, requester, - d_queueAssigningCb, d_allocator_p); } diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index 70be8517b..1977dd4fb 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -167,8 +167,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL mqbi::StorageManager* d_storageManager_p; - QueueAssigningCb d_queueAssigningCb; - AfterPartitionPrimaryAssignmentCb d_afterPartitionPrimaryAssignmentCb; private: @@ -395,15 +393,6 @@ class ClusterStateManager BSLS_KEYWORD_FINAL /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. void setStorageManager(mqbi::StorageManager* value) BSLS_KEYWORD_OVERRIDE; - - /// Set the queue assigning callback to the specified `value`. - void - setQueueAssigningCb(const QueueAssigningCb& value) BSLS_KEYWORD_OVERRIDE; - - void setQueueUnassigningCb(const QueueUnassigningCb& value) - BSLS_KEYWORD_OVERRIDE; - // Set the queue unassigning callback to the specified 'value'. - /// Set the after partition primary assignment callback to the specified /// `value`. void setAfterPartitionPrimaryAssignmentCb( @@ -746,21 +735,6 @@ inline void ClusterStateManager::setStorageManager(mqbi::StorageManager* value) d_storageManager_p = value; } -inline void -ClusterStateManager::setQueueAssigningCb(const QueueAssigningCb& value) -{ - d_queueAssigningCb = value; -} - -inline void ClusterStateManager::setQueueUnassigningCb( - BSLS_ANNOTATION_UNUSED const QueueUnassigningCb& value) -{ - // Note that QueueUnassigningCb is only ever used in non-CSL mode, so we - // can ignore it. - // - // NOTHING -} - inline void ClusterStateManager::setAfterPartitionPrimaryAssignmentCb( const AfterPartitionPrimaryAssignmentCb& value) { diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 63172674e..02a37a3c4 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -354,8 +354,7 @@ void createDomainCb(const bmqp_ctrlmsg::Status& status, // ------------------ void ClusterUtil::setPendingUnassignment(ClusterState* clusterState, - const bmqt::Uri& uri, - bool pendingUnassignment) + const bmqt::Uri& uri) { // PRECONDITIONS BSLS_ASSERT_SAFE(clusterState); @@ -366,7 +365,7 @@ void ClusterUtil::setPendingUnassignment(ClusterState* clusterState, if (iter != clusterState->domainStates().cend()) { UriToQueueInfoMapIter qiter = iter->second->queuesInfo().find(uri); if (qiter != iter->second->queuesInfo().cend()) { - qiter->second->setPendingUnassignment(pendingUnassignment); + qiter->second->setState(ClusterStateQueueInfo::k_UNASSIGNING); } } } @@ -671,7 +670,6 @@ void ClusterUtil::processQueueAssignmentRequest( const mqbi::Cluster* cluster, const bmqp_ctrlmsg::ControlMessage& request, mqbnet::ClusterNode* requester, - const QueueAssigningCb& queueAssigningCb, bslma::Allocator* allocator) { // executed by the cluster *DISPATCHER* thread @@ -746,25 +744,11 @@ void ClusterUtil::processQueueAssignmentRequest( status.code() = 0; status.message() = ""; - const DomainStatesCIter cit = clusterState->domainStates().find( - uri.qualifiedDomain()); - if (cit != clusterState->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = cit->second->queuesInfo().find(uri); - if (qcit != cit->second->queuesInfo().cend() && - !(cluster->isCSLModeEnabled() && - qcit->second->pendingUnassignment())) { - // Queue is already assigned - clusterData->messageTransmitter().sendMessage(response, requester); - return; // RETURN - } - } - assignQueue(clusterState, clusterData, ledger, cluster, uri, - queueAssigningCb, allocator, &status); @@ -846,14 +830,13 @@ void ClusterUtil::populateQueueUnassignedAdvisory( } ClusterUtil::QueueAssignmentResult::Enum -ClusterUtil::assignQueue(ClusterState* clusterState, - ClusterData* clusterData, - ClusterStateLedger* ledger, - const mqbi::Cluster* cluster, - const bmqt::Uri& uri, - const QueueAssigningCb& queueAssigningCb, - bslma::Allocator* allocator, - bmqp_ctrlmsg::Status* status) +ClusterUtil::assignQueue(ClusterState* clusterState, + ClusterData* clusterData, + ClusterStateLedger* ledger, + const mqbi::Cluster* cluster, + const bmqt::Uri& uri, + bslma::Allocator* allocator, + bmqp_ctrlmsg::Status* status) { // executed by the cluster *DISPATCHER* thread @@ -874,6 +857,7 @@ ClusterUtil::assignQueue(ClusterState* clusterState, const bmqp_ctrlmsg::NodeStatus::Value nodeStatus = clusterData->membership().selfNodeStatus(); + if (!cluster->isFSMWorkflow() && bmqp_ctrlmsg::NodeStatus::E_AVAILABLE != nodeStatus) { BALL_LOG_WARN << cluster->description() @@ -890,15 +874,45 @@ ClusterUtil::assignQueue(ClusterState* clusterState, k_ASSIGNMENT_WHILE_UNAVAILABLE; // RETURN } - DomainStatesIter domIt = clusterState->domainStates().find( - uri.qualifiedDomain()); - if (domIt == clusterState->domainStates().end()) { - // REVISIT: This is also done in 'ClusterState::assignQueue' + ClusterStateQueueInfo::State previousState = ClusterStateQueueInfo::k_NONE; + + ClusterState::DomainStates& domainStates = clusterState->domainStates(); + DomainStatesIter domIt = domainStates.find(uri.qualifiedDomain()); + + UriToQueueInfoMapIter queueIt; + + if (domIt == domainStates.end()) { + ClusterState::DomainStateSp domainState; + domainState.createInplace(allocator, allocator); + domIt = domainStates.emplace(uri.qualifiedDomain(), domainState).first; + + queueIt = domIt->second->queuesInfo().end(); + } + else { + queueIt = domIt->second->queuesInfo().find(uri); + } + + if (queueIt == domIt->second->queuesInfo().end()) { + QueueInfoSp queueInfo; + + queueInfo.createInplace(allocator, uri, allocator); + + queueIt = domIt->second->queuesInfo().emplace(uri, queueInfo).first; + } + else { + previousState = queueIt->second->state(); + } + + if (previousState == ClusterStateQueueInfo::k_ASSIGNING) { + BALL_LOG_INFO << cluster->description() << "queueAssignment of '" + << uri << "' is already pending."; + return QueueAssignmentResult::k_ASSIGNMENT_OK; + } - clusterState->domainStates()[uri.qualifiedDomain()].createInplace( - allocator, - allocator); - domIt = clusterState->domainStates().find(uri.qualifiedDomain()); + if (previousState == ClusterStateQueueInfo::k_ASSIGNED) { + BALL_LOG_INFO << cluster->description() << "queueAssignment of '" + << uri << "' is already done."; + return QueueAssignmentResult::k_ASSIGNMENT_OK; } if (domIt->second->domain() == 0) { @@ -972,18 +986,13 @@ ClusterUtil::assignQueue(ClusterState* clusterState, } } - // Set the queue as no longer pending unassignment - const DomainStatesCIter citDomainState = clusterState->domainStates().find( - uri.qualifiedDomain()); - if (citDomainState != clusterState->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = - citDomainState->second->queuesInfo().find(uri); - if (qcit != citDomainState->second->queuesInfo().cend()) { - BSLS_ASSERT_SAFE(cluster->isCSLModeEnabled() && - qcit->second->pendingUnassignment()); - qcit->second->setPendingUnassignment(false); - } - } + // Set the queue as assigning (no longer pending unassignment) + queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNING); + + BALL_LOG_INFO << "Cluster [" << cluster->description() + << "]: Transition: " << previousState << " -> " + << ClusterStateQueueInfo::k_ASSIGNING << " for [" << uri + << "]."; // Populate 'queueAssignmentAdvisory' bdlma::LocalSequentialAllocator<1024> localAllocator(allocator); @@ -1001,12 +1010,11 @@ ClusterUtil::assignQueue(ClusterState* clusterState, clusterData, uri, domIt->second->domain()); - if (cluster->isCSLModeEnabled()) { - // In CSL mode, we delay the insertion to queueKeys until - // 'onQueueAssigned' observer callback. - clusterState->queueKeys().erase(key); - } + // 'ClusterQueueHelper::onQueueAssigned' (the 'onQueueAssigned' observer + // callback) will insert the key to 'ClusterState::queueKeys'. + + clusterState->queueKeys().erase(key); // Apply 'queueAssignmentAdvisory' to CSL BALL_LOG_INFO << clusterData->identity().description() @@ -1041,8 +1049,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState, appInfos); BSLS_ASSERT_SAFE(assignRc); - domIt->second->adjustQueueCount(1); - BALL_LOG_INFO << cluster->description() << ": Queue assigned: " << queueAdvisory; @@ -1050,8 +1056,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState, clusterData->messageTransmitter().broadcastMessage(controlMsg); } - queueAssigningCb(uri, true); // processingPendingRequests - return QueueAssignmentResult::k_ASSIGNMENT_OK; } @@ -1061,7 +1065,6 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, int partitionId, const mqbu::StorageKey& queueKey, const AppInfos& appInfos, - const QueueAssigningCb& queueAssigningCb, bool forceUpdate) { // executed by the cluster *DISPATCHER* thread @@ -1140,25 +1143,6 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, << "[uri: " << uri << ", queueKey: " << queueKey << ", partitionId: " << partitionId; - if (!cluster->isCSLModeEnabled()) { - ClusterState::QueueKeysInsertRc insertRc = - clusterState->queueKeys().insert(queueKey); - if (false == insertRc.second) { - BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << cluster->description() - << ": re-registering a known queue with a stale view, " - << "but queueKey is not unique. " << "QueueKey [" - << queueKey << "], URI [" << uri << "], Partition [" - << partitionId << "], AppInfos [" << storageAppInfos - << "]." << BMQTSK_ALARMLOG_END; - return; // RETURN - } - - clusterState->domainStates() - .at(uri.qualifiedDomain()) - ->adjustQueueCount(1); - } - BSLS_ASSERT_SAFE(1 == clusterState->queueKeys().count(queueKey)); return; // RETURN @@ -1174,27 +1158,6 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, << ", queueKey: " << queueKey << ", partitionId: " << partitionId << ", appInfos: " << printer << "]"; - - if (!cluster->isCSLModeEnabled()) { - ClusterState::QueueKeysInsertRc insertRc = - clusterState->queueKeys().insert(queueKey); - if (false == insertRc.second) { - // Duplicate queue key. - BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") - << cluster->description() - << ": registering a queue for an unknown queue, but " - << "queueKey is not unique. QueueKey [" << queueKey - << "], URI [" << uri << "], Partition [" << partitionId << "]." - << BMQTSK_ALARMLOG_END; - return; // RETURN - } - - clusterState->domainStates() - .at(uri.qualifiedDomain()) - ->adjustQueueCount(1); - } - - queueAssigningCb(uri, false); // processingPendingRequests } void ClusterUtil::populateAppInfos( diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.h b/src/groups/mqb/mqbc/mqbc_clusterutil.h index 171416a17..a1c5f6b88 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.h +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.h @@ -104,8 +104,6 @@ struct ClusterUtil { typedef mqbi::ClusterStateManager::QueueAssignmentResult QueueAssignmentResult; - typedef mqbi::ClusterStateManager::QueueAssigningCb QueueAssigningCb; - /// Map of NodeSession -> number of new partitions to assign to it typedef bsl::unordered_map NumNewPartitionsMap; @@ -135,11 +133,10 @@ struct ClusterUtil { /// Return true if the specified `spoPair` is valid, false otherwise. static bool isValid(const bmqp_ctrlmsg::SyncPointOffsetPair& spoPair); - /// Set the specified `uri` to have the specified `pendingUnassignment` - /// status in the specified `clusterState`. + /// Set the specified `uri` to have the `k_UNASSIGNING` state in the + /// specified `clusterState`. static void setPendingUnassignment(ClusterState* clusterState, - const bmqt::Uri& uri, - bool pendingUnassignment); + const bmqt::Uri& uri); /// Load into the specified `message` the message encoded in the /// specified `eventBlob` using the specified `allocator`. @@ -191,8 +188,8 @@ struct ClusterUtil { /// Process the queue assignment in the specified `request`, received /// from the specified `requester`, using the specified `clusterState`, - /// `clusterData`, `ledger`, `cluster`, `queueAssigningCb` and - /// `allocator`. Return the queue assignment result. + /// `clusterData`, `ledger`, `cluster`, and `allocator`. Return the queue + /// assignment result. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. @@ -202,9 +199,8 @@ struct ClusterUtil { ClusterStateLedger* ledger, const mqbi::Cluster* cluster, const bmqp_ctrlmsg::ControlMessage& request, - mqbnet::ClusterNode* requester, - const QueueAssigningCb& queueAssigningCb, - bslma::Allocator* allocator); + mqbnet::ClusterNode* requester, + bslma::Allocator* allocator); /// Populate the specified `advisory` with information describing a /// queue assignment of the specified `uri` living in the specified @@ -238,26 +234,24 @@ struct ClusterUtil { /// adviosry to the specified `ledger`. Return a value indicating /// whether the assignment was successful or was definitively rejected, /// and populate the optionally specified `status` with a human readable - /// error code and string in case of failure. Also invoke the specified - /// `queueAssigningCb` on success. Use the specified `allocator` for - /// memory allocations. This method is called only on the leader node. + /// error code and string in case of failure. Use the specified + /// `allocator` for memory allocations. This method is called only on the + /// leader node. /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. static QueueAssignmentResult::Enum - assignQueue(ClusterState* clusterState, - ClusterData* clusterData, - ClusterStateLedger* ledger, - const mqbi::Cluster* cluster, - const bmqt::Uri& uri, - const QueueAssigningCb& queueAssigningCb, - bslma::Allocator* allocator, - bmqp_ctrlmsg::Status* status = 0); + assignQueue(ClusterState* clusterState, + ClusterData* clusterData, + ClusterStateLedger* ledger, + const mqbi::Cluster* cluster, + const bmqt::Uri& uri, + bslma::Allocator* allocator, + bmqp_ctrlmsg::Status* status = 0); /// Register a queue info for the queue with the specified `uri`, /// `partitionId`, `queueKey` and the optionally specified `appIdInfos` - /// to the specified `clusterState` of the specified `cluster`. Also - /// invoke the specified `queueAssigningCb` on success. If no + /// to the specified `clusterState` of the specified `cluster`. If no /// `appIdInfos` is specified, use the appId infos from the domain /// config instead. If the specified `forceUpdate` flag is true, update /// queue info even if it is valid but different from the specified @@ -271,7 +265,6 @@ struct ClusterUtil { int partitionId, const mqbu::StorageKey& queueKey, const AppInfos& appIdInfos, - const QueueAssigningCb& queueAssigningCb, bool forceUpdate); /// Generate appKeys based on the appIds in the specified `domainConfig` diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp index 16ea3acbd..7d32ae985 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp @@ -330,12 +330,14 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, ++citer) { const mqbc::ClusterState::QueueInfoSp& infoSp = citer->second; - bmqp_ctrlmsg::QueueInfo queueInfo; - infoSp->key().loadBinary(&queueInfo.key()); - queueInfo.uri() = infoSp->uri().asString(); - queueInfo.partitionId() = infoSp->partitionId(); + if (infoSp->state() == ClusterStateQueueInfo::k_ASSIGNED) { + bmqp_ctrlmsg::QueueInfo queueInfo; + infoSp->key().loadBinary(&queueInfo.key()); + queueInfo.uri() = infoSp->uri().asString(); + queueInfo.partitionId() = infoSp->partitionId(); - leaderAdvisory.queues().push_back(queueInfo); + leaderAdvisory.queues().push_back(queueInfo); + } } } diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index fa754df9c..d916424cc 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -74,22 +74,6 @@ class ClusterStateManager { public: // TYPES - /// Signature of a callback invoked when the specified `uri` is in the - /// process of being assigned. If the specified - /// `processingPendingRequests` is true, we will process pending - /// requests on this machine. - typedef bsl::function - QueueAssigningCb; - - /// Signature of a callback invoked when the queue with the specified - /// `queueInfo` is being unassigned. Load into the specified - /// `hasInFlightRequests` whether there are still in-flight requests for - /// the queue. Return true on success, or false on failure. - typedef bsl::function - QueueUnassigningCb; - /// Signature of a callback invoked after the specified `partitionId` /// gets assigned to the specified `primary` with the specified `status`. /// Note that null is a valid value for the `primary`, and it implies @@ -155,12 +139,6 @@ class ClusterStateManager { /// dispatcher thread. virtual void setStorageManager(StorageManager* value) = 0; - /// Set the queue assigning callback to the specified `value`. - virtual void setQueueAssigningCb(const QueueAssigningCb& value) = 0; - - /// Set the queue unassigning callback to the specified `value`. - virtual void setQueueUnassigningCb(const QueueUnassigningCb& value) = 0; - /// Set the after partition primary assignment callback to the specified /// `value`. virtual void setAfterPartitionPrimaryAssignmentCb(