Skip to content

Commit

Permalink
Perf[MQB]: use in-place callbacks in Dispatcher
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Dec 11, 2024
1 parent 81f9fe2 commit 7b8f8b6
Show file tree
Hide file tree
Showing 22 changed files with 898 additions and 241 deletions.
4 changes: 2 additions & 2 deletions src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ void AdminSession::onDispatcherEvent(const mqbi::DispatcherEvent& event)
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();

BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break;

case mqbi::DispatcherEventType::e_ACK:
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore,
// with the 'tearDownAllQueuesDone' finalize callback having the 'handle'
// bound to it (so that the session is not yet destroyed).
dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(), // empty
mqbi::Dispatcher::VoidFunctor(), // empty
mqbi::DispatcherClientType::e_QUEUE,
bdlf::BindUtil::bind(&ClientSession::tearDownAllQueuesDone,
this,
Expand Down Expand Up @@ -1270,7 +1270,7 @@ void ClientSession::processDisconnectAllQueues(
// type, refer to top level documention for explanation (paragraph about
// the bmqu::SharedResource).
dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(), // empty
mqbi::Dispatcher::VoidFunctor(), // empty
mqbi::DispatcherClientType::e_QUEUE,
bdlf::BindUtil::bind(
bmqu::WeakMemFnUtil::weakMemFn(
Expand Down Expand Up @@ -3069,9 +3069,9 @@ void ClientSession::onDispatcherEvent(const mqbi::DispatcherEvent& event)
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();

BSLS_ASSERT_SAFE(realEvent->callback());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
flush(); // Flush any pending messages to guarantee ordering of events
realEvent->callback()(dispatcherClientData().processorHandle());
realEvent->callback()();
} break;
case mqbi::DispatcherEventType::e_CONTROL_MSG: {
BSLS_ASSERT_OPT(false &&
Expand Down
87 changes: 53 additions & 34 deletions src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ void Dispatcher_Executor::post(const bsl::function<void()>& f) const

event->object()
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f));
.callback()
.setCallback(f);

// submit the event
int rc = d_processorPool_p->enqueueEvent(event, d_processorHandle);
Expand Down Expand Up @@ -183,8 +184,9 @@ void Dispatcher_ClientExecutor::post(const bsl::function<void()>& f) const

event->object()
.setType(mqbi::DispatcherEventType::e_CALLBACK)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f))
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p));
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p))
.callback()
.setCallback(f);

// submit the event
int rc = processorPool()->enqueueEvent(event, processorHandle());
Expand Down Expand Up @@ -232,6 +234,39 @@ Dispatcher::DispatcherContext::DispatcherContext(
// NOTHING
}

// ------------------------------------
// class Dispatcher::OnNewClientFunctor
// ------------------------------------

Dispatcher::OnNewClientFunctor::OnNewClientFunctor(
Dispatcher* owner_p,
mqbi::DispatcherClientType::Enum type,
int processorId)
: d_owner_p(owner_p)
, d_type(type)
, d_processorId(processorId)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_owner_p);
}

// ACCESSORS
void Dispatcher::OnNewClientFunctor::operator()() const
{
// executed by the *DISPATCHER* thread

// Resize the 'd_flushList' vector for that specified 'processorId', if
// needed, to ensure it has enough space to hold all clients associated to
// that processorId.
DispatcherContext& context = *(d_owner_p->d_contexts[d_type]);

int count = context.d_loadBalancer.clientsCountForProcessor(d_processorId);
if (static_cast<int>(context.d_flushList[d_processorId].capacity()) <
count) {
context.d_flushList[d_processorId].reserve(count);
}
}

// ----------------
// class Dispatcher
// ----------------
Expand Down Expand Up @@ -372,10 +407,10 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
// whole purpose of the 'e_DISPATCHER' event type.
flushClients(type, processorId);

if (realEvent->callback()) {
if (realEvent->callback().hasCallback()) {
// A callback may not have been set if all we wanted was to
// execute the 'finalizeCallback' of the event.
realEvent->callback()(processorId);
realEvent->callback()();
}
}
else {
Expand Down Expand Up @@ -404,7 +439,7 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
const mqbi::DispatcherDispatcherEvent* realEvent =
event->object().asDispatcherEvent();

if (realEvent->finalizeCallback()) {
if (realEvent->finalizeCallback().hasCallback()) {
BALL_LOG_TRACE << "Calling finalizeCallback on queue "
<< processorId << " of " << type
<< " dispatcher: " << event->object();
Expand All @@ -430,23 +465,6 @@ void Dispatcher::flushClients(mqbi::DispatcherClientType::Enum type,
context.d_flushList[processorId].clear();
}

void Dispatcher::onNewClient(mqbi::DispatcherClientType::Enum type,
int processorId)
{
// executed by the *DISPATCHER* thread

// Resize the 'd_flushList' vector for that specified 'processorId', if
// needed, to ensure it has enough space to hold all clients associated to
// that processorId.
DispatcherContext& context = *(d_contexts[type]);

int count = context.d_loadBalancer.clientsCountForProcessor(processorId);
if (static_cast<int>(context.d_flushList[processorId].capacity()) <
count) {
context.d_flushList[processorId].reserve(count);
}
}

Dispatcher::Dispatcher(const mqbcfg::DispatcherConfig& config,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
Expand Down Expand Up @@ -595,12 +613,13 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client,
&context.d_processorPool_mp->getUnmanagedEvent()->object();
(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(
bdlf::BindUtil::bind(&Dispatcher::onNewClient,
this,
type,
bdlf::PlaceHolders::_1)) // processor
.setDestination(client); // not needed
.setDestination(client); // TODO: not needed?

// Build callback functor in-place.
// The destructor for functor is called in `reset`.
new (event->callback().place<OnNewClientFunctor>())
OnNewClientFunctor(this, type, processor);

context.d_processorPool_mp->enqueueEvent(event, processor);
return processor; // RETURN
} // break;
Expand Down Expand Up @@ -650,8 +669,8 @@ void Dispatcher::unregisterClient(mqbi::DispatcherClient* client)
mqbi::Dispatcher::k_INVALID_PROCESSOR_HANDLE);
}

void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
mqbi::DispatcherClientType::Enum type,
void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,
mqbi::DispatcherClientType::Enum type,
const mqbi::Dispatcher::VoidFunctor& doneCallback)
{
// PRECONDITIONS
Expand Down Expand Up @@ -691,9 +710,9 @@ void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
if (processorPool[i] != 0) {
mqbi::DispatcherEvent* qEvent =
&processorPool[i]->getUnmanagedEvent()->object();
qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(functor)
.setFinalizeCallback(doneCallback);
qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER);
qEvent->callback().setCallback(functor);
qEvent->finalizeCallback().setCallback(doneCallback);
processorPool[i]->enqueueEventOnAllQueues(qEvent);
}
}
Expand Down
42 changes: 31 additions & 11 deletions src/groups/mqb/mqba/mqba_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {

typedef bsl::vector<mqbi::DispatcherClient*> DispatcherClientPtrVector;

/// The purpose is to avoid memory allocation by bdlf::BindUtil::bind
/// when dispatching CONFIRM from Cluster to Queue.
class OnNewClientFunctor : public mqbi::CallbackFunctor {
private:
// PRIVATE DATA
Dispatcher* d_owner_p;
mqbi::DispatcherClientType::Enum d_type;
int d_processorId;

public:
// CREATORS
/// This functor is invoked when a new client with the specified `type`
/// is registered to the dispatcher, from the thread associated to that
/// new client that is mapped to the specified `processorId`. The
/// specified `owner_p` holds pointer to the parent Dispatcher object.
OnNewClientFunctor(Dispatcher* owner_p,
mqbi::DispatcherClientType::Enum type,
int processorId);

// ACCESSORS
/// Updated the data associated with the new client from the
/// appropriate thread, using fields stored in this functor.
void operator()() const BSLS_KEYWORD_OVERRIDE;
};

/// Context for a dispatcher, with threads and pools
struct DispatcherContext {
private:
Expand Down Expand Up @@ -332,11 +357,6 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {
/// `processorId`.
void flushClients(mqbi::DispatcherClientType::Enum type, int processorId);

/// This method is invoked when a new client of the specified `type` is
/// registered to the dispatcher, from the thread associated to that new
/// client that is mapped to the specified `processorId`.
void onNewClient(mqbi::DispatcherClientType::Enum type, int processorId);

public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(Dispatcher, bslma::UsesBslmaAllocator)
Expand Down Expand Up @@ -416,9 +436,9 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {
/// clients of the specified `type`, and invoke the optionally specified
/// `doneCallback` (if any) when all the relevant processors are done
/// executing the `functor`.
void execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
mqbi::DispatcherClientType::Enum type,
const mqbi::Dispatcher::VoidFunctor& doneCallback =
void execute(const mqbi::Dispatcher::VoidFunctor& functor,
mqbi::DispatcherClientType::Enum type,
const mqbi::Dispatcher::VoidFunctor& doneCallback =
mqbi::Dispatcher::VoidFunctor()) BSLS_KEYWORD_OVERRIDE;

void execute(const mqbi::Dispatcher::VoidFunctor& functor,
Expand Down Expand Up @@ -569,8 +589,7 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,

mqbi::DispatcherEvent* event = getEvent(client);

(*event).setType(type).setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
(*event).setType(type).callback().setCallback(functor);

dispatchEvent(event, client);
}
Expand All @@ -585,7 +604,8 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,

(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(functor));
.callback()
.setCallback(functor);

dispatchEvent(event, client.clientType(), client.processorHandle());
}
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3397,8 +3397,8 @@ void Cluster::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent& realEvent =
*event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent.callback());
realEvent.callback()(dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent.callback().hasCallback());
realEvent.callback()();
} break; // BREAK
case mqbi::DispatcherEventType::e_PUT: {
const mqbi::DispatcherPutEvent& realEvent = *event.asPutEvent();
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1384,8 +1384,8 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break;
case mqbi::DispatcherEventType::e_PUSH: {
onPushEvent(*(event.asPushEvent()));
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5893,7 +5893,7 @@ void ClusterQueueHelper::checkUnconfirmedV2Dispatched(
// Synchronize with all Queue Dispatcher threads
bslmt::Latch latch(1);
d_cluster_p->dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(), // empty
mqbi::Dispatcher::VoidFunctor(), // empty
mqbi::DispatcherClientType::e_QUEUE,
bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch));

Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,8 @@ void LocalQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(
d_state_p->queue()->dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break; // BREAK
case mqbi::DispatcherEventType::e_ACK: {
BALL_LOG_INFO << "Skipping dispatcher event [" << event << "] "
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,12 @@ void QueueEngineUtil_ReleaseHandleProctor::invokeCallback()
// represents 'mqbblp::ClusterNodeSession').

d_queueState_p->queue()->dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(),
mqbi::Dispatcher::VoidFunctor(),
mqbi::DispatcherClientType::e_SESSION,
bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp));

d_queueState_p->queue()->dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(),
mqbi::Dispatcher::VoidFunctor(),
mqbi::DispatcherClientType::e_CLUSTER,
bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp));
}
Expand Down
10 changes: 8 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,15 @@ void QueueHandle::confirmMessage(const bmqt::MessageGUID& msgGUID,
// A more generic approach would be to maintain a queue of CONFIRMs per
// queue (outside of the dispatcher) and process it separately (on idle?).

QueueHandle::ConfirmFunctor f(this, msgGUID, downstreamSubQueueId);
mqbi::DispatcherEvent* queueEvent = d_queue_sp->dispatcher()->getEvent(
mqbi::DispatcherClientType::e_QUEUE);

d_queue_sp->dispatcher()->execute(f, d_queue_sp.get());
(*queueEvent).setType(mqbi::DispatcherEventType::e_CALLBACK);

new (queueEvent->callback().place<QueueHandle::ConfirmFunctor>())
QueueHandle::ConfirmFunctor(this, msgGUID, downstreamSubQueueId);

d_queue_sp->dispatcher()->dispatchEvent(queueEvent, d_queue_sp.get());
}

void QueueHandle::rejectMessage(const bmqt::MessageGUID& msgGUID,
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class QueueHandle : public mqbi::QueueHandle {

/// The purpose is to avoid memory allocation by bdlf::BindUtil::bind
/// when dispatching CONFIRM from Cluster to Queue.
class ConfirmFunctor {
class ConfirmFunctor : public mqbi::CallbackFunctor {
private:
// PRIVATE DATA
QueueHandle* d_owner_p;
Expand All @@ -144,7 +144,7 @@ class QueueHandle : public mqbi::QueueHandle {
bmqt::MessageGUID guid,
unsigned int downstreamSubQueueId);

void operator()();
void operator()() const BSLS_KEYWORD_OVERRIDE;
};

public:
Expand Down Expand Up @@ -682,7 +682,7 @@ inline QueueHandle::ConfirmFunctor::ConfirmFunctor(
// NOTHING
}

inline void QueueHandle::ConfirmFunctor::operator()()
inline void QueueHandle::ConfirmFunctor::operator()() const
{
d_owner_p->confirmMessageDispatched(d_guid, d_downstreamSubQueueId);
}
Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,8 @@ void RemoteQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(
d_state_p->queue()->dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break;
case mqbi::DispatcherEventType::e_PUSH: {
const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent();
Expand Down
Loading

0 comments on commit 7b8f8b6

Please sign in to comment.