Skip to content

Commit

Permalink
Feat[MQB]: dispatcher stats
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Oct 11, 2024
1 parent 01693b0 commit 8713de2
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ int Application::start(bsl::ostream& errorDescription)
// Start dispatcher
d_dispatcher_mp.load(new (*d_allocator_p) Dispatcher(
mqbcfg::BrokerConfig::get().dispatcherConfig(),
d_statController_mp->dispatcherStatContext(),
d_scheduler_p,
d_allocators.get("Dispatcher")),
d_allocator_p);
Expand Down
22 changes: 21 additions & 1 deletion src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ int Dispatcher::startContext(bsl::ostream& errorDescription,
DispatcherContext(config, d_allocator_p),
d_allocator_p);

context->d_statContext_mp =
mqbstat::DispatcherStatsUtil::initializeClientStatContext(
d_statContext_p,
mqbi::DispatcherClientType::toAscii(type),
d_allocator_p);

// Create and start the threadPool
context->d_threadPool_mp.load(
new (*d_allocator_p)
Expand Down Expand Up @@ -360,6 +366,8 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
case ProcessorPool::Event::MWCC_USER: {
BALL_LOG_TRACE << "Dispatching Event to queue " << processorId
<< " of " << type << " dispatcher: " << event->object();
DispatcherContext& dispatcherContext = *(d_contexts[type]);

if (event->object().type() ==
mqbi::DispatcherEventType::e_DISPATCHER) {
const mqbi::DispatcherDispatcherEvent* realEvent =
Expand All @@ -380,7 +388,6 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
}
}
else {
DispatcherContext& dispatcherContext = *(d_contexts[type]);
event->object().destination()->onDispatcherEvent(event->object());
if (!event->object()
.destination()
Expand All @@ -394,6 +401,11 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
.setAddedToFlushList(true);
}
}

dispatcherContext.d_statContext_mp->adjustValue(
mqbstat::DispatcherStats::Stat::e_DONE_START +
event->object().type(),
1);
} break;
case ProcessorPool::Event::MWCC_QUEUE_EMPTY: {
flushClients(type, processorId);
Expand Down Expand Up @@ -449,11 +461,13 @@ void Dispatcher::onNewClient(mqbi::DispatcherClientType::Enum type,
}

Dispatcher::Dispatcher(const mqbcfg::DispatcherConfig& config,
mwcst::StatContext* statContext,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_isStarted(false)
, d_config(config)
, d_statContext_p(statContext)
, d_scheduler_p(scheduler)
, d_contexts(allocator)
{
Expand Down Expand Up @@ -582,6 +596,9 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client,
.setClientType(type)
.setProcessorHandle(processor);

context.d_statContext_mp->adjustValue(
mqbstat::DispatcherStats::Stat::e_CLIENT_COUNT,
1);
BALL_LOG_DEBUG << "Registered a new client to the dispatcher "
<< "[Client: " << client->description()
<< ", type: " << type << ", processor: " << processor
Expand Down Expand Up @@ -628,6 +645,9 @@ void Dispatcher::unregisterClient(mqbi::DispatcherClient* client)
case mqbi::DispatcherClientType::e_QUEUE:
case mqbi::DispatcherClientType::e_CLUSTER: {
d_contexts[type]->d_loadBalancer.removeClient(client);
d_contexts[type]->d_statContext_mp->adjustValue(
mqbstat::DispatcherStats::Stat::e_CLIENT_COUNT,
-1);
} break;
case mqbi::DispatcherClientType::e_UNDEFINED:
case mqbi::DispatcherClientType::e_ALL:
Expand Down
23 changes: 17 additions & 6 deletions src/groups/mqb/mqba/mqba_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@

#include <mqbcfg_messages.h>
#include <mqbi_dispatcher.h>
#include <mqbstat_dispatcherstats.h>
#include <mqbu_loadbalancer.h>

// MWC
#include <mwcc_multiqueuethreadpool.h>
#include <mwcex_executor.h>
#include <mwcst_statcontext.h>

// BDE
#include <ball_log.h>
Expand Down Expand Up @@ -245,6 +247,9 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {
// for distributing clients
// across processors

/// Stat context for this client type
bslma::ManagedPtr<mwcst::StatContext> d_statContext_mp;

bsl::vector<DispatcherClientPtrVector> d_flushList;
// Vector of vector of
// pointers to
Expand Down Expand Up @@ -277,21 +282,23 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {

private:
// DATA
/// Allocator to use
bslma::Allocator* d_allocator_p;
// Allocator to use

/// True if this component is started
bool d_isStarted;
// True if this component is started

/// Configuration for the dispatcher
mqbcfg::DispatcherConfig d_config;
// Configuration for the dispatcher

/// Top-level stat context for all dispatcher client types
mwcst::StatContext* d_statContext_p;

/// Event scheduler to use
bdlmt::EventScheduler* d_scheduler_p;
// Event scheduler to use

/// The various contexts, one for each ClientType
bsl::vector<DispatcherContextSp> d_contexts;
// The various context, one for each
// ClientType

// FRIENDS
friend class Dispatcher_ClientExecutor;
Expand Down Expand Up @@ -348,6 +355,7 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {
/// All memory allocation will be performed using the specified
/// `allocator`.
Dispatcher(const mqbcfg::DispatcherConfig& config,
mwcst::StatContext* statContext,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator);

Expand Down Expand Up @@ -549,6 +557,9 @@ inline void Dispatcher::dispatchEvent(mqbi::DispatcherEvent* event,
case mqbi::DispatcherClientType::e_QUEUE:
case mqbi::DispatcherClientType::e_CLUSTER: {
d_contexts[type]->d_processorPool_mp->enqueueEvent(event, handle);
d_contexts[type]->d_statContext_mp->adjustValue(
mqbstat::DispatcherStats::Stat::e_ENQ_START + event->type(),
1);
} break;
case mqbi::DispatcherClientType::e_UNDEFINED:
case mqbi::DispatcherClientType::e_ALL:
Expand Down
15 changes: 14 additions & 1 deletion src/groups/mqb/mqba/mqba_dispatcher.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// MQB
#include <mqbcfg_messages.h>
#include <mqbmock_dispatcher.h>
#include <mqbstat_dispatcherstats.h>

// MWC
#include <mwcex_bindutil.h>
Expand Down Expand Up @@ -118,8 +119,15 @@ static void test1_breathingTest()
s_allocator_p);
eventScheduler.start();

bsl::shared_ptr<mwcst::StatContext> statContext_sp(
mqbstat::DispatcherStatsUtil::initializeStatContext(10,
s_allocator_p));

{
mqba::Dispatcher obj(dispatcherConfig, &eventScheduler, s_allocator_p);
mqba::Dispatcher obj(dispatcherConfig,
statContext_sp.get(),
&eventScheduler,
s_allocator_p);
}

eventScheduler.stop();
Expand Down Expand Up @@ -211,7 +219,12 @@ static void test3_executorsSupport()
dispatcherConfig.clusters().processorConfig().queueSizeHighWatermark() =
100;

bsl::shared_ptr<mwcst::StatContext> statContext_sp(
mqbstat::DispatcherStatsUtil::initializeStatContext(10,
s_allocator_p));

mqba::Dispatcher dispatcher(dispatcherConfig,
statContext_sp.get(),
&eventScheduler,
s_allocator_p);

Expand Down
101 changes: 101 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_dispatcherstats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2018-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// mqbstat_dispatcherstats.cpp -*-C++-*-
#include <mqbstat_dispatcherstats.h>

#include <mqbscm_version.h>
// BMQ
#include <bmqt_uri.h>

// MQB
#include <mqbi_cluster.h>
#include <mqbi_domain.h>

// MWC
#include <mwcst_statcontext.h>
#include <mwcst_statutil.h>
#include <mwcst_statvalue.h>

// BDE
#include <bdld_datummapbuilder.h>
#include <bdld_manageddatum.h>
#include <bdlma_localsequentialallocator.h>
#include <bsls_assert.h>

namespace BloombergLP {
namespace mqbstat {

// ---------------------
// class DomainStatsUtil
// ---------------------

bsl::shared_ptr<mwcst::StatContext>
DispatcherStatsUtil::initializeStatContext(int historySize,
bslma::Allocator* allocator)
{
bdlma::LocalSequentialAllocator<2048> localAllocator(allocator);

mwcst::StatContextConfiguration config("dispatcher", &localAllocator);
config.defaultHistorySize(historySize)
.statValueAllocator(allocator)
.storeExpiredSubcontextValues(true);

return bsl::shared_ptr<mwcst::StatContext>(
new (*allocator) mwcst::StatContext(config, allocator),
allocator);
}

bslma::ManagedPtr<mwcst::StatContext>
DispatcherStatsUtil::initializeClientStatContext(mwcst::StatContext* parent,
const bslstl::StringRef& name,
bslma::Allocator* allocator)
{
bdlma::LocalSequentialAllocator<2048> localAllocator(allocator);

mwcst::StatContextConfiguration statConfig(name, &localAllocator);
statConfig.isTable(true)
.value("enq_undefined")
.value("enq_dispatcher")
.value("enq_callback")
.value("enq_control_msg")
.value("enq_confirm")
.value("enq_reject")
.value("enq_push")
.value("enq_put")
.value("enq_ack")
.value("enq_cluster_state")
.value("enq_storage")
.value("enq_recovery")
.value("enq_replication_receipt")
.value("done_undefined")
.value("done_dispatcher")
.value("done_callback")
.value("done_control_msg")
.value("done_confirm")
.value("done_reject")
.value("done_push")
.value("done_put")
.value("done_ack")
.value("done_cluster_state")
.value("done_storage")
.value("done_recovery")
.value("done_replication_receipt")
.value("nb_client");
return parent->addSubcontext(statConfig);
}

} // close package namespace
} // close enterprise namespace
Loading

0 comments on commit 8713de2

Please sign in to comment.