Skip to content

Commit

Permalink
Fix: use common throttle for QueueHandle subscriptions log
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 7, 2023
1 parent 39368e2 commit 41c473c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 20 deletions.
33 changes: 16 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <mwcu_printutil.h>

// BDE
#include <ball_logthrottle.h>
#include <bdlb_print.h>
#include <bdlf_bind.h>
#include <bdlt_timeunitratio.h>
Expand Down Expand Up @@ -537,11 +538,6 @@ QueueHandle::QueueHandle(
5 * bdlt::TimeUnitRatio::k_NS_PER_S);
// One maximum log per 5 seconds

d_throttledSubscriptionInfo.initialize(
10,
5 * bdlt::TimeUnitRatio::k_NS_PER_S);
// Ten per 5 seconds

setHandleParameters(handleParameters);
}

Expand Down Expand Up @@ -610,12 +606,14 @@ void QueueHandle::registerSubscription(unsigned int downstreamSubId,
const bmqp_ctrlmsg::ConsumerInfo& ci,
unsigned int upstreamId)
{
if (d_throttledSubscriptionInfo.requestPermission()) {
BALL_LOG_INFO << "QueueHandle [" << this
<< "] registering Subscription [id = " << downstreamId
<< ", downstreamSubQueueId = " << downstreamSubId
<< ", upstreamId = " << upstreamId << "]";
}
static const int k_MAX_INSTANT_MESSAGES = 10;
static const bsls::Types::Int64 k_NS_PER_MESSAGE =
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND;
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "QueueHandle [" << this
<< "] registering Subscription [id = " << downstreamId
<< ", downstreamSubQueueId = " << downstreamSubId
<< ", upstreamId = " << upstreamId << "]";

const bsl::shared_ptr<Downstream>& subStream = downstream(downstreamSubId);

Expand Down Expand Up @@ -717,12 +715,13 @@ bool QueueHandle::unregisterSubStream(
itSubscription != d_subscriptions.end();) {
const SubscriptionSp& subscription = itSubscription->second;
if (subscription->d_downstreamSubQueueId == downstreamSubQueueId) {
if (d_throttledSubscriptionInfo.requestPermission()) {
BALL_LOG_INFO << "Queue '" << d_queue_sp->description()
<< "' handle " << this
<< " removing Subscription "
<< itSubscription->first;
}
static const int k_MAX_INSTANT_MESSAGES = 10;
static const bsls::Types::Int64 k_NS_PER_MESSAGE =
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND;
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "Queue '" << d_queue_sp->description() << "' handle "
<< this << " removing Subscription "
<< itSubscription->first;
itSubscription = d_subscriptions.erase(itSubscription);
}
else {
Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ class QueueHandle : public mqbi::QueueHandle {

bdlmt::Throttle d_throttledDroppedPutMessages;

bdlmt::Throttle d_throttledSubscriptionInfo;

mwcu::OperationChain d_deconfigureChain;
// Mechanism to serialize execution of
// the substream deconfigure callbacks
Expand Down Expand Up @@ -665,7 +663,7 @@ inline QueueHandle::Downstream::Downstream(const bsl::string& appId,
bslma::Allocator* allocator_p)
: d_appId(appId)
, d_upstreamSubQueueId(upstreamSubQueueId)
, d_data(new (*allocator_p)
, d_data(new(*allocator_p)
mqbi::QueueHandle::UnconfirmedMessageInfoMap(allocator_p),
allocator_p)
{
Expand Down

0 comments on commit 41c473c

Please sign in to comment.