diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp index 951562b4..93d2b9df 100644 --- a/src/kernel/drivers/MqttDriver.hpp +++ b/src/kernel/drivers/MqttDriver.hpp @@ -224,32 +224,28 @@ class MqttDriver { Log.infoln("MQTT: Queuing topic '%s'%s (qos = %d): %s", topic.c_str(), (retain == Retention::Retain ? " (retain)" : ""), qos, serializedJson.c_str()); #endif - TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle(); - bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask); - if (!offered) { - return PublishStatus::QueueFull; - } - if (waitingTask == nullptr) { - return PublishStatus::Pending; - } - return waitFor(offered, timeout); + return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) { + return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask); + }); } PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) { Log.traceln("MQTT: Clearing topic '%s'", topic.c_str()); + return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) { + return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask); + }); + } + + PublishStatus executeAndAwait(ticks timeout, std::function enqueue) { TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle(); - bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask); + bool offered = enqueue(waitingTask); if (!offered) { return PublishStatus::QueueFull; } if (waitingTask == nullptr) { return PublishStatus::Pending; } - return waitFor(offered, timeout); - } - - PublishStatus waitFor(bool offered, ticks timeout) { uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count()); switch (status) { case 0: