Skip to content

Commit

Permalink
Fix [MQB]: mqbc::StorageMgr: Transition to available only when all pr…
Browse files Browse the repository at this point in the history
…imary active (#416)

* mqbc::StorageMgr: Ban 'processPrimaryStatusAdvisory' in non-FSM mode

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageMgr: Transition to available only when all primary active

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageMgr: clang-format

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageMgr: Healing replica buffers primary status advisories

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbs::FileStore: Rename setPrimary -> setActivePrimary

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageMgr: Comment about check if all partitions available

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Oct 10, 2024
1 parent 0c88d8c commit 060cd9f
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 67 deletions.
46 changes: 34 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1655,17 +1655,32 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
if (pinfo.primaryNode() != source ||
pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
BALL_LOG_WARN_BLOCK
{
BALL_LOG_OUTPUT_STREAM
<< d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
if (pinfo.primaryNode()) {
BALL_LOG_OUTPUT_STREAM << " Ignoring advisory.";
}
else {
BALL_LOG_OUTPUT_STREAM << " Since we have not received any"
<< " information regarding the true"
<< " primary, this advisory could "
<< "be from the true one. Will"
<< " buffer the advisory for now.";
d_storageManager_p->bufferPrimaryStatusAdvisory(primaryAdv,
source);
}
}
return; // RETURN
}
}
Expand Down Expand Up @@ -1759,11 +1774,18 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(

// TBD: may need to review the order of invoking these routines.

BALL_LOG_INFO << d_clusterData_p->identity().description()
<< " PartitionId [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< ", from: " << source->nodeDescription();

BSLS_ASSERT_SAFE(ns->isPrimaryForPartition(primaryAdv.partitionId()));
d_stateManager_mp->setPrimaryStatus(primaryAdv.partitionId(),
primaryAdv.status());

d_storageManager_p->processPrimaryStatusAdvisory(primaryAdv, source);
if (!d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
d_storageManager_p->processPrimaryStatusAdvisory(primaryAdv, source);
}
}

void ClusterOrchestrator::processStateNotification(
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,15 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event,
source));
}

void StorageManager::bufferPrimaryStatusAdvisory(
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source)
{
// executed by *ANY* thread

BSLS_ASSERT_OPT(false && "This method should only be invoked in FSM mode");
}

void StorageManager::processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source)
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ class StorageManager : public mqbi::StorageManager {
processReceiptEvent(const bmqp::Event& event,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void bufferPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
Expand Down
25 changes: 16 additions & 9 deletions src/groups/mqb/mqbc/mqbc_partitionstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ class PartitionStateTableActions {

virtual void do_processBufferedLiveData(const ARGS& args) = 0;

virtual void
do_processBufferedPrimaryStatusAdvisories(const ARGS& args) = 0;

virtual void do_processLiveData(const ARGS& args) = 0;

virtual void do_processPut(const ARGS& args) = 0;
Expand Down Expand Up @@ -367,7 +370,8 @@ class PartitionStateTableActions {
void do_cleanupSeqnums_resetReceiveDataCtx_reapplyDetectSelfReplica(
const ARGS& args);

void do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog(
void
do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args);

void
Expand All @@ -382,7 +386,7 @@ class PartitionStateTableActions {
const ARGS& args);

void
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args);

void
Expand Down Expand Up @@ -579,10 +583,11 @@ class PartitionStateTable
REPLICA_DATA_RQST_PULL,
closeRecoveryFileSet_openStorage_startSendDataChunks,
REPLICA_HEALING);
PST_CFG(REPLICA_HEALING,
DONE_SENDING_DATA_CHUNKS,
replicaDataResponsePull_processBufferedLiveData_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
DONE_SENDING_DATA_CHUNKS,
replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
ERROR_SENDING_DATA_CHUNKS,
Expand All @@ -603,7 +608,7 @@ class PartitionStateTable
PST_CFG(
REPLICA_HEALING,
DONE_RECEIVING_DATA_CHUNKS,
replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog,
replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
Expand Down Expand Up @@ -939,11 +944,12 @@ void PartitionStateTableActions<ARGS>::

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args)
{
do_replicaDataResponsePull(args);
do_processBufferedLiveData(args);
do_processBufferedPrimaryStatusAdvisories(args);
do_stopWatchDog(args);
}

Expand Down Expand Up @@ -980,14 +986,15 @@ void PartitionStateTableActions<ARGS>::

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args)
{
do_replicaDataResponsePush(args);
do_resetReceiveDataCtx(args);
do_closeRecoveryFileSet(args);
do_openStorage(args);
do_processBufferedLiveData(args);
do_processBufferedPrimaryStatusAdvisories(args);
do_stopWatchDog(args);
}

Expand Down
Loading

0 comments on commit 060cd9f

Please sign in to comment.