Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for 'isFSMWorkflow' flag and fix FSM workflow #161

Merged
merged 13 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,19 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
&d_clusterData,
d_state,
d_clusterData.domainFactory(),
&d_clusterOrchestrator.queueHelper(),
dispatcher(),
k_PARTITION_FSM_WATCHDOG_TIMEOUT_DURATION,
bdlf::BindUtil::bind(&Cluster::onRecoveryStatus,
this,
bdlf::PlaceHolders::_1, // status
bsl::vector<unsigned int>(),
statRecorder),
bdlf::BindUtil::bind(
&ClusterOrchestrator::onPartitionPrimaryStatus,
&d_clusterOrchestrator,
bdlf::PlaceHolders::_1, // partitionId
bdlf::PlaceHolders::_2, // status
bdlf::PlaceHolders::_3), // primary leaseId
storageManagerAllocator))
: static_cast<mqbi::StorageManager*>(
new (*storageManagerAllocator) mqbblp::StorageManager(
Expand Down Expand Up @@ -2503,6 +2508,7 @@ Cluster::Cluster(const bslstl::StringRef& name,
bufferFactory,
blobSpPool,
clusterConfig,
mqbcfg::ClusterProxyDefinition(allocator),
netCluster,
this,
domainFactory,
Expand Down Expand Up @@ -3148,6 +3154,23 @@ void Cluster::processClusterControlMessage(
source),
this);
} break; // BREAK
case MsgChoice::SELECTION_ID_CLUSTER_STATE_F_S_M_MESSAGE: {
dispatcher()->execute(
bdlf::BindUtil::bind(
&ClusterOrchestrator::processClusterStateFSMMessage,
&d_clusterOrchestrator,
message,
source),
this);
} break; // BREAK
case MsgChoice::SELECTION_ID_PARTITION_MESSAGE: {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::processPartitionMessage,
&d_clusterOrchestrator,
message,
source),
this);
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED:
default: {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
Expand Down Expand Up @@ -3605,7 +3628,13 @@ Cluster::sendRequest(const Cluster::RequestManagerType::RequestSp& request,
mqbnet::ClusterNode* target,
bsls::TimeInterval timeout)
{
// executed by the cluster *DISPATCHER* thread
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that not in the Dispatcher thread anymore? If so, need to comment on the thread safety

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is now possible to be invoked in the queue dispatcher thread by mqbc::StorageManager:

bmqt::GenericResult::Enum status = d_clusterData_p->cluster()->sendRequest(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ElectorInfo::setElectorInfo is "executed by the cluster DISPATCHER thread" so we need to put some warning about thread safety. We consider the access to d_leaderNode_p (and everything else here) thread-safe.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comment about thread safety. Importantly, Cluster::sendRequest() has been invoked in queue dispatcher threads since before FSM, for example at RecoveryManager::onPartitionSyncStateQueryResponseDispatched().

// executed by the cluster *DISPATCHER* thread or the *QUEUE DISPATCHER*
// thread
//
// It is safe to invoke this function from non-cluster threads because
// `d_clusterData.electorInfo().leaderNodeId()` is only modified upon
// elector transition, while `d_clusterData.requestManager().sendRequest`
// is guarded by a mutex on its own.

// PRECONDITIONS
BSLS_ASSERT_SAFE(target != 0 ||
Expand Down
172 changes: 158 additions & 14 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mqbc_incoreclusterstateledger.h>
#include <mqbcmd_messages.h>
#include <mqbi_queueengine.h>
#include <mqbi_storagemanager.h>
#include <mqbnet_cluster.h>

// MWC
Expand All @@ -40,6 +41,7 @@
#include <bsl_cstddef.h> // size_t
#include <bsl_string.h>
#include <bsl_vector.h>
#include <bsls_annotation.h>
#include <bsls_assert.h>
#include <bsls_timeinterval.h>

Expand Down Expand Up @@ -526,7 +528,7 @@ void ClusterOrchestrator::timerCbDispatched()
continue; // CONTINUE
}

d_clusterData_p->cluster()->dispatcher()->execute(
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::onQueueActivityTimer,
this,
timer,
Expand Down Expand Up @@ -892,6 +894,116 @@ void ClusterOrchestrator::processStopRequest(
processNodeStoppingNotification(ns, &request);
}

void ClusterOrchestrator::processClusterStateFSMMessage(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
d_clusterConfig.clusterAttributes().isFSMWorkflow());
BSLS_ASSERT(message.choice().isClusterMessageValue());
BSLS_ASSERT(message.choice()
.clusterMessage()
.choice()
.isClusterStateFSMMessageValue());

typedef bmqp_ctrlmsg::ClusterStateFSMMessageChoice MsgChoice; // shortcut
switch (message.choice()
.clusterMessage()
.choice()
.clusterStateFSMMessage()
.choice()
.selectionId()) {
case MsgChoice::SELECTION_ID_FOLLOWER_L_S_N_REQUEST: {
d_stateManager_mp->processFollowerLSNRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_REGISTRATION_REQUEST: {
d_stateManager_mp->processRegistrationRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_FOLLOWER_CLUSTER_STATE_REQUEST: {
d_stateManager_mp->processFollowerClusterStateRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_FOLLOWER_L_S_N_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_REGISTRATION_RESPONSE:
678098 marked this conversation as resolved.
Show resolved Hide resolved
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_FOLLOWER_CLUSTER_STATE_RESPONSE: {
BSLS_ASSERT_SAFE(!message.rId().isNull());

d_cluster_p->processResponse(message);
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED: BSLS_ANNOTATION_FALLTHROUGH;
default: {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description()
<< ": unexpected clusterMessage:" << message
<< MWCTSK_ALARMLOG_END;
} break; // BREAK
}
}

void ClusterOrchestrator::processPartitionMessage(
const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
d_clusterConfig.clusterAttributes().isFSMWorkflow());
BSLS_ASSERT(message.choice().isClusterMessageValue());
BSLS_ASSERT(
message.choice().clusterMessage().choice().isPartitionMessageValue());

if (bmqp_ctrlmsg::NodeStatus::E_STOPPING ==
d_clusterData_p->membership().selfNodeStatus()) {
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": Not processing partition message : " << message
<< " from " << source->nodeDescription()
<< " since self is stopping";

return; // RETURN
}

typedef bmqp_ctrlmsg::PartitionMessageChoice MsgChoice; // shortcut
switch (message.choice()
.clusterMessage()
.choice()
.partitionMessage()
.choice()
.selectionId()) {
case MsgChoice::SELECTION_ID_REPLICA_STATE_REQUEST: {
d_storageManager_p->processReplicaStateRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_PRIMARY_STATE_REQUEST: {
d_storageManager_p->processPrimaryStateRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_REPLICA_DATA_REQUEST: {
d_storageManager_p->processReplicaDataRequest(message, source);
} break; // BREAK
case MsgChoice::SELECTION_ID_REPLICA_STATE_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_PRIMARY_STATE_RESPONSE:
BSLS_ANNOTATION_FALLTHROUGH;
case MsgChoice::SELECTION_ID_REPLICA_DATA_RESPONSE: {
BSLS_ASSERT_SAFE(!message.rId().isNull());

d_cluster_p->processResponse(message);
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED: BSLS_ANNOTATION_FALLTHROUGH;
default: {
MWCTSK_ALARMLOG_ALARM("CLUSTER")
<< d_clusterData_p->identity().description()
<< ": unexpected clusterMessage:" << message
<< MWCTSK_ALARMLOG_END;
} break; // BREAK
}
}

void ClusterOrchestrator::processNodeStoppingNotification(
mqbc::ClusterNodeSession* ns,
const bmqp_ctrlmsg::ControlMessage* request)
Expand Down Expand Up @@ -1006,6 +1118,11 @@ void ClusterOrchestrator::processNodeStatusAdvisory(
source,
partitions);
}
else if (d_clusterConfig.clusterAttributes().isFSMWorkflow() &&
source->nodeId() ==
d_clusterData_p->electorInfo().leaderNodeId()) {
d_queueHelper.onLeaderAvailable();
}

// For each partition for which self is primary, notify the storageMgr
// about the status of a peer node. Self may end up issuing a primary
Expand Down Expand Up @@ -1174,6 +1291,16 @@ void ClusterOrchestrator::processElectorEvent(const bmqp::Event& event,
return; // RETURN
}

if (d_clusterData_p->membership().selfNodeStatus() ==
bmqp_ctrlmsg::NodeStatus::E_STOPPING) {
// No need to process the event since self is stopping.
BALL_LOG_INFO << d_cluster_p->description()
<< ": Not processing elector event from node "
<< source->nodeDescription()
<< " since self is stopping.";
return; // RETURN
}

// Enqueue elector events in the dispatcher thread as well. Note that its
// important that elector events are processed in the dispatcher thread
// too, otherwise, depending upon thread scheduling, a new node may get
Expand Down Expand Up @@ -1488,18 +1615,19 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
.choice()
.isPrimaryStatusAdvisoryValue());

const bmqp_ctrlmsg::PrimaryStatusAdvisory& primaryAdv =
message.choice().clusterMessage().choice().primaryStatusAdvisory();

if (d_clusterData_p->membership().selfNodeStatus() ==
bmqp_ctrlmsg::NodeStatus::E_STOPPING) {
// No need to process the advisory since self is stopping.
BALL_LOG_INFO << d_cluster_p->description()
<< ": Not processing primary status advisory since"
BALL_LOG_INFO << d_cluster_p->description() << " Partition ["
<< primaryAdv.partitionId() << "]: "
<< "Not processing primary status advisory since"
<< " self is stopping.";
return; // RETURN
}

const bmqp_ctrlmsg::PrimaryStatusAdvisory& primaryAdv =
message.choice().clusterMessage().choice().primaryStatusAdvisory();

if (0 > primaryAdv.partitionId() ||
static_cast<size_t>(primaryAdv.partitionId()) >=
clusterState()->partitions().size()) {
Expand All @@ -1524,13 +1652,30 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
d_clusterData_p->membership().getClusterNodeSession(source);
BSLS_ASSERT_SAFE(ns);

if (!d_stateManager_mp->isFirstLeaderAdvisory()) {
if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
if (pinfo.primaryNode() != source ||
pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
return; // RETURN
}
}
else if (!d_stateManager_mp->isFirstLeaderAdvisory()) {
// Self node has heard from the leader at least once. Perform
// additional validations.

if (pinfo.primaryNode() != source) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": PartitionId [" << primaryAdv.partitionId()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
Expand All @@ -1543,7 +1688,7 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(

if (pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": PartitionId [" << primaryAdv.partitionId()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv << " from perceived primary: "
<< source->nodeDescription()
Expand All @@ -1553,9 +1698,8 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
}
}
else {
// TODO_CSL Address this case as part of CSL node startup sequence
// changes. Also remove `ClusterStateManager::setPrimary()` when this
// code is removed.
// TODO Remove `mqbi::ClusterStateManager::setPrimary()` when this code
// is removed.

// Self node has not heard from the leader until now. We update the
// ClusterState as well as forward the advisory to StorageMgr, but log
Expand All @@ -1581,7 +1725,7 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
// 'StorageMgr::processPrimaryStatusAdvisoryDispatched' as well.

BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " PartitionId [" << primaryAdv.partitionId()
<< " Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< ", from: " << source->nodeDescription()
<< ", before receiving leader advisory. Current "
Expand Down Expand Up @@ -1611,7 +1755,7 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
}
}

// In either case, update primary status and inform the storageMgr.
// In any case, update primary status and inform the storageMgr.

// TBD: may need to review the order of invoking these routines.

Expand Down
17 changes: 17 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,23 @@ class ClusterOrchestrator {
void processStopRequest(const bmqp_ctrlmsg::ControlMessage& request,
mqbnet::ClusterNode* source);

// Process the specified cluster state FSM 'message' from the specified
// 'source'.
//
// THREAD: This method is invoked in the associated cluster's
// dispatcher thread.
void
processClusterStateFSMMessage(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source);

// Process the specified partition FSM 'message' from the specified
// 'source'.
//
// THREAD: This method is invoked in the associated cluster's
// dispatcher thread.
void processPartitionMessage(const bmqp_ctrlmsg::ControlMessage& message,
mqbnet::ClusterNode* source);

/// Invoked by `mqbblp::Cluster` when recovery has succeeded.
///
/// TBD: this is mostly temporary.
Expand Down
7 changes: 3 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
SessionSpVec sessions;
bsls::TimeInterval shutdownTimeout;
shutdownTimeout.addMilliseconds(
d_clusterProxyConfig.queueOperations().shutdownTimeoutMs());
clusterProxyConfig()->queueOperations().shutdownTimeoutMs());

for (mqbnet::TransportManagerIterator sessIt(
d_clusterData.transportManager());
Expand Down Expand Up @@ -1020,13 +1020,12 @@ ClusterProxy::ClusterProxy(
: d_allocator_p(allocator)
, d_isStarted(false)
, d_isStopping(false)
, d_description("ClusterProxy (" + name + ")", d_allocator_p)
, d_clusterProxyConfig(clusterProxyConfig)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
mqbcfg::ClusterDefinition(allocator),
clusterProxyConfig,
netCluster,
this,
0,
Expand All @@ -1038,7 +1037,7 @@ ClusterProxy::ClusterProxy(
0, // Partition count. Proxy has no notion of partition.
allocator)
, d_activeNodeManager(d_clusterData.membership().netCluster()->nodes(),
d_description,
description(),
mqbcfg::BrokerConfig::get().hostDataCenter())
, d_queueHelper(&d_clusterData, &d_state, 0, allocator)
, d_nodeStatsMap(allocator)
Expand Down
Loading
Loading