From 035193e3effb2d961a7f98815fe83adcad0ee680 Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Thu, 26 Dec 2024 14:47:20 -0500 Subject: [PATCH] Add routing broadcast and update open queue check --- src/groups/mqb/mqba/mqba_application.cpp | 13 ++-- src/groups/mqb/mqba/mqba_commandrouter.cpp | 4 ++ src/groups/mqb/mqba/mqba_domainmanager.cpp | 39 ++++++----- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 28 ++++++-- src/groups/mqb/mqbblp/mqbblp_cluster.h | 5 ++ src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 9 +++ src/groups/mqb/mqbblp/mqbblp_clusterproxy.h | 5 ++ .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 3 +- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 67 ++++++++++-------- src/groups/mqb/mqbblp/mqbblp_domain.h | 2 +- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 18 +++++ src/groups/mqb/mqbblp/mqbblp_storagemanager.h | 5 ++ src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 18 +++++ src/groups/mqb/mqbc/mqbc_storagemanager.h | 5 ++ src/groups/mqb/mqbc/mqbc_storageutil.cpp | 68 +++++++++++++++++++ src/groups/mqb/mqbc/mqbc_storageutil.h | 17 +++++ src/groups/mqb/mqbi/mqbi_cluster.h | 4 ++ src/groups/mqb/mqbi/mqbi_domain.h | 2 +- src/groups/mqb/mqbi/mqbi_storagemanager.h | 4 ++ src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 9 +++ src/groups/mqb/mqbmock/mqbmock_cluster.h | 5 ++ src/groups/mqb/mqbmock/mqbmock_domain.cpp | 2 +- src/groups/mqb/mqbmock/mqbmock_domain.h | 2 +- .../mqb/mqbmock/mqbmock_storagemanager.cpp | 7 ++ .../mqb/mqbmock/mqbmock_storagemanager.h | 5 ++ src/integration-tests/test_domain_remove.py | 51 ++++++++++++++ 26 files changed, 339 insertions(+), 58 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index dee58e76ef..b18f3a218c 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -199,11 +199,13 @@ Application::Application(bdlmt::EventScheduler* scheduler, // Print banner BALL_LOG_INFO - << "Starting" << "\n ____ __ __ ___ _ _" + << "Starting" + << "\n ____ __ __ ___ _ _" << "\n | __ )| \\/ |/ _ \\| |__ _ __ ___ | | _____ _ __" << "\n | _ \\| |\\/| | | | | '_ \\| '__/ _ \\| |/ / _ \\ '__|" << "\n | |_) | | | | |_| | |_) | | | (_) | < __/ |" - << "\n |____/|_| |_|\\__\\_\\_.__/|_| \\___/|_|\\_\\___|_|" << "\n" + << "\n |____/|_| |_|\\__\\_\\_.__/|_| \\___/|_|\\_\\___|_|" + << "\n" << "\n Instance..............: " << brkrCfg.brokerInstanceName() << "\n Version...............: " << brkrCfg.brokerVersion() << "\n Build Type............: " << MQBA_STRINGIFY(BMQ_BUILD_TYPE) @@ -657,6 +659,9 @@ Application::getRelevantCluster(bsl::ostream& errorDescription, else if (domains.isReconfigureValue()) { domainName = domains.reconfigure().domain(); } + else if (domains.isRemoveValue()) { + domainName = domains.remove().domain(); + } else { errorDescription << "Cannot extract cluster for that command"; return NULL; // RETURN @@ -822,8 +827,8 @@ int Application::processCommand(const bslstl::StringRef& source, mqbcmd::Command command; bsl::string parseError; if (const int rc = mqbcmd::ParseUtil::parse(&command, &parseError, cmd)) { - os << "Unable to decode command " << "(rc: " << rc << ", error: '" - << parseError << "')"; + os << "Unable to decode command " + << "(rc: " << rc << ", error: '" << parseError << "')"; return rc + 10 * rc_PARSE_ERROR; // RETURN } diff --git a/src/groups/mqb/mqba/mqba_commandrouter.cpp b/src/groups/mqb/mqba/mqba_commandrouter.cpp index 48fa28eb11..750c5419ef 100644 --- a/src/groups/mqb/mqba/mqba_commandrouter.cpp +++ b/src/groups/mqb/mqba/mqba_commandrouter.cpp @@ -314,6 +314,10 @@ void CommandRouter::setCommandRoutingMode() d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode()); // DOMAINS RECONFIGURE } + else if (domains.isRemoveValue()) { + d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode()); + // DOMAINS REMOVE [finalize] + } } else if (commandChoice.isClustersValue()) { const mqbcmd::ClustersCommand& clusters = commandChoice.clusters(); diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index c8a311a7de..7f022fcdb5 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -719,13 +719,11 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name << "' called!!!"; - // TODO: broadcast to all other nodes - - // 1. Reject if there's any opened queue - if (domainSp->hasActiveQueue()) { + // 1. Reject if there's any opened or opening queue + if (!domainSp->tryRemove()) { bmqu::MemOutStream os; os << "Trying to remove the domain '" << name - << "' while there are queues open"; + << "' while there are queues opened or opening"; result->makeError().message() = os.str(); return -1; // RETURN } @@ -777,34 +775,40 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, BALL_LOG_INFO << "BEFORE PURGE"; + // Purge -> Primary + // TODO: GC -> leader + // 5. Purge inactive queues // remove virtual storage; add a record in journal file - mqbcmd::DomainCommand domainCommand; - domainCommand.makePurge(); + mqbcmd::DomainResult domainResult; + mqbcmd::ClusterResult clusterResult; - mqbcmd::DomainResult domainResult; - rc = domainSp->processCommand(&domainResult, domainCommand); + domainSp->cluster()->purgeQueueOnDomain(&clusterResult, name); - if (domainResult.isErrorValue()) { - result->makeError(domainResult.error()); - return rc; // RETURN - } - else if (domainResult.isSuccessValue()) { - result->makeSuccess(domainResult.success()); + if (clusterResult.isErrorValue()) { + result->makeError(clusterResult.error()); return rc; // RETURN } + + BSLS_ASSERT_SAFE(clusterResult.isStorageResultValue()); + BSLS_ASSERT_SAFE( + clusterResult.storageResult().isPurgedQueuesValue()); + + mqbcmd::PurgedQueues& purgedQueues = + domainResult.makePurgedQueues(); + purgedQueues.queues() = + clusterResult.storageResult().purgedQueues().queues(); result->makeDomainResult(domainResult); // 6. Force GC queues // remove Queue from domain; remove storage from partition - // CLUSTERS CLUSTER FORCE_GC_QUEUES - // TODO: Do we want to have add another type of result? mqbcmd::ClusterResult clusterForceGCResult; rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); if (clusterForceGCResult.isErrorValue()) { result->makeError(clusterForceGCResult.error()); return -1; // RETURN } + // register and unresgiter queue // 7. Mark DOMAIN REMOVED to accecpt the second pass domainSp->removeDomainCompleted(); @@ -818,6 +822,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, result->makeSuccess(); return 0; // RETURN } + return 0; } diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index ef247ca9e7..b90a7ef9fe 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -3685,9 +3685,11 @@ void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); // 'true' implies immediate - if (const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues( - true, - domainName)) { + const int rc = + d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName); + if (rc == -1 || rc == -3) { + // TBD: We allow the node to not be an active primary for *any* + // partition; this has to be changed once we allow leader != primary BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: " << rc << ")"; result->makeError().message() = "Failed to execute command (rc: " + @@ -3695,11 +3697,29 @@ void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, } else { // Otherwise the command succeeded. - BALL_LOG_INFO << "SUCCESS in Cluster::gcQueueOnDomainDispatched"; result->makeSuccess(); } } +void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) +{ + // exected by *ANY* thread + + mqbcmd::StorageResult storageResult; + + dispatcher()->execute( + bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain, + d_storageManager_mp.get(), + &storageResult, + domainName), + this); + + dispatcher()->synchronize(this); + + result->makeStorageResult(storageResult); +} + void Cluster::printClusterStateSummary(bsl::ostream& out, int level, int spacesPerLevel) const diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 57168c7e77..534c8eb78c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -673,6 +673,11 @@ class Cluster : public mqbi::Cluster, /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Purge queues in this cluster on a given domain. + void + purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 2a0568fa81..43bfdd6b0f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -1338,6 +1338,15 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out) loadQueuesInfo(&clusterProxyStatus.queuesInfo()); } +void ClusterProxy::purgeQueueOnDomain( + mqbcmd::ClusterResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + bmqu::MemOutStream os; + os << "MockCluster::gcQueueOnDomain not implemented!"; + result->makeError().message() = os.str(); +} + int ClusterProxy::gcQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 8e0483074e..096b591301 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -539,6 +539,11 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Purge queues in this cluster on a given domain. + void + purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index daa1aafa6d..2524813fd2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -6153,6 +6153,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, rc_SUCCESS = 0, rc_CLUSTER_IS_STOPPING = -1, rc_SELF_IS_NOT_PRIMARY = -2, + rc_SELF_IS_NOT_LEADER = -3, }; if (d_cluster_p->isStopping()) { @@ -6326,7 +6327,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, d_primaryNotLeaderAlarmRaised = true; } - return rc_SUCCESS; // RETURN + return rc_SELF_IS_NOT_LEADER; // RETURN } for (size_t i = 0; i < queuesToGc.size(); ++i) { diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 3c394f9600..57e5f02c93 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -525,30 +525,35 @@ void Domain::openQueue( // PRECONDITIONS BSLS_ASSERT_SAFE(uri.asString() == handleParameters.uri()); - if (d_state != e_STARTED) { - // Reject this open-queue request with a soft failure status. + { + bslmt::LockGuard guard(&d_mutex); // LOCK - bmqp_ctrlmsg::Status status; + if (d_state != e_STARTED) { + // Reject this open-queue request with a soft failure status. - if (d_state == e_REMOVING || d_state == e_REMOVED) { - status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; - status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; - status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; - } - else { - status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; - status.code() = mqbi::ClusterErrorCode::e_STOPPING; - status.message() = k_NODE_IS_STOPPING; + bmqp_ctrlmsg::Status status; + + if (d_state == e_REMOVING || d_state == e_REMOVED) { + status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; + status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; + status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; + } + else { + status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; + status.code() = mqbi::ClusterErrorCode::e_STOPPING; + status.message() = k_NODE_IS_STOPPING; + } + + callback(status, + static_cast(0), + bmqp_ctrlmsg::OpenQueueResponse(), + mqbi::Cluster::OpenQueueConfirmationCookie()); + return; // RETURN } - callback(status, - static_cast(0), - bmqp_ctrlmsg::OpenQueueResponse(), - mqbi::Cluster::OpenQueueConfirmationCookie()); - return; // RETURN + ++d_pendingRequests; } - ++d_pendingRequests; d_cluster_sp->openQueue( uri, this, @@ -983,23 +988,29 @@ void Domain::loadRoutingConfiguration( } } -bool Domain::hasActiveQueue() const +bool Domain::tryRemove() const { - // Queues are created before handles, so if d_queue is empty, - // there's shouldn't be any active handle - if (d_queues.empty()) { + bslmt::LockGuard guard(&d_mutex); // LOCK + + if (d_pendingRequests != 0) { return false; } - // If there's queue in this domain, check to see if there's - // any handle to it - for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { - if (it->second->hasActiveHandle()) { - return true; + // If there's queue in this domain, check to see if there's any active + // handle to it + if (!d_queues.empty()) { + for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { + // check like this or call Queue::hasActiveHandle()? + const bmqp_ctrlmsg::QueueHandleParameters& handleParamerers = + it->second->handleParameters(); + if (handleParamerers.readCount() != 0 || + handleParamerers.writeCount() != 0) { + return false; + } } } - return false; + return true; } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 0bd822e96e..959292bcdd 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -364,7 +364,7 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, /// Check the state of the queues in this domain, return true if /// there's queue with valid queue handles. - bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE; + bool tryRemove() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 29a66d27bb..07f19730a2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -2110,6 +2110,24 @@ void StorageManager::gcUnrecognizedDomainQueues() d_unrecognizedDomains); } +int StorageManager::purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) +{ + // executed by cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_dispatcher_p->inDispatcherThread(&d_clusterData_p->cluster())); + + mqbc::StorageUtil::purgeQueueOnDomain(result, + domainName, + &d_fileStores, + &d_storages, + &d_storagesLock); + + return 0; +} + // ACCESSORS bool StorageManager::isStorageEmpty(const bmqt::Uri& uri, int partitionId) const diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index d1c2b7b595..e6fab0df9d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -671,6 +671,11 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager { /// GC the queues from unrecognized domains, if any. void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE; + /// Purge the queues on a given domain. + int + purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + // ACCESSORS /// Return the processor handle in charge of the specified diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index c97387cfc1..54d405e920 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -4457,6 +4457,24 @@ void StorageManager::gcUnrecognizedDomainQueues() d_unrecognizedDomains); } +int StorageManager::purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) +{ + // executed by cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_dispatcher_p->inDispatcherThread(&d_clusterData_p->cluster())); + + StorageUtil::purgeQueueOnDomain(result, + domainName, + &d_fileStores, + &d_storages, + &d_storagesLock); + + return 0; +} + mqbs::FileStore& StorageManager::fileStore(int partitionId) { // PRECONDITIONS diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index ca991c6205..a3c62a49b1 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -999,6 +999,11 @@ class StorageManager BSLS_KEYWORD_FINAL /// GC the queues from unrecognized domains, if any. void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE; + /// Purge the queues on a given domain. + int + purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Return partition corresponding to the specified `partitionId`. The /// behavior is undefined if `partitionId` does not represent a valid /// partition id. Note, this modifiable reference to partition is only diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 649ec946a9..995753800b 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -600,6 +600,37 @@ void StorageUtil::executeForEachPartitions(const PerPartitionFunctor& job, latch.wait(); } +void StorageUtil::executeForValidPartitions(const PerPartitionFunctor& job, + const FileStores& fileStores) +{ + // executed by cluster *DISPATCHER* thread + + bsl::vector validPartitionIds; + validPartitionIds.reserve(fileStores.size()); + + for (unsigned int i = 0; i < fileStores.size(); ++i) { + FileStoreSp fileStore = fileStores[i]; + if (fileStore->primaryNode() && fileStore->primaryNode()->nodeId() == + fileStore->config().nodeId()) { + validPartitionIds.push_back(i); + } + } + + bslmt::Latch latch(validPartitionIds.size()); + + BALL_LOG_INFO << "StorageUtil::executeForValidPartitions for " + << fileStores.size() << " partitions!"; + + for (unsigned int i = 0; i < validPartitionIds.size(); ++i) { + int partitionId = validPartitionIds[i]; + fileStores[partitionId]->execute( + bdlf::BindUtil::bind(job, partitionId, &latch)); + } + + // Wait + latch.wait(); +} + int StorageUtil::processReplicationCommand( mqbcmd::ReplicationResult* replicationResult, int* replicationFactor, @@ -3793,5 +3824,42 @@ void StorageUtil::forceIssueAdvisoryAndSyncPt(mqbc::ClusterData* clusterData, } } +void StorageUtil::purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName, + FileStores* fileStores, + StorageSpMapVec* storageMapVec, + bslmt::Mutex* storagesLock) +{ + bsl::vector > purgedQueuesVec; + purgedQueuesVec.resize(fileStores->size()); + + // To purge a domain, we have to purge queues in each partition + // where the current node is the primary + // from the correct thread. This is achieved by parallel launch + // of `purgeDomainDispatched` across all valid FileStore's threads. + // We need to wait here, using `latch`, until the command completes + // in all valid threads. + executeForValidPartitions( + bdlf::BindUtil::bind(&purgeDomainDispatched, + &purgedQueuesVec, + bdlf::PlaceHolders::_2, // latch + bdlf::PlaceHolders::_1, // partitionId + storageMapVec, + storagesLock, + fileStores, + domainName), + *fileStores); + + mqbcmd::PurgedQueues& purgedQueues = result->makePurgedQueues(); + for (size_t i = 0; i < purgedQueuesVec.size(); ++i) { + const bsl::vector& purgedQs = + purgedQueuesVec[i]; + + purgedQueues.queues().insert(purgedQueues.queues().begin(), + purgedQs.begin(), + purgedQs.end()); + } +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index dd4f801e01..7c29a120dc 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -343,6 +343,16 @@ struct StorageUtil { static void executeForEachPartitions(const PerPartitionFunctor& job, const FileStores& fileStores); + /// For each partition which has the current node as the primary, + /// Execute the specified `job` in the specified `fileStores`. + /// Each partition will receive its partitionId and a latch + /// along with the `job`. Each valid partition *must* call + /// `latch->arrive()` after it has finished executing the `job`. + /// + /// THREAD: Executed by the cluster-dispatcher thread. + static void executeForValidPartitions(const PerPartitionFunctor& job, + const FileStores& fileStores); + /// Process the specified `command`, and load the result to the /// specified `replicationResult`. The command might modify the /// specified `replicationFactor` and the corresponding value in each @@ -773,6 +783,13 @@ struct StorageUtil { mqbs::FileStore* fs, mqbnet::ClusterNode* destination, const PartitionInfo& pinfo); + + /// Purge the queues on a given domain. + static void purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName, + FileStores* fileStores, + StorageSpMapVec* storageMapVec, + bslmt::Mutex* storagesLock); }; template <> diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 7904aeacc9..894642502f 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -371,6 +371,10 @@ class Cluster : public DispatcherClient { /// Load the cluster state to the specified `out` object. virtual void loadClusterStatus(mqbcmd::ClusterResult* out) = 0; + /// Purge queues in this cluster on a given domain. + virtual void purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) = 0; + /// Force GC queues in this cluster on a given domain. virtual int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) = 0; diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index 183724f325..9e21e89f17 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -221,7 +221,7 @@ class Domain { /// Check the state of the queues in this domain, return true if /// there's queue with valid queue handles. - virtual bool hasActiveQueue() const = 0; + virtual bool tryRemove() const = 0; }; // =================== diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index 58591e9d07..d29acd099b 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -404,6 +404,10 @@ class StorageManager { /// GC the queues from unrecognized domains, if any. virtual void gcUnrecognizedDomainQueues() = 0; + /// Purge the queues on a given domain. + virtual int purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) = 0; + // ACCESSORS /// Return the processor handle in charge of the specified diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 14c63e8e23..82ffc442d4 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -489,6 +489,15 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* out) out->makeClusterStatus(); } +void Cluster::purgeQueueOnDomain( + mqbcmd::ClusterResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + bmqu::MemOutStream os; + os << "MockCluster::gcQueueOnDomain not implemented!"; + result->makeError().message() = os.str(); +} + int Cluster::gcQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index 89c35d2830..eb36702e56 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -402,6 +402,11 @@ class Cluster : public mqbi::Cluster { /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; + /// Purge queues in this cluster on a given domain. + void + purgeQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Force GC queues in this cluster on a given domain. int gcQueueOnDomain(mqbcmd::ClusterResult* result, const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index 699e03b947..6f4f5b38d5 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -230,7 +230,7 @@ void Domain::loadRoutingConfiguration( // NOTHING } -bool Domain::hasActiveQueue() const +bool Domain::tryRemove() const { return true; } diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index 9d52f210fc..68c5e11c7c 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -251,7 +251,7 @@ class Domain : public mqbi::Domain { /// Check the state of the current domain, return true if it's /// active and accepts incoming connections. - bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE; + bool tryRemove() const BSLS_KEYWORD_OVERRIDE; }; // =================== diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp index 236cf17b3e..1d69acbea2 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp @@ -276,6 +276,13 @@ void StorageManager::gcUnrecognizedDomainQueues() // NOTHING } +int StorageManager::purgeQueueOnDomain( + BSLS_ANNOTATION_UNUSED mqbcmd::StorageResult* result, + BSLS_ANNOTATION_UNUSED const bsl::string& domainName) +{ + return 0; +} + // ACCESSORS mqbi::Dispatcher::ProcessorHandle StorageManager::processorForPartition( BSLS_ANNOTATION_UNUSED int partitionId) const diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h index a1e5c08a8c..bbcdb33f70 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h @@ -269,6 +269,11 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager { /// GC the queues from unrecognized domains, if any. void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE; + /// Purge the queues on a given domain. + int + purgeQueueOnDomain(mqbcmd::StorageResult* result, + const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + // ACCESSORS /// Return the processor handle in charge of the specified diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 11c89d5f11..7be240ef8a 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -275,3 +275,54 @@ def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_BROADCAST}") assert "Trying to remove a nonexistent domain" not in res + + +def test_send_to_replicas(multi_node: Cluster): + proxies = multi_node.proxy_cycle() + proxy = next(proxies) + + queue1 = f"bmq://{tc.DOMAIN_PRIORITY}/q1" + queue2 = f"bmq://{tc.DOMAIN_PRIORITY}/q2" + + # producer and consumer open the queue, + # post and confirm messages and both close + producer = proxy.create_client("producer") + producer.open(queue1, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(queue1, flags=["read"], succeed=True) + + producer.post( + queue1, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(queue1, "*", succeed=True) + producer.close(queue1, succeed=True) + consumer.close(queue1, succeed=True) + + # producer open another queue, should be on a different partition + producer.open(queue2, flags=["write"], succeed=True) + producer.post( + queue2, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + producer.close(queue2, succeed=True) + + leader = multi_node.last_known_leader + member = multi_node.nodes(exclude=leader)[0] + + # send remove domain admin command + # command couldn't go through since there's a queue open + admin = AdminClient() + admin.connect(member.config.host, int(member.config.port)) + multi_node._logger.info("========= Before Send Admin Command =======") + # res = admin.send_admin(f"CLUSTERS CLUSTER {multi_node.name} STORAGE SUMMARY") + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + multi_node._logger.info("=========================") + multi_node._logger.info(res) + assert "while there are queues open" not in res