Skip to content

Commit

Permalink
Wait ACKs for all messages posted by bmqtool
Browse files Browse the repository at this point in the history
Instead of waiting for Ctrl+C in auto-posting mode, bmqtool will
be counting ACKs and terminate after all posted messages are
acknowledged.

Signed-off-by: Stanislav Yuzvinsky <[email protected]>
  • Loading branch information
syuzvinsky committed Mar 29, 2024
1 parent 9f34985 commit 5594640
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
23 changes: 13 additions & 10 deletions src/applications/bmqtool/m_bmqtool_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,12 @@ void Application::onMessageEvent(const bmqa::MessageEvent& event)

// Write to log file
d_fileLogger.writeAckMessage(message);

if (d_numExpectedAcks != 0 &&
d_numExpectedAcks == ++d_numPostedAcknowledged) {
BALL_LOG_INFO << "All posted messages have been acknowledged";
d_shutdownSemaphore_p->post();
}
}
else {
// Message is a push message
Expand Down Expand Up @@ -1045,15 +1051,6 @@ void Application::producerThread()
turnstile.waitTurn();
}
}

// Finished posting messages in auto mode?
// If shutDownGrace is set, signal to the main thread to exit.
if (d_parameters_p->mode() == ParametersMode::e_AUTO &&
d_parameters_p->shutdownGrace() != 0) {
// We do not need to sleep the grace period, since it is done
// by the main thread, in the stop() function.
d_shutdownSemaphore_p->post();
}
}

// CLASS METHODS
Expand Down Expand Up @@ -1131,6 +1128,8 @@ Application::Application(Parameters* parameters,
, d_latencies(allocator)
, d_autoReadInProgress(false)
, d_autoReadActivity(false)
, d_numExpectedAcks(0)
, d_numPostedAcknowledged(0)
{
// NOTHING
}
Expand Down Expand Up @@ -1194,8 +1193,12 @@ int Application::run()
d_shutdownSemaphore_p->post();
}
else {
// Start the thread
if (bmqt::QueueFlagsUtil::isWriter(d_parameters_p->queueFlags())) {
d_numExpectedAcks =
d_parameters_p->eventsCount() * d_parameters_p->eventSize();
d_numPostedAcknowledged = 0;

// Start the thread
rc = bslmt::ThreadUtil::create(
&d_runningThread,
bdlf::MemFnUtil::memFn(&Application::producerThread, this));
Expand Down
13 changes: 13 additions & 0 deletions src/applications/bmqtool/m_bmqtool_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,19 @@ class Application : public bmqa::SessionEventHandler {
// message was seen during the current
// grace period.

int d_numExpectedAcks;
// Auto-produce mode only. The total number of messages
// the tool will send. After posting is finished
// the tool will be waiting for this number of ACK
// messages, after which the shutdown semaphore will
// be posted.

int d_numPostedAcknowledged;
// Auto-produce mode only. The number of acknowledged
// messages. When the value of this field becomes equal
// to d_numExpectedAcks, the shutdown semaphore will be
// posted.

// PRIVATE MANIPULATORS
// (virtual: bmqa::SessionEventHandler)

Expand Down

0 comments on commit 5594640

Please sign in to comment.