Skip to content

Commit

Permalink
re-insert queue tracking if needed
Browse files Browse the repository at this point in the history
Signed-off-by: Vitaly Dzhitenov <[email protected]>
  • Loading branch information
dorjesinpo committed Nov 17, 2023
1 parent cd34cf5 commit cef0617
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -906,41 +906,50 @@ void ClusterQueueHelper::processPendingContexts(

void ClusterQueueHelper::assignUpstreamSubqueueId(OpenQueueContext* context)
{
// If needed, generate upstream subQueueId
if (context->d_upstreamSubQueueId !=
bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) {
return; // RETURN
}


QueueLiveState& info = context->d_queueContext_p->d_liveQInfo;
const bsl::string appId = bmqp::QueueUtil::extractAppId(
context->d_handleParameters);
StreamsMap::const_iterator it = info.d_subQueueIds.findByAppIdSafe(appId);
unsigned int upstreamSubId;

if (it == info.d_subQueueIds.end()) {
bdlb::NullableValue<bmqp_ctrlmsg::SubQueueIdInfo> subQueueIdInfo =
context->d_handleParameters.subIdInfo();

if (appId == bmqp::ProtocolUtil::k_DEFAULT_APP_ID) {
upstreamSubId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID;
// If needed, generate upstream subQueueId
if (context->d_upstreamSubQueueId ==
bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) {
unsigned int upstreamSubId;

if (it == info.d_subQueueIds.end()) {
bdlb::NullableValue<bmqp_ctrlmsg::SubQueueIdInfo> subQueueIdInfo =
context->d_handleParameters.subIdInfo();

if (appId == bmqp::ProtocolUtil::k_DEFAULT_APP_ID) {
upstreamSubId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID;
}
else {
upstreamSubId = getNextSubQueueId(context);
subQueueIdInfo.value().subId() = upstreamSubId;
}
info.d_subQueueIds.insert(
appId,
upstreamSubId,
SubQueueContext(context->d_queueContext_p->uri(),
subQueueIdInfo,
d_allocator_p));
}
else {
upstreamSubId = getNextSubQueueId(context);
subQueueIdInfo.value().subId() = upstreamSubId;
upstreamSubId = it->subId();
}

context->d_upstreamSubQueueId = upstreamSubId;
} else if (it == info.d_subQueueIds.end()) {
info.d_subQueueIds.insert(
appId,
upstreamSubId,
context->d_upstreamSubQueueId,
SubQueueContext(context->d_queueContext_p->uri(),
subQueueIdInfo,
context->d_handleParameters.subIdInfo(),
d_allocator_p));
}
else {
upstreamSubId = it->subId();
}

context->d_upstreamSubQueueId = upstreamSubId;
}

void ClusterQueueHelper::processOpenQueueRequest(
Expand Down

0 comments on commit cef0617

Please sign in to comment.