diff --git a/src/applications/bmqtool/m_bmqtool_inpututil.cpp b/src/applications/bmqtool/m_bmqtool_inpututil.cpp index b870b9324..e11691080 100644 --- a/src/applications/bmqtool/m_bmqtool_inpututil.cpp +++ b/src/applications/bmqtool/m_bmqtool_inpututil.cpp @@ -173,6 +173,8 @@ void InputUtil::verifyProperties( bsl::unordered_set pairs; + pairs.insert("pairs_"); + while (it.hasNext()) { bsl::string name = it.name(); @@ -201,7 +203,7 @@ void InputUtil::verifyProperties( break; } case bmqt::PropertyType::e_SHORT: { - BSLS_ASSERT_SAFE(it.getAsShort()() == + BSLS_ASSERT_SAFE(it.getAsShort() == in.getPropertyAsShort(name)); break; } diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp index 77b34fed5..dfffd84bd 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp @@ -3531,14 +3531,20 @@ void BrokerSession::processPushEvent(const bmqp::Event& event) citer != sIds.end(); ++citer) { bmqt::CorrelationId correlationId; + unsigned int subscriptionHandleId; const QueueManager::QueueSp queue = - d_queueManager.observePushEvent(&correlationId, *citer); + d_queueManager.observePushEvent(&correlationId, + &subscriptionHandleId, + *citer); BSLS_ASSERT(queue); queueEvent->insertQueue(citer->d_subscriptionId, queue); - queueEvent->addCorrelationId(correlationId, - citer->d_subscriptionId); + // Use 'subscriptionHandle' instead of the internal + // 'citer->d_subscriptionId' so that + // 'bmqimp::Event::subscriptionId()' returns 'subscriptionHandle' + + queueEvent->addCorrelationId(correlationId, subscriptionHandleId); } // Update event bytes @@ -5239,7 +5245,12 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr& queue, bmqp_ctrlmsg::Subscription subscription(d_allocator_p); - subscription.sId() = cit->first.id(); + const unsigned int internalSubscriptionId = + ++d_nextInternalSubscriptionId; + + subscription.sId() = internalSubscriptionId; + // Using unique id instead of 'SubscriptionHandle::id()' + subscription.consumers().emplace_back(ci); bmqp_ctrlmsg::ExpressionVersion::Value version; @@ -5260,9 +5271,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr& queue, subscription.expression().text() = from.expression().text(); streamParams.subscriptions().emplace_back(subscription); - d_queueManager.registerSubscription(queue, - cit->first.id(), - cit->first.correlationId()); + queue->registerInternalSubscriptionId(internalSubscriptionId, + cit->first.id(), + cit->first.correlationId()); } return context; // RETURN } @@ -5298,9 +5309,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr& queue, streamParams.consumerPriority() = options.consumerPriority(); streamParams.consumerPriorityCount() = 1; - d_queueManager.registerSubscription(queue, - queue->subQueueId(), - bmqt::CorrelationId()); + queue->registerInternalSubscriptionId(queue->subQueueId(), + queue->subQueueId(), + bmqt::CorrelationId()); } return context; @@ -5624,6 +5635,7 @@ BrokerSession::BrokerSession( , d_messageExpirationTimeoutHandle() , d_nextRequestGroupId(k_NON_BUFFERED_REQUEST_GROUP_ID) , d_queueRetransmissionTimeoutMap(allocator) +, d_nextInternalSubscriptionId(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID) { // PRECONDITIONS BSLS_ASSERT_SAFE(d_scheduler_p->clockType() == @@ -6123,6 +6135,7 @@ void BrokerSession::onConfigureQueueResponse( res == bmqt::GenericResult::e_NOT_CONNECTED || res == bmqt::GenericResult::e_NOT_SUPPORTED); + (void)res; BALL_LOG_INFO << "Ignore cancelled request: " << context->request(); return; // RETURN diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.h b/src/groups/bmq/bmqimp/bmqimp_brokersession.h index a2d25921f..a833a7899 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.h +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.h @@ -852,6 +852,9 @@ class BrokerSession BSLS_CPP11_FINAL { // retransmission timeout provided by // the broker + unsigned int d_nextInternalSubscriptionId; + // Assists generating unique ids for Configure requests. + private: // NOT IMPLEMENTED BrokerSession(const BrokerSession&); diff --git a/src/groups/bmq/bmqimp/bmqimp_event.h b/src/groups/bmq/bmqimp/bmqimp_event.h index 6cb28c799..a0f6e23dc 100644 --- a/src/groups/bmq/bmqimp/bmqimp_event.h +++ b/src/groups/bmq/bmqimp/bmqimp_event.h @@ -471,7 +471,7 @@ class Event { /// undefined unless 0 <= 'position' < numCorrrelationIds(), and event's /// type() is MESSAGEEVENT, 'messageEventMode()' is READ and the /// underlying raw event is of type PUSH. - const unsigned int subscriptionId(int position) const; + unsigned int subscriptionId(int position) const; // MANIPULATORS @@ -500,8 +500,8 @@ class Event { /// event's type() is MESSAGEEVENT, 'messageEventMode()' is READ and the /// underlying raw event is of type ACK, PUT or PUSH. void addCorrelationId(const bmqt::CorrelationId& correlationId, - unsigned int subscriptionId = - bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID); + unsigned int subscriptionHandleId = + bmqt::SubscriptionHandle::k_INVALID_HANDLE_ID); /// Insert the specified `queue` to the queues and the specified /// `corrId` to the list of correlationIds associated with this event. @@ -746,7 +746,7 @@ inline const bmqt::CorrelationId& Event::correlationId(int position) const return d_correlationIds[position].first; } -inline const unsigned int Event::subscriptionId(int position) const +inline unsigned int Event::subscriptionId(int position) const { // PRECONDITIONS BSLS_ASSERT_SAFE(type() == EventType::e_MESSAGE); @@ -798,7 +798,7 @@ inline bmqp::PutEventBuilder* Event::putEventBuilder() } inline void Event::addCorrelationId(const bmqt::CorrelationId& correlationId, - unsigned int subscriptionId) + unsigned int subscriptionHandleId) { // TODO: when ACK event is created locally we have to fill d_correlationIds // before the raw ACK 'bmqp::Event' is created and may be used to @@ -810,7 +810,8 @@ inline void Event::addCorrelationId(const bmqt::CorrelationId& correlationId, // BSLS_ASSERT_SAFE(messageEventMode() == MessageEventMode::e_READ); // BSLS_ASSERT_SAFE(d_rawEvent.isAckEvent()); - d_correlationIds.push_back(bsl::make_pair(correlationId, subscriptionId)); + d_correlationIds.push_back( + bsl::make_pair(correlationId, subscriptionHandleId)); } } // close package namespace diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp index f20146baa..d68b3cdf3 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp @@ -341,13 +341,16 @@ void MessageDumper::dumpPushEvent(bsl::ostream& out, const bmqp::Event& event) unsigned int subscriptionId; bmqp::RdaInfo rdaInfo; bmqt::CorrelationId correlationId; + unsigned int subscriptionHandleId; iter.extractQueueInfo(&qId, &subscriptionId, &rdaInfo); QueueManager::QueueSp queue = - d_queueManager_p->lookupQueueBySubscriptionId(&correlationId, - qId, - subscriptionId); + d_queueManager_p->lookupQueueBySubscriptionId( + &correlationId, + &subscriptionHandleId, + qId, + subscriptionId); BSLS_ASSERT_SAFE(queue); out << "PUSH Message #" << ++msgNum << ": " diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp index 859d8c5e8..52c6c05da 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp @@ -550,7 +550,9 @@ void Tester::registerSubscription(const bslstl::StringRef& uri, BSLS_ASSERT_SAFE(queue); - d_queueManager.registerSubscription(queue, subscriptionId, correlationId); + queue->registerInternalSubscriptionId(subscriptionId, + subscriptionId, + correlationId); } void Tester::updateSubscriptions(const bslstl::StringRef& uri, diff --git a/src/groups/bmq/bmqimp/bmqimp_queue.cpp b/src/groups/bmq/bmqimp/bmqimp_queue.cpp index 09a737259..629f5ca37 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queue.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_queue.cpp @@ -325,6 +325,7 @@ Queue::Queue(bslma::Allocator* allocator) , d_schemaLearner(allocator) , d_schemaLearnerContext(d_schemaLearner.createContext()) , d_config(allocator) +, d_registeredInternalSubscriptionIds(allocator) { d_handleParameters.uri() = ""; d_handleParameters.flags() = bmqt::QueueFlagsUtil::empty(); diff --git a/src/groups/bmq/bmqimp/bmqimp_queue.h b/src/groups/bmq/bmqimp/bmqimp_queue.h index 839637220..659f1cba4 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queue.h +++ b/src/groups/bmq/bmqimp/bmqimp_queue.h @@ -172,6 +172,11 @@ struct QueueStatsUtil { /// Representation of a Queue (properties, stats, state, ...) class Queue { + public: + // PUBLIC TYPES + typedef bsl::pair SubscriptionHandle; + // Not using private 'bmqt::SubscriptionHandle' ctor + private: // DATA bslma::Allocator* d_allocator_p; @@ -256,6 +261,13 @@ class Queue { bmqp_ctrlmsg::StreamParameters d_config; + bsl::unordered_map + d_registeredInternalSubscriptionIds; + // This keeps SubscriptionHandle (id and CorrelationId) for Configure + // response processing. + // Supporting multiple concurrent Configure requests. + // TODO: This should go into ConfigureRequest context. + private: // NOT IMPLEMENTED @@ -325,6 +337,20 @@ class Queue { /// reinitialize the state before a new start). void clearStatContext(); + void + registerInternalSubscriptionId(unsigned int internalSubscriptionId, + unsigned int subscriptionHandleId, + const bmqt::CorrelationId& correlationId); + // Keep the specified 'subscriptionHandleId' and 'correlationId' + // associated with the specified 'internalSubscriptionId' between + // Configure request and Configure response (until + // 'extractSubscriptionHandle'). + + SubscriptionHandle + extractSubscriptionHandle(unsigned int internalSubscriptionId); + // Lookup, copy, erase, and return the copy of what was registered + // by 'registerInternalSubscriptionId'. + // ACCESSORS /// Return true if this Queue object has a SubQueueId having the default @@ -528,6 +554,33 @@ inline Queue& Queue::setConfig(const bmqp_ctrlmsg::StreamParameters& value) return *this; } +inline void +Queue::registerInternalSubscriptionId(unsigned int internalSubscriptionId, + unsigned int subscriptionHandleId, + const bmqt::CorrelationId& correlationId) +{ + d_registeredInternalSubscriptionIds.emplace( + internalSubscriptionId, + SubscriptionHandle(subscriptionHandleId, correlationId)); +} + +inline Queue::SubscriptionHandle +Queue::extractSubscriptionHandle(unsigned int internalSubscriptionId) +{ + bsl::unordered_map::const_iterator cit = + d_registeredInternalSubscriptionIds.find(internalSubscriptionId); + + if (cit == d_registeredInternalSubscriptionIds.end()) { + return {internalSubscriptionId, bmqt::CorrelationId()}; // RETURN + } + + SubscriptionHandle result(cit->second); + + d_registeredInternalSubscriptionIds.erase(cit); + + return result; +} + // ACCESSORS inline QueueState::Enum Queue::state() const { diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp b/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp index dfa99d7c8..13d2ea87c 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp @@ -67,13 +67,14 @@ QueueManager::lookupQueueLocked(const bmqp::QueueId& queueId) const QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked( bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandleId, int qId, - unsigned int sid) const + unsigned int internalSubscriptionId) const { // PRECONDITIONS // d_queuesLock locked - if (sid == bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID) { + if (internalSubscriptionId == bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID) { // Look up by 'bmqp::QueueId' const QueueSp& result = lookupQueueLocked(bmqp::QueueId(qId)); @@ -84,22 +85,21 @@ QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked( return result; } // lookup by 'subscriptionId' - SubscriptionId id(qId, sid); + SubscriptionId id(qId, internalSubscriptionId); QueuesBySubscriptions::const_iterator cit = d_queuesBySubscriptionIds.find( id); + if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( cit == d_queuesBySubscriptionIds.end())) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; return QueueSp(); // RETURN } - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!cit->second.d_isCommited)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return QueueSp(); // RETURN - } + BSLS_ASSERT_SAFE(cit->second.d_queue); - *correlationId = cit->second.d_correlatonId; + *subscriptionHandleId = cit->second.d_subscriptionHandle.first; + *correlationId = cit->second.d_subscriptionHandle.second; return cit->second.d_queue; } @@ -226,17 +226,6 @@ QueueManager::QueueSp QueueManager::removeQueue(const Queue* queue) (void)numErasedUris; // Compiler happiness } - bmqt::QueueOptions::SubscriptionsSnapshot snapshot(d_allocator_p); - queueSp->options().loadSubscriptions(&snapshot); - - for (bmqt::QueueOptions::SubscriptionsSnapshot::const_iterator cit = - snapshot.begin(); - cit != snapshot.end(); - ++cit) { - SubscriptionId id(queue->id(), cit->first.id()); - d_queuesBySubscriptionIds.erase(id); - } - return queueSp; } @@ -300,12 +289,14 @@ void QueueManager::resetState() } const QueueManager::QueueSp -QueueManager::observePushEvent(bmqt::CorrelationId* correlationId, +QueueManager::observePushEvent(bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandleId, const bmqp::EventUtilQueueInfo& info) { // Update stats const QueueSp queue = lookupQueueBySubscriptionIdLocked( correlationId, + subscriptionHandleId, info.d_header.queueId(), info.d_subscriptionId); @@ -504,54 +495,30 @@ void QueueManager::resetSubStreamCount(const bsl::string& canonicalUri) uriIter->second.d_subStreamCount = 0; } -void QueueManager::registerSubscription( - const bsl::shared_ptr& queue, - unsigned int subscriptionId, - const bmqt::CorrelationId& correlationId) -{ - BSLS_ASSERT_SAFE(queue); - - d_queuesBySubscriptionIds.insert( - bsl::make_pair(SubscriptionId(queue->id(), subscriptionId), - QueueBySubscription(queue, correlationId))); -} - void QueueManager::updateSubscriptions( const bsl::shared_ptr& queue, const bmqp_ctrlmsg::StreamParameters& config) { + BSLS_ASSERT_SAFE(queue); + const bmqp_ctrlmsg::StreamParameters& previous = queue->config(); for (size_t i = 0; i < previous.subscriptions().size(); ++i) { - SubscriptionId id(queue->id(), previous.subscriptions()[i].sId()); + unsigned int internalSubscriptionId = + previous.subscriptions()[i].sId(); - QueuesBySubscriptions::iterator it = d_queuesBySubscriptionIds.find( - id); - if (it != d_queuesBySubscriptionIds.end()) { - it->second.d_isCommited = false; - } - // For backward compatibility, allow missing 'registerSubscription' - // in which case Subscription Correlation Id will be empty. + SubscriptionId id(queue->id(), internalSubscriptionId); + + d_queuesBySubscriptionIds.erase(id); } + for (size_t i = 0; i < config.subscriptions().size(); ++i) { - SubscriptionId id(queue->id(), config.subscriptions()[i].sId()); - QueuesBySubscriptions::iterator it = d_queuesBySubscriptionIds.find( - id); - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( - it == d_queuesBySubscriptionIds.end())) { - // For backward compatibility, allow missing 'registerSubscription' - // in which case Subscription Correlation Id will be empty. - - it = d_queuesBySubscriptionIds - .insert(bsl::make_pair( - id, - QueueBySubscription(queue, bmqt::CorrelationId()))) - .first; - } + unsigned int internalSubscriptionId = config.subscriptions()[i].sId(); - it->second.d_isCommited = true; + d_queuesBySubscriptionIds.insert(bsl::make_pair( + SubscriptionId(queue->id(), internalSubscriptionId), + QueueBySubscription(internalSubscriptionId, queue))); } - // 'QueueManager::removeQueue' cleans 'd_queuesBySubscriptionIds' } // ACCESSORS diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.h b/src/groups/bmq/bmqimp/bmqimp_queuemanager.h index 83dcf356f..9c515518f 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.h +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.h @@ -151,27 +151,18 @@ class QueueManager { public: // TYPES - typedef bsl::shared_ptr QueueSp; + typedef bsl::shared_ptr QueueSp; + typedef bsl::pair SubscriptionUandle; struct QueueBySubscription { const QueueSp d_queue; - bool d_isCommited; - // The Subscription correlationId is - // specified at the configure request - // time while the response 'commits' - // the Subscription, availing it for - // the lookup by correlationId in a - // PUSH message. Rather than keeping - // two separate maps, this bool is a - // state of Subscription - 'true' if - // Subscription request gets response. - - const bmqt::CorrelationId d_correlatonId; + + const SubscriptionUandle d_subscriptionHandle; // The Subscription correlationId as // specified in the configure request. - QueueBySubscription(const bsl::shared_ptr& queue, - const bmqt::CorrelationId& correlatonId); + QueueBySubscription(unsigned int internalSubscriptionId, + const bsl::shared_ptr& queue); }; /// Subscription id -> {Queue, Subscription correlationId} @@ -256,8 +247,9 @@ class QueueManager { /// specified `correlationId`. QueueSp lookupQueueBySubscriptionIdLocked(bmqt::CorrelationId* correlationId, - int qid, - unsigned int sid) const; + unsigned int* subscriptionHandleId, + int qid, + unsigned int innerSubscriptionId) const; public: // TRAITS @@ -319,6 +311,7 @@ class QueueManager { const bmqp::PushMessageIterator& iterator); const QueueSp observePushEvent(bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandleId, const bmqp::EventUtilQueueInfo& info); /// Update stats for the queue(s) corresponding to the messages pointed @@ -347,10 +340,6 @@ class QueueManager { /// object. void resetSubStreamCount(const bsl::string& canonicalUri); - void registerSubscription(const bsl::shared_ptr& queue, - unsigned int subscriptionId, - const bmqt::CorrelationId& correlationId); - void updateSubscriptions(const bsl::shared_ptr& queue, const bmqp_ctrlmsg::StreamParameters& config); @@ -363,8 +352,9 @@ class QueueManager { /// `correlationId`, and return a shared pointer to the Queue object (if /// found), or an empty shared pointer (if not found). QueueSp lookupQueueBySubscriptionId(bmqt::CorrelationId* correlationId, - int queueId, - unsigned int subscriptionId) const; + unsigned int* subscriptionHandleId, + int queueId, + unsigned int subscriptionId) const; // TBD: Temporary method to enable refactoring of 'BrokerSession'. // Specifically, reopen logic in 'BrokerSession::processPacket'. @@ -391,11 +381,11 @@ class QueueManager { // ---------------------------------------- inline QueueManager::QueueBySubscription::QueueBySubscription( - const bsl::shared_ptr& queue, - const bmqt::CorrelationId& correlatonId) + unsigned int internalSubscriptionId, + const bsl::shared_ptr& queue) : d_queue(queue) -, d_isCommited(false) -, d_correlatonId(correlatonId) +, d_subscriptionHandle( + queue->extractSubscriptionHandle(internalSubscriptionId)) { // NOTHING } @@ -446,16 +436,18 @@ QueueManager::lookupQueue(const bmqp::QueueId& queueId) const return lookupQueueLocked(queueId); } -inline QueueManager::QueueSp -QueueManager::lookupQueueBySubscriptionId(bmqt::CorrelationId* correlationId, - int queueId, - unsigned int subscriptionId) const +inline QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionId( + bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandleId, + int queueId, + unsigned int internalSubscriptionId) const { bsls::SpinLockGuard guard(&d_queuesLock); // d_queuesLock LOCKED return lookupQueueBySubscriptionIdLocked(correlationId, + subscriptionHandleId, queueId, - subscriptionId); + internalSubscriptionId); } } // close package namespace diff --git a/src/groups/bmq/bmqp/bmqp_optionsview.h b/src/groups/bmq/bmqp/bmqp_optionsview.h index cfc7cb551..d54ae3da0 100644 --- a/src/groups/bmq/bmqp/bmqp_optionsview.h +++ b/src/groups/bmq/bmqp/bmqp_optionsview.h @@ -347,7 +347,7 @@ inline OptionsView::Iterator::Iterator(const OptionsView* optionsView, const unsigned int offset) : d_optionsView_p(optionsView) , d_offset(offset) -, d_value(static_cast(d_offset)) +, d_value(static_cast(d_offset)) { } diff --git a/src/groups/bmq/bmqp/bmqp_protocol.h b/src/groups/bmq/bmqp/bmqp_protocol.h index 27a8f11c1..ed7d6b76b 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.h +++ b/src/groups/bmq/bmqp/bmqp_protocol.h @@ -411,6 +411,7 @@ struct Protocol { // RemainingDeliveryAttempts counter value. static const unsigned int k_DEFAULT_SUBSCRIPTION_ID = 0; + // Internal unique id in Configure request // CLASS METHODS diff --git a/src/groups/bmq/bmqt/bmqt_subscription.cpp b/src/groups/bmq/bmqt/bmqt_subscription.cpp index 4d6521746..a9a504798 100644 --- a/src/groups/bmq/bmqt/bmqt_subscription.cpp +++ b/src/groups/bmq/bmqt/bmqt_subscription.cpp @@ -43,7 +43,7 @@ const int Subscription::k_DEFAULT_CONSUMER_PRIORITY = 0; unsigned int SubscriptionHandle::nextId() { - static bsls::AtomicUint s_id = 0; + static bsls::AtomicUint s_id = k_INVALID_HANDLE_ID; return ++s_id; } diff --git a/src/groups/bmq/bmqt/bmqt_subscription.h b/src/groups/bmq/bmqt/bmqt_subscription.h index df850a6d5..d865ea077 100644 --- a/src/groups/bmq/bmqt/bmqt_subscription.h +++ b/src/groups/bmq/bmqt/bmqt_subscription.h @@ -64,6 +64,10 @@ class SubscriptionHandle { friend class bmqa::MessageImpl; friend class bmqa::MessageIterator; + public: + static const unsigned int k_INVALID_HANDLE_ID = 0; + // Initial (invalid) value for 'bmqt::SubscriptionHandle::d_id' + private: // PRIVATE DATA unsigned int d_id; @@ -281,7 +285,7 @@ bsl::ostream& operator<<(bsl::ostream& stream, const Subscription& rhs); // ---------------------- inline SubscriptionHandle::SubscriptionHandle() -: d_id(0) +: d_id(k_INVALID_HANDLE_ID) , d_correlationId() { // NOTHING diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index ae65db17d..b5d269dcf 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -535,10 +535,12 @@ QueueHandle::QueueHandle( d_throttledDroppedPutMessages.initialize( 1, 5 * bdlt::TimeUnitRatio::k_NS_PER_S); + // One maximum log per 5 seconds + d_throttledSubscriptionInfo.initialize( - 1, + 10, 5 * bdlt::TimeUnitRatio::k_NS_PER_S); - // One maximum log per 5 seconds + // Ten per 5 seconds setHandleParameters(handleParameters); }