Skip to content

Commit

Permalink
mqbc::ClusterUtil: Revert to not populate appId infos in non-CSL mode (
Browse files Browse the repository at this point in the history
…#408)

* mqbc::ClusterUtil: Revert to not populate appId infos in non-CSL mode

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* test_restart.py: temporarily disable non-FSM to FSM tests

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Aug 26, 2024
1 parent d526712 commit 71a9660
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 36 deletions.
12 changes: 8 additions & 4 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,8 @@ void ClusterUtil::populateQueueAssignmentAdvisory(
ClusterState* clusterState,
ClusterData* clusterData,
const bmqt::Uri& uri,
const mqbi::Domain* domain)
const mqbi::Domain* domain,
bool isCSLMode)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(advisory);
Expand All @@ -809,8 +810,10 @@ void ClusterUtil::populateQueueAssignmentAdvisory(
uri.asString());
key->loadBinary(&queueInfo.key());

// Generate appIds and appKeys
populateAppIdInfos(&queueInfo.appIds(), domain->config().mode());
if (isCSLMode) {
// Generate appIds and appKeys
populateAppIdInfos(&queueInfo.appIds(), domain->config().mode());
}

BALL_LOG_INFO << clusterData->identity().description()
<< ": Populated QueueAssignmentAdvisory: " << *advisory;
Expand Down Expand Up @@ -1004,7 +1007,8 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
clusterState,
clusterData,
uri,
domIt->second->domain());
domIt->second->domain(),
cluster->isCSLModeEnabled());
if (cluster->isCSLModeEnabled()) {
// In CSL mode, we delay the insertion to queueKeys until
// 'onQueueAssigned' observer callback.
Expand Down
6 changes: 4 additions & 2 deletions src/groups/mqb/mqbc/mqbc_clusterutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,16 @@ struct ClusterUtil {
/// Populate the specified `advisory` with information describing a
/// queue assignment of the specified `uri` living in the specified
/// `domain`, using the specified `clusterState`, `clusterData`. Load into
/// the specified `key` the unique queue key generated.
/// the specified `key` the unique queue key generated. AppIds info will
/// not be populated if the specified `isCSLMode` is true.
static void populateQueueAssignmentAdvisory(
bmqp_ctrlmsg::QueueAssignmentAdvisory* advisory,
mqbu::StorageKey* key,
ClusterState* clusterState,
ClusterData* clusterData,
const bmqt::Uri& uri,
const mqbi::Domain* domain);
const mqbi::Domain* domain,
bool isCSLMode);

/// Populate the specified `advisory` with information describing a
/// queue unassignment of the specified `uri` having the specified `key`
Expand Down
64 changes: 34 additions & 30 deletions src/integration-tests/test_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,33 +143,37 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster):

cluster.stop_nodes()

# Reconfigure the cluster from non-FSM to FSM mode
for broker in cluster.configurator.brokers.values():
my_clusters = broker.clusters.my_clusters
if len(my_clusters) > 0:
my_clusters[0].cluster_attributes.is_cslmode_enabled = True
my_clusters[0].cluster_attributes.is_fsmworkflow = True
cluster.deploy_domains()

cluster.start_nodes(wait_leader=True, wait_ready=True)
# For a standard cluster, states have already been restored as part of
# leader re-election.
if cluster.is_single_node:
producer.wait_state_restored()

producer.post(tc.URI_PRIORITY, payload=["msg2"], wait_ack=True, succeed=True)
producer.post(tc.URI_FANOUT, payload=["fanout_msg2"], wait_ack=True, succeed=True)

# Consumer for priority queue
consumer = next(proxies).create_client("consumer")
consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True)
consumer.wait_push_event()
assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2)

# Consumer for fanout queue
consumer_fanout = next(proxies).create_client("consumer_fanout")
consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
consumer_fanout.wait_push_event()
assert wait_until(
lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2
)
# TODO: Disable reconfiguring the cluster for now, as we are resolving a
# compatibility issue as described at
# https://github.com/bloomberg/blazingmq/pull/408

# # Reconfigure the cluster from non-FSM to FSM mode
# for broker in cluster.configurator.brokers.values():
# my_clusters = broker.clusters.my_clusters
# if len(my_clusters) > 0:
# my_clusters[0].cluster_attributes.is_cslmode_enabled = True
# my_clusters[0].cluster_attributes.is_fsmworkflow = True
# cluster.deploy_domains()

# cluster.start_nodes(wait_leader=True, wait_ready=True)
# # For a standard cluster, states have already been restored as part of
# # leader re-election.
# if cluster.is_single_node:
# producer.wait_state_restored()

# producer.post(tc.URI_PRIORITY, payload=["msg2"], wait_ack=True, succeed=True)
# producer.post(tc.URI_FANOUT, payload=["fanout_msg2"], wait_ack=True, succeed=True)

# # Consumer for priority queue
# consumer = next(proxies).create_client("consumer")
# consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True)
# consumer.wait_push_event()
# assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2)

# # Consumer for fanout queue
# consumer_fanout = next(proxies).create_client("consumer_fanout")
# consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
# consumer_fanout.wait_push_event()
# assert wait_until(
# lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2
# )

0 comments on commit 71a9660

Please sign in to comment.