diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp index 1f39b6b2..951562b4 100644 --- a/src/kernel/drivers/MqttDriver.hpp +++ b/src/kernel/drivers/MqttDriver.hpp @@ -37,8 +37,9 @@ class MqttDriver { enum class PublishStatus { TimeOut = 0, Success = 1, - Pending = 2, - Failed = 3 + Failed = 2, + Pending = 3, + QueueFull = 4 }; typedef std::function CommandHandler; @@ -82,6 +83,7 @@ class MqttDriver { bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) { String suffix = "commands/" + name; return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) { + // TODO Do exponential backoff when clear cannot be finished // Clear topic and wait for it to be cleared auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 }); if (clearStatus != PublishStatus::Success) { @@ -224,10 +226,13 @@ class MqttDriver { #endif TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle(); bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask); - if (offered && waitingTask != nullptr) { - return waitFor(offered, timeout); + if (!offered) { + return PublishStatus::QueueFull; } - return offered ? PublishStatus::Pending : PublishStatus::Failed; + if (waitingTask == nullptr) { + return PublishStatus::Pending; + } + return waitFor(offered, timeout); } PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) { @@ -235,10 +240,13 @@ class MqttDriver { topic.c_str()); TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle(); bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask); - if (offered && waitingTask != nullptr) { - return waitFor(offered, timeout); + if (!offered) { + return PublishStatus::QueueFull; + } + if (waitingTask == nullptr) { + return PublishStatus::Pending; } - return offered ? PublishStatus::Pending : PublishStatus::Failed; + return waitFor(offered, timeout); } PublishStatus waitFor(bool offered, ticks timeout) {