Skip to content

Commit

Permalink
Adding [THROTTLED] prefix
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Apr 25, 2024
1 parent ccc1615 commit add724d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
8 changes: 6 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE =
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES;
// Time interval between messages logged with throttling.

#define BMQ_LOGTHROTTLE_INFO() \
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \
<< "[THROTTLED] "

/// This function is a simple wrapper around the specified `callback`, to
/// ensure that the specified `refCount` is decremented after it gets
/// invoked with the specified `status`, `queue` and `confirmationCookie`.
Expand Down Expand Up @@ -1696,7 +1700,7 @@ void ClusterQueueHelper::onConfigureQueueResponse(

if (d_cluster_p->isStopping()) {
// Self is stopping. Drop the response.
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< d_cluster_p->description()
<< ": Dropping (re)configureQueue response [reason: 'stopping'"
<< ", request: " << requestContext->request()
Expand Down Expand Up @@ -3303,7 +3307,7 @@ void ClusterQueueHelper::sendCloseQueueRequest(
// will be advertised upstream. So just like above, we indicate
// success via 'callback'.

BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< d_cluster_p->description()
<< ": Failed to send close-queue request: " << request->request()
<< ", for queue [" << handleParameters.uri() << "] to "
Expand Down
18 changes: 11 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE =
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND;
// Time interval between messages logged with throttling.

#define BMQ_LOGTHROTTLE_INFO() \
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \
<< "[THROTTLED] "

typedef bsl::function<void()> CompletionCallback;

/// Utility function used in `mwcu::OperationChain` as the operation
Expand Down Expand Up @@ -613,11 +617,11 @@ void QueueHandle::registerSubscription(unsigned int downstreamSubId,
const bmqp_ctrlmsg::ConsumerInfo& ci,
unsigned int upstreamId)
{
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "QueueHandle [" << this
<< "] registering Subscription [id = " << downstreamId
<< ", downstreamSubQueueId = " << downstreamSubId
<< ", upstreamId = " << upstreamId << "]";
BMQ_LOGTHROTTLE_INFO() << "QueueHandle [" << this
<< "] registering Subscription [id = "
<< downstreamId
<< ", downstreamSubQueueId = " << downstreamSubId
<< ", upstreamId = " << upstreamId << "]";

const bsl::shared_ptr<Downstream>& subStream = downstream(downstreamSubId);

Expand Down Expand Up @@ -719,7 +723,7 @@ bool QueueHandle::unregisterSubStream(
itSubscription != d_subscriptions.end();) {
const SubscriptionSp& subscription = itSubscription->second;
if (subscription->d_downstreamSubQueueId == downstreamSubQueueId) {
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< "Queue '" << d_queue_sp->description() << "' handle "
<< this << " removing Subscription "
<< itSubscription->first;
Expand Down Expand Up @@ -878,7 +882,7 @@ void QueueHandle::deliverMessage(

// Increasing resource usage ('update()' above) made us hit our
// maxUnconfirmed limit.
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< "Queue '" << d_queue_sp->description()
<< "' with subscription [" << subscriptions[i] << "]"
<< " of client '"
Expand Down
27 changes: 17 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE =
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES;
// Time interval between messages logged with throttling.

#define BMQ_LOGTHROTTLE_INFO() \
BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \
<< "[THROTTLED] "

// ====================
// class LimitedPrinter
// ====================
Expand Down Expand Up @@ -282,14 +286,15 @@ void RelayQueueEngine::onHandleConfiguredDispatched(
BALL_LOGTHROTTLE_INFO_BLOCK(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
{
if (bmqp_ctrlmsg::StatusCategory::E_SUCCESS == status.category()) {
BALL_LOG_INFO << "Received success 'configure-stream' response"
<< " for handle [" << handle << "] for queue ["
<< d_queueState_p->uri() << "], for parameters "
<< downStreamParameters;
BALL_LOG_INFO
<< "[THROTTLED] Received success 'configure-stream' response"
<< " for handle [" << handle << "] for queue ["
<< d_queueState_p->uri() << "], for parameters "
<< downStreamParameters;
}
else {
BALL_LOG_WARN
<< "#QUEUE_CONFIGURE_FAILURE "
<< "[THROTTLED] #QUEUE_CONFIGURE_FAILURE "
<< "Received failed 'configure-stream' response for handle '"
<< handle->client() << ":" << handle->id() << "' for queue '"
<< d_queueState_p->uri() << "', for parameters "
Expand All @@ -299,7 +304,8 @@ void RelayQueueEngine::onHandleConfiguredDispatched(
mqbcmd::RoundRobinRouter outrr(d_allocator_p);
context->d_routing_sp->loadInternals(&outrr);

BALL_LOG_OUTPUT_STREAM << "For queue [" << d_queueState_p->uri()
BALL_LOG_OUTPUT_STREAM << "[THROTTLED] For queue ["
<< d_queueState_p->uri()
<< "] new routing will be "
<< LimitedPrinter(outrr, 2048, d_allocator_p);
}
Expand Down Expand Up @@ -332,7 +338,8 @@ void RelayQueueEngine::onHandleConfiguredDispatched(
mqbcmd::QueueEngine outqe(d_allocator_p);
loadInternals(&outqe);

BALL_LOG_OUTPUT_STREAM << "For queue [" << d_queueState_p->uri()
BALL_LOG_OUTPUT_STREAM << "[THROTTLED] For queue ["
<< d_queueState_p->uri()
<< "], the engine config is "
<< LimitedPrinter(outqe, 2048, d_allocator_p);
}
Expand Down Expand Up @@ -687,7 +694,7 @@ void RelayQueueEngine::configureApp(
&previousParameters,
upstreamSubQueueId);

BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< "For queue '" << d_queueState_p->uri()
<< "', about to rebuild upstream state [current stream parameters: "
<< previousParameters << "]";
Expand All @@ -710,7 +717,7 @@ void RelayQueueEngine::configureApp(
// Last advertised stream parameters for this queue are same as the
// newly advertised ones. No need to send any notification upstream.

BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< "For queue [" << d_queueState_p->uri()
<< "], last advertised stream parameter by the queue"
<< " were same as newly advertised ones: " << previousParameters
Expand Down Expand Up @@ -790,7 +797,7 @@ void RelayQueueEngine::rebuildUpstreamState(Routers::AppContext* context,

d_queueState_p->setUpstreamParameters(upstreamParams, upstreamSubQueueId);

BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
BMQ_LOGTHROTTLE_INFO()
<< "For queue '" << d_queueState_p->uri()
<< "', rebuilt upstream parameters [new upstream parameters: "
<< upstreamParams << "]";
Expand Down

0 comments on commit add724d

Please sign in to comment.