diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 01dc25116..fa9b77562 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -906,41 +906,48 @@ void ClusterQueueHelper::processPendingContexts( void ClusterQueueHelper::assignUpstreamSubqueueId(OpenQueueContext* context) { - // If needed, generate upstream subQueueId - if (context->d_upstreamSubQueueId != - bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) { - return; // RETURN - } - QueueLiveState& info = context->d_queueContext_p->d_liveQInfo; const bsl::string appId = bmqp::QueueUtil::extractAppId( context->d_handleParameters); StreamsMap::const_iterator it = info.d_subQueueIds.findByAppIdSafe(appId); - unsigned int upstreamSubId; - if (it == info.d_subQueueIds.end()) { - bdlb::NullableValue subQueueIdInfo = - context->d_handleParameters.subIdInfo(); + // If needed, generate upstream subQueueId + if (context->d_upstreamSubQueueId == + bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) { + unsigned int upstreamSubId; + + if (it == info.d_subQueueIds.end()) { + bdlb::NullableValue subQueueIdInfo = + context->d_handleParameters.subIdInfo(); - if (appId == bmqp::ProtocolUtil::k_DEFAULT_APP_ID) { - upstreamSubId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID; + if (appId == bmqp::ProtocolUtil::k_DEFAULT_APP_ID) { + upstreamSubId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID; + } + else { + upstreamSubId = getNextSubQueueId(context); + subQueueIdInfo.value().subId() = upstreamSubId; + } + info.d_subQueueIds.insert( + appId, + upstreamSubId, + SubQueueContext(context->d_queueContext_p->uri(), + subQueueIdInfo, + d_allocator_p)); } else { - upstreamSubId = getNextSubQueueId(context); - subQueueIdInfo.value().subId() = upstreamSubId; + upstreamSubId = it->subId(); } + + context->d_upstreamSubQueueId = upstreamSubId; + } + else if (it == info.d_subQueueIds.end()) { info.d_subQueueIds.insert( appId, - upstreamSubId, + context->d_upstreamSubQueueId, SubQueueContext(context->d_queueContext_p->uri(), - subQueueIdInfo, + context->d_handleParameters.subIdInfo(), d_allocator_p)); } - else { - upstreamSubId = it->subId(); - } - - context->d_upstreamSubQueueId = upstreamSubId; } void ClusterQueueHelper::processOpenQueueRequest(