Skip to content

Commit

Permalink
Add broadcast routing for DOMAINS REMOVE command
Browse files Browse the repository at this point in the history
Signed-off-by: Emelia Lei <[email protected]>
  • Loading branch information
emelialei88 committed Jan 7, 2025
1 parent 0aee31f commit e116884
Show file tree
Hide file tree
Showing 26 changed files with 489 additions and 178 deletions.
3 changes: 3 additions & 0 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ Application::getRelevantCluster(bsl::ostream& errorDescription,
else if (domains.isReconfigureValue()) {
domainName = domains.reconfigure().domain();
}
else if (domains.isRemoveValue()) {
domainName = domains.remove().domain();
}
else {
errorDescription << "Cannot extract cluster for that command";
return NULL; // RETURN
Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqba/mqba_commandrouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ void CommandRouter::setCommandRoutingMode()
d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode());
// DOMAINS RECONFIGURE <domain>
}
else if (domains.isRemoveValue()) {
d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode());
// DOMAINS REMOVE <domain> [finalize]
}
}
else if (commandChoice.isClustersValue()) {
const mqbcmd::ClustersCommand& clusters = commandChoice.clusters();
Expand Down
124 changes: 56 additions & 68 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,

if (0 != locateOrCreateDomain(&domainSp, name)) {
bmqu::MemOutStream os;
os << "Trying to remove a nonexistent domain '" << name << "'";
os << "Domain '" << name << "' doesn't exist";
result->makeError().message() = os.str();
return -1; // RETURN
}
Expand All @@ -719,95 +719,82 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name
<< "' called!!!";

// TODO: broadcast to all other nodes

// 1. Reject if there's any opened queue
if (domainSp->hasActiveQueue()) {
// 1. Reject if there's any opened or opening queue
if (!domainSp->tryRemove()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues open";
<< "' while there are queues opened or opening";
result->makeError().message() = os.str();
return -1; // RETURN
}

BALL_LOG_INFO << "BEFORE CHEKCING CLUSTER STATUS";

// 2. Reject if the state of cluster is not healthy
// Notice that the bad state can happen anywhere down the road,
// so this check is not enough to prevent a partial execution.
// It's inevitable, so we can only make sure the code doesn't
// break if we run this command again
// TODO: ask if this is necessary???
mqbi::Cluster* cluster = domainSp->cluster();
mqbcmd::ClusterResult clusterStatusResult;
mqbcmd::ClusterCommand clusterStatusCommand;
clusterStatusCommand.makeStatus();

int rc = cluster->processCommand(&clusterStatusResult,
clusterStatusCommand);
if (clusterStatusResult.isErrorValue()) {
result->makeError(clusterStatusResult.error());
return rc; // RETURN
}
// 2. Mark DOMAIN PREREMOVE to block openQueue requests
domainSp->removeDomainReset();

BALL_LOG_INFO << "BEFORE PURGE";

BALL_LOG_INFO << "AFTER CHEKCING CLUSTER STATUS";
BALL_LOG_INFO << clusterStatusResult.clusterStatus();
// 3. Purge inactive queues
// remove virtual storage; add a record in journal file
mqbcmd::DomainResult domainResult;
mqbcmd::ClusterResult clusterResult;
mqbi::Cluster* cluster = domainSp->cluster();

BSLS_ASSERT_SAFE(clusterStatusResult.isClusterStatusValue());
cluster->purgeQueueOnDomain(&clusterResult, name);

if (!clusterStatusResult.clusterStatus().isHealthy()) {
bmqu::MemOutStream os;
os << "Domain '" << name << "' in cluster '" << name
<< "' is not healthy";
result->makeError().message() = os.str();
if (clusterResult.isErrorValue()) {
result->makeError(clusterResult.error());
return -1; // RETURN
}

BALL_LOG_INFO << "BEFORE SETTING 'DELETED' FLAG";
BSLS_ASSERT_SAFE(clusterResult.isStorageResultValue());
BSLS_ASSERT_SAFE(
clusterResult.storageResult().isPurgedQueuesValue());

// 3. Mark DOMAIN REMOVING to Block all incoming openQueue requests
// TODO: this idea requires a lot of getters and setters exposed in
// mqbi - better idea?
domainSp->removeDomainStart();
mqbcmd::PurgedQueues& purgedQueues =
domainResult.makePurgedQueues();
purgedQueues.queues() =
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE";
BALL_LOG_INFO << "BEFORE GC";

// 4. Mark domain for delete in domainResolver
d_domainResolver_mp->clearCache(name);
// 4. Force GC queues
// unregister Queue from domain;
// remove queue storage from partition
mqbcmd::ClusterResult clusterForceGCResult;
int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}

BALL_LOG_INFO << "BEFORE PURGE";
// 5. Mark DOMAIN REMOVED to accecpt the second pass

// 5. Purge inactive queues
// remove virtual storage; add a record in journal file
mqbcmd::DomainCommand domainCommand;
domainCommand.makePurge();
BALL_LOG_INFO << "BEFORE WAIT FOR TEARDOWN";

mqbcmd::DomainResult domainResult;
rc = domainSp->processCommand(&domainResult, domainCommand);
bmqu::SharedResource<DomainManager> self(this);
bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC);

if (domainResult.isErrorValue()) {
result->makeError(domainResult.error());
return rc; // RETURN
}
else if (domainResult.isSuccessValue()) {
result->makeSuccess(domainResult.success());
return rc; // RETURN
}
result->makeDomainResult(domainResult);
domainSp->teardownRemove(bdlf::BindUtil::bind(
bmqu::WeakMemFnUtil::weakMemFn(&DomainManager::onDomainClosed,
self.acquireWeak()),
bdlf::PlaceHolders::_1, // Domain Name
&latch));

// 6. Force GC queues
// remove Queue from domain; remove storage from partition
// CLUSTERS CLUSTER <name> FORCE_GC_QUEUES
// TODO: Do we want to have add another type of result?
mqbcmd::ClusterResult clusterForceGCResult;
rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
bsls::TimeInterval timeout =
bmqsys::Time::nowMonotonicClock().addSeconds(5);

rc = latch.timedWait(timeout);
if (0 != rc) {
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << 5
<< " seconds. rc: " << rc << ".";
return rc;
}

// 7. Mark DOMAIN REMOVED to accecpt the second pass
domainSp->removeDomainCompleted();
BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE";

// 6. Mark domain for delete in domainResolver
d_domainResolver_mp->clearCache(name);
}
// Second pass
else {
Expand All @@ -818,6 +805,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
result->makeSuccess();
return 0; // RETURN
}

return 0;
}

Expand Down
41 changes: 29 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,10 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
d_throttledFailedAckMessages,
BALL_LOG_INFO << description() << ": failed Ack "
<< "[status: " << status << ", source: '" << source
<< "'"
<< ", correlationId: " << correlationId
<< "'" << ", correlationId: " << correlationId
<< ", GUID: " << messageGUID << ", queue: '"
<< (found ? uri : "** null **") << "' "
<< "(id: " << queueId << ")] "
<< "to node "
<< "(id: " << queueId << ")] " << "to node "
<< nodeSession->clusterNode()->nodeDescription(););
}

Expand All @@ -455,8 +453,8 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
<< "[status: " << status << ", source: '" << source << "'"
<< ", correlationId: " << correlationId
<< ", GUID: " << messageGUID << ", queue: '" << uri
<< "' (id: " << queueId << ")] to "
<< "node " << nodeSession->clusterNode()->nodeDescription();
<< "' (id: " << queueId << ")] to " << "node "
<< nodeSession->clusterNode()->nodeDescription();

// Update stats for the queue (or subStream of the queue)
// TBD: We should collect all invalid stats (i.e. stats for queues that
Expand Down Expand Up @@ -494,8 +492,7 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
d_throttledDroppedAckMessages,
BALL_LOG_ERROR << description() << ": dropping ACK message "
<< "[status: " << status << ", source: '" << source
<< "'"
<< ", correlationId: " << correlationId
<< "'" << ", correlationId: " << correlationId
<< ", GUID: " << messageGUID
<< ", queueId: " << queueId << "] to node "
<< nodeSession->clusterNode()->nodeDescription()
Expand Down Expand Up @@ -3694,21 +3691,41 @@ void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
if (const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues(
true,
domainName)) {
const int rc =
d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName);
if (rc == -1 || rc == -3) {
// TBD: We allow the node to not be an active primary for *any*
// partition; this has to be changed once we allow leader != primary
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
BALL_LOG_INFO << "SUCCESS in Cluster::gcQueueOnDomainDispatched";
result->makeSuccess();
}
}

void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

mqbcmd::StorageResult storageResult;

dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain,
d_storageManager_mp.get(),
&storageResult,
domainName),
this);

dispatcher()->synchronize(this);

result->makeStorageResult(storageResult);
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
int level,
int spacesPerLevel) const
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,11 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
Expand Down
23 changes: 16 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
<< "Received unexpected event: " << event;
BSLS_ASSERT_SAFE(false && "Unexpected event received");
return; // RETURN
} // break;
} // break;
default: {
BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description()
<< "Received unknown event: " << event;
Expand Down Expand Up @@ -1342,6 +1342,15 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

void ClusterProxy::purgeQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
bmqu::MemOutStream os;
os << "MockCluster::gcQueueOnDomain not implemented!";
result->makeError().message() = os.str();
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
Expand Down Expand Up @@ -1419,31 +1428,31 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event)
BSLS_ASSERT_OPT(false &&
"'DISPATCHER' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_CLUSTER_STATE: {
BSLS_ASSERT_OPT(false &&
"'CLUSTER_STATE' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_STORAGE: {
BSLS_ASSERT_OPT(false && "'STORAGE' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_RECOVERY: {
BSLS_ASSERT_OPT(false &&
"'RECOVERY' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_UNDEFINED: {
BSLS_ASSERT_OPT(false &&
"'UNDEFINED' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: {
BSLS_ASSERT_OPT(
false && "'REPLICATION_RECEIPT' type dispatcher event unexpected");
return; // RETURN
} // break;
} // break;
default: {
BALL_LOG_ERROR << "#UNEXPECTED_EVENT " << description()
<< ": received unexpected dispatcher event: " << event;
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,11 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6005,6 +6005,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
rc_SUCCESS = 0,
rc_CLUSTER_IS_STOPPING = -1,
rc_SELF_IS_NOT_PRIMARY = -2,
rc_SELF_IS_NOT_LEADER = -3,
};

if (d_cluster_p->isStopping()) {
Expand Down Expand Up @@ -6178,7 +6179,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
d_primaryNotLeaderAlarmRaised = true;
}

return rc_SUCCESS; // RETURN
return rc_SELF_IS_NOT_LEADER; // RETURN
}

for (size_t i = 0; i < queuesToGc.size(); ++i) {
Expand Down
Loading

0 comments on commit e116884

Please sign in to comment.