From bd5cec4e6259bbb4035bd38b25c6373f3d715cc5 Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Fri, 3 Jan 2025 17:13:52 -0500 Subject: [PATCH] Add the second pass and tests Remove the domain object in the second pass Signed-off-by: Emelia Lei --- src/groups/mqb/mqba/mqba_domainmanager.cpp | 89 +++++--- src/groups/mqb/mqba/mqba_domainmanager.h | 3 + src/groups/mqb/mqbblp/mqbblp_domain.cpp | 19 +- src/groups/mqb/mqbblp/mqbblp_domain.h | 27 ++- src/groups/mqb/mqbc/mqbc_clusterstate.cpp | 3 + src/groups/mqb/mqbi/mqbi_domain.h | 10 +- src/groups/mqb/mqbmock/mqbmock_domain.cpp | 12 + src/groups/mqb/mqbmock/mqbmock_domain.h | 10 +- src/integration-tests/test_domain_remove.py | 240 ++++++++++++++++---- 9 files changed, 328 insertions(+), 85 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index f6c764d8e..36062971b 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -59,7 +59,8 @@ namespace BloombergLP { namespace mqba { namespace { -const int k_MAX_WAIT_SECONDS_AT_SHUTDOWN = 40; +const int k_MAX_WAIT_SECONDS_AT_SHUTDOWN = 40; +const int k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE = 5; /// This function is a callback passed to domain manager for /// synchronization. The specified 'status' is a return status @@ -705,19 +706,16 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, else if (command.isRemoveValue()) { const bsl::string& name = command.remove().domain(); - DomainSp domainSp; - - if (0 != locateOrCreateDomain(&domainSp, name)) { - bmqu::MemOutStream os; - os << "Domain '" << name << "' doesn't exist"; - result->makeError().message() = os.str(); - return -1; // RETURN - } - // First pass if (command.remove().finalize().isNull()) { - BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name - << "' called!!!"; + DomainSp domainSp; + + if (0 != locateOrCreateDomain(&domainSp, name)) { + bmqu::MemOutStream os; + os << "Domain '" << name << "' doesn't exist"; + result->makeError().message() = os.str(); + return -1; // RETURN + } // 1. Reject if there's any opened or opening queue if (!domainSp->tryRemove()) { @@ -731,8 +729,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, // 2. Mark DOMAIN PREREMOVE to block openQueue requests domainSp->removeDomainReset(); - BALL_LOG_INFO << "BEFORE PURGE"; - // 3. Purge inactive queues // remove virtual storage; add a record in journal file mqbcmd::DomainResult domainResult; @@ -756,8 +752,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, clusterResult.storageResult().purgedQueues().queues(); result->makeDomainResult(domainResult); - BALL_LOG_INFO << "BEFORE GC"; - // 4. Force GC queues // unregister Queue from domain; // remove queue storage from partition @@ -770,8 +764,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, // 5. Mark DOMAIN REMOVED to accecpt the second pass - BALL_LOG_INFO << "BEFORE WAIT FOR TEARDOWN"; - bmqu::SharedResource self(this); bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC); @@ -782,26 +774,50 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, &latch)); bsls::TimeInterval timeout = - bmqsys::Time::nowMonotonicClock().addSeconds(5); + bmqsys::Time::nowMonotonicClock().addSeconds( + k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE); rc = latch.timedWait(timeout); if (0 != rc) { - BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << 5 + BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " + << k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE << " seconds. rc: " << rc << "."; return rc; } - BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE"; - - // 6. Mark domain for delete in domainResolver + // 6. Clear cache in domainResolver and configProvider d_domainResolver_mp->clearCache(name); + d_configProvider_p->clearCache(name); + + // 7. Mark DOMAINS REMOVE command first round as complete + domainSp->removeDomainComplete(); } // Second pass else { - // TODO: remove the domain object + DomainSp domainSp; + + int rc = locateDomain(&domainSp, name); + if (0 != rc) { + bmqu::MemOutStream os; + os << "Domain '" << name << "' doesn't exist"; + result->makeError().message() = os.str(); + return rc; // RETURN + } + + if (!domainSp->isRemoveComplete()) { + BALL_LOG_ERROR + << "First pass of DOMAINS REMOVE is not completed."; + return -1; // RETURN + } + + rc = removeDomain(name); + if (0 != rc) { + bmqu::MemOutStream os; + os << "Domain '" << name << "' doesn't exist"; + result->makeError().message() = os.str(); + return rc; // RETURN + } - BALL_LOG_INFO << "[Second pass] DOMAINS REMOVE '" << name - << "' finalize called!!!"; result->makeSuccess(); return 0; // RETURN } @@ -815,6 +831,27 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, return -1; } +int DomainManager::removeDomain(const bsl::string& domainName) +{ + enum RcEnum { + // Value for the various RC error categories + rc_SUCCESS = 0, + rc_DOMAIN_NOT_FOUND = -1 + }; + + bslmt::LockGuard guard(&d_mutex); // mutex LOCKED + + DomainSpMap::const_iterator it = d_domains.find(domainName); + + if (it == d_domains.end()) { + return rc_DOMAIN_NOT_FOUND; // RETURN + } + + d_domains.erase(domainName); + + return rc_SUCCESS; +} + void DomainManager::qualifyDomain( const bslstl::StringRef& name, const mqbi::DomainFactory::QualifiedDomainCb& callback) diff --git a/src/groups/mqb/mqba/mqba_domainmanager.h b/src/groups/mqb/mqba/mqba_domainmanager.h index a48044a82..18e26ffd1 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.h +++ b/src/groups/mqb/mqba/mqba_domainmanager.h @@ -343,6 +343,9 @@ class DomainManager BSLS_CPP11_FINAL : public mqbi::DomainFactory { int processCommand(mqbcmd::DomainsResult* result, const mqbcmd::DomainsCommand& command); + /// Remove the `domainSp` from DomainSpMap. + int removeDomain(const bsl::string& domainName); + // MANIPULATORS // (virtual: mqbi::DomainFactory) diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 8fc836000..7f91b64b6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -375,7 +375,8 @@ Domain::Domain(const bsl::string& name, Domain::~Domain() { - BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state) && + BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state || + e_POSTREMOVE == d_state) && "'teardown' must be called before the destructor"); } @@ -564,8 +565,8 @@ void Domain::openQueue( bmqp_ctrlmsg::Status status; - if (d_state == e_PREREMOVE || d_state == e_REMOVING || - d_state == e_REMOVED) { + if (d_state == e_REMOVING || d_state == e_REMOVED || + d_state == e_PREREMOVE || d_state == e_POSTREMOVE) { status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; @@ -927,6 +928,13 @@ void Domain::removeDomainReset() d_teardownRemoveCb = nullptr; } +void Domain::removeDomainComplete() +{ + bslmt::LockGuard guard(&d_mutex); // LOCK + + d_state = e_POSTREMOVE; +} + // ACCESSORS int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const @@ -1047,5 +1055,10 @@ bool Domain::tryRemove() const return true; } +bool Domain::isRemoveComplete() const +{ + return d_state == e_POSTREMOVE; +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index b408286f3..093e5b5e1 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -109,12 +109,17 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, typedef AppInfos::const_iterator AppInfosCIter; enum DomainState { - e_STARTED = 0, - e_STOPPING = 1, - e_STOPPED = 2, - e_PREREMOVE = 3, - e_REMOVING = 4, - e_REMOVED = 5 + e_STARTED = 0, + e_STOPPING = 1, + e_STOPPED = 2, + // Used for teardownRemove function + e_REMOVING = 3, + e_REMOVED = 4, + // Used as flags to indicate + // the start and finish of + // the first round for DOMAINS REMOVE + e_PREREMOVE = 5, + e_POSTREMOVE = 6, }; private: @@ -332,9 +337,13 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be REMOVING + /// Mark the state of domain to be PREREMOVE void removeDomainReset() BSLS_KEYWORD_OVERRIDE; + /// Mark the state of domain to be POSTREMOVE, + /// indicating the first round of DOMAINS REMOVE is completed + void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; + // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -374,6 +383,10 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, /// Check the state of the queues in this domain, return false if there's /// queues opened or opening. bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + + /// Check the state of the domain, return true if the first round + /// of DOMAINS REMOVE is completed + bool isRemoveComplete() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 88d5a7b5e..e2763fc29 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -421,6 +421,9 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri) } domIt->second->queuesInfo().erase(cit); + if (domIt->second->queuesInfo().empty()) { + domIt->second->setDomain(nullptr); + } // POSTCONDITIONS // diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index 96bf615ad..a80c22de9 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -185,9 +185,13 @@ class Domain { virtual int processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) = 0; - /// Mark the state of domain to be REMOVING + /// Mark the state of domain to be PREREMOVE virtual void removeDomainReset() = 0; + /// Mark the state of domain to be POSTREMOVE, + /// indicating the first round of DOMAINS REMOVE is completed + virtual void removeDomainComplete() = 0; + // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -227,6 +231,10 @@ class Domain { /// Check the state of the queues in this domain, return false if there's /// queues opened or opening. virtual bool tryRemove() const = 0; + + /// Check the state of the domain, return true if the first round + /// of DOMAINS REMOVE is completed + virtual bool isRemoveComplete() const = 0; }; // =================== diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index 35c8a2459..c67ddcd18 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -160,6 +160,11 @@ void Domain::removeDomainReset() BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); } +void Domain::removeDomainComplete() +{ + BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); +} + int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const { @@ -233,6 +238,13 @@ void Domain::loadRoutingConfiguration( bool Domain::tryRemove() const { + BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); + return true; +} + +bool Domain::isRemoveComplete() const +{ + BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); return true; } diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index 8b26cdf02..7f4fe059c 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -211,9 +211,13 @@ class Domain : public mqbi::Domain { processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be REMOVING + /// Mark the state of domain to be PREREMOVE void removeDomainReset() BSLS_KEYWORD_OVERRIDE; + /// Mark the state of domain to be POSTREMOVE, + /// indicating the first round of DOMAINS REMOVE is completed + void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; + /// Load into the specified `out`, if `out` is not 0, the queue /// corresponding to the specified `uri`, if found. Return 0 on success, /// or a non-zero return code otherwise. @@ -251,6 +255,10 @@ class Domain : public mqbi::Domain { /// Check the state of the queues in this domain, return false if there's /// queues opened or opening. bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + + /// Check the state of the domain, return true if the first round + /// of DOMAINS REMOVE is completed + bool isRemoveComplete() const BSLS_KEYWORD_OVERRIDE; }; // =================== diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 8d6a2d652..300da6ff9 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -31,38 +31,72 @@ import time -def test_remove_domain_with_queue_close(cluster: Cluster): +def write_messages(proxy, uri, n_msgs=5, do_confirm=True): + """ + producer send a message, client confirm, then both close connection + """ + producer = proxy.create_client("producer") + producer.open(uri, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(uri, flags=["read"], succeed=True) + + producer.post(uri, [f"msg{i}" for i in range(n_msgs)], succeed=True, wait_ack=True) + + if do_confirm: + consumer.confirm(uri, "*", succeed=True) + + producer.close(uri, succeed=True) + consumer.close(uri, succeed=True) + + +def test_remove_domain_with_queue_closed(cluster: Cluster): + """ + send DOMAINS REMOVE command after both queue closed + command should succeed + """ proxies = cluster.proxy_cycle() proxy = next(proxies) # producer and consumer open the queue, # post and confirm messages and both close - producer = proxy.create_client("producer") - producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + write_messages(proxy, tc.URI_PRIORITY) - consumer = proxy.create_client("consumer") - consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + # send remove domain admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 0 message(s)" in res - producer.post( - tc.URI_PRIORITY, - [f"msg{i}" for i in range(3)], - succeed=True, - wait_ack=True, - ) - consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) - producer.close(tc.URI_PRIORITY, succeed=True) - consumer.close(tc.URI_PRIORITY, succeed=True) + +def test_remove_domain_with_queue_open(cluster: Cluster): + """ + send DOMAINS REMOVE command with a queue still open + command should fail + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + uri = tc.URI_PRIORITY + producer = proxy.create_client("producer") + producer.open(uri, flags=["write"], succeed=True) + producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) # send remove domain admin command - # command couldn't go through since there's a queue open admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" not in res + assert "while there are queues open" in res def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): + """ + send DOMAINS REMOVE command when the cluster is not healthy + the command fails with a routing error + resend the command and it should succeed + """ proxies = multi_node.proxy_cycle() proxy = next(proxies) @@ -71,22 +105,7 @@ def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): replicas = multi_node.nodes(exclude=leader) member = replicas[0] - def write_messages(uri): - # producer send a message, client confirm, then both close connection - producer = proxy.create_client("producer") - producer.open(uri, flags=["write"], succeed=True) - - consumer = proxy.create_client("consumer") - consumer.open(uri, flags=["read"], succeed=True) - - producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) - - consumer.confirm(uri, "+1", succeed=True) - - producer.close(uri, succeed=True) - consumer.close(uri, succeed=True) - - write_messages(tc.URI_PRIORITY) + write_messages(proxy, tc.URI_PRIORITY, n_msgs=5, do_confirm=False) # set quorum to make it impossible to select a leader for node in multi_node.nodes(): @@ -100,7 +119,7 @@ def write_messages(uri): # send remove domain admin command # command couldn't go through since state is unhealthy admin = AdminClient() - admin.connect(member.config.host, int(member.config.port)) + admin.connect(member.config.host, int(member.config.port)) # member = east2 res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert "Error occurred routing command to this node" in res assert res.split("\n").count("No queue purged.") == 3 @@ -110,18 +129,24 @@ def write_messages(uri): # wait until the cluster become healthy again leader.start() leader.wait() - replicas[1].set_quorum(1) - leader.wait_status(wait_leader=True, wait_ready=False) + replicas[1].set_quorum(1) # Quorum set to 1 from 99 + leader.wait_status(wait_leader=True, wait_ready=False) # new leader = west1 # send DOMAINS REMOVE admin command again + multi_node._logger.info("BEFORE SENDING ADMIN COMMAND AGAIN") res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "Purged 4 message(s) for a total of 16 B from 1 queue(s):" in res + assert "Purged 5 message(s) for a total of 20 B from 1 queue(s):" in res assert res.split("\n").count("No queue purged.") == 3 def test_remove_different_domain(cluster: Cluster): + """ + send DOMAINS REMOVE command to remove a different domain + the original one should be intact + """ proxies = cluster.proxy_cycle() + # open queue in PRIORITY domain but remove PRIORITY_SC # producer produces messages and then closes connection producer = next(proxies).create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) @@ -132,10 +157,8 @@ def test_remove_different_domain(cluster: Cluster): succeed=True, wait_ack=True, ) - producer.close(tc.URI_PRIORITY) - # send remove domain admin command - # for a different domain + # send DOMAINS REMOVE admin command to a different domain admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -143,7 +166,16 @@ def test_remove_different_domain(cluster: Cluster): res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY_SC}") assert "No queue purged." in res + # post message to the untouched domain + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(5)], + succeed=True, + wait_ack=True, + ) + # do the same things for a different pair reversely + # open queue in FANOUT_SC domain but remove FANOUT producer.open(tc.URI_FANOUT_SC, flags=["write"], succeed=True) producer.post( @@ -152,34 +184,46 @@ def test_remove_different_domain(cluster: Cluster): succeed=True, wait_ack=True, ) - producer.close(tc.URI_FANOUT_SC) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_FANOUT}") assert "No queue purged." in res + # post message to the unremoved domain + producer.post( + tc.URI_FANOUT_SC, + [f"msg{i}" for i in range(5)], + succeed=True, + wait_ack=True, + ) + def test_open_queue_after_remove_domain(cluster: Cluster): + """ + try to open a queue after the first round of DOMAINS REMOVE command + and it should fail since we started remove but not fully finished yet + """ proxies = cluster.proxy_cycle() next(proxies) # eastp proxy = next(proxies) # westp + uri = tc.URI_PRIORITY # producer produces messages and consumer confirms # then both close connections producer = proxy.create_client("producer") - producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + producer.open(uri, flags=["write"], succeed=True) consumer = proxy.create_client("consumer") - consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + consumer.open(uri, flags=["read"], succeed=True) producer.post( - tc.URI_PRIORITY, + uri, [f"msg{i}" for i in range(3)], succeed=True, wait_ack=True, ) - consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) - producer.close(tc.URI_PRIORITY, succeed=True) - consumer.close(tc.URI_PRIORITY, succeed=True) + consumer.confirm(uri, "*", succeed=True) + producer.close(uri, succeed=True) + consumer.close(uri, succeed=True) # send remove domain admin command admin = AdminClient() @@ -188,10 +232,14 @@ def test_open_queue_after_remove_domain(cluster: Cluster): admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") # open queues on the removed domain should fail - assert producer.open(tc.URI_PRIORITY, flags=["write"], block=True) < 0 + assert producer.open(uri, flags=["write"], block=True) != Client.e_SUCCESS def test_remove_domain_with_queue_open(cluster: Cluster): + """ + issue DOMAINS REMOVE command when both producer and consumer close connections, + both open, or one of them has the connection open + """ proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -272,6 +320,9 @@ def test_remove_domain_with_unconfirmed_message(cluster: Cluster): def test_remove_domain_not_on_disk(cluster: Cluster): + """ + issue DOMAINS REMOVE command when the domain is not on disk + """ admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -281,14 +332,22 @@ def test_remove_domain_not_on_disk(cluster: Cluster): def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): + """ + issue DOMAINS REMOVE command when the domain is not on disk + """ admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_BROADCAST}") assert "Trying to remove a nonexistent domain" not in res + assert "No queue purged." in res def test_send_to_replicas(multi_node: Cluster): + """ + send DOMAINS REMOVE admin command to replica instead of primary + replica will boardcast to all the nodes including the primary + """ proxies = multi_node.proxy_cycle() proxy = next(proxies) @@ -333,3 +392,90 @@ def test_send_to_replicas(multi_node: Cluster): res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert "Purged" in res + + +def test_second_round(cluster: Cluster): + """ + issue DOMAINS REMOVE command, and later finalize the command + a queue and the removed domain can be opened after finalizing + and when the domain exists on the disk + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + uri = tc.URI_PRIORITY + + def remove_from_disk_and_add_back(): + """ + remove the domain config file from the fisk, + send the finalize DOMAINS REMOVE command + check a producer can't open a queue under that domain + add the domain config file back to the disk + check now a producer can open a queue under that domain + """ + # remove domain config file + domain_config = cluster.config.domains[tc.DOMAIN_PRIORITY] + + for node in cluster.configurator.brokers.values(): + del node.domains[tc.DOMAIN_PRIORITY] + cluster.deploy_domains() + + # second round + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY} FINALIZE") + assert "SUCCESS" in res + + # producer can't open a queue since the domain config file doesn't exist + producer = proxy.create_client("producer") + assert producer.open(tc.URI_PRIORITY, flags=["write"], block=True) < 0 + + # add back the domain config file + for node in cluster.configurator.brokers.values(): + node.domains[tc.DOMAIN_PRIORITY] = domain_config + cluster.deploy_domains() + + # now the queue can be opened + assert producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) == 0 + + producer.close(uri=tc.URI_PRIORITY, succeed=True) + + # put -> confirm -> admin command -> remove_from_disk_and_add_back + write_messages(proxy, uri) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 0 message(s) for a total of 0 B from 1 queue(s)" in res + remove_from_disk_and_add_back() + + # put -> no confirm -> admin command -> remove_from_disk_and_add_back + producer = proxy.create_client("producer") + producer.open(uri, flags=["write"], succeed=True) + producer.post(uri, [f"msg{i}" for i in range(3)], succeed=True, wait_ack=True) + producer.close(uri=uri, succeed=True) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 3 message(s) for a total of 12 B from 1 queue(s)" in res + remove_from_disk_and_add_back() + + +def test_purge_then_remove(cluster: Cluster): + """ + purge queue then remove + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + uri = tc.URI_PRIORITY + + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + + producer = proxy.create_client("producer") + producer.open(uri, flags=["write"], succeed=True) + producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) + producer.close(uri=uri, succeed=True) + + res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} PURGE") + assert f"Purged 5 message(s)" in res + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 0 message(s) for a total of 0 B from 1 queue(s)" in res