From 61e1b87239b70cc462517d60eff145936746d747 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Thu, 31 Oct 2024 01:48:09 +0000 Subject: [PATCH] Perf[MQB]: use in-place callbacks in Dispatcher Signed-off-by: Evgeny Malygin --- src/groups/mqb/mqba/mqba_adminsession.cpp | 4 +- src/groups/mqb/mqba/mqba_clientsession.cpp | 8 +- src/groups/mqb/mqba/mqba_dispatcher.cpp | 87 ++++---- src/groups/mqb/mqba/mqba_dispatcher.h | 42 +++- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 4 +- src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 4 +- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 2 +- src/groups/mqb/mqbblp/mqbblp_localqueue.cpp | 5 +- .../mqb/mqbblp/mqbblp_queueengineutil.cpp | 4 +- src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp | 10 +- src/groups/mqb/mqbblp/mqbblp_queuehandle.h | 6 +- src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp | 5 +- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 9 +- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 4 - src/groups/mqb/mqbc/mqbc_storageutil.cpp | 100 +++++---- src/groups/mqb/mqbc/mqbc_storageutil.h | 97 ++++----- src/groups/mqb/mqbi/mqbi_dispatcher.cpp | 71 ++++++- src/groups/mqb/mqbi/mqbi_dispatcher.h | 190 +++++++++++++----- src/groups/mqb/mqbi/mqbi_dispatcher.t.cpp | 185 +++++++++++++++++ src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp | 8 +- src/groups/mqb/mqbmock/mqbmock_dispatcher.h | 6 +- src/groups/mqb/mqbs/mqbs_filestore.cpp | 4 +- 22 files changed, 604 insertions(+), 251 deletions(-) create mode 100644 src/groups/mqb/mqbi/mqbi_dispatcher.t.cpp diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp index 2bf5caea2..3a5fd07db 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.cpp @@ -461,8 +461,8 @@ void AdminSession::onDispatcherEvent(const mqbi::DispatcherEvent& event) const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(realEvent->callback().hasCallback()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_ACK: diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 0d1d5674c..6b6cb8ee8 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -692,7 +692,7 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore, // with the 'tearDownAllQueuesDone' finalize callback having the 'handle' // bound to it (so that the session is not yet destroyed). dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::Dispatcher::VoidFunctor(), // empty mqbi::DispatcherClientType::e_QUEUE, bdlf::BindUtil::bind(&ClientSession::tearDownAllQueuesDone, this, @@ -1214,7 +1214,7 @@ void ClientSession::processDisconnectAllQueues( // type, refer to top level documention for explanation (paragraph about // the bmqu::SharedResource). dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::Dispatcher::VoidFunctor(), // empty mqbi::DispatcherClientType::e_QUEUE, bdlf::BindUtil::bind( bmqu::WeakMemFnUtil::weakMemFn( @@ -3013,9 +3013,9 @@ void ClientSession::onDispatcherEvent(const mqbi::DispatcherEvent& event) const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); + BSLS_ASSERT_SAFE(realEvent->callback().hasCallback()); flush(); // Flush any pending messages to guarantee ordering of events - realEvent->callback()(dispatcherClientData().processorHandle()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_CONTROL_MSG: { BSLS_ASSERT_OPT(false && diff --git a/src/groups/mqb/mqba/mqba_dispatcher.cpp b/src/groups/mqb/mqba/mqba_dispatcher.cpp index 4032047ff..83e24486f 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.cpp +++ b/src/groups/mqb/mqba/mqba_dispatcher.cpp @@ -91,7 +91,8 @@ void Dispatcher_Executor::post(const bsl::function& f) const event->object() .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f)); + .callback() + .setCallback(f); // submit the event int rc = d_processorPool_p->enqueueEvent(event, d_processorHandle); @@ -183,8 +184,9 @@ void Dispatcher_ClientExecutor::post(const bsl::function& f) const event->object() .setType(mqbi::DispatcherEventType::e_CALLBACK) - .setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f)) - .setDestination(const_cast(d_client_p)); + .setDestination(const_cast(d_client_p)) + .callback() + .setCallback(f); // submit the event int rc = processorPool()->enqueueEvent(event, processorHandle()); @@ -232,6 +234,39 @@ Dispatcher::DispatcherContext::DispatcherContext( // NOTHING } +// ------------------------------------ +// class Dispatcher::OnNewClientFunctor +// ------------------------------------ + +Dispatcher::OnNewClientFunctor::OnNewClientFunctor( + Dispatcher* owner_p, + mqbi::DispatcherClientType::Enum type, + int processorId) +: d_owner_p(owner_p) +, d_type(type) +, d_processorId(processorId) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_owner_p); +} + +// ACCESSORS +void Dispatcher::OnNewClientFunctor::operator()() const +{ + // executed by the *DISPATCHER* thread + + // Resize the 'd_flushList' vector for that specified 'processorId', if + // needed, to ensure it has enough space to hold all clients associated to + // that processorId. + DispatcherContext& context = *(d_owner_p->d_contexts[d_type]); + + int count = context.d_loadBalancer.clientsCountForProcessor(d_processorId); + if (static_cast(context.d_flushList[d_processorId].capacity()) < + count) { + context.d_flushList[d_processorId].reserve(count); + } +} + // ---------------- // class Dispatcher // ---------------- @@ -372,10 +407,10 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, // whole purpose of the 'e_DISPATCHER' event type. flushClients(type, processorId); - if (realEvent->callback()) { + if (realEvent->callback().hasCallback()) { // A callback may not have been set if all we wanted was to // execute the 'finalizeCallback' of the event. - realEvent->callback()(processorId); + realEvent->callback()(); } } else { @@ -404,7 +439,7 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, const mqbi::DispatcherDispatcherEvent* realEvent = event->object().asDispatcherEvent(); - if (realEvent->finalizeCallback()) { + if (realEvent->finalizeCallback().hasCallback()) { BALL_LOG_TRACE << "Calling finalizeCallback on queue " << processorId << " of " << type << " dispatcher: " << event->object(); @@ -430,23 +465,6 @@ void Dispatcher::flushClients(mqbi::DispatcherClientType::Enum type, context.d_flushList[processorId].clear(); } -void Dispatcher::onNewClient(mqbi::DispatcherClientType::Enum type, - int processorId) -{ - // executed by the *DISPATCHER* thread - - // Resize the 'd_flushList' vector for that specified 'processorId', if - // needed, to ensure it has enough space to hold all clients associated to - // that processorId. - DispatcherContext& context = *(d_contexts[type]); - - int count = context.d_loadBalancer.clientsCountForProcessor(processorId); - if (static_cast(context.d_flushList[processorId].capacity()) < - count) { - context.d_flushList[processorId].reserve(count); - } -} - Dispatcher::Dispatcher(const mqbcfg::DispatcherConfig& config, bdlmt::EventScheduler* scheduler, bslma::Allocator* allocator) @@ -595,12 +613,13 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client, &context.d_processorPool_mp->getUnmanagedEvent()->object(); (*event) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback( - bdlf::BindUtil::bind(&Dispatcher::onNewClient, - this, - type, - bdlf::PlaceHolders::_1)) // processor - .setDestination(client); // not needed + .setDestination(client); // TODO: not needed? + + // Build callback functor in-place. + // The destructor for functor is called in `reset`. + new (event->callback().place()) + OnNewClientFunctor(this, type, processor); + context.d_processorPool_mp->enqueueEvent(event, processor); return processor; // RETURN } // break; @@ -650,8 +669,8 @@ void Dispatcher::unregisterClient(mqbi::DispatcherClient* client) mqbi::Dispatcher::k_INVALID_PROCESSOR_HANDLE); } -void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor, - mqbi::DispatcherClientType::Enum type, +void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor, + mqbi::DispatcherClientType::Enum type, const mqbi::Dispatcher::VoidFunctor& doneCallback) { // PRECONDITIONS @@ -691,9 +710,9 @@ void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor, if (processorPool[i] != 0) { mqbi::DispatcherEvent* qEvent = &processorPool[i]->getUnmanagedEvent()->object(); - qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(functor) - .setFinalizeCallback(doneCallback); + qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER); + qEvent->callback().setCallback(functor); + qEvent->finalizeCallback().setCallback(doneCallback); processorPool[i]->enqueueEventOnAllQueues(qEvent); } } diff --git a/src/groups/mqb/mqba/mqba_dispatcher.h b/src/groups/mqb/mqba/mqba_dispatcher.h index 23861cc85..280f1b6b0 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.h +++ b/src/groups/mqb/mqba/mqba_dispatcher.h @@ -221,6 +221,31 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { typedef bsl::vector DispatcherClientPtrVector; + /// The purpose is to avoid memory allocation by bdlf::BindUtil::bind + /// when dispatching CONFIRM from Cluster to Queue. + class OnNewClientFunctor : public mqbi::CallbackFunctor { + private: + // PRIVATE DATA + Dispatcher* d_owner_p; + mqbi::DispatcherClientType::Enum d_type; + int d_processorId; + + public: + // CREATORS + /// This functor is invoked when a new client with the specified `type` + /// is registered to the dispatcher, from the thread associated to that + /// new client that is mapped to the specified `processorId`. The + /// specified `owner_p` holds pointer to the parent Dispatcher object. + OnNewClientFunctor(Dispatcher* owner_p, + mqbi::DispatcherClientType::Enum type, + int processorId); + + // ACCESSORS + /// Updated the data associated with the new client from the + /// appropriate thread, using fields stored in this functor. + void operator()() const BSLS_KEYWORD_OVERRIDE; + }; + /// Context for a dispatcher, with threads and pools struct DispatcherContext { private: @@ -332,11 +357,6 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { /// `processorId`. void flushClients(mqbi::DispatcherClientType::Enum type, int processorId); - /// This method is invoked when a new client of the specified `type` is - /// registered to the dispatcher, from the thread associated to that new - /// client that is mapped to the specified `processorId`. - void onNewClient(mqbi::DispatcherClientType::Enum type, int processorId); - public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(Dispatcher, bslma::UsesBslmaAllocator) @@ -416,9 +436,9 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { /// clients of the specified `type`, and invoke the optionally specified /// `doneCallback` (if any) when all the relevant processors are done /// executing the `functor`. - void execute(const mqbi::Dispatcher::ProcessorFunctor& functor, - mqbi::DispatcherClientType::Enum type, - const mqbi::Dispatcher::VoidFunctor& doneCallback = + void execute(const mqbi::Dispatcher::VoidFunctor& functor, + mqbi::DispatcherClientType::Enum type, + const mqbi::Dispatcher::VoidFunctor& doneCallback = mqbi::Dispatcher::VoidFunctor()) BSLS_KEYWORD_OVERRIDE; void execute(const mqbi::Dispatcher::VoidFunctor& functor, @@ -569,8 +589,7 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor, mqbi::DispatcherEvent* event = getEvent(client); - (*event).setType(type).setCallback( - mqbi::Dispatcher::voidToProcessorFunctor(functor)); + (*event).setType(type).callback().setCallback(functor); dispatchEvent(event, client); } @@ -585,7 +604,8 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor, (*event) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(mqbi::Dispatcher::voidToProcessorFunctor(functor)); + .callback() + .setCallback(functor); dispatchEvent(event, client.clientType(), client.processorHandle()); } diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index f1f5d8d91..797d126ea 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -3396,8 +3396,8 @@ void Cluster::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent& realEvent = *event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent.callback()); - realEvent.callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(realEvent.callback().hasCallback()); + realEvent.callback()(); } break; // BREAK case mqbi::DispatcherEventType::e_PUT: { const mqbi::DispatcherPutEvent& realEvent = *event.asPutEvent(); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 94cca4401..3c017d005 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -1384,8 +1384,8 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(realEvent->callback().hasCallback()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_PUSH: { onPushEvent(*(event.asPushEvent())); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 8822876f4..a317918a8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -5901,7 +5901,7 @@ void ClusterQueueHelper::checkUnconfirmedV2Dispatched( // Synchronize with all Queue Dispatcher threads bslmt::Latch latch(1); d_cluster_p->dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::Dispatcher::VoidFunctor(), // empty mqbi::DispatcherClientType::e_QUEUE, bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch)); diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index c300dc02d..59f18b163 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -334,9 +334,8 @@ void LocalQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()( - d_state_p->queue()->dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(realEvent->callback().hasCallback()); + realEvent->callback()(); } break; // BREAK case mqbi::DispatcherEventType::e_ACK: { BALL_LOG_INFO << "Skipping dispatcher event [" << event << "] " diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index 92c7876ff..96987cb8e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -606,12 +606,12 @@ void QueueEngineUtil_ReleaseHandleProctor::invokeCallback() // represents 'mqbblp::ClusterNodeSession'). d_queueState_p->queue()->dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), + mqbi::Dispatcher::VoidFunctor(), mqbi::DispatcherClientType::e_SESSION, bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp)); d_queueState_p->queue()->dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), + mqbi::Dispatcher::VoidFunctor(), mqbi::DispatcherClientType::e_CLUSTER, bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp)); } diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index 42ffca0c7..844c7c8c8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -766,9 +766,15 @@ void QueueHandle::confirmMessage(const bmqt::MessageGUID& msgGUID, // A more generic approach would be to maintain a queue of CONFIRMs per // queue (outside of the dispatcher) and process it separately (on idle?). - QueueHandle::ConfirmFunctor f(this, msgGUID, downstreamSubQueueId); + mqbi::DispatcherEvent* queueEvent = d_queue_sp->dispatcher()->getEvent( + mqbi::DispatcherClientType::e_QUEUE); - d_queue_sp->dispatcher()->execute(f, d_queue_sp.get()); + (*queueEvent).setType(mqbi::DispatcherEventType::e_CALLBACK); + + new (queueEvent->callback().place()) + QueueHandle::ConfirmFunctor(this, msgGUID, downstreamSubQueueId); + + d_queue_sp->dispatcher()->dispatchEvent(queueEvent, d_queue_sp.get()); } void QueueHandle::rejectMessage(const bmqt::MessageGUID& msgGUID, diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.h b/src/groups/mqb/mqbblp/mqbblp_queuehandle.h index 78b75d4e8..240533e61 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.h @@ -131,7 +131,7 @@ class QueueHandle : public mqbi::QueueHandle { /// The purpose is to avoid memory allocation by bdlf::BindUtil::bind /// when dispatching CONFIRM from Cluster to Queue. - class ConfirmFunctor { + class ConfirmFunctor : public mqbi::CallbackFunctor { private: // PRIVATE DATA QueueHandle* d_owner_p; @@ -144,7 +144,7 @@ class QueueHandle : public mqbi::QueueHandle { bmqt::MessageGUID guid, unsigned int downstreamSubQueueId); - void operator()(); + void operator()() const BSLS_KEYWORD_OVERRIDE; }; public: @@ -682,7 +682,7 @@ inline QueueHandle::ConfirmFunctor::ConfirmFunctor( // NOTHING } -inline void QueueHandle::ConfirmFunctor::operator()() +inline void QueueHandle::ConfirmFunctor::operator()() const { d_owner_p->confirmMessageDispatched(d_guid, d_downstreamSubQueueId); } diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index bf411902a..a1c27e4c1 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -750,9 +750,8 @@ void RemoteQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()( - d_state_p->queue()->dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(realEvent->callback().hasCallback()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_PUSH: { const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent(); diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index e319f48f3..3b8bf0105 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -1069,7 +1069,6 @@ void StorageManager::registerQueue(const bmqt::Uri& uri, &d_appKeysVec[partitionId], &d_appKeysLock, &d_allocators, - processorForPartition(partitionId), uri, queueKey, d_clusterData_p->identity().description(), @@ -1092,9 +1091,9 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) + .callback() .setCallback( bdlf::BindUtil::bind(&mqbc::StorageUtil::unregisterQueueDispatched, - bdlf::PlaceHolders::_1, // processor d_fileStores[partitionId].get(), &d_storages[partitionId], &d_storagesLock, @@ -1161,6 +1160,7 @@ void StorageManager::registerQueueReplica(int partitionId, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) + .callback() .setCallback(bdlf::BindUtil::bind( &mqbc::StorageUtil::registerQueueReplicaDispatched, static_cast(0), @@ -1205,6 +1205,7 @@ void StorageManager::unregisterQueueReplica(int partitionId, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) + .callback() .setCallback(bdlf::BindUtil::bind( &mqbc::StorageUtil::unregisterQueueReplicaDispatched, static_cast(0), @@ -1251,6 +1252,7 @@ void StorageManager::updateQueueReplica(int partitionId, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) + .callback() .setCallback(bdlf::BindUtil::bind( &mqbc::StorageUtil::updateQueueReplicaDispatched, static_cast(0), @@ -1303,11 +1305,11 @@ void StorageManager::setQueue(mqbi::Queue* queue, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) + .callback() .setCallback( bdlf::BindUtil::bind(&mqbc::StorageUtil::setQueueDispatched, &d_storages[partitionId], &d_storagesLock, - bdlf::PlaceHolders::_1, // processor d_clusterData_p->identity().description(), partitionId, uri, @@ -1330,7 +1332,6 @@ void StorageManager::setQueueRaw(mqbi::Queue* queue, mqbc::StorageUtil::setQueueDispatched( &d_storages[partitionId], &d_storagesLock, - processorForPartition(partitionId), d_clusterData_p->identity().description(), partitionId, uri, diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d472aa809..53fe00ec4 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3681,7 +3681,6 @@ void StorageManager::registerQueue( &d_appKeysVec[partitionId], &d_appKeysLock, &d_allocators, - processorForPartition(partitionId), uri, queueKey, d_clusterData_p->identity().description(), @@ -3706,7 +3705,6 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) .setType(mqbi::DispatcherEventType::e_DISPATCHER) .setCallback( bdlf::BindUtil::bind(&StorageUtil::unregisterQueueDispatched, - bdlf::PlaceHolders::_1, // processor d_fileStores[partitionId].get(), &d_storages[partitionId], &d_storagesLock, @@ -3893,7 +3891,6 @@ void StorageManager::setQueue(mqbi::Queue* queue, bdlf::BindUtil::bind(&StorageUtil::setQueueDispatched, &d_storages[partitionId], &d_storagesLock, - bdlf::PlaceHolders::_1, // processor d_clusterData_p->identity().description(), partitionId, uri, @@ -3914,7 +3911,6 @@ void StorageManager::setQueueRaw(mqbi::Queue* queue, StorageUtil::setQueueDispatched(&d_storages[partitionId], &d_storagesLock, - processorForPartition(partitionId), d_clusterData_p->identity().description(), partitionId, uri, diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 3bae89492..81c386b32 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -118,12 +118,11 @@ bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos, } void StorageUtil::registerQueueDispatched( - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs) + mqbs::FileStore* fs, + mqbs::ReplicatedStorage* storage, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -178,18 +177,17 @@ void StorageUtil::registerQueueDispatched( } void StorageUtil::updateQueuePrimaryDispatched( - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::ReplicatedStorage* storage, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& addedIdKeyPairs, - const AppInfos& removedIdKeyPairs, - bool isFanout, - bool isCSLMode) + mqbs::ReplicatedStorage* storage, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2251,22 +2249,20 @@ mqbu::StorageKey StorageUtil::generateAppKey(AppKeys* appKeys, return appKey; } -void StorageUtil::registerQueue( - const mqbi::Cluster* cluster, - mqbi::Dispatcher* dispatcher, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - bmqma::CountingAllocatorStore* allocators, - const mqbi::Dispatcher::ProcessorHandle& processor, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs, - mqbi::Domain* domain) +void StorageUtil::registerQueue(const mqbi::Cluster* cluster, + mqbi::Dispatcher* dispatcher, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + bmqma::CountingAllocatorStore* allocators, + const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs, + mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -2285,6 +2281,8 @@ void StorageUtil::registerQueue( cluster->clusterConfig()->partitionConfig().numPartitions()); BSLS_ASSERT_SAFE(domain); + const int processor = fs->processorId(); + // StorageMgr is either aware of the queue (the 'uri') or it isn't. If it // is already aware, either this queue was registered earlier or it was // seen during recovery (if it's a file-backed storage), which means that @@ -2384,7 +2382,6 @@ void StorageUtil::registerQueue( .setType(mqbi::DispatcherEventType::e_DISPATCHER) .setCallback(bdlf::BindUtil::bind( updateQueuePrimaryDispatched, - bdlf::PlaceHolders::_1, // processor storageSp.get(), storagesLock, fs, @@ -2497,7 +2494,6 @@ void StorageUtil::registerQueue( (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) .setCallback(bdlf::BindUtil::bind(®isterQueueDispatched, - bdlf::PlaceHolders::_1, // processor fs, storageSp.get(), clusterDescription, @@ -2507,15 +2503,13 @@ void StorageUtil::registerQueue( fs->dispatchEvent(queueEvent); } -void StorageUtil::unregisterQueueDispatched( - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - const ClusterData* clusterData, - int partitionId, - const PartitionInfo& pinfo, - const bmqt::Uri& uri) +void StorageUtil::unregisterQueueDispatched(mqbs::FileStore* fs, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const ClusterData* clusterData, + int partitionId, + const PartitionInfo& pinfo, + const bmqt::Uri& uri) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2888,7 +2882,7 @@ void StorageUtil::unregisterQueueReplicaDispatched( const mqbu::StorageKey& appKey, bool isCSLMode) { - // executed by *QUEUE_DISPATCHER* thread with the specified 'processorId' + // executed by *QUEUE_DISPATCHER* thread associated with `partitionId` // PRECONDITIONS BSLS_ASSERT_SAFE(fs); @@ -3126,14 +3120,12 @@ void StorageUtil::updateQueueReplicaDispatched( } } -void StorageUtil::setQueueDispatched( - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - const bsl::string& clusterDescription, - int partitionId, - const bmqt::Uri& uri, - mqbi::Queue* queue) +void StorageUtil::setQueueDispatched(StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const bsl::string& clusterDescription, + int partitionId, + const bmqt::Uri& uri, + mqbi::Queue* queue) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index 3f6e6cfe4..5d016761c 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -197,28 +197,25 @@ struct StorageUtil { const AppInfos& newAppInfos); /// THREAD: Executed by the Queue's dispatcher thread. - static void - registerQueueDispatched(const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs); + static void registerQueueDispatched(mqbs::FileStore* fs, + mqbs::ReplicatedStorage* storage, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs); /// THREAD: This method is called from the Queue's dispatcher thread. - static void updateQueuePrimaryDispatched( - const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::ReplicatedStorage* storage, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& addedIdKeyPairs, - const AppInfos& removedIdKeyPairs, - bool isFanout, - bool isCSLMode); + static void + updateQueuePrimaryDispatched(mqbs::ReplicatedStorage* storage, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode); /// StorageManager's storages lock must be locked before calling this /// method. @@ -620,33 +617,29 @@ struct StorageUtil { /// associated queue storage created. /// /// THREAD: Executed by the Client's dispatcher thread. - static void - registerQueue(const mqbi::Cluster* cluster, - mqbi::Dispatcher* dispatcher, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - bmqma::CountingAllocatorStore* allocators, - const mqbi::Dispatcher::ProcessorHandle& processor, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs, - mqbi::Domain* domain); + static void registerQueue(const mqbi::Cluster* cluster, + mqbi::Dispatcher* dispatcher, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + bmqma::CountingAllocatorStore* allocators, + const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs, + mqbi::Domain* domain); /// THREAD: Executed by the Queue's dispatcher thread. - static void unregisterQueueDispatched( - const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - const ClusterData* clusterData, - int partitionId, - const PartitionInfo& pinfo, - const bmqt::Uri& uri); + static void unregisterQueueDispatched(mqbs::FileStore* fs, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const ClusterData* clusterData, + int partitionId, + const PartitionInfo& pinfo, + const bmqt::Uri& uri); /// Configure the fanout queue having specified `uri` and `queueKey`, /// assigned to the specified `partitionId` to have the specified @@ -716,14 +709,12 @@ struct StorageUtil { /// Executed by queue-dispatcher thread with the specified /// `processorId`. - static void - setQueueDispatched(StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - const mqbi::Dispatcher::ProcessorHandle& processor, - const bsl::string& clusterDescription, - int partitionId, - const bmqt::Uri& uri, - mqbi::Queue* queue); + static void setQueueDispatched(StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const bsl::string& clusterDescription, + int partitionId, + const bmqt::Uri& uri, + mqbi::Queue* queue); static int makeStorage(bsl::ostream& errorDescription, bslma::ManagedPtr* out, diff --git a/src/groups/mqb/mqbi/mqbi_dispatcher.cpp b/src/groups/mqb/mqbi/mqbi_dispatcher.cpp index 3f8ba170d..ba606d8c5 100644 --- a/src/groups/mqb/mqbi/mqbi_dispatcher.cpp +++ b/src/groups/mqb/mqbi/mqbi_dispatcher.cpp @@ -32,6 +32,47 @@ namespace BloombergLP { namespace mqbi { +namespace { + +class VoidCallback : public mqbi::CallbackFunctor { + private: + // PRIVATE DATA + mqbi::Dispatcher::VoidFunctor d_callback; + + public: + // CREATORS + VoidCallback(const mqbi::Dispatcher::VoidFunctor& callback) + : d_callback(callback) + { + // NOTHING + } + + VoidCallback(bslmf::MovableRef callback) + : d_callback(bslmf::MovableRefUtil::move(callback)) + { + // NOTHING + } + + // ACCESSORS + void operator()() const BSLS_KEYWORD_OVERRIDE + { + if (d_callback) { + d_callback(); + } + } +}; + +} // close unnamed namespace + +// ---------------------- +// struct CallbackFunctor +// ---------------------- + +CallbackFunctor::~CallbackFunctor() +{ + // NOTHING +} + // --------------------------- // struct DispatcherClientType // --------------------------- @@ -168,19 +209,32 @@ bool DispatcherEventType::fromAscii(DispatcherEventType::Enum* out, // class Dispatcher // ---------------- -// CLASS METHODS -Dispatcher::ProcessorFunctor -Dispatcher::voidToProcessorFunctor(const Dispatcher::VoidFunctor& functor) -{ - return bdlf::BindUtil::bind(functor); -} - // CREATORS Dispatcher::~Dispatcher() { // NOTHING } +// --------------------- +// class InPlaceCallback +// --------------------- + +void InPlaceCallback::setCallback(const Dispatcher::VoidFunctor& callback) +{ + // Preconditions for placement are checked in `place`. + // Destructor is called by `reset` of the holding DispatcherEvent. + new (place()) VoidCallback(callback); +} + +void InPlaceCallback::setCallback( + bslmf::MovableRef callback) +{ + // Preconditions for placement are checked in `place`. + // Destructor is called by `reset` of the holding DispatcherEvent. + new (place()) + VoidCallback(bslmf::MovableRefUtil::move(callback)); +} + // ------------------------------- // class DispatcherDispatcherEvent // ------------------------------- @@ -327,7 +381,8 @@ bsl::ostream& DispatcherEvent::print(bsl::ostream& stream, } break; case DispatcherEventType::e_DISPATCHER: { printer.printAttribute("hasFinalizeCallback", - (finalizeCallback() ? "yes" : "no")); + (finalizeCallback().hasCallback() ? "yes" + : "no")); } break; case DispatcherEventType::e_CALLBACK: { // Nothing more to print diff --git a/src/groups/mqb/mqbi/mqbi_dispatcher.h b/src/groups/mqb/mqbi/mqbi_dispatcher.h index f09ea521c..8bda27257 100644 --- a/src/groups/mqb/mqbi/mqbi_dispatcher.h +++ b/src/groups/mqb/mqbi/mqbi_dispatcher.h @@ -328,21 +328,11 @@ class Dispatcher { /// Signature of a `void` functor method. typedef bsl::function VoidFunctor; - /// Signature of a functor method with one parameter, the processor - /// handle on which it is being executed. - typedef bsl::function ProcessorFunctor; - // PUBLIC CLASS DATA /// Value of an invalid processor handle. static const ProcessorHandle k_INVALID_PROCESSOR_HANDLE = -1; - // CLASS METHODS - - /// Convenient utility to convert the specified `functor` from a - /// `VoidFunctor` into a `ProcessorFunctor` type. - static ProcessorFunctor voidToProcessorFunctor(const VoidFunctor& functor); - public: // CREATORS @@ -425,7 +415,7 @@ class Dispatcher { /// clients of the specified `type`, and invoke the specified /// `doneCallback` (if any) when all the relevant processors are done /// executing the `functor`. - virtual void execute(const ProcessorFunctor& functor, + virtual void execute(const VoidFunctor& functor, DispatcherClientType::Enum type, const VoidFunctor& doneCallback = VoidFunctor()) = 0; @@ -480,6 +470,93 @@ class Dispatcher { clientExecutor(const mqbi::DispatcherClient* client) const = 0; }; +// =============== +// CallbackFunctor +// =============== + +struct CallbackFunctor { + virtual ~CallbackFunctor(); + + virtual void operator()() const = 0; +}; + +// =============== +// InPlaceCallback +// =============== + +struct InPlaceCallback BSLS_KEYWORD_FINAL { + public: + // PUBLIC CLASS DATA + /// The maximum size of the callback stored in the internal buffer. + /// Note that in order to place a callback functor in this in-place storage + /// its size needs to be no more than this constant. This check is ensured + /// on construction. + static const size_t k_CALLBACK_BUFFER_SIZE = 80; + + private: + // DATA + /// The internal buffer possibly storing a callback. + char d_callbackBuffer[k_CALLBACK_BUFFER_SIZE]; + + /// The flag indicating if `d_callbackBuffer` contains a valid callback + /// object right now. + bool d_hasCallback; + + char d_reserved[7]; + + public: + // CREATORS + inline InPlaceCallback() + : d_hasCallback(false) + { + // NOTHING + } + + inline ~InPlaceCallback() { reset(); } + + // MANIPULATORS + inline void reset() + { + if (d_hasCallback) { + // Want to avoid memsetting the internal buffer to 0, + // destructor call and set flag is enough + reinterpret_cast(d_callbackBuffer) + ->~CallbackFunctor(); + d_hasCallback = false; + } + } + + template + inline char* place() + { + // PRECONDITIONS + /// TODO: use static_assert to check sizeof and CALLBACK_TYPE + /// inheritance from the base CallbackFunctor type. + BSLS_ASSERT_SAFE(!d_hasCallback); + BSLS_ASSERT_SAFE(sizeof(CALLBACK_TYPE) <= k_CALLBACK_BUFFER_SIZE); + BSLS_ASSERT_SAFE(0 == static_cast( + reinterpret_cast(0))); + d_hasCallback = true; + return d_callbackBuffer; + } + + void setCallback(const Dispatcher::VoidFunctor& callback); + + void setCallback(bslmf::MovableRef callback); + + // ACCESSORS + + inline bool hasCallback() const { return d_hasCallback; } + + inline void operator()() const + { + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_hasCallback); + + (*reinterpret_cast(d_callbackBuffer))(); + } +}; + // =============================== // class DispatcherDispatcherEvent // =============================== @@ -496,11 +573,11 @@ class DispatcherDispatcherEvent { /// Return a reference not offering modifiable access to the callback /// associated to this event. - virtual const Dispatcher::ProcessorFunctor& callback() const = 0; + virtual const InPlaceCallback& callback() const = 0; /// Return a reference not offering modifiable access to the finalize /// callback, if any, associated to this event. - virtual const Dispatcher::VoidFunctor& finalizeCallback() const = 0; + virtual const InPlaceCallback& finalizeCallback() const = 0; }; // ============================= @@ -520,7 +597,7 @@ class DispatcherCallbackEvent { /// Return a reference not offering modifiable access to the callback /// associated to this event. - virtual const Dispatcher::ProcessorFunctor& callback() const = 0; + virtual const InPlaceCallback& callback() const = 0; }; // =================================== @@ -934,9 +1011,6 @@ class DispatcherEvent : public DispatcherDispatcherEvent, // DispatcherEvent view interfaces // for more specific information. - Dispatcher::ProcessorFunctor d_callback; - // Callback embedded in this event. - mqbnet::ClusterNode* d_clusterNode_p; // 'ClusterNode' associated to this // event. @@ -950,15 +1024,6 @@ class DispatcherEvent : public DispatcherDispatcherEvent, bmqp_ctrlmsg::ControlMessage d_controlMessage; // ControlMessage in this event.. - Dispatcher::VoidFunctor d_finalizeCallback; - // Callback embedded in this event. - // This callback is called when the - // 'Dispatcher::execute' method is - // used to enqueue an event to - // multiple processors, and will be - // called when the last processor - // finished processing it. - bmqt::MessageGUID d_guid; // GUID of the message in this event. @@ -1005,6 +1070,15 @@ class DispatcherEvent : public DispatcherDispatcherEvent, bsl::shared_ptr d_state; + /// In-place storage for the callback in this event. + InPlaceCallback d_callback; + + /// Callback embedded in this event. This callback is called when the + /// 'Dispatcher::execute' method is used to enqueue an event to multiple + /// processors, and will be called when the last processor finished + /// processing it. + InPlaceCallback d_finalizeCallback; + public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(DispatcherEvent, bslma::UsesBslmaAllocator) @@ -1031,14 +1105,13 @@ class DispatcherEvent : public DispatcherDispatcherEvent, const bmqp::AckMessage& ackMessage() const BSLS_KEYWORD_OVERRIDE; const bsl::shared_ptr& blob() const BSLS_KEYWORD_OVERRIDE; const bsl::shared_ptr& options() const BSLS_KEYWORD_OVERRIDE; - const Dispatcher::ProcessorFunctor& callback() const BSLS_KEYWORD_OVERRIDE; + const InPlaceCallback& callback() const BSLS_KEYWORD_OVERRIDE; + const InPlaceCallback& finalizeCallback() const BSLS_KEYWORD_OVERRIDE; mqbnet::ClusterNode* clusterNode() const BSLS_KEYWORD_OVERRIDE; const bmqp::ConfirmMessage& confirmMessage() const BSLS_KEYWORD_OVERRIDE; const bmqp::RejectMessage& rejectMessage() const BSLS_KEYWORD_OVERRIDE; const bmqp_ctrlmsg::ControlMessage& - controlMessage() const BSLS_KEYWORD_OVERRIDE; - const Dispatcher::VoidFunctor& - finalizeCallback() const BSLS_KEYWORD_OVERRIDE; + controlMessage() const BSLS_KEYWORD_OVERRIDE; const bmqt::MessageGUID& guid() const BSLS_KEYWORD_OVERRIDE; bool isRelay() const BSLS_KEYWORD_OVERRIDE; int partitionId() const BSLS_KEYWORD_OVERRIDE; @@ -1062,13 +1135,20 @@ class DispatcherEvent : public DispatcherDispatcherEvent, public: // MANIPULATORS + InPlaceCallback& callback(); + InPlaceCallback& finalizeCallback(); + + DispatcherEvent& + setCallback(bslmf::MovableRef value); + DispatcherEvent& + setFinalizeCallback(bslmf::MovableRef value); + DispatcherEvent& setType(DispatcherEventType::Enum value); DispatcherEvent& setSource(DispatcherClient* value); DispatcherEvent& setDestination(DispatcherClient* value); DispatcherEvent& setAckMessage(const bmqp::AckMessage& value); DispatcherEvent& setBlob(const bsl::shared_ptr& value); DispatcherEvent& setOptions(const bsl::shared_ptr& value); - DispatcherEvent& setCallback(const Dispatcher::ProcessorFunctor& value); DispatcherEvent& setClusterNode(mqbnet::ClusterNode* value); DispatcherEvent& setConfirmMessage(const bmqp::ConfirmMessage& value); DispatcherEvent& setRejectMessage(const bmqp::RejectMessage& value); @@ -1292,12 +1372,10 @@ inline DispatcherEvent::DispatcherEvent(bslma::Allocator* allocator) , d_ackMessage() , d_blob_sp(0, allocator) , d_options_sp(0, allocator) -, d_callback(bsl::allocator_arg, allocator) , d_clusterNode_p(0) , d_confirmMessage() , d_rejectMessage() , d_controlMessage(allocator) -, d_finalizeCallback(bsl::allocator_arg, allocator) , d_guid(bmqt::MessageGUID()) , d_isRelay(false) , d_partitionId(-1) @@ -1310,6 +1388,8 @@ inline DispatcherEvent::DispatcherEvent(bslma::Allocator* allocator) , d_compressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE) , d_isOutOfOrder(false) , d_genCount(0) +, d_callback() +, d_finalizeCallback() { // NOTHING } @@ -1329,11 +1409,26 @@ inline const bsl::shared_ptr& DispatcherEvent::options() const return d_options_sp; } -inline const Dispatcher::ProcessorFunctor& DispatcherEvent::callback() const +inline const InPlaceCallback& DispatcherEvent::callback() const +{ + return d_callback; +} + +inline InPlaceCallback& DispatcherEvent::callback() { return d_callback; } +inline const InPlaceCallback& DispatcherEvent::finalizeCallback() const +{ + return d_finalizeCallback; +} + +inline InPlaceCallback& DispatcherEvent::finalizeCallback() +{ + return d_finalizeCallback; +} + inline mqbnet::ClusterNode* DispatcherEvent::clusterNode() const { return d_clusterNode_p; @@ -1355,11 +1450,6 @@ DispatcherEvent::controlMessage() const return d_controlMessage; } -inline const Dispatcher::VoidFunctor& DispatcherEvent::finalizeCallback() const -{ - return d_finalizeCallback; -} - inline const bmqt::MessageGUID& DispatcherEvent::guid() const { return d_guid; @@ -1470,9 +1560,16 @@ DispatcherEvent::setOptions(const bsl::shared_ptr& value) } inline DispatcherEvent& -DispatcherEvent::setCallback(const Dispatcher::ProcessorFunctor& value) +DispatcherEvent::setCallback(bslmf::MovableRef value) { - d_callback = value; + d_callback.setCallback(value); + return *this; +} + +inline DispatcherEvent& DispatcherEvent::setFinalizeCallback( + bslmf::MovableRef value) +{ + d_finalizeCallback.setCallback(value); return *this; } @@ -1504,13 +1601,6 @@ DispatcherEvent::setControlMessage(const bmqp_ctrlmsg::ControlMessage& value) return *this; } -inline DispatcherEvent& -DispatcherEvent::setFinalizeCallback(const Dispatcher::VoidFunctor& value) -{ - d_finalizeCallback = value; - return *this; -} - inline DispatcherEvent& DispatcherEvent::setGuid(const bmqt::MessageGUID& value) { @@ -1605,11 +1695,11 @@ inline void DispatcherEvent::reset() d_ackMessage = bmqp::AckMessage(); d_blob_sp.reset(); d_options_sp.reset(); - d_callback = bsl::nullptr_t(); + d_callback.reset(); + d_finalizeCallback.reset(); d_clusterNode_p = 0; d_confirmMessage = bmqp::ConfirmMessage(); d_rejectMessage = bmqp::RejectMessage(); - d_finalizeCallback = bsl::nullptr_t(); d_guid = bmqt::MessageGUID(); d_isRelay = false; d_putHeader = bmqp::PutHeader(); diff --git a/src/groups/mqb/mqbi/mqbi_dispatcher.t.cpp b/src/groups/mqb/mqbi/mqbi_dispatcher.t.cpp new file mode 100644 index 000000000..dc3c10a90 --- /dev/null +++ b/src/groups/mqb/mqbi/mqbi_dispatcher.t.cpp @@ -0,0 +1,185 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbi_dispatcher.t.cpp -*-C++-*- +#include + +// BMQ +#include + +// BDE +#include +#include +#include +#include +#include +#include +#include + +// TEST DRIVER +#include + +// CONVENIENCE +using namespace BloombergLP; +using namespace bsl; + +// ============================================================================ +// TEST HELPERS UTILITY +// ---------------------------------------------------------------------------- +namespace { + +struct Test { + int ptr; + int a; + bool b; +}; + +} // close unnamed namespace + +// ============================================================================ +// TESTS +// ---------------------------------------------------------------------------- +static void test1_InPlaceCallback() +{ + bmqtst::TestHelper::printTestName("IN PLACE CALLBACK"); + + mqbi::InPlaceCallback callback; + ASSERT(!callback.hasCallback()); + + callback.setCallback(mqbi::Dispatcher::VoidFunctor()); + ASSERT(callback.hasCallback()); + callback(); +} + +static void testN1_dispatcherEventPeformance() +// ------------------------------------------------------------------------ +// DISPATCHER EVENT PERFORMANCE +// +// Concerns: +// +// Plan: +// +// Testing: +// ------------------------------------------------------------------------ +{ + bmqtst::TestHelper::printTestName("DISPATCHER EVENT PERFORMANCE"); + + bsl::cout << "sizeof(mqbi::DispatcherEvent): " + << sizeof(mqbi::DispatcherEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherDispatcherEvent): " + << sizeof(mqbi::DispatcherDispatcherEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherCallbackEvent): " + << sizeof(mqbi::DispatcherCallbackEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherControlMessageEvent): " + << sizeof(mqbi::DispatcherControlMessageEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherConfirmEvent): " + << sizeof(mqbi::DispatcherConfirmEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherRejectEvent): " + << sizeof(mqbi::DispatcherRejectEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherPushEvent): " + << sizeof(mqbi::DispatcherPushEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherPutEvent): " + << sizeof(mqbi::DispatcherPutEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherAckEvent): " + << sizeof(mqbi::DispatcherAckEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherClusterStateEvent): " + << sizeof(mqbi::DispatcherClusterStateEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherStorageEvent): " + << sizeof(mqbi::DispatcherStorageEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherRecoveryEvent): " + << sizeof(mqbi::DispatcherRecoveryEvent) << bsl::endl; + bsl::cout << " sizeof(mqbi::DispatcherReceiptEvent): " + << sizeof(mqbi::DispatcherReceiptEvent) << bsl::endl; + + bsl::cout << " sizeof(bsl::function): " + << sizeof(bsl::function) << bsl::endl; + bsl::cout << " sizeof(bsl::function): " + << sizeof(bsl::function) << bsl::endl; + bsl::cout << " sizeof(Test): " << sizeof(Test) << bsl::endl; + + const size_t k_ITERS_NUM = 100000000; + + const bmqp::PutHeader header; + const bsl::shared_ptr blob; + const bsl::shared_ptr state; + const bmqt::MessageGUID guid; + const bmqp::MessagePropertiesInfo info; + const bmqp::Protocol::SubQueueInfosArray subQueueInfos; + const bsl::string msgGroupId; + const bmqp::ConfirmMessage confirm; + + mqbi::DispatcherEvent event(s_allocator_p); + + { + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + event.reset(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + bsl::cout << "mqbi::DispatcherEvent::reset():" << bsl::endl; + bsl::cout << " total: " + << bmqu::PrintUtil::prettyTimeInterval(end - begin) << " (" + << k_ITERS_NUM << " iterations)" << bsl::endl; + bsl::cout << " per call: " + << bmqu::PrintUtil::prettyTimeInterval((end - begin) / + k_ITERS_NUM) + << bsl::endl + << bsl::endl; + } + + { + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + mqbi::DispatcherEvent event(s_allocator_p); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + bsl::cout << "mqbi::DispatcherEvent::DispatcherEvent():" << bsl::endl; + bsl::cout << " total: " + << bmqu::PrintUtil::prettyTimeInterval(end - begin) << " (" + << k_ITERS_NUM << " iterations)" << bsl::endl; + bsl::cout << " per call: " + << bmqu::PrintUtil::prettyTimeInterval((end - begin) / + k_ITERS_NUM) + << bsl::endl + << bsl::endl; + } +} + +// ============================================================================ +// MAIN PROGRAM +// ---------------------------------------------------------------------------- + +int main(int argc, char* argv[]) +{ + // To be called only once per process instantiation. + bsls::TimeUtil::initialize(); + + TEST_PROLOG(bmqtst::TestHelper::e_DEFAULT); + + switch (_testCase) { + case 0: + case 1: test1_InPlaceCallback(); break; + case -1: testN1_dispatcherEventPeformance(); break; + default: { + bsl::cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." + << bsl::endl; + s_testStatus = -1; + } break; + } + + TEST_EPILOG(bmqtst::TestHelper::e_CHECK_DEF_GBL_ALLOC); +} diff --git a/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp b/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp index 4738d1135..aa4b8fe11 100644 --- a/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp @@ -108,13 +108,12 @@ void Dispatcher::execute( } void Dispatcher::execute( - const mqbi::Dispatcher::ProcessorFunctor& functor, + const mqbi::Dispatcher::VoidFunctor& functor, BSLS_ANNOTATION_UNUSED mqbi::DispatcherClientType::Enum type, const mqbi::Dispatcher::VoidFunctor& doneCallback) { if (functor) { - const ProcessorHandle dummy = Dispatcher::k_INVALID_PROCESSOR_HANDLE; - functor(dummy); + functor(); } if (doneCallback) { @@ -243,7 +242,8 @@ mqbi::DispatcherClientData& DispatcherClient::dispatcherClientData() void DispatcherClient::onDispatcherEvent(const mqbi::DispatcherEvent& event) { if (event.type() == mqbi::DispatcherEventType::e_CALLBACK) { - event.asCallbackEvent()->callback()(0); + BSLS_ASSERT_SAFE(event.asCallbackEvent()->callback().hasCallback()); + event.asCallbackEvent()->callback()(); } } diff --git a/src/groups/mqb/mqbmock/mqbmock_dispatcher.h b/src/groups/mqb/mqbmock/mqbmock_dispatcher.h index da208f680..e4aa0dc62 100644 --- a/src/groups/mqb/mqbmock/mqbmock_dispatcher.h +++ b/src/groups/mqb/mqbmock/mqbmock_dispatcher.h @@ -176,9 +176,9 @@ class Dispatcher : public mqbi::Dispatcher { /// clients of the specified `type`, and invoke the specified /// `doneCallback` (if any) when all the relevant processors are done /// executing the `functor`. - void execute(const mqbi::Dispatcher::ProcessorFunctor& functor, - mqbi::DispatcherClientType::Enum type, - const mqbi::Dispatcher::VoidFunctor& doneCallback) + void execute(const mqbi::Dispatcher::VoidFunctor& functor, + mqbi::DispatcherClientType::Enum type, + const mqbi::Dispatcher::VoidFunctor& doneCallback) BSLS_KEYWORD_OVERRIDE; void synchronize(mqbi::DispatcherClient* client) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 0b59ae95a..b95883e17 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -7177,8 +7177,8 @@ void FileStore::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(realEvent->callback().hasCallback()); + realEvent->callback()(); } break; // BREAK case mqbi::DispatcherEventType::e_UNDEFINED: case mqbi::DispatcherEventType::e_PUT: