Skip to content

Commit

Permalink
Add support for 'isFSMWorkflow' flag and fix FSM workflow (#161)
Browse files Browse the repository at this point in the history
* Add support for 'isFSMWorkflow' flag and fix FSM workflow

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* FSM workflow: Fix compile errors

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* typedef DomainQueueMessagesCountMaps: mqbs::StorageUtil -> mqbc

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* PR#161: Address comments

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* FSM workflow: Adopt PR#174 'fix: mix of strong and weak consistencies'

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* blp::Cluster: Add back missing control message processing types

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* test_leader_node_delay.py: Fix log grepping post-FSM

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* run-tests: Also run integration tests in FSM mode

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::ClusterDataIdentity: Simplify the code

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* blp::ClusterProxy, mqbmock::Cluster: Use description from ClusterData

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::ClusterData: Incorporate mqbcfg::ClusterProxyDefinition

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* FSM workflow: Adopt PR#190 'Application subscriptions'

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* PR#161: Address comments

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Mar 26, 2024
1 parent 8d133d6 commit 1de0b4e
Show file tree
Hide file tree
Showing 45 changed files with 4,588 additions and 2,929 deletions.
33 changes: 31 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,19 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
&d_clusterData,
d_state,
d_clusterData.domainFactory(),
&d_clusterOrchestrator.queueHelper(),
dispatcher(),
k_PARTITION_FSM_WATCHDOG_TIMEOUT_DURATION,
bdlf::BindUtil::bind(&Cluster::onRecoveryStatus,
this,
bdlf::PlaceHolders::_1, // status
bsl::vector<unsigned int>(),
statRecorder),
bdlf::BindUtil::bind(
&ClusterOrchestrator::onPartitionPrimaryStatus,
&d_clusterOrchestrator,
bdlf::PlaceHolders::_1, // partitionId
bdlf::PlaceHolders::_2, // status
bdlf::PlaceHolders::_3), // primary leaseId
storageManagerAllocator))
: static_cast<mqbi::StorageManager*>(
new (*storageManagerAllocator) mqbblp::StorageManager(
Expand Down Expand Up @@ -2503,6 +2508,7 @@ Cluster::Cluster(const bslstl::StringRef& name,
bufferFactory,
blobSpPool,
clusterConfig,
mqbcfg::ClusterProxyDefinition(allocator),
netCluster,
this,
domainFactory,
Expand Down Expand Up @@ -3148,6 +3154,23 @@ void Cluster::processClusterControlMessage(
source),
this);
} break; // BREAK
case MsgChoice::SELECTION_ID_CLUSTER_STATE_F_S_M_MESSAGE: {
dispatcher()->execute(
bdlf::BindUtil::bind(
&ClusterOrchestrator::processClusterStateFSMMessage,
&d_clusterOrchestrator,
message,
source),
this);
} break; // BREAK
case MsgChoice::SELECTION_ID_PARTITION_MESSAGE: {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::processPartitionMessage,
&d_clusterOrchestrator,
message,
source),
this);
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED:
default: {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
Expand Down Expand Up @@ -3605,7 +3628,13 @@ Cluster::sendRequest(const Cluster::RequestManagerType::RequestSp& request,
mqbnet::ClusterNode* target,
bsls::TimeInterval timeout)
{
// executed by the cluster *DISPATCHER* thread
// executed by the cluster *DISPATCHER* thread or the *QUEUE DISPATCHER*
// thread
//
// It is safe to invoke this function from non-cluster threads because
// `d_clusterData.electorInfo().leaderNodeId()` is only modified upon
// elector transition, while `d_clusterData.requestManager().sendRequest`
// is guarded by a mutex on its own.

// PRECONDITIONS
BSLS_ASSERT_SAFE(target != 0 ||
Expand Down
172 changes: 158 additions & 14 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mqbc_incoreclusterstateledger.h>
#include <mqbcmd_messages.h>
#include <mqbi_queueengine.h>
#include <mqbi_storagemanager.h>
#include <mqbnet_cluster.h>

// MWC
Expand All @@ -40,6 +41,7 @@
#include <bsl_cstddef.h> // size_t
#include <bsl_string.h>
#include <bsl_vector.h>
#include <bsls_annotation.h>
#include <bsls_assert.h>
#include <bsls_timeinterval.h>

Expand Down Expand Up @@ -526,7 +528,7 @@ void ClusterOrchestrator::timerCbDispatched()
continue; // CONTINUE
}

d_clusterData_p->cluster()->dispatcher()->execute(
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::onQueueActivityTimer,
this,
timer,
Expand Down Expand Up @@ -892,6 +894,116 @@ void ClusterOrchestrator::processStopRequest(
processNodeStoppingNotification(ns, &request);
}

void ClusterOrchestrator::processClusterStateFSMMessage(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
d_clusterConfig.clusterAttributes().isFSMWorkflow());
BSLS_ASSERT(message.choice().isClusterMessageValue());
BSLS_ASSERT(message.choice()
.clusterMessage()
.choice()
.isClusterStateFSMMessageValue());

typedef bmqp_ctrlmsg::ClusterStateFSMMessageChoice MsgChoice; // shortcut
switch (message.choice()
.clusterMessage()
.choice()
.clusterStateFSMMessage()
.choice()
.selectionId()) {
case MsgChoice::SELECTION_ID_FOLLOWER_L_S_N_REQUEST: {
d_stateManager_mp->processFollowerLSNRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_REGISTRATION_REQUEST: {
d_stateManager_mp->processRegistrationRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_FOLLOWER_CLUSTER_STATE_REQUEST: {
d_stateManager_mp->processFollowerClusterStateRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_FOLLOWER_L_S_N_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_REGISTRATION_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_FOLLOWER_CLUSTER_STATE_RESPONSE: {
BSLS_ASSERT_SAFE(!message.rId().isNull());

d_cluster_p->processResponse(message);
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED: BSLS_ANNOTATION_FALLTHROUGH;
default: {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description()
<< ": unexpected clusterMessage:" << message
<< MWCTSK_ALARMLOG_END;
} break; // BREAK
}
}

void ClusterOrchestrator::processPartitionMessage(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
d_clusterConfig.clusterAttributes().isFSMWorkflow());
BSLS_ASSERT(message.choice().isClusterMessageValue());
BSLS_ASSERT(
message.choice().clusterMessage().choice().isPartitionMessageValue());

if (bmqp_ctrlmsg::NodeStatus::E_STOPPING ==
d_clusterData_p->membership().selfNodeStatus()) {
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Not processing partition message : " << message
<< " from " << source->nodeDescription()
<< " since self is stopping";

return; // RETURN
}

typedef bmqp_ctrlmsg::PartitionMessageChoice MsgChoice; // shortcut
switch (message.choice()
.clusterMessage()
.choice()
.partitionMessage()
.choice()
.selectionId()) {
case MsgChoice::SELECTION_ID_REPLICA_STATE_REQUEST: {
d_storageManager_p->processReplicaStateRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_PRIMARY_STATE_REQUEST: {
d_storageManager_p->processPrimaryStateRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_REPLICA_DATA_REQUEST: {
d_storageManager_p->processReplicaDataRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_REPLICA_STATE_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_PRIMARY_STATE_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_REPLICA_DATA_RESPONSE: {
BSLS_ASSERT_SAFE(!message.rId().isNull());

d_cluster_p->processResponse(message);
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED: BSLS_ANNOTATION_FALLTHROUGH;
default: {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description()
<< ": unexpected clusterMessage:" << message
<< MWCTSK_ALARMLOG_END;
} break; // BREAK
}
}

void ClusterOrchestrator::processNodeStoppingNotification(
mqbc::ClusterNodeSession* ns,
const bmqp_ctrlmsg::ControlMessage* request)
Expand Down Expand Up @@ -1006,6 +1118,11 @@ void ClusterOrchestrator::processNodeStatusAdvisory(
source,
partitions);
}
else if (d_clusterConfig.clusterAttributes().isFSMWorkflow() &&
source->nodeId() ==
d_clusterData_p->electorInfo().leaderNodeId()) {
d_queueHelper.onLeaderAvailable();
}

// For each partition for which self is primary, notify the storageMgr
// about the status of a peer node. Self may end up issuing a primary
Expand Down Expand Up @@ -1174,6 +1291,16 @@ void ClusterOrchestrator::processElectorEvent(const bmqp::Event& event,
return; // RETURN
}

if (d_clusterData_p->membership().selfNodeStatus() ==
bmqp_ctrlmsg::NodeStatus::E_STOPPING) {
// No need to process the event since self is stopping.
BALL_LOG_INFO << d_cluster_p->description()
<< ": Not processing elector event from node "
<< source->nodeDescription()
<< " since self is stopping.";
return; // RETURN
}

// Enqueue elector events in the dispatcher thread as well. Note that its
// important that elector events are processed in the dispatcher thread
// too, otherwise, depending upon thread scheduling, a new node may get
Expand Down Expand Up @@ -1488,18 +1615,19 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
.choice()
.isPrimaryStatusAdvisoryValue());

const bmqp_ctrlmsg::PrimaryStatusAdvisory& primaryAdv =
message.choice().clusterMessage().choice().primaryStatusAdvisory();

if (d_clusterData_p->membership().selfNodeStatus() ==
bmqp_ctrlmsg::NodeStatus::E_STOPPING) {
// No need to process the advisory since self is stopping.
BALL_LOG_INFO << d_cluster_p->description()
<< ": Not processing primary status advisory since"
BALL_LOG_INFO << d_cluster_p->description() << " Partition ["
<< primaryAdv.partitionId() << "]: "
<< "Not processing primary status advisory since"
<< " self is stopping.";
return; // RETURN
}

const bmqp_ctrlmsg::PrimaryStatusAdvisory& primaryAdv =
message.choice().clusterMessage().choice().primaryStatusAdvisory();

if (0 > primaryAdv.partitionId() ||
static_cast<size_t>(primaryAdv.partitionId()) >=
clusterState()->partitions().size()) {
Expand All @@ -1524,13 +1652,30 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
d_clusterData_p->membership().getClusterNodeSession(source);
BSLS_ASSERT_SAFE(ns);

if (!d_stateManager_mp->isFirstLeaderAdvisory()) {
if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
if (pinfo.primaryNode() != source ||
pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
return; // RETURN
}
}
else if (!d_stateManager_mp->isFirstLeaderAdvisory()) {
// Self node has heard from the leader at least once. Perform
// additional validations.

if (pinfo.primaryNode() != source) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": PartitionId [" << primaryAdv.partitionId()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
Expand All @@ -1543,7 +1688,7 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(

if (pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": PartitionId [" << primaryAdv.partitionId()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv << " from perceived primary: "
<< source->nodeDescription()
Expand All @@ -1553,9 +1698,8 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
}
}
else {
// TODO_CSL Address this case as part of CSL node startup sequence
// changes. Also remove `ClusterStateManager::setPrimary()` when this
// code is removed.
// TODO Remove `mqbi::ClusterStateManager::setPrimary()` when this code
// is removed.

// Self node has not heard from the leader until now. We update the
// ClusterState as well as forward the advisory to StorageMgr, but log
Expand All @@ -1581,7 +1725,7 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
// 'StorageMgr::processPrimaryStatusAdvisoryDispatched' as well.

BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " PartitionId [" << primaryAdv.partitionId()
<< " Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< ", from: " << source->nodeDescription()
<< ", before receiving leader advisory. Current "
Expand Down Expand Up @@ -1611,7 +1755,7 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
}
}

// In either case, update primary status and inform the storageMgr.
// In any case, update primary status and inform the storageMgr.

// TBD: may need to review the order of invoking these routines.

Expand Down
17 changes: 17 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,23 @@ class ClusterOrchestrator {
void processStopRequest(const bmqp_ctrlmsg::ControlMessage& request,
mqbnet::ClusterNode* source);

// Process the specified cluster state FSM 'message' from the specified
// 'source'.
//
// THREAD: This method is invoked in the associated cluster's
// dispatcher thread.
void
processClusterStateFSMMessage(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source);

// Process the specified partition FSM 'message' from the specified
// 'source'.
//
// THREAD: This method is invoked in the associated cluster's
// dispatcher thread.
void processPartitionMessage(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source);

/// Invoked by `mqbblp::Cluster` when recovery has succeeded.
///
/// TBD: this is mostly temporary.
Expand Down
7 changes: 3 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
SessionSpVec sessions;
bsls::TimeInterval shutdownTimeout;
shutdownTimeout.addMilliseconds(
d_clusterProxyConfig.queueOperations().shutdownTimeoutMs());
clusterProxyConfig()->queueOperations().shutdownTimeoutMs());

for (mqbnet::TransportManagerIterator sessIt(
d_clusterData.transportManager());
Expand Down Expand Up @@ -1020,13 +1020,12 @@ ClusterProxy::ClusterProxy(
: d_allocator_p(allocator)
, d_isStarted(false)
, d_isStopping(false)
, d_description("ClusterProxy (" + name + ")", d_allocator_p)
, d_clusterProxyConfig(clusterProxyConfig)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
mqbcfg::ClusterDefinition(allocator),
clusterProxyConfig,
netCluster,
this,
0,
Expand All @@ -1038,7 +1037,7 @@ ClusterProxy::ClusterProxy(
0, // Partition count. Proxy has no notion of partition.
allocator)
, d_activeNodeManager(d_clusterData.membership().netCluster()->nodes(),
d_description,
description(),
mqbcfg::BrokerConfig::get().hostDataCenter())
, d_queueHelper(&d_clusterData, &d_state, 0, allocator)
, d_nodeStatsMap(allocator)
Expand Down
Loading

0 comments on commit 1de0b4e

Please sign in to comment.