diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index a2a913675d..f8014f68cd 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -100,6 +100,10 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE = bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES; // Time interval between messages logged with throttling. +#define BMQ_LOGTHROTTLE_INFO() \ + BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \ + << "[THROTTLED] " + /// This function is a simple wrapper around the specified `callback`, to /// ensure that the specified `refCount` is decremented after it gets /// invoked with the specified `status`, `queue` and `confirmationCookie`. @@ -1696,7 +1700,7 @@ void ClusterQueueHelper::onConfigureQueueResponse( if (d_cluster_p->isStopping()) { // Self is stopping. Drop the response. - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << d_cluster_p->description() << ": Dropping (re)configureQueue response [reason: 'stopping'" << ", request: " << requestContext->request() @@ -3303,7 +3307,7 @@ void ClusterQueueHelper::sendCloseQueueRequest( // will be advertised upstream. So just like above, we indicate // success via 'callback'. - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << d_cluster_p->description() << ": Failed to send close-queue request: " << request->request() << ", for queue [" << handleParameters.uri() << "] to " diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index a7c15a80a7..05a35b6f07 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -69,6 +69,10 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE = bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND; // Time interval between messages logged with throttling. +#define BMQ_LOGTHROTTLE_INFO() \ + BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \ + << "[THROTTLED] " + typedef bsl::function CompletionCallback; /// Utility function used in `mwcu::OperationChain` as the operation @@ -613,11 +617,11 @@ void QueueHandle::registerSubscription(unsigned int downstreamSubId, const bmqp_ctrlmsg::ConsumerInfo& ci, unsigned int upstreamId) { - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) - << "QueueHandle [" << this - << "] registering Subscription [id = " << downstreamId - << ", downstreamSubQueueId = " << downstreamSubId - << ", upstreamId = " << upstreamId << "]"; + BMQ_LOGTHROTTLE_INFO() << "QueueHandle [" << this + << "] registering Subscription [id = " + << downstreamId + << ", downstreamSubQueueId = " << downstreamSubId + << ", upstreamId = " << upstreamId << "]"; const bsl::shared_ptr& subStream = downstream(downstreamSubId); @@ -719,7 +723,7 @@ bool QueueHandle::unregisterSubStream( itSubscription != d_subscriptions.end();) { const SubscriptionSp& subscription = itSubscription->second; if (subscription->d_downstreamSubQueueId == downstreamSubQueueId) { - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << "Queue '" << d_queue_sp->description() << "' handle " << this << " removing Subscription " << itSubscription->first; @@ -878,7 +882,7 @@ void QueueHandle::deliverMessage( // Increasing resource usage ('update()' above) made us hit our // maxUnconfirmed limit. - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << "Queue '" << d_queue_sp->description() << "' with subscription [" << subscriptions[i] << "]" << " of client '" diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 665217ff9c..f60642caa0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -74,6 +74,10 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE = bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES; // Time interval between messages logged with throttling. +#define BMQ_LOGTHROTTLE_INFO() \ + BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \ + << "[THROTTLED] " + // ==================== // class LimitedPrinter // ==================== @@ -282,14 +286,15 @@ void RelayQueueEngine::onHandleConfiguredDispatched( BALL_LOGTHROTTLE_INFO_BLOCK(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) { if (bmqp_ctrlmsg::StatusCategory::E_SUCCESS == status.category()) { - BALL_LOG_INFO << "Received success 'configure-stream' response" - << " for handle [" << handle << "] for queue [" - << d_queueState_p->uri() << "], for parameters " - << downStreamParameters; + BALL_LOG_INFO + << "[THROTTLED] Received success 'configure-stream' response" + << " for handle [" << handle << "] for queue [" + << d_queueState_p->uri() << "], for parameters " + << downStreamParameters; } else { BALL_LOG_WARN - << "#QUEUE_CONFIGURE_FAILURE " + << "[THROTTLED] #QUEUE_CONFIGURE_FAILURE " << "Received failed 'configure-stream' response for handle '" << handle->client() << ":" << handle->id() << "' for queue '" << d_queueState_p->uri() << "', for parameters " @@ -299,7 +304,8 @@ void RelayQueueEngine::onHandleConfiguredDispatched( mqbcmd::RoundRobinRouter outrr(d_allocator_p); context->d_routing_sp->loadInternals(&outrr); - BALL_LOG_OUTPUT_STREAM << "For queue [" << d_queueState_p->uri() + BALL_LOG_OUTPUT_STREAM << "[THROTTLED] For queue [" + << d_queueState_p->uri() << "] new routing will be " << LimitedPrinter(outrr, 2048, d_allocator_p); } @@ -332,7 +338,8 @@ void RelayQueueEngine::onHandleConfiguredDispatched( mqbcmd::QueueEngine outqe(d_allocator_p); loadInternals(&outqe); - BALL_LOG_OUTPUT_STREAM << "For queue [" << d_queueState_p->uri() + BALL_LOG_OUTPUT_STREAM << "[THROTTLED] For queue [" + << d_queueState_p->uri() << "], the engine config is " << LimitedPrinter(outqe, 2048, d_allocator_p); } @@ -687,7 +694,7 @@ void RelayQueueEngine::configureApp( &previousParameters, upstreamSubQueueId); - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << "For queue '" << d_queueState_p->uri() << "', about to rebuild upstream state [current stream parameters: " << previousParameters << "]"; @@ -710,7 +717,7 @@ void RelayQueueEngine::configureApp( // Last advertised stream parameters for this queue are same as the // newly advertised ones. No need to send any notification upstream. - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << "For queue [" << d_queueState_p->uri() << "], last advertised stream parameter by the queue" << " were same as newly advertised ones: " << previousParameters @@ -790,7 +797,7 @@ void RelayQueueEngine::rebuildUpstreamState(Routers::AppContext* context, d_queueState_p->setUpstreamParameters(upstreamParams, upstreamSubQueueId); - BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + BMQ_LOGTHROTTLE_INFO() << "For queue '" << d_queueState_p->uri() << "', rebuilt upstream parameters [new upstream parameters: " << upstreamParams << "]";