Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf[MQB]: callback construction in a fixed buffer #481

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ void AdminSession::onDispatcherEvent(const mqbi::DispatcherEvent& event)
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();

BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break;

case mqbi::DispatcherEventType::e_ACK:
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore,
// with the 'tearDownAllQueuesDone' finalize callback having the 'handle'
// bound to it (so that the session is not yet destroyed).
dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(), // empty
mqbi::Dispatcher::VoidFunctor(), // empty
mqbi::DispatcherClientType::e_QUEUE,
bdlf::BindUtil::bind(&ClientSession::tearDownAllQueuesDone,
this,
Expand Down Expand Up @@ -1270,7 +1270,7 @@ void ClientSession::processDisconnectAllQueues(
// type, refer to top level documention for explanation (paragraph about
// the bmqu::SharedResource).
dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(), // empty
mqbi::Dispatcher::VoidFunctor(), // empty
mqbi::DispatcherClientType::e_QUEUE,
bdlf::BindUtil::bind(
bmqu::WeakMemFnUtil::weakMemFn(
Expand Down Expand Up @@ -3069,9 +3069,9 @@ void ClientSession::onDispatcherEvent(const mqbi::DispatcherEvent& event)
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();

BSLS_ASSERT_SAFE(realEvent->callback());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
flush(); // Flush any pending messages to guarantee ordering of events
realEvent->callback()(dispatcherClientData().processorHandle());
realEvent->callback()();
} break;
case mqbi::DispatcherEventType::e_CONTROL_MSG: {
BSLS_ASSERT_OPT(false &&
Expand Down
87 changes: 53 additions & 34 deletions src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ void Dispatcher_Executor::post(const bsl::function<void()>& f) const

event->object()
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f));
.callback()
.setCallback(f);

// submit the event
int rc = d_processorPool_p->enqueueEvent(event, d_processorHandle);
Expand Down Expand Up @@ -183,8 +184,9 @@ void Dispatcher_ClientExecutor::post(const bsl::function<void()>& f) const

event->object()
.setType(mqbi::DispatcherEventType::e_CALLBACK)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f))
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p));
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p))
.callback()
.setCallback(f);

// submit the event
int rc = processorPool()->enqueueEvent(event, processorHandle());
Expand Down Expand Up @@ -232,6 +234,39 @@ Dispatcher::DispatcherContext::DispatcherContext(
// NOTHING
}

// ------------------------------------
// class Dispatcher::OnNewClientFunctor
// ------------------------------------

Dispatcher::OnNewClientFunctor::OnNewClientFunctor(
Dispatcher* owner_p,
mqbi::DispatcherClientType::Enum type,
int processorId)
: d_owner_p(owner_p)
, d_type(type)
, d_processorId(processorId)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_owner_p);
}

// ACCESSORS
void Dispatcher::OnNewClientFunctor::operator()() const
{
// executed by the *DISPATCHER* thread

// Resize the 'd_flushList' vector for that specified 'processorId', if
// needed, to ensure it has enough space to hold all clients associated to
// that processorId.
DispatcherContext& context = *(d_owner_p->d_contexts[d_type]);

int count = context.d_loadBalancer.clientsCountForProcessor(d_processorId);
if (static_cast<int>(context.d_flushList[d_processorId].capacity()) <
count) {
context.d_flushList[d_processorId].reserve(count);
}
}

// ----------------
// class Dispatcher
// ----------------
Expand Down Expand Up @@ -372,10 +407,10 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
// whole purpose of the 'e_DISPATCHER' event type.
flushClients(type, processorId);

if (realEvent->callback()) {
if (realEvent->callback().hasCallback()) {
// A callback may not have been set if all we wanted was to
// execute the 'finalizeCallback' of the event.
realEvent->callback()(processorId);
realEvent->callback()();
}
}
else {
Expand Down Expand Up @@ -404,7 +439,7 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
const mqbi::DispatcherDispatcherEvent* realEvent =
event->object().asDispatcherEvent();

if (realEvent->finalizeCallback()) {
if (realEvent->finalizeCallback().hasCallback()) {
BALL_LOG_TRACE << "Calling finalizeCallback on queue "
<< processorId << " of " << type
<< " dispatcher: " << event->object();
Expand All @@ -430,23 +465,6 @@ void Dispatcher::flushClients(mqbi::DispatcherClientType::Enum type,
context.d_flushList[processorId].clear();
}

void Dispatcher::onNewClient(mqbi::DispatcherClientType::Enum type,
int processorId)
{
// executed by the *DISPATCHER* thread

// Resize the 'd_flushList' vector for that specified 'processorId', if
// needed, to ensure it has enough space to hold all clients associated to
// that processorId.
DispatcherContext& context = *(d_contexts[type]);

int count = context.d_loadBalancer.clientsCountForProcessor(processorId);
if (static_cast<int>(context.d_flushList[processorId].capacity()) <
count) {
context.d_flushList[processorId].reserve(count);
}
}

Dispatcher::Dispatcher(const mqbcfg::DispatcherConfig& config,
bdlmt::EventScheduler* scheduler,
bslma::Allocator* allocator)
Expand Down Expand Up @@ -595,12 +613,13 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client,
&context.d_processorPool_mp->getUnmanagedEvent()->object();
(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(
bdlf::BindUtil::bind(&Dispatcher::onNewClient,
this,
type,
bdlf::PlaceHolders::_1)) // processor
.setDestination(client); // not needed
.setDestination(client); // TODO: not needed?

// Build callback functor in-place.
// The destructor for functor is called in `reset`.
new (event->callback().place<OnNewClientFunctor>())
OnNewClientFunctor(this, type, processor);

context.d_processorPool_mp->enqueueEvent(event, processor);
return processor; // RETURN
} // break;
Expand Down Expand Up @@ -650,8 +669,8 @@ void Dispatcher::unregisterClient(mqbi::DispatcherClient* client)
mqbi::Dispatcher::k_INVALID_PROCESSOR_HANDLE);
}

void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
mqbi::DispatcherClientType::Enum type,
void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,
mqbi::DispatcherClientType::Enum type,
const mqbi::Dispatcher::VoidFunctor& doneCallback)
{
// PRECONDITIONS
Expand Down Expand Up @@ -691,9 +710,9 @@ void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
if (processorPool[i] != 0) {
mqbi::DispatcherEvent* qEvent =
&processorPool[i]->getUnmanagedEvent()->object();
qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(functor)
.setFinalizeCallback(doneCallback);
qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER);
qEvent->callback().setCallback(functor);
qEvent->finalizeCallback().setCallback(doneCallback);
processorPool[i]->enqueueEventOnAllQueues(qEvent);
}
}
Expand Down
42 changes: 31 additions & 11 deletions src/groups/mqb/mqba/mqba_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {

typedef bsl::vector<mqbi::DispatcherClient*> DispatcherClientPtrVector;

/// The purpose is to avoid memory allocation by bdlf::BindUtil::bind
/// when dispatching CONFIRM from Cluster to Queue.
class OnNewClientFunctor : public mqbi::CallbackFunctor {
private:
// PRIVATE DATA
Dispatcher* d_owner_p;
mqbi::DispatcherClientType::Enum d_type;
int d_processorId;

public:
// CREATORS
/// This functor is invoked when a new client with the specified `type`
/// is registered to the dispatcher, from the thread associated to that
/// new client that is mapped to the specified `processorId`. The
/// specified `owner_p` holds pointer to the parent Dispatcher object.
OnNewClientFunctor(Dispatcher* owner_p,
mqbi::DispatcherClientType::Enum type,
int processorId);

// ACCESSORS
/// Updated the data associated with the new client from the
/// appropriate thread, using fields stored in this functor.
void operator()() const BSLS_KEYWORD_OVERRIDE;
};

/// Context for a dispatcher, with threads and pools
struct DispatcherContext {
private:
Expand Down Expand Up @@ -332,11 +357,6 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {
/// `processorId`.
void flushClients(mqbi::DispatcherClientType::Enum type, int processorId);

/// This method is invoked when a new client of the specified `type` is
/// registered to the dispatcher, from the thread associated to that new
/// client that is mapped to the specified `processorId`.
void onNewClient(mqbi::DispatcherClientType::Enum type, int processorId);

public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(Dispatcher, bslma::UsesBslmaAllocator)
Expand Down Expand Up @@ -416,9 +436,9 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher {
/// clients of the specified `type`, and invoke the optionally specified
/// `doneCallback` (if any) when all the relevant processors are done
/// executing the `functor`.
void execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
mqbi::DispatcherClientType::Enum type,
const mqbi::Dispatcher::VoidFunctor& doneCallback =
void execute(const mqbi::Dispatcher::VoidFunctor& functor,
mqbi::DispatcherClientType::Enum type,
const mqbi::Dispatcher::VoidFunctor& doneCallback =
mqbi::Dispatcher::VoidFunctor()) BSLS_KEYWORD_OVERRIDE;

void execute(const mqbi::Dispatcher::VoidFunctor& functor,
Expand Down Expand Up @@ -569,8 +589,7 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,

mqbi::DispatcherEvent* event = getEvent(client);

(*event).setType(type).setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
(*event).setType(type).callback().setCallback(functor);

dispatchEvent(event, client);
}
Expand All @@ -585,7 +604,8 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,

(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(functor));
.callback()
.setCallback(functor);

dispatchEvent(event, client.clientType(), client.processorHandle());
}
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3397,8 +3397,8 @@ void Cluster::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent& realEvent =
*event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent.callback());
realEvent.callback()(dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent.callback().hasCallback());
realEvent.callback()();
} break; // BREAK
case mqbi::DispatcherEventType::e_PUT: {
const mqbi::DispatcherPutEvent& realEvent = *event.asPutEvent();
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1384,8 +1384,8 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break;
case mqbi::DispatcherEventType::e_PUSH: {
onPushEvent(*(event.asPushEvent()));
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5893,7 +5893,7 @@ void ClusterQueueHelper::checkUnconfirmedV2Dispatched(
// Synchronize with all Queue Dispatcher threads
bslmt::Latch latch(1);
d_cluster_p->dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(), // empty
mqbi::Dispatcher::VoidFunctor(), // empty
mqbi::DispatcherClientType::e_QUEUE,
bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch));

Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,8 @@ void LocalQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(
d_state_p->queue()->dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break; // BREAK
case mqbi::DispatcherEventType::e_ACK: {
BALL_LOG_INFO << "Skipping dispatcher event [" << event << "] "
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,12 @@ void QueueEngineUtil_ReleaseHandleProctor::invokeCallback()
// represents 'mqbblp::ClusterNodeSession').

d_queueState_p->queue()->dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(),
mqbi::Dispatcher::VoidFunctor(),
mqbi::DispatcherClientType::e_SESSION,
bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp));

d_queueState_p->queue()->dispatcher()->execute(
mqbi::Dispatcher::ProcessorFunctor(),
mqbi::Dispatcher::VoidFunctor(),
mqbi::DispatcherClientType::e_CLUSTER,
bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp));
}
Expand Down
10 changes: 8 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,15 @@ void QueueHandle::confirmMessage(const bmqt::MessageGUID& msgGUID,
// A more generic approach would be to maintain a queue of CONFIRMs per
// queue (outside of the dispatcher) and process it separately (on idle?).

QueueHandle::ConfirmFunctor f(this, msgGUID, downstreamSubQueueId);
mqbi::DispatcherEvent* queueEvent = d_queue_sp->dispatcher()->getEvent(
mqbi::DispatcherClientType::e_QUEUE);

d_queue_sp->dispatcher()->execute(f, d_queue_sp.get());
(*queueEvent).setType(mqbi::DispatcherEventType::e_CALLBACK);

new (queueEvent->callback().place<QueueHandle::ConfirmFunctor>())
QueueHandle::ConfirmFunctor(this, msgGUID, downstreamSubQueueId);

d_queue_sp->dispatcher()->dispatchEvent(queueEvent, d_queue_sp.get());
}

void QueueHandle::rejectMessage(const bmqt::MessageGUID& msgGUID,
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class QueueHandle : public mqbi::QueueHandle {

/// The purpose is to avoid memory allocation by bdlf::BindUtil::bind
/// when dispatching CONFIRM from Cluster to Queue.
class ConfirmFunctor {
class ConfirmFunctor : public mqbi::CallbackFunctor {
private:
// PRIVATE DATA
QueueHandle* d_owner_p;
Expand All @@ -144,7 +144,7 @@ class QueueHandle : public mqbi::QueueHandle {
bmqt::MessageGUID guid,
unsigned int downstreamSubQueueId);

void operator()();
void operator()() const BSLS_KEYWORD_OVERRIDE;
};

public:
Expand Down Expand Up @@ -682,7 +682,7 @@ inline QueueHandle::ConfirmFunctor::ConfirmFunctor(
// NOTHING
}

inline void QueueHandle::ConfirmFunctor::operator()()
inline void QueueHandle::ConfirmFunctor::operator()() const
{
d_owner_p->confirmMessageDispatched(d_guid, d_downstreamSubQueueId);
}
Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,9 +774,8 @@ void RemoteQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event)
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(
d_state_p->queue()->dispatcherClientData().processorHandle());
BSLS_ASSERT_SAFE(realEvent->callback().hasCallback());
realEvent->callback()();
} break;
case mqbi::DispatcherEventType::e_PUSH: {
const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent();
Expand Down
Loading