Skip to content

Commit

Permalink
Queries to check if consumer thread is closed/done.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Jul 13, 2024
1 parent 3b857a5 commit 02b589e
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 67 deletions.
8 changes: 3 additions & 5 deletions examples/async_consume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,15 @@ int main(int argc, char* argv[])
while (true) {
auto evt = cli.consume_event();

if (const auto* p = std::get_if<mqtt::const_message_ptr>(&evt)) {
if (const auto* p = evt.get_message_if()) {
auto& msg = *p;
if (msg)
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
else if (std::holds_alternative<mqtt::connected_event>(evt))
else if (evt.is_connected())
cout << "\n*** Connected ***" << endl;
else if (std::holds_alternative<mqtt::connection_lost_event>(evt))
else if (evt.is_connection_lost())
cout << "*** Connection Lost ***" << endl;
else
cout << "???" << endl;
}
}
catch (const mqtt::exception& exc) {
Expand Down
69 changes: 41 additions & 28 deletions examples/async_consume_v5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ int main(int argc, char* argv[])
{
auto serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI;

mqtt::async_client cli(serverURI, CLIENT_ID);
auto cli = std::make_shared<mqtt::async_client>(serverURI, CLIENT_ID);

auto connOpts = mqtt::connect_options_builder::v5()
.clean_start(false)
Expand All @@ -62,12 +62,12 @@ int main(int argc, char* argv[])
try {
// Start consumer before connecting to make sure to not miss messages

cli.start_consuming();
cli->start_consuming();

// Connect to the server

cout << "Connecting to the MQTT server..." << flush;
auto tok = cli.connect(connOpts);
auto tok = cli->connect(connOpts);

// Getting the connect response will block waiting for the
// connection to complete.
Expand All @@ -84,46 +84,59 @@ int main(int argc, char* argv[])
// subscriptions.
if (!rsp.is_session_present()) {
cout << "\n Session not present on broker. Subscribing..." << flush;
cli.subscribe(TOPIC, QOS)->wait();
cli->subscribe(TOPIC, QOS)->wait();
}

cout << "\n OK" << endl;

// We'll signal the consumer to exit from another thread.
// (just to show that we can)
thread([cli] {
this_thread::sleep_for(10s);
cout << "\nClosing the consumer." << endl;
cli->stop_consuming();
}).detach();

// Consume messages
// This just exits if the client is disconnected.
// (See some other examples for auto or manual reconnect)
//
// This just exits if the consumer is closed or the client is
// disconnected. (See some other examples for auto or manual
// reconnect)

cout << "\nWaiting for messages on topic: '" << TOPIC << "'" << endl;

while (true) {
auto evt = cli.consume_event();

if (const auto* p = std::get_if<mqtt::const_message_ptr>(&evt)) {
auto& msg = *p;
if (msg)
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
else if (std::holds_alternative<mqtt::connected_event>(evt)) {
cout << "\n*** Connected ***" << endl;
}
else if (std::holds_alternative<mqtt::connection_lost_event>(evt)) {
cout << "*** Connection Lost ***" << endl;
break;
}
else if (const auto* p = std::get_if<mqtt::disconnected_event>(&evt)) {
cout << "*** Disconnected. Reason [0x" << hex << int{p->reasonCode}
<< "]: " << p->reasonCode << " ***" << endl;
break;
try {
while (true) {
auto evt = cli->consume_event();

if (const auto* p = evt.get_message_if()) {
auto& msg = *p;
if (msg)
cout << msg->get_topic() << ": " << msg->to_string() << endl;
}
else if (evt.is_connected()) {
cout << "\n*** Connected ***" << endl;
}
else if (evt.is_connection_lost()) {
cout << "*** Connection Lost ***" << endl;
break;
}
else if (const auto* p = evt.get_disconnected_if()) {
cout << "*** Disconnected. Reason [0x" << hex << int{p->reasonCode}
<< "]: " << p->reasonCode << " ***" << endl;
break;
}
}
}
catch (mqtt::queue_closed&) {
}

// If we're here, the client was almost certainly disconnected.
// But we check, just to make sure.

if (cli.is_connected()) {
if (cli->is_connected()) {
cout << "\nShutting down and disconnecting from the MQTT server..." << flush;
cli.stop_consuming();
cli.disconnect()->wait();
cli->disconnect()->wait();
cout << "OK" << endl;
}
}
Expand Down
68 changes: 51 additions & 17 deletions include/mqtt/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class async_client : public virtual iasync_client
/** Smart/shared pointer for an object of this class */
using ptr_t = std::shared_ptr<async_client>;
/** Type for a thread-safe queue to consume events synchronously */
using consumer_queue_type = std::unique_ptr<thread_queue<event_type>>;
using consumer_queue_type = std::unique_ptr<thread_queue<event>>;

/** Handler type for registering an individual message callback */
using message_handler = std::function<void(const_message_ptr)>;
Expand Down Expand Up @@ -753,10 +753,32 @@ class async_client : public virtual iasync_client
void start_consuming() override;
/**
* Stop consuming messages.
* This shuts down the internal callback and discards any unread
* messages.
* This shuts down the internal callback and closes the internal
* consumer queue. Any remaining messages and events can be read until
* the queue is emptied, but nothing further will be added to it.
* This will also wake up any thread waiting on the queue.
*/
void stop_consuming() override;
/**
* Determines if the consumer queue has been closed.
* Once closed, any events in the queue can still be read, but no new
* events can be added to it.
* @return @true if the consumer queue has been closed, @false
* otherwise.
*/
bool consumer_closed() noexcept override {
return !que_ || que_->closed();
}
/**
* Determines if the consumer queue is "done" (closed and empty).
* Once the queue is done, no more events can be added or removed fom
* the queue.
* @return @true if the consumer queue is closed and empty, @false
* otherwise.
*/
bool consumer_done() noexcept override {
return !que_ || que_->done();
}
/**
* Read the next message from the queue.
* This blocks until a new message arrives.
Expand All @@ -781,18 +803,21 @@ class async_client : public virtual iasync_client
bool try_consume_message_for(
const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
) {
event_type evt;
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

event evt;

while (true) {
if (!que_->try_get_for(&evt, relTime))
return false;

if (const auto* pval = std::get_if<const_message_ptr>(&evt)) {
if (const auto* pval = evt.get_message_if()) {
*msg = std::move(*pval);
break;
}

if (!std::holds_alternative<connected_event>(evt)) {
if (evt.is_any_disconnect()) {
*msg = const_message_ptr{};
break;
}
Expand Down Expand Up @@ -824,18 +849,21 @@ class async_client : public virtual iasync_client
bool try_consume_message_until(
const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
) {
event_type evt;
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

event evt;

while (true) {
if (!que_->try_get_until(&evt, absTime))
return false;

if (const auto* pval = std::get_if<const_message_ptr>(&evt)) {
if (const auto* pval = evt.get_message_if()) {
*msg = std::move(*pval);
break;
}

if (!std::holds_alternative<connected_event>(evt)) {
if (!evt.is_any_disconnect()) {
*msg = const_message_ptr{};
break;
}
Expand All @@ -862,14 +890,14 @@ class async_client : public virtual iasync_client
* This blocks until a new message arrives.
* @return The message and topic.
*/
event_type consume_event() override { return que_->get(); }
event consume_event() override { return que_->get(); }
/**
* Try to read the next message from the queue without blocking.
* @param evt Pointer to the value to receive the event
* @return @em true if an event was read, @em false if no
* event was available.
*/
bool try_consume_event(event_type* evt) override { return que_->try_get(evt); }
bool try_consume_event(event* evt) override { return que_->try_get(evt); }
/**
* Waits a limited time for a message to arrive.
* @param evt Pointer to the value to receive the event.
Expand All @@ -879,8 +907,11 @@ class async_client : public virtual iasync_client
*/
template <typename Rep, class Period>
bool try_consume_event_for(
event_type* evt, const std::chrono::duration<Rep, Period>& relTime
event* evt, const std::chrono::duration<Rep, Period>& relTime
) {
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

return que_->try_get_for(evt, relTime);
}
/**
Expand All @@ -890,8 +921,8 @@ class async_client : public virtual iasync_client
* timeout.
*/
template <typename Rep, class Period>
event_type try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
event_type evt;
event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
event evt;
que_->try_get_for(&evt, relTime);
return evt;
}
Expand All @@ -904,8 +935,11 @@ class async_client : public virtual iasync_client
*/
template <class Clock, class Duration>
bool try_consume_event_until(
event_type* evt, const std::chrono::time_point<Clock, Duration>& absTime
event* evt, const std::chrono::time_point<Clock, Duration>& absTime
) {
if (!que_)
throw mqtt::exception(-1, "Consumer not started");

return que_->try_get_until(evt, absTime);
}
/**
Expand All @@ -915,9 +949,9 @@ class async_client : public virtual iasync_client
* timeout.
*/
template <class Clock, class Duration>
event_type try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
) {
event_type evt;
event evt;
que_->try_get_until(&evt, absTime);
return evt;
}
Expand Down
Loading

0 comments on commit 02b589e

Please sign in to comment.