Skip to content

Commit

Permalink
refactor: Getting rid of (un)assigningCb (#507)
Browse files Browse the repository at this point in the history
* Getting rid of (un)assigningCb

Signed-off-by: dorjesinpo <[email protected]>

* Getting rid of (un)assigningCb

Signed-off-by: dorjesinpo <[email protected]>

* fix: handle pending assignment case

Signed-off-by: dorjesinpo <[email protected]>

* Addressing review; fixing bad merge

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Dec 10, 2024
1 parent 81f9fe2 commit acf74d3
Show file tree
Hide file tree
Showing 14 changed files with 442 additions and 740 deletions.
343 changes: 86 additions & 257 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp

Large diffs are not rendered by default.

55 changes: 11 additions & 44 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
/// Reset the `id`, `partitionId`, `key` and `queue` members of this
/// object. Note that `uri` is left untouched because it is an
/// invariant member of a given instance of such a QueueInfo object.
void reset();
void resetButKeepPending();
};

struct StopContext {
Expand Down Expand Up @@ -523,29 +523,6 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
QueueAssignmentResult::Enum
assignQueue(const QueueContextSp& queueContext);

/// Called when the specified `uri` is in the process of being assigned.
/// If the specified `processingPendingRequests` is true, we will
/// process pending requests on this machine.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueAssigning(const bmqt::Uri& uri,
bool processingPendingRequests);

/// Called when the queue with the specified `queueInfo` is being
/// unassigned. Load into the specified `hasInFlightRequests` whether
/// there are still in-flight requests for the queue. Return true on
/// success, or false on failure.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
///
/// TODO_CSL: This is the current workflow which we should be able to
/// remove after the new workflow via
/// ClusterQueueHelper::onQueueUnassigned() is stable.
bool onQueueUnassigning(bool* hasInFlightRequests,
const bmqp_ctrlmsg::QueueInfo& queueInfo);

/// Send a queueAssignment request to the leader, requesting assignment
/// of the queue with the specified `uri`. This method is called only
/// on a non leader node of a cluster member, for a cluster having a
Expand Down Expand Up @@ -978,15 +955,17 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueAssigned(const mqbc::ClusterStateQueueInfo& info)
void
onQueueAssigned(const bsl::shared_ptr<mqbc::ClusterStateQueueInfo>& info)
BSLS_KEYWORD_OVERRIDE;

/// Callback invoked when a queue with the specified `info` gets
/// unassigned from the cluster.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void onQueueUnassigned(const mqbc::ClusterStateQueueInfo& info)
void
onQueueUnassigned(const bsl::shared_ptr<mqbc::ClusterStateQueueInfo>& info)
BSLS_KEYWORD_OVERRIDE;

/// Callback invoked when a queue with the specified `uri` belonging to
Expand Down Expand Up @@ -1109,13 +1088,6 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
/// failure.
int gcExpiredQueues(bool immediate = false);

ClusterQueueHelper& setOnQueueAssignedCb(const OnQueueAssignedCb& value);

/// Set the corresponding member to the specified `value` and return a
/// reference offering modifiable access to this object.
ClusterQueueHelper&
setOnQueueUnassignedCb(const OnQueueUnassignedCb& value);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand All @@ -1135,8 +1107,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
mqbc::ClusterNodeSession* ns,
const VoidFunctor& callback = VoidFunctor());

/// Called upon leader becoming available.
void onLeaderAvailable();
// Called upon leader becoming available.

// ACCESSORS

Expand Down Expand Up @@ -1266,21 +1238,16 @@ ClusterQueueHelper::isQueueAssigned(const QueueContext& queueContext) const
bmqp::QueueId::k_UNASSIGNED_QUEUE_ID; // RETURN
}

DomainStatesCIter domCit = d_clusterState_p->domainStates().find(
queueContext.uri().qualifiedDomain());
if (domCit == d_clusterState_p->domainStates().cend()) {
return false; // RETURN
}

UriToQueueInfoMapCIter qCit = domCit->second->queuesInfo().find(
mqbc::ClusterStateQueueInfo* assigned = d_clusterState_p->getAssigned(
queueContext.uri());
if (qCit == domCit->second->queuesInfo().cend()) {

if (assigned == 0) {
return false; // RETURN
}

BSLS_ASSERT_SAFE(qCit->second->partitionId() !=
BSLS_ASSERT_SAFE(assigned->partitionId() !=
mqbs::DataStore::k_INVALID_PARTITION_ID &&
!qCit->second->key().isNull());
!assigned->key().isNull());
return true;
}

Expand Down
Loading

0 comments on commit acf74d3

Please sign in to comment.