Skip to content

Commit

Permalink
Add the second pass and tests
Browse files Browse the repository at this point in the history
Remove the domain object in the second pass

Signed-off-by: Emelia Lei <[email protected]>
  • Loading branch information
emelialei88 committed Jan 7, 2025
1 parent f273b7d commit 3f686e0
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 85 deletions.
89 changes: 63 additions & 26 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ namespace BloombergLP {
namespace mqba {

namespace {
const int k_MAX_WAIT_SECONDS_AT_SHUTDOWN = 40;
const int k_MAX_WAIT_SECONDS_AT_SHUTDOWN = 40;
const int k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE = 5;

/// This function is a callback passed to domain manager for
/// synchronization. The specified 'status' is a return status
Expand Down Expand Up @@ -705,19 +706,16 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
else if (command.isRemoveValue()) {
const bsl::string& name = command.remove().domain();

DomainSp domainSp;

if (0 != locateOrCreateDomain(&domainSp, name)) {
bmqu::MemOutStream os;
os << "Domain '" << name << "' doesn't exist";
result->makeError().message() = os.str();
return -1; // RETURN
}

// First pass
if (command.remove().finalize().isNull()) {
BALL_LOG_INFO << "[First pass] DOMAINS REMOVE '" << name
<< "' called!!!";
DomainSp domainSp;

if (0 != locateOrCreateDomain(&domainSp, name)) {
bmqu::MemOutStream os;
os << "Domain '" << name << "' doesn't exist";
result->makeError().message() = os.str();
return -1; // RETURN
}

// 1. Reject if there's any opened or opening queue
if (!domainSp->tryRemove()) {
Expand All @@ -731,8 +729,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
// 2. Mark DOMAIN PREREMOVE to block openQueue requests
domainSp->removeDomainReset();

BALL_LOG_INFO << "BEFORE PURGE";

// 3. Purge inactive queues
// remove virtual storage; add a record in journal file
mqbcmd::DomainResult domainResult;
Expand All @@ -756,8 +752,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

BALL_LOG_INFO << "BEFORE GC";

// 4. Force GC queues
// unregister Queue from domain;
// remove queue storage from partition
Expand All @@ -770,8 +764,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,

// 5. Mark DOMAIN REMOVED to accecpt the second pass

BALL_LOG_INFO << "BEFORE WAIT FOR TEARDOWN";

bmqu::SharedResource<DomainManager> self(this);
bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC);

Expand All @@ -782,26 +774,50 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
&latch));

bsls::TimeInterval timeout =
bmqsys::Time::nowMonotonicClock().addSeconds(5);
bmqsys::Time::nowMonotonicClock().addSeconds(
k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE);

rc = latch.timedWait(timeout);
if (0 != rc) {
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << 5
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in "
<< k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE
<< " seconds. rc: " << rc << ".";
return rc;
}

BALL_LOG_INFO << "BEFORE CLEAN DOMAINRESOLVER CACHE";

// 6. Mark domain for delete in domainResolver
// 6. Clear cache in domainResolver and configProvider
d_domainResolver_mp->clearCache(name);
d_configProvider_p->clearCache(name);

// 7. Mark DOMAINS REMOVE command first round as complete
domainSp->removeDomainComplete();
}
// Second pass
else {
// TODO: remove the domain object
DomainSp domainSp;

int rc = locateDomain(&domainSp, name);
if (0 != rc) {
bmqu::MemOutStream os;
os << "Domain '" << name << "' doesn't exist";
result->makeError().message() = os.str();
return rc; // RETURN
}

if (!domainSp->isRemoveComplete()) {
BALL_LOG_ERROR
<< "First pass of DOMAINS REMOVE is not completed.";
return -1; // RETURN
}

rc = removeDomain(name);
if (0 != rc) {
bmqu::MemOutStream os;
os << "Domain '" << name << "' doesn't exist";
result->makeError().message() = os.str();
return rc; // RETURN
}

BALL_LOG_INFO << "[Second pass] DOMAINS REMOVE '" << name
<< "' finalize called!!!";
result->makeSuccess();
return 0; // RETURN
}
Expand All @@ -815,6 +831,27 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
return -1;
}

int DomainManager::removeDomain(const bsl::string& domainName)
{
enum RcEnum {
// Value for the various RC error categories
rc_SUCCESS = 0,
rc_DOMAIN_NOT_FOUND = -1
};

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // mutex LOCKED

DomainSpMap::const_iterator it = d_domains.find(domainName);

if (it == d_domains.end()) {
return rc_DOMAIN_NOT_FOUND; // RETURN
}

d_domains.erase(domainName);

return rc_SUCCESS;
}

void DomainManager::qualifyDomain(
const bslstl::StringRef& name,
const mqbi::DomainFactory::QualifiedDomainCb& callback)
Expand Down
3 changes: 3 additions & 0 deletions src/groups/mqb/mqba/mqba_domainmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ class DomainManager BSLS_CPP11_FINAL : public mqbi::DomainFactory {
int processCommand(mqbcmd::DomainsResult* result,
const mqbcmd::DomainsCommand& command);

/// Remove the `domainSp` from DomainSpMap.
int removeDomain(const bsl::string& domainName);

// MANIPULATORS
// (virtual: mqbi::DomainFactory)

Expand Down
19 changes: 16 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ Domain::Domain(const bsl::string& name,

Domain::~Domain()
{
BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state) &&
BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state ||
e_POSTREMOVE == d_state) &&
"'teardown' must be called before the destructor");
}

Expand Down Expand Up @@ -564,8 +565,8 @@ void Domain::openQueue(

bmqp_ctrlmsg::Status status;

if (d_state == e_PREREMOVE || d_state == e_REMOVING ||
d_state == e_REMOVED) {
if (d_state == e_REMOVING || d_state == e_REMOVED ||
d_state == e_PREREMOVE || d_state == e_POSTREMOVE) {
status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED;
status.code() = mqbi::ClusterErrorCode::e_UNKNOWN;
status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED;
Expand Down Expand Up @@ -927,6 +928,13 @@ void Domain::removeDomainReset()
d_teardownRemoveCb = nullptr;
}

void Domain::removeDomainComplete()
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK

d_state = e_POSTREMOVE;
}

// ACCESSORS
int Domain::lookupQueue(bsl::shared_ptr<mqbi::Queue>* out,
const bmqt::Uri& uri) const
Expand Down Expand Up @@ -1047,5 +1055,10 @@ bool Domain::tryRemove() const
return true;
}

bool Domain::isRemoveComplete() const
{
return d_state == e_POSTREMOVE;
}

} // close package namespace
} // close enterprise namespace
27 changes: 20 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,17 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,
typedef AppInfos::const_iterator AppInfosCIter;

enum DomainState {
e_STARTED = 0,
e_STOPPING = 1,
e_STOPPED = 2,
e_PREREMOVE = 3,
e_REMOVING = 4,
e_REMOVED = 5
e_STARTED = 0,
e_STOPPING = 1,
e_STOPPED = 2,
// Used for teardownRemove function
e_REMOVING = 3,
e_REMOVED = 4,
// Used as flags to indicate
// the start and finish of
// the first round for DOMAINS REMOVE
e_PREREMOVE = 5,
e_POSTREMOVE = 6,
};

private:
Expand Down Expand Up @@ -332,9 +337,13 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,
processCommand(mqbcmd::DomainResult* result,
const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE;

/// Mark the state of domain to be REMOVING
/// Mark the state of domain to be PREREMOVE
void removeDomainReset() BSLS_KEYWORD_OVERRIDE;

/// Mark the state of domain to be POSTREMOVE,
/// indicating the first round of DOMAINS REMOVE is completed
void removeDomainComplete() BSLS_KEYWORD_OVERRIDE;

// ACCESSORS

/// Load into the specified `out` the queue corresponding to the
Expand Down Expand Up @@ -374,6 +383,10 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,
/// Check the state of the queues in this domain, return false if there's
/// queues opened or opening.
bool tryRemove() const BSLS_KEYWORD_OVERRIDE;

/// Check the state of the domain, return true if the first round
/// of DOMAINS REMOVE is completed
bool isRemoveComplete() const BSLS_KEYWORD_OVERRIDE;
};

// ============================================================================
Expand Down
3 changes: 3 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri)
}

domIt->second->queuesInfo().erase(cit);
if (domIt->second->queuesInfo().empty()) {
domIt->second->setDomain(nullptr);
}

// POSTCONDITIONS
//
Expand Down
10 changes: 9 additions & 1 deletion src/groups/mqb/mqbi/mqbi_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,13 @@ class Domain {
virtual int processCommand(mqbcmd::DomainResult* result,
const mqbcmd::DomainCommand& command) = 0;

/// Mark the state of domain to be REMOVING
/// Mark the state of domain to be PREREMOVE
virtual void removeDomainReset() = 0;

/// Mark the state of domain to be POSTREMOVE,
/// indicating the first round of DOMAINS REMOVE is completed
virtual void removeDomainComplete() = 0;

// ACCESSORS

/// Load into the specified `out` the queue corresponding to the
Expand Down Expand Up @@ -227,6 +231,10 @@ class Domain {
/// Check the state of the queues in this domain, return false if there's
/// queues opened or opening.
virtual bool tryRemove() const = 0;

/// Check the state of the domain, return true if the first round
/// of DOMAINS REMOVE is completed
virtual bool isRemoveComplete() const = 0;
};

// ===================
Expand Down
12 changes: 12 additions & 0 deletions src/groups/mqb/mqbmock/mqbmock_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ void Domain::removeDomainReset()
BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!");
}

void Domain::removeDomainComplete()
{
BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!");
}

int Domain::lookupQueue(bsl::shared_ptr<mqbi::Queue>* out,
const bmqt::Uri& uri) const
{
Expand Down Expand Up @@ -233,6 +238,13 @@ void Domain::loadRoutingConfiguration(

bool Domain::tryRemove() const
{
BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!");
return true;
}

bool Domain::isRemoveComplete() const
{
BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!");
return true;
}

Expand Down
10 changes: 9 additions & 1 deletion src/groups/mqb/mqbmock/mqbmock_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,13 @@ class Domain : public mqbi::Domain {
processCommand(mqbcmd::DomainResult* result,
const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE;

/// Mark the state of domain to be REMOVING
/// Mark the state of domain to be PREREMOVE
void removeDomainReset() BSLS_KEYWORD_OVERRIDE;

/// Mark the state of domain to be POSTREMOVE,
/// indicating the first round of DOMAINS REMOVE is completed
void removeDomainComplete() BSLS_KEYWORD_OVERRIDE;

/// Load into the specified `out`, if `out` is not 0, the queue
/// corresponding to the specified `uri`, if found. Return 0 on success,
/// or a non-zero return code otherwise.
Expand Down Expand Up @@ -251,6 +255,10 @@ class Domain : public mqbi::Domain {
/// Check the state of the queues in this domain, return false if there's
/// queues opened or opening.
bool tryRemove() const BSLS_KEYWORD_OVERRIDE;

/// Check the state of the domain, return true if the first round
/// of DOMAINS REMOVE is completed
bool isRemoveComplete() const BSLS_KEYWORD_OVERRIDE;
};

// ===================
Expand Down
Loading

0 comments on commit 3f686e0

Please sign in to comment.