From d0c12fe860c5b22c5764b737efb7e915a1354810 Mon Sep 17 00:00:00 2001 From: Vitaly Dzhitenov Date: Thu, 26 Oct 2023 11:12:11 -0400 Subject: [PATCH] Use BrokerSession::d_nextInternalSubscriptionId --- src/groups/bmq/bmqimp/bmqimp_brokersession.cpp | 6 +++--- src/groups/bmq/bmqimp/bmqimp_brokersession.h | 3 +++ src/groups/bmq/bmqimp/bmqimp_event.h | 2 +- src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp | 11 ++++++----- src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp | 4 ++-- src/groups/bmq/bmqimp/bmqimp_queuemanager.h | 8 ++++---- src/groups/bmq/bmqp/bmqp_optionsview.h | 2 +- src/groups/bmq/bmqp/bmqp_protocol.h | 1 + src/groups/bmq/bmqt/bmqt_subscription.cpp | 2 +- src/groups/bmq/bmqt/bmqt_subscription.h | 6 +++++- 10 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp index 3e906bc2f4..dfffd84bd3 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp @@ -5245,9 +5245,8 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr& queue, bmqp_ctrlmsg::Subscription subscription(d_allocator_p); - static bsls::AtomicUint s_nextId(0); - - unsigned int internalSubscriptionId = s_nextId.add(1); + const unsigned int internalSubscriptionId = + ++d_nextInternalSubscriptionId; subscription.sId() = internalSubscriptionId; // Using unique id instead of 'SubscriptionHandle::id()' @@ -5636,6 +5635,7 @@ BrokerSession::BrokerSession( , d_messageExpirationTimeoutHandle() , d_nextRequestGroupId(k_NON_BUFFERED_REQUEST_GROUP_ID) , d_queueRetransmissionTimeoutMap(allocator) +, d_nextInternalSubscriptionId(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID) { // PRECONDITIONS BSLS_ASSERT_SAFE(d_scheduler_p->clockType() == diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.h b/src/groups/bmq/bmqimp/bmqimp_brokersession.h index a2d25921fd..a833a78998 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.h +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.h @@ -852,6 +852,9 @@ class BrokerSession BSLS_CPP11_FINAL { // retransmission timeout provided by // the broker + unsigned int d_nextInternalSubscriptionId; + // Assists generating unique ids for Configure requests. + private: // NOT IMPLEMENTED BrokerSession(const BrokerSession&); diff --git a/src/groups/bmq/bmqimp/bmqimp_event.h b/src/groups/bmq/bmqimp/bmqimp_event.h index 9e9d39658b..a0f6e23dc2 100644 --- a/src/groups/bmq/bmqimp/bmqimp_event.h +++ b/src/groups/bmq/bmqimp/bmqimp_event.h @@ -501,7 +501,7 @@ class Event { /// underlying raw event is of type ACK, PUT or PUSH. void addCorrelationId(const bmqt::CorrelationId& correlationId, unsigned int subscriptionHandleId = - bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID); + bmqt::SubscriptionHandle::k_INVALID_HANDLE_ID); /// Insert the specified `queue` to the queues and the specified /// `corrId` to the list of correlationIds associated with this event. diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp index 65a83be8d6..d68b3cdf38 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp @@ -341,15 +341,16 @@ void MessageDumper::dumpPushEvent(bsl::ostream& out, const bmqp::Event& event) unsigned int subscriptionId; bmqp::RdaInfo rdaInfo; bmqt::CorrelationId correlationId; - unsigned int subscriptionHandle; + unsigned int subscriptionHandleId; iter.extractQueueInfo(&qId, &subscriptionId, &rdaInfo); QueueManager::QueueSp queue = - d_queueManager_p->lookupQueueBySubscriptionId(&correlationId, - &subscriptionHandle, - qId, - subscriptionId); + d_queueManager_p->lookupQueueBySubscriptionId( + &correlationId, + &subscriptionHandleId, + qId, + subscriptionId); BSLS_ASSERT_SAFE(queue); out << "PUSH Message #" << ++msgNum << ": " diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp b/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp index 0aea8d0bd6..13d2ea87cb 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp @@ -290,13 +290,13 @@ void QueueManager::resetState() const QueueManager::QueueSp QueueManager::observePushEvent(bmqt::CorrelationId* correlationId, - unsigned int* subscriptionHandle, + unsigned int* subscriptionHandleId, const bmqp::EventUtilQueueInfo& info) { // Update stats const QueueSp queue = lookupQueueBySubscriptionIdLocked( correlationId, - subscriptionHandle, + subscriptionHandleId, info.d_header.queueId(), info.d_subscriptionId); diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.h b/src/groups/bmq/bmqimp/bmqimp_queuemanager.h index 4eb13cf4f1..9c515518fc 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.h +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.h @@ -311,7 +311,7 @@ class QueueManager { const bmqp::PushMessageIterator& iterator); const QueueSp observePushEvent(bmqt::CorrelationId* correlationId, - unsigned int* subscriptionHandle, + unsigned int* subscriptionHandleId, const bmqp::EventUtilQueueInfo& info); /// Update stats for the queue(s) corresponding to the messages pointed @@ -352,7 +352,7 @@ class QueueManager { /// `correlationId`, and return a shared pointer to the Queue object (if /// found), or an empty shared pointer (if not found). QueueSp lookupQueueBySubscriptionId(bmqt::CorrelationId* correlationId, - unsigned int* subscriptionHandle, + unsigned int* subscriptionHandleId, int queueId, unsigned int subscriptionId) const; @@ -438,14 +438,14 @@ QueueManager::lookupQueue(const bmqp::QueueId& queueId) const inline QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionId( bmqt::CorrelationId* correlationId, - unsigned int* subscriptionHandle, + unsigned int* subscriptionHandleId, int queueId, unsigned int internalSubscriptionId) const { bsls::SpinLockGuard guard(&d_queuesLock); // d_queuesLock LOCKED return lookupQueueBySubscriptionIdLocked(correlationId, - subscriptionHandle, + subscriptionHandleId, queueId, internalSubscriptionId); } diff --git a/src/groups/bmq/bmqp/bmqp_optionsview.h b/src/groups/bmq/bmqp/bmqp_optionsview.h index cfc7cb5517..d54ae3da0b 100644 --- a/src/groups/bmq/bmqp/bmqp_optionsview.h +++ b/src/groups/bmq/bmqp/bmqp_optionsview.h @@ -347,7 +347,7 @@ inline OptionsView::Iterator::Iterator(const OptionsView* optionsView, const unsigned int offset) : d_optionsView_p(optionsView) , d_offset(offset) -, d_value(static_cast(d_offset)) +, d_value(static_cast(d_offset)) { } diff --git a/src/groups/bmq/bmqp/bmqp_protocol.h b/src/groups/bmq/bmqp/bmqp_protocol.h index 27a8f11c17..ed7d6b76ba 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.h +++ b/src/groups/bmq/bmqp/bmqp_protocol.h @@ -411,6 +411,7 @@ struct Protocol { // RemainingDeliveryAttempts counter value. static const unsigned int k_DEFAULT_SUBSCRIPTION_ID = 0; + // Internal unique id in Configure request // CLASS METHODS diff --git a/src/groups/bmq/bmqt/bmqt_subscription.cpp b/src/groups/bmq/bmqt/bmqt_subscription.cpp index 4d65217468..a9a504798b 100644 --- a/src/groups/bmq/bmqt/bmqt_subscription.cpp +++ b/src/groups/bmq/bmqt/bmqt_subscription.cpp @@ -43,7 +43,7 @@ const int Subscription::k_DEFAULT_CONSUMER_PRIORITY = 0; unsigned int SubscriptionHandle::nextId() { - static bsls::AtomicUint s_id = 0; + static bsls::AtomicUint s_id = k_INVALID_HANDLE_ID; return ++s_id; } diff --git a/src/groups/bmq/bmqt/bmqt_subscription.h b/src/groups/bmq/bmqt/bmqt_subscription.h index df850a6d52..d865ea0775 100644 --- a/src/groups/bmq/bmqt/bmqt_subscription.h +++ b/src/groups/bmq/bmqt/bmqt_subscription.h @@ -64,6 +64,10 @@ class SubscriptionHandle { friend class bmqa::MessageImpl; friend class bmqa::MessageIterator; + public: + static const unsigned int k_INVALID_HANDLE_ID = 0; + // Initial (invalid) value for 'bmqt::SubscriptionHandle::d_id' + private: // PRIVATE DATA unsigned int d_id; @@ -281,7 +285,7 @@ bsl::ostream& operator<<(bsl::ostream& stream, const Subscription& rhs); // ---------------------- inline SubscriptionHandle::SubscriptionHandle() -: d_id(0) +: d_id(k_INVALID_HANDLE_ID) , d_correlationId() { // NOTHING