From e1168840d8b111aab3060faa5d7d4653e71e62da Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Thu, 26 Dec 2024 14:47:20 -0500 Subject: [PATCH] Add broadcast routing for DOMAINS REMOVE command Signed-off-by: Emelia Lei --- src/groups/mqb/mqba/mqba_application.cpp | 3 + src/groups/mqb/mqba/mqba_commandrouter.cpp | 4 + src/groups/mqb/mqba/mqba_domainmanager.cpp | 124 ++++++++---------- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 41 ++++-- src/groups/mqb/mqbblp/mqbblp_cluster.h | 5 + src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 23 +++- src/groups/mqb/mqbblp/mqbblp_clusterproxy.h | 5 + .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 3 +- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 117 +++++++++++------ src/groups/mqb/mqbblp/mqbblp_domain.h | 32 +++-- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 18 +++ src/groups/mqb/mqbblp/mqbblp_storagemanager.h | 5 + src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 18 +++ src/groups/mqb/mqbc/mqbc_storagemanager.h | 5 + src/groups/mqb/mqbc/mqbc_storageutil.cpp | 68 ++++++++++ src/groups/mqb/mqbc/mqbc_storageutil.h | 17 +++ src/groups/mqb/mqbi/mqbi_cluster.h | 4 + src/groups/mqb/mqbi/mqbi_domain.h | 19 ++- src/groups/mqb/mqbi/mqbi_storagemanager.h | 4 + src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 9 ++ src/groups/mqb/mqbmock/mqbmock_cluster.h | 5 + src/groups/mqb/mqbmock/mqbmock_domain.cpp | 15 ++- src/groups/mqb/mqbmock/mqbmock_domain.h | 13 +- .../mqb/mqbmock/mqbmock_storagemanager.cpp | 7 + .../mqb/mqbmock/mqbmock_storagemanager.h | 5 + src/integration-tests/test_domain_remove.py | 98 +++++++++++--- 26 files changed, 489 insertions(+), 178 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index dee58e76e..d93d2acec 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -657,6 +657,9 @@ Application::getRelevantCluster(bsl::ostream& errorDescription, else if (domains.isReconfigureValue()) { domainName = domains.reconfigure().domain(); } + else if (domains.isRemoveValue()) { + domainName = domains.remove().domain(); + } else { errorDescription << "Cannot extract cluster for that command"; return NULL; // RETURN diff --git a/src/groups/mqb/mqba/mqba_commandrouter.cpp b/src/groups/mqb/mqba/mqba_commandrouter.cpp index b69d7ef02..3dc3e9210 100644 --- a/src/groups/mqb/mqba/mqba_commandrouter.cpp +++ b/src/groups/mqb/mqba/mqba_commandrouter.cpp @@ -315,6 +315,10 @@ void CommandRouter::setCommandRoutingMode() d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode()); // DOMAINS RECONFIGURE } + else if (domains.isRemoveValue()) { + d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode()); + // DOMAINS REMOVE [finalize] + } } else if (commandChoice.isClustersValue()) { const mqbcmd::ClustersCommand& clusters = commandChoice.clusters(); diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index c8a311a7d..f6c764d8e 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -709,7 +709,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, if (0 != locateOrCreateDomain(&domainSp, name)) { bmqu::MemOutStream os; - os << "Trying to remove a nonexistent domain '" << name << "'"; + os << "Domain '" << name << "' doesn't exist"; result->makeError().message() = os.str(); return -1; // RETURN } @@ -719,95 +719,82 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name << "' called!!!"; - // TODO: broadcast to all other nodes - - // 1. Reject if there's any opened queue - if (domainSp->hasActiveQueue()) { + // 1. Reject if there's any opened or opening queue + if (!domainSp->tryRemove()) { bmqu::MemOutStream os; os << "Trying to remove the domain '" << name - << "' while there are queues open"; + << "' while there are queues opened or opening"; result->makeError().message() = os.str(); return -1; // RETURN } - BALL_LOG_INFO << "BEFORE CHEKCING CLUSTER STATUS"; - - // 2. Reject if the state of cluster is not healthy - // Notice that the bad state can happen anywhere down the road, - // so this check is not enough to prevent a partial execution. - // It's inevitable, so we can only make sure the code doesn't - // break if we run this command again - // TODO: ask if this is necessary??? - mqbi::Cluster* cluster = domainSp->cluster(); - mqbcmd::ClusterResult clusterStatusResult; - mqbcmd::ClusterCommand clusterStatusCommand; - clusterStatusCommand.makeStatus(); - - int rc = cluster->processCommand(&clusterStatusResult, - clusterStatusCommand); - if (clusterStatusResult.isErrorValue()) { - result->makeError(clusterStatusResult.error()); - return rc; // RETURN - } + // 2. Mark DOMAIN PREREMOVE to block openQueue requests + domainSp->removeDomainReset(); + + BALL_LOG_INFO << "BEFORE PURGE"; - BALL_LOG_INFO << "AFTER CHEKCING CLUSTER STATUS"; - BALL_LOG_INFO << clusterStatusResult.clusterStatus(); + // 3. Purge inactive queues + // remove virtual storage; add a record in journal file + mqbcmd::DomainResult domainResult; + mqbcmd::ClusterResult clusterResult; + mqbi::Cluster* cluster = domainSp->cluster(); - BSLS_ASSERT_SAFE(clusterStatusResult.isClusterStatusValue()); + cluster->purgeQueueOnDomain(&clusterResult, name); - if (!clusterStatusResult.clusterStatus().isHealthy()) { - bmqu::MemOutStream os; - os << "Domain '" << name << "' in cluster '" << name - << "' is not healthy"; - result->makeError().message() = os.str(); + if (clusterResult.isErrorValue()) { + result->makeError(clusterResult.error()); return -1; // RETURN } - BALL_LOG_INFO << "BEFORE SETTING 'DELETED' FLAG"; + BSLS_ASSERT_SAFE(clusterResult.isStorageResultValue()); + BSLS_ASSERT_SAFE( + clusterResult.storageResult().isPurgedQueuesValue()); - // 3. Mark DOMAIN REMOVING to Block all incoming openQueue requests - // TODO: this idea requires a lot of getters and setters exposed in - // mqbi - better idea? - domainSp->removeDomainStart(); + mqbcmd::PurgedQueues& purgedQueues = + domainResult.makePurgedQueues(); + purgedQueues.queues() = + clusterResult.storageResult().purgedQueues().queues(); + result->makeDomainResult(domainResult); - BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE"; + BALL_LOG_INFO << "BEFORE GC"; - // 4. Mark domain for delete in domainResolver - d_domainResolver_mp->clearCache(name); + // 4. Force GC queues + // unregister Queue from domain; + // remove queue storage from partition + mqbcmd::ClusterResult clusterForceGCResult; + int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); + if (clusterForceGCResult.isErrorValue()) { + result->makeError(clusterForceGCResult.error()); + return -1; // RETURN + } - BALL_LOG_INFO << "BEFORE PURGE"; + // 5. Mark DOMAIN REMOVED to accecpt the second pass - // 5. Purge inactive queues - // remove virtual storage; add a record in journal file - mqbcmd::DomainCommand domainCommand; - domainCommand.makePurge(); + BALL_LOG_INFO << "BEFORE WAIT FOR TEARDOWN"; - mqbcmd::DomainResult domainResult; - rc = domainSp->processCommand(&domainResult, domainCommand); + bmqu::SharedResource self(this); + bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC); - if (domainResult.isErrorValue()) { - result->makeError(domainResult.error()); - return rc; // RETURN - } - else if (domainResult.isSuccessValue()) { - result->makeSuccess(domainResult.success()); - return rc; // RETURN - } - result->makeDomainResult(domainResult); + domainSp->teardownRemove(bdlf::BindUtil::bind( + bmqu::WeakMemFnUtil::weakMemFn(&DomainManager::onDomainClosed, + self.acquireWeak()), + bdlf::PlaceHolders::_1, // Domain Name + &latch)); - // 6. Force GC queues - // remove Queue from domain; remove storage from partition - // CLUSTERS CLUSTER FORCE_GC_QUEUES - // TODO: Do we want to have add another type of result? - mqbcmd::ClusterResult clusterForceGCResult; - rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); - if (clusterForceGCResult.isErrorValue()) { - result->makeError(clusterForceGCResult.error()); - return -1; // RETURN + bsls::TimeInterval timeout = + bmqsys::Time::nowMonotonicClock().addSeconds(5); + + rc = latch.timedWait(timeout); + if (0 != rc) { + BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << 5 + << " seconds. rc: " << rc << "."; + return rc; } - // 7. Mark DOMAIN REMOVED to accecpt the second pass - domainSp->removeDomainCompleted(); + BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE"; + + // 6. Mark domain for delete in domainResolver + d_domainResolver_mp->clearCache(name); } // Second pass else { @@ -818,6 +805,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, result->makeSuccess(); return 0; // RETURN } + return 0; } diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 1ab0deecb..96c5bd6d0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -441,12 +441,10 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, d_throttledFailedAckMessages, BALL_LOG_INFO << description() << ": failed Ack " << "[status: " << status << ", source: '" << source - << "'" - << ", correlationId: " << correlationId + << "'" << ", correlationId: " << correlationId << ", GUID: " << messageGUID << ", queue: '" << (found ? uri : "** null **") << "' " - << "(id: " << queueId << ")] " - << "to node " + << "(id: " << queueId << ")] " << "to node " << nodeSession->clusterNode()->nodeDescription();); } @@ -455,8 +453,8 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, << "[status: " << status << ", source: '" << source << "'" << ", correlationId: " << correlationId << ", GUID: " << messageGUID << ", queue: '" << uri - << "' (id: " << queueId << ")] to " - << "node " << nodeSession->clusterNode()->nodeDescription(); + << "' (id: " << queueId << ")] to " << "node " + << nodeSession->clusterNode()->nodeDescription(); // Update stats for the queue (or subStream of the queue) // TBD: We should collect all invalid stats (i.e. stats for queues that @@ -494,8 +492,7 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, d_throttledDroppedAckMessages, BALL_LOG_ERROR << description() << ": dropping ACK message " << "[status: " << status << ", source: '" << source - << "'" - << ", correlationId: " << correlationId + << "'" << ", correlationId: " << correlationId << ", GUID: " << messageGUID << ", queueId: " << queueId << "] to node " << nodeSession->clusterNode()->nodeDescription() @@ -3694,9 +3691,11 @@ void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); // 'true' implies immediate - if (const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues( - true, - domainName)) { + const int rc = + d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName); + if (rc == -1 || rc == -3) { + // TBD: We allow the node to not be an active primary for *any* + // partition; this has to be changed once we allow leader != primary BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: " << rc << ")"; result->makeError().message() = "Failed to execute command (rc: " + @@ -3704,11 +3703,29 @@ void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, } else { // Otherwise the command succeeded. - BALL_LOG_INFO << "SUCCESS in Cluster::gcQueueOnDomainDispatched"; result->makeSuccess(); } } +void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) +{ + // exected by *ANY* thread + + mqbcmd::StorageResult storageResult; + + dispatcher()->execute( + bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain, + d_storageManager_mp.get(), + &storageResult, + domainName), + this); + + dispatcher()->synchronize(this); + + result->makeStorageResult(storageResult); +} + void Cluster::printClusterStateSummary(bsl::ostream& out, int level, int spacesPerLevel) const diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 57168c7e7..534c8eb78 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -673,6 +673,11 @@ class Cluster : public mqbi::Cluster, /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Purge queues in this cluster on a given domain. + void + purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 080bf8de5..25d608fd6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -784,7 +784,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event, << "Received unexpected event: " << event; BSLS_ASSERT_SAFE(false && "Unexpected event received"); return; // RETURN - } // break; + } // break; default: { BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description() << "Received unknown event: " << event; @@ -1342,6 +1342,15 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out) loadQueuesInfo(&clusterProxyStatus.queuesInfo()); } +void ClusterProxy::purgeQueueOnDomain( + mqbcmd::ClusterResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + bmqu::MemOutStream os; + os << "MockCluster::gcQueueOnDomain not implemented!"; + result->makeError().message() = os.str(); +} + int ClusterProxy::gcQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) @@ -1419,31 +1428,31 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event) BSLS_ASSERT_OPT(false && "'DISPATCHER' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_CLUSTER_STATE: { BSLS_ASSERT_OPT(false && "'CLUSTER_STATE' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_STORAGE: { BSLS_ASSERT_OPT(false && "'STORAGE' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_RECOVERY: { BSLS_ASSERT_OPT(false && "'RECOVERY' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_UNDEFINED: { BSLS_ASSERT_OPT(false && "'UNDEFINED' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: { BSLS_ASSERT_OPT( false && "'REPLICATION_RECEIPT' type dispatcher event unexpected"); return; // RETURN - } // break; + } // break; default: { BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description() << ": received unexpected dispatcher event: " << event; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 8e0483074..096b59130 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -539,6 +539,11 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Purge queues in this cluster on a given domain. + void + purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index f9a817972..05949eca9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -6005,6 +6005,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, rc_SUCCESS = 0, rc_CLUSTER_IS_STOPPING = -1, rc_SELF_IS_NOT_PRIMARY = -2, + rc_SELF_IS_NOT_LEADER = -3, }; if (d_cluster_p->isStopping()) { @@ -6178,7 +6179,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, d_primaryNotLeaderAlarmRaised = true; } - return rc_SUCCESS; // RETURN + return rc_SELF_IS_NOT_LEADER; // RETURN } for (size_t i = 0; i < queuesToGc.size(); ++i) { diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 701cc04fd..8fc836000 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -358,6 +358,7 @@ Domain::Domain(const bsl::string& name, , d_queues(allocator) , d_pendingRequests(0) , d_teardownCb() +, d_teardownRemoveCb() , d_mutex() { if (d_cluster_sp->isRemote()) { @@ -515,6 +516,34 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) } } +void Domain::teardownRemove(const TeardownCb& teardownCb) +{ + BSLS_ASSERT_SAFE(d_state != e_REMOVING && d_state != e_REMOVED); + BSLS_ASSERT_SAFE(!d_teardownRemoveCb); + BSLS_ASSERT_SAFE(teardownCb); + + bslmt::LockGuard guard(&d_mutex); // d_mutex LOCKED + + BALL_LOG_INFO << "Removing domain '" << d_name << "' having " + << d_queues.size() << " registered queues."; + + d_teardownRemoveCb = teardownCb; + d_state = e_REMOVING; + + d_cluster_sp->unregisterStateObserver(this); + + if (d_queues.empty()) { + d_teardownRemoveCb(d_name); + d_state = e_REMOVED; + return; // RETURN + } + + for (QueueMap::iterator it = d_queues.begin(); it != d_queues.end(); + ++it) { + it->second->close(); + } +} + void Domain::openQueue( const bmqt::Uri& uri, const bsl::shared_ptr& clientContext, @@ -527,30 +556,36 @@ void Domain::openQueue( // PRECONDITIONS BSLS_ASSERT_SAFE(uri.asString() == handleParameters.uri()); - if (d_state != e_STARTED) { - // Reject this open-queue request with a soft failure status. + { + bslmt::LockGuard guard(&d_mutex); // LOCK - bmqp_ctrlmsg::Status status; + if (d_state != e_STARTED) { + // Reject this open-queue request with a soft failure status. - if (d_state == e_REMOVING || d_state == e_REMOVED) { - status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; - status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; - status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; - } - else { - status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; - status.code() = mqbi::ClusterErrorCode::e_STOPPING; - status.message() = k_NODE_IS_STOPPING; + bmqp_ctrlmsg::Status status; + + if (d_state == e_PREREMOVE || d_state == e_REMOVING || + d_state == e_REMOVED) { + status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; + status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; + status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; + } + else { + status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; + status.code() = mqbi::ClusterErrorCode::e_STOPPING; + status.message() = k_NODE_IS_STOPPING; + } + + callback(status, + static_cast(0), + bmqp_ctrlmsg::OpenQueueResponse(), + mqbi::Cluster::OpenQueueConfirmationCookie()); + return; // RETURN } - callback(status, - static_cast(0), - bmqp_ctrlmsg::OpenQueueResponse(), - mqbi::Cluster::OpenQueueConfirmationCookie()); - return; // RETURN + ++d_pendingRequests; } - ++d_pendingRequests; d_cluster_sp->openQueue( uri, this, @@ -703,6 +738,14 @@ void Domain::unregisterQueue(mqbi::Queue* queue) d_state = e_STOPPED; } } + else if (d_state == e_REMOVING) { + BSLS_ASSERT_SAFE(d_teardownRemoveCb); + + if (d_queues.empty()) { + d_teardownRemoveCb(d_name); + d_state = e_REMOVED; + } + } } int Domain::processCommand(mqbcmd::DomainResult* result, @@ -876,18 +919,12 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainStart() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_REMOVING; -} - -void Domain::removeDomainCompleted() +void Domain::removeDomainReset() { bslmt::LockGuard guard(&d_mutex); // LOCK - d_state = e_REMOVED; + d_state = e_PREREMOVE; + d_teardownRemoveCb = nullptr; } // ACCESSORS @@ -985,23 +1022,29 @@ void Domain::loadRoutingConfiguration( } } -bool Domain::hasActiveQueue() const +bool Domain::tryRemove() const { - // Queues are created before handles, so if d_queue is empty, - // there's shouldn't be any active handle - if (d_queues.empty()) { + bslmt::LockGuard guard(&d_mutex); // LOCK + + if (d_pendingRequests != 0) { return false; } - // If there's queue in this domain, check to see if there's - // any handle to it - for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { - if (it->second->hasActiveHandle()) { - return true; + // If there's queue in this domain, check to see if there's any active + // handle to it + if (!d_queues.empty()) { + for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { + // Looks like in RootQueueEngine::releaseHandle, queueHandle is + // removed and r/w counts reset (in `proctor.releaseHandle`) before + // substreams are unregistered; should we check substream? + // handle->subStreamInfos().size() == 0 + if (it->second->hasActiveHandle()) { + return false; + } } } - return false; + return true; } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 83cff6358..b408286f3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -109,11 +109,12 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, typedef AppInfos::const_iterator AppInfosCIter; enum DomainState { - e_STARTED = 0, - e_STOPPING = 1, - e_STOPPED = 2, - e_REMOVING = 3, - e_REMOVED = 4 + e_STARTED = 0, + e_STOPPING = 1, + e_STOPPED = 2, + e_PREREMOVE = 3, + e_REMOVING = 4, + e_REMOVED = 5 }; private: @@ -179,6 +180,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, // non-null only if 'd_state == // DomainStats::e_STOPPING'. + mqbi::Domain::TeardownCb d_teardownRemoveCb; + mutable bslmt::Mutex d_mutex; // Mutex for protecting the queues // map @@ -284,6 +287,14 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, void teardown(const mqbi::Domain::TeardownCb& teardownCb) BSLS_KEYWORD_OVERRIDE; + /// Teardown this `Domain` instance and invoke the specified + /// `teardownCb` callback when done. This method is called during + /// DOMAIN REMOVE command to offer Domain an opportunity to + /// sync, serialize it's queues in a graceful manner. Note: the domain is + /// in charge of all the queues it owns, and hence must stop them if needs + /// be. + void teardownRemove(const TeardownCb& teardownCb) BSLS_KEYWORD_OVERRIDE; + /// Create/Open with the specified `handleParameters` the queue having /// the specified `uri` for the requester client represented with the /// specified `clientContext`. Invoke the specified `callback` with the @@ -322,10 +333,7 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; /// Mark the state of domain to be REMOVING - void removeDomainStart() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be REMOVED - void removeDomainCompleted() BSLS_KEYWORD_OVERRIDE; + void removeDomainReset() BSLS_KEYWORD_OVERRIDE; // ACCESSORS @@ -363,9 +371,9 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, void loadRoutingConfiguration(bmqp_ctrlmsg::RoutingConfiguration* config) const BSLS_KEYWORD_OVERRIDE; - /// Check the state of the queues in this domain, return true if - /// there's queue with valid queue handles. - bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE; + /// Check the state of the queues in this domain, return false if there's + /// queues opened or opening. + bool tryRemove() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 44b3c4356..2f8572cfe 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -2144,6 +2144,24 @@ void StorageManager::gcUnrecognizedDomainQueues() d_unrecognizedDomains); } +int StorageManager::purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) +{ + // executed by cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_dispatcher_p->inDispatcherThread(&d_clusterData_p->cluster())); + + mqbc::StorageUtil::purgeQueueOnDomain(result, + domainName, + &d_fileStores, + &d_storages, + &d_storagesLock); + + return 0; +} + // ACCESSORS bool StorageManager::isStorageEmpty(const bmqt::Uri& uri, int partitionId) const diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index d1c2b7b59..e6fab0df9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -671,6 +671,11 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager { /// GC the queues from unrecognized domains, if any. void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE; + /// Purge the queues on a given domain. + int + purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + // ACCESSORS /// Return the processor handle in charge of the specified diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 718eec5b1..3f7547cd3 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -4498,6 +4498,24 @@ void StorageManager::gcUnrecognizedDomainQueues() d_unrecognizedDomains); } +int StorageManager::purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) +{ + // executed by cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_dispatcher_p->inDispatcherThread(&d_clusterData_p->cluster())); + + StorageUtil::purgeQueueOnDomain(result, + domainName, + &d_fileStores, + &d_storages, + &d_storagesLock); + + return 0; +} + mqbs::FileStore& StorageManager::fileStore(int partitionId) { // PRECONDITIONS diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index e210f1135..c530ff6ec 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -999,6 +999,11 @@ class StorageManager BSLS_KEYWORD_FINAL /// GC the queues from unrecognized domains, if any. void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE; + /// Purge the queues on a given domain. + int + purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Return partition corresponding to the specified `partitionId`. The /// behavior is undefined if `partitionId` does not represent a valid /// partition id. Note, this modifiable reference to partition is only diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 7cca27ca0..78b5980b5 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -699,6 +699,37 @@ void StorageUtil::executeForEachPartitions(const PerPartitionFunctor& job, latch.wait(); } +void StorageUtil::executeForValidPartitions(const PerPartitionFunctor& job, + const FileStores& fileStores) +{ + // executed by cluster *DISPATCHER* thread + + bsl::vector validPartitionIds; + validPartitionIds.reserve(fileStores.size()); + + for (unsigned int i = 0; i < fileStores.size(); ++i) { + FileStoreSp fileStore = fileStores[i]; + if (fileStore->primaryNode() && fileStore->primaryNode()->nodeId() == + fileStore->config().nodeId()) { + validPartitionIds.push_back(i); + } + } + + bslmt::Latch latch(validPartitionIds.size()); + + BALL_LOG_INFO << "StorageUtil::executeForValidPartitions for " + << fileStores.size() << " partitions!"; + + for (unsigned int i = 0; i < validPartitionIds.size(); ++i) { + int partitionId = validPartitionIds[i]; + fileStores[partitionId]->execute( + bdlf::BindUtil::bind(job, partitionId, &latch)); + } + + // Wait + latch.wait(); +} + int StorageUtil::processReplicationCommand( mqbcmd::ReplicationResult* replicationResult, int* replicationFactor, @@ -3884,5 +3915,42 @@ void StorageUtil::forceIssueAdvisoryAndSyncPt(mqbc::ClusterData* clusterData, } } +void StorageUtil::purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName, + FileStores* fileStores, + StorageSpMapVec* storageMapVec, + bslmt::Mutex* storagesLock) +{ + bsl::vector > purgedQueuesVec; + purgedQueuesVec.resize(fileStores->size()); + + // To purge a domain, we have to purge queues in each partition + // where the current node is the primary + // from the correct thread. This is achieved by parallel launch + // of `purgeDomainDispatched` across all valid FileStore's threads. + // We need to wait here, using `latch`, until the command completes + // in all valid threads. + executeForValidPartitions( + bdlf::BindUtil::bind(&purgeDomainDispatched, + &purgedQueuesVec, + bdlf::PlaceHolders::_2, // latch + bdlf::PlaceHolders::_1, // partitionId + storageMapVec, + storagesLock, + fileStores, + domainName), + *fileStores); + + mqbcmd::PurgedQueues& purgedQueues = result->makePurgedQueues(); + for (size_t i = 0; i < purgedQueuesVec.size(); ++i) { + const bsl::vector& purgedQs = + purgedQueuesVec[i]; + + purgedQueues.queues().insert(purgedQueues.queues().begin(), + purgedQs.begin(), + purgedQs.end()); + } +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index 993bdf2d2..105846dc5 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -351,6 +351,16 @@ struct StorageUtil { static void executeForEachPartitions(const PerPartitionFunctor& job, const FileStores& fileStores); + /// For each partition which has the current node as the primary, + /// Execute the specified `job` in the specified `fileStores`. + /// Each partition will receive its partitionId and a latch + /// along with the `job`. Each valid partition *must* call + /// `latch->arrive()` after it has finished executing the `job`. + /// + /// THREAD: Executed by the cluster-dispatcher thread. + static void executeForValidPartitions(const PerPartitionFunctor& job, + const FileStores& fileStores); + /// Process the specified `command`, and load the result to the /// specified `replicationResult`. The command might modify the /// specified `replicationFactor` and the corresponding value in each @@ -792,6 +802,13 @@ struct StorageUtil { mqbs::FileStore* fs, mqbnet::ClusterNode* destination, const PartitionInfo& pinfo); + + /// Purge the queues on a given domain. + static void purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName, + FileStores* fileStores, + StorageSpMapVec* storageMapVec, + bslmt::Mutex* storagesLock); }; template <> diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 7904aeacc..894642502 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -371,6 +371,10 @@ class Cluster : public DispatcherClient { /// Load the cluster state to the specified `out` object. virtual void loadClusterStatus(mqbcmd::ClusterResult* out) = 0; + /// Purge queues in this cluster on a given domain. + virtual void purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) = 0; + /// Force GC queues in this cluster on a given domain. virtual int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) = 0; diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index 183724f32..96bf615ad 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -142,6 +142,14 @@ class Domain { /// be. virtual void teardown(const TeardownCb& teardownCb) = 0; + /// Teardown this `Domain` instance and invoke the specified + /// `teardownCb` callback when done. This method is called during + /// DOMAIN REMOVE command to offer Domain an opportunity to + /// sync, serialize it's queues in a graceful manner. Note: the domain is + /// in charge of all the queues it owns, and hence must stop them if needs + /// be. + virtual void teardownRemove(const TeardownCb& teardownCb) = 0; + /// Create/Open with the specified `handleParameters` the queue having /// the specified `uri` for the requester client represented with the /// specified `clientContext`. Invoke the specified `callback` with the @@ -178,10 +186,7 @@ class Domain { const mqbcmd::DomainCommand& command) = 0; /// Mark the state of domain to be REMOVING - virtual void removeDomainStart() = 0; - - /// Mark the state of domain to be REMOVED - virtual void removeDomainCompleted() = 0; + virtual void removeDomainReset() = 0; // ACCESSORS @@ -219,9 +224,9 @@ class Domain { virtual void loadRoutingConfiguration( bmqp_ctrlmsg::RoutingConfiguration* config) const = 0; - /// Check the state of the queues in this domain, return true if - /// there's queue with valid queue handles. - virtual bool hasActiveQueue() const = 0; + /// Check the state of the queues in this domain, return false if there's + /// queues opened or opening. + virtual bool tryRemove() const = 0; }; // =================== diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index 1c492202a..8a142d81f 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -403,6 +403,10 @@ class StorageManager { /// GC the queues from unrecognized domains, if any. virtual void gcUnrecognizedDomainQueues() = 0; + /// Purge the queues on a given domain. + virtual int purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) = 0; + // ACCESSORS /// Return the processor handle in charge of the specified diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 14c63e8e2..82ffc442d 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -489,6 +489,15 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* out) out->makeClusterStatus(); } +void Cluster::purgeQueueOnDomain( + mqbcmd::ClusterResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + bmqu::MemOutStream os; + os << "MockCluster::gcQueueOnDomain not implemented!"; + result->makeError().message() = os.str(); +} + int Cluster::gcQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index 89c35d283..eb36702e5 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -402,6 +402,11 @@ class Cluster : public mqbi::Cluster { /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Purge queues in this cluster on a given domain. + void + purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index 699e03b94..35c8a2459 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -82,6 +82,12 @@ void Domain::teardown( // NOTHING } +void Domain::teardownRemove( + BSLS_ANNOTATION_UNUSED const mqbi::Domain::TeardownCb& teardownCb) +{ + // NOTHING +} + void Domain::openQueue( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const @@ -149,12 +155,7 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainStart() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - -void Domain::removeDomainCompleted() +void Domain::removeDomainReset() { BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); } @@ -230,7 +231,7 @@ void Domain::loadRoutingConfiguration( // NOTHING } -bool Domain::hasActiveQueue() const +bool Domain::tryRemove() const { return true; } diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index 9d52f210f..8b26cdf02 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -165,6 +165,8 @@ class Domain : public mqbi::Domain { /// Do some logging. void teardown(const Domain::TeardownCb& teardownCb) BSLS_KEYWORD_OVERRIDE; + void teardownRemove(const TeardownCb& teardownCb) BSLS_KEYWORD_OVERRIDE; + /// Create/Open with the specified `handleParameters` the queue having /// the specified `uri` for the requester client represented with the /// specified `clientContext`. Invoke the specified `callback` with the @@ -210,10 +212,7 @@ class Domain : public mqbi::Domain { const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; /// Mark the state of domain to be REMOVING - void removeDomainStart() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be REMOVED - void removeDomainCompleted() BSLS_KEYWORD_OVERRIDE; + void removeDomainReset() BSLS_KEYWORD_OVERRIDE; /// Load into the specified `out`, if `out` is not 0, the queue /// corresponding to the specified `uri`, if found. Return 0 on success, @@ -249,9 +248,9 @@ class Domain : public mqbi::Domain { void loadRoutingConfiguration(bmqp_ctrlmsg::RoutingConfiguration* config) const BSLS_KEYWORD_OVERRIDE; - /// Check the state of the current domain, return true if it's - /// active and accepts incoming connections. - bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE; + /// Check the state of the queues in this domain, return false if there's + /// queues opened or opening. + bool tryRemove() const BSLS_KEYWORD_OVERRIDE; }; // =================== diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp index cf4ff91c2..7a1f556c6 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp @@ -276,6 +276,13 @@ void StorageManager::gcUnrecognizedDomainQueues() // NOTHING } +int StorageManager::purgeQueueOnDomain( + BSLS_ANNOTATION_UNUSED mqbcmd::StorageResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + return 0; +} + // ACCESSORS mqbi::Dispatcher::ProcessorHandle StorageManager::processorForPartition( BSLS_ANNOTATION_UNUSED int partitionId) const diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h index a1e5c08a8..bbcdb33f7 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h @@ -269,6 +269,11 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager { /// GC the queues from unrecognized domains, if any. void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE; + /// Purge the queues on a given domain. + int + purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + // ACCESSORS /// Return the processor handle in charge of the specified diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 11c89d5f1..8d6a2d652 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -59,8 +59,6 @@ def test_remove_domain_with_queue_close(cluster: Cluster): leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - cluster._logger.info("=========================") - cluster._logger.info(res) assert "while there are queues open" not in res @@ -73,40 +71,52 @@ def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): replicas = multi_node.nodes(exclude=leader) member = replicas[0] - # producer send a message, client confirm, then both close connection - producer = proxy.create_client("producer") - producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + def write_messages(uri): + # producer send a message, client confirm, then both close connection + producer = proxy.create_client("producer") + producer.open(uri, flags=["write"], succeed=True) - consumer = proxy.create_client("consumer") - consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + consumer = proxy.create_client("consumer") + consumer.open(uri, flags=["read"], succeed=True) - producer.post( - tc.URI_PRIORITY, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True - ) + producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) - consumer.confirm(tc.URI_PRIORITY, "+1", succeed=True) + consumer.confirm(uri, "+1", succeed=True) - producer.close(tc.URI_PRIORITY, succeed=True) - consumer.close(tc.URI_PRIORITY, succeed=True) + producer.close(uri, succeed=True) + consumer.close(uri, succeed=True) + + write_messages(tc.URI_PRIORITY) # set quorum to make it impossible to select a leader for node in multi_node.nodes(): node.set_quorum(99, succeed=True) - # klll the leader to make the cluster unhealthy + # kill the leader to make the cluster unhealthy leader.check_exit_code = False leader.kill() leader.wait() - # wait for the state to catch up - time.sleep(11) - # send remove domain admin command # command couldn't go through since state is unhealthy admin = AdminClient() admin.connect(member.config.host, int(member.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "is not healthy" in res + assert "Error occurred routing command to this node" in res + assert res.split("\n").count("No queue purged.") == 3 + + # restart the previous leader node + # set quorum to make a member become the leader + # wait until the cluster become healthy again + leader.start() + leader.wait() + replicas[1].set_quorum(1) + leader.wait_status(wait_leader=True, wait_ready=False) + + # send DOMAINS REMOVE admin command again + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 4 message(s) for a total of 16 B from 1 queue(s):" in res + assert res.split("\n").count("No queue purged.") == 3 def test_remove_different_domain(cluster: Cluster): @@ -265,8 +275,9 @@ def test_remove_domain_not_on_disk(cluster: Cluster): admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin("DOMAINS REMOVE domain.foo") - assert "Trying to remove a nonexistent domain" in res + domain_name = "domain.foo" + res = admin.send_admin(f"DOMAINS REMOVE {domain_name}") + assert f"Domain '{domain_name}' doesn't exist" in res def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): @@ -275,3 +286,50 @@ def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_BROADCAST}") assert "Trying to remove a nonexistent domain" not in res + + +def test_send_to_replicas(multi_node: Cluster): + proxies = multi_node.proxy_cycle() + proxy = next(proxies) + + queue1 = f"bmq://{tc.DOMAIN_PRIORITY}/q1" + queue2 = f"bmq://{tc.DOMAIN_PRIORITY}/q2" + + # producer and consumer open the queue, + # post and confirm messages and both close + producer = proxy.create_client("producer") + producer.open(queue1, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(queue1, flags=["read"], succeed=True) + + producer.post( + queue1, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(queue1, "*", succeed=True) + producer.close(queue1, succeed=True) + consumer.close(queue1, succeed=True) + + # producer open another queue, should be on a different partition + producer.open(queue2, flags=["write"], succeed=True) + producer.post( + queue2, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + producer.close(queue2, succeed=True) + + leader = multi_node.last_known_leader + member = multi_node.nodes(exclude=leader)[0] + + # send remove domain admin command + # command couldn't go through since there's a queue open + admin = AdminClient() + admin.connect(member.config.host, int(member.config.port)) + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged" in res