Skip to content

Commit

Permalink
Add routing broadcast and update open queue check
Browse files Browse the repository at this point in the history
  • Loading branch information
emelialei88 committed Dec 26, 2024
1 parent c41fb5a commit 035193e
Show file tree
Hide file tree
Showing 26 changed files with 339 additions and 58 deletions.
13 changes: 9 additions & 4 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,13 @@ Application::Application(bdlmt::EventScheduler* scheduler,

// Print banner
BALL_LOG_INFO
<< "Starting" << "\n ____ __ __ ___ _ _"
<< "Starting"
<< "\n ____ __ __ ___ _ _"
<< "\n | __ )| \\/ |/ _ \\| |__ _ __ ___ | | _____ _ __"
<< "\n | _ \\| |\\/| | | | | '_ \\| '__/ _ \\| |/ / _ \\ '__|"
<< "\n | |_) | | | | |_| | |_) | | | (_) | < __/ |"
<< "\n |____/|_| |_|\\__\\_\\_.__/|_| \\___/|_|\\_\\___|_|" << "\n"
<< "\n |____/|_| |_|\\__\\_\\_.__/|_| \\___/|_|\\_\\___|_|"
<< "\n"
<< "\n Instance..............: " << brkrCfg.brokerInstanceName()
<< "\n Version...............: " << brkrCfg.brokerVersion()
<< "\n Build Type............: " << MQBA_STRINGIFY(BMQ_BUILD_TYPE)
Expand Down Expand Up @@ -657,6 +659,9 @@ Application::getRelevantCluster(bsl::ostream& errorDescription,
else if (domains.isReconfigureValue()) {
domainName = domains.reconfigure().domain();
}
else if (domains.isRemoveValue()) {
domainName = domains.remove().domain();
}
else {
errorDescription << "Cannot extract cluster for that command";
return NULL; // RETURN
Expand Down Expand Up @@ -822,8 +827,8 @@ int Application::processCommand(const bslstl::StringRef& source,
mqbcmd::Command command;
bsl::string parseError;
if (const int rc = mqbcmd::ParseUtil::parse(&command, &parseError, cmd)) {
os << "Unable to decode command " << "(rc: " << rc << ", error: '"
<< parseError << "')";
os << "Unable to decode command "
<< "(rc: " << rc << ", error: '" << parseError << "')";
return rc + 10 * rc_PARSE_ERROR; // RETURN
}

Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqba/mqba_commandrouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ void CommandRouter::setCommandRoutingMode()
d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode());
// DOMAINS RECONFIGURE <domain>
}
else if (domains.isRemoveValue()) {
d_routingModeMp.load(new (*allocator) ClusterWideRoutingMode());
// DOMAINS REMOVE <domain> [finalize]
}
}
else if (commandChoice.isClustersValue()) {
const mqbcmd::ClustersCommand& clusters = commandChoice.clusters();
Expand Down
39 changes: 22 additions & 17 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,13 +719,11 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name
<< "' called!!!";

// TODO: broadcast to all other nodes

// 1. Reject if there's any opened queue
if (domainSp->hasActiveQueue()) {
// 1. Reject if there's any opened or opening queue
if (!domainSp->tryRemove()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues open";
<< "' while there are queues opened or opening";
result->makeError().message() = os.str();
return -1; // RETURN
}
Expand Down Expand Up @@ -777,34 +775,40 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,

BALL_LOG_INFO << "BEFORE PURGE";

// Purge -> Primary
// TODO: GC -> leader

// 5. Purge inactive queues
// remove virtual storage; add a record in journal file
mqbcmd::DomainCommand domainCommand;
domainCommand.makePurge();
mqbcmd::DomainResult domainResult;
mqbcmd::ClusterResult clusterResult;

mqbcmd::DomainResult domainResult;
rc = domainSp->processCommand(&domainResult, domainCommand);
domainSp->cluster()->purgeQueueOnDomain(&clusterResult, name);

if (domainResult.isErrorValue()) {
result->makeError(domainResult.error());
return rc; // RETURN
}
else if (domainResult.isSuccessValue()) {
result->makeSuccess(domainResult.success());
if (clusterResult.isErrorValue()) {
result->makeError(clusterResult.error());
return rc; // RETURN
}

BSLS_ASSERT_SAFE(clusterResult.isStorageResultValue());
BSLS_ASSERT_SAFE(
clusterResult.storageResult().isPurgedQueuesValue());

mqbcmd::PurgedQueues& purgedQueues =
domainResult.makePurgedQueues();
purgedQueues.queues() =
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

// 6. Force GC queues
// remove Queue from domain; remove storage from partition
// CLUSTERS CLUSTER <name> FORCE_GC_QUEUES
// TODO: Do we want to have add another type of result?
mqbcmd::ClusterResult clusterForceGCResult;
rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}
// register and unresgiter queue

// 7. Mark DOMAIN REMOVED to accecpt the second pass
domainSp->removeDomainCompleted();
Expand All @@ -818,6 +822,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
result->makeSuccess();
return 0; // RETURN
}

return 0;
}

Expand Down
28 changes: 24 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3685,21 +3685,41 @@ void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
if (const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues(
true,
domainName)) {
const int rc =
d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName);
if (rc == -1 || rc == -3) {
// TBD: We allow the node to not be an active primary for *any*
// partition; this has to be changed once we allow leader != primary
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
BALL_LOG_INFO << "SUCCESS in Cluster::gcQueueOnDomainDispatched";
result->makeSuccess();
}
}

void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

mqbcmd::StorageResult storageResult;

dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain,
d_storageManager_mp.get(),
&storageResult,
domainName),
this);

dispatcher()->synchronize(this);

result->makeStorageResult(storageResult);
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
int level,
int spacesPerLevel) const
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,11 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,15 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

void ClusterProxy::purgeQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
bmqu::MemOutStream os;
os << "MockCluster::gcQueueOnDomain not implemented!";
result->makeError().message() = os.str();
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,11 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6153,6 +6153,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
rc_SUCCESS = 0,
rc_CLUSTER_IS_STOPPING = -1,
rc_SELF_IS_NOT_PRIMARY = -2,
rc_SELF_IS_NOT_LEADER = -3,
};

if (d_cluster_p->isStopping()) {
Expand Down Expand Up @@ -6326,7 +6327,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
d_primaryNotLeaderAlarmRaised = true;
}

return rc_SUCCESS; // RETURN
return rc_SELF_IS_NOT_LEADER; // RETURN
}

for (size_t i = 0; i < queuesToGc.size(); ++i) {
Expand Down
67 changes: 39 additions & 28 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,30 +525,35 @@ void Domain::openQueue(
// PRECONDITIONS
BSLS_ASSERT_SAFE(uri.asString() == handleParameters.uri());

if (d_state != e_STARTED) {
// Reject this open-queue request with a soft failure status.
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK

bmqp_ctrlmsg::Status status;
if (d_state != e_STARTED) {
// Reject this open-queue request with a soft failure status.

if (d_state == e_REMOVING || d_state == e_REMOVED) {
status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED;
status.code() = mqbi::ClusterErrorCode::e_UNKNOWN;
status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED;
}
else {
status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED;
status.code() = mqbi::ClusterErrorCode::e_STOPPING;
status.message() = k_NODE_IS_STOPPING;
bmqp_ctrlmsg::Status status;

if (d_state == e_REMOVING || d_state == e_REMOVED) {
status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED;
status.code() = mqbi::ClusterErrorCode::e_UNKNOWN;
status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED;
}
else {
status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED;
status.code() = mqbi::ClusterErrorCode::e_STOPPING;
status.message() = k_NODE_IS_STOPPING;
}

callback(status,
static_cast<mqbi::QueueHandle*>(0),
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());
return; // RETURN
}

callback(status,
static_cast<mqbi::QueueHandle*>(0),
bmqp_ctrlmsg::OpenQueueResponse(),
mqbi::Cluster::OpenQueueConfirmationCookie());
return; // RETURN
++d_pendingRequests;
}

++d_pendingRequests;
d_cluster_sp->openQueue(
uri,
this,
Expand Down Expand Up @@ -983,23 +988,29 @@ void Domain::loadRoutingConfiguration(
}
}

bool Domain::hasActiveQueue() const
bool Domain::tryRemove() const
{
// Queues are created before handles, so if d_queue is empty,
// there's shouldn't be any active handle
if (d_queues.empty()) {
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK

if (d_pendingRequests != 0) {
return false;
}

// If there's queue in this domain, check to see if there's
// any handle to it
for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) {
if (it->second->hasActiveHandle()) {
return true;
// If there's queue in this domain, check to see if there's any active
// handle to it
if (!d_queues.empty()) {
for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) {
// check like this or call Queue::hasActiveHandle()?
const bmqp_ctrlmsg::QueueHandleParameters& handleParamerers =
it->second->handleParameters();
if (handleParamerers.readCount() != 0 ||
handleParamerers.writeCount() != 0) {
return false;
}
}
}

return false;
return true;
}

} // close package namespace
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,

/// Check the state of the queues in this domain, return true if
/// there's queue with valid queue handles.
bool hasActiveQueue() const BSLS_KEYWORD_OVERRIDE;
bool tryRemove() const BSLS_KEYWORD_OVERRIDE;
};

// ============================================================================
Expand Down
18 changes: 18 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2110,6 +2110,24 @@ void StorageManager::gcUnrecognizedDomainQueues()
d_unrecognizedDomains);
}

int StorageManager::purgeQueueOnDomain(mqbcmd::StorageResult* result,
const bsl::string& domainName)
{
// executed by cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_dispatcher_p->inDispatcherThread(&d_clusterData_p->cluster()));

mqbc::StorageUtil::purgeQueueOnDomain(result,
domainName,
&d_fileStores,
&d_storages,
&d_storagesLock);

return 0;
}

// ACCESSORS
bool StorageManager::isStorageEmpty(const bmqt::Uri& uri,
int partitionId) const
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,11 @@ class StorageManager BSLS_KEYWORD_FINAL : public mqbi::StorageManager {
/// GC the queues from unrecognized domains, if any.
void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Purge the queues on a given domain.
int
purgeQueueOnDomain(mqbcmd::StorageResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
18 changes: 18 additions & 0 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4457,6 +4457,24 @@ void StorageManager::gcUnrecognizedDomainQueues()
d_unrecognizedDomains);
}

int StorageManager::purgeQueueOnDomain(mqbcmd::StorageResult* result,
const bsl::string& domainName)
{
// executed by cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_dispatcher_p->inDispatcherThread(&d_clusterData_p->cluster()));

StorageUtil::purgeQueueOnDomain(result,
domainName,
&d_fileStores,
&d_storages,
&d_storagesLock);

return 0;
}

mqbs::FileStore& StorageManager::fileStore(int partitionId)
{
// PRECONDITIONS
Expand Down
Loading

0 comments on commit 035193e

Please sign in to comment.