From a2e726be18dbe0e9d82c12ec428d08300c238b1d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?L=C3=B3r=C3=A1nt=20Pint=C3=A9r?= <lorant.pinter@gmail.com>
Date: Sun, 4 Feb 2024 22:31:55 +0100
Subject: [PATCH] Return queue full

---
 src/kernel/drivers/MqttDriver.hpp | 24 ++++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp
index 1f39b6b2..951562b4 100644
--- a/src/kernel/drivers/MqttDriver.hpp
+++ b/src/kernel/drivers/MqttDriver.hpp
@@ -37,8 +37,9 @@ class MqttDriver {
     enum class PublishStatus {
         TimeOut = 0,
         Success = 1,
-        Pending = 2,
-        Failed = 3
+        Failed = 2,
+        Pending = 3,
+        QueueFull = 4
     };
 
     typedef std::function<void(const JsonObject&, JsonObject&)> CommandHandler;
@@ -82,6 +83,7 @@ class MqttDriver {
         bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) {
             String suffix = "commands/" + name;
             return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) {
+                // TODO Do exponential backoff when clear cannot be finished
                 // Clear topic and wait for it to be cleared
                 auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 });
                 if (clearStatus != PublishStatus::Success) {
@@ -224,10 +226,13 @@ class MqttDriver {
 #endif
         TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
         bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
-        if (offered && waitingTask != nullptr) {
-            return waitFor(offered, timeout);
+        if (!offered) {
+            return PublishStatus::QueueFull;
         }
-        return offered ? PublishStatus::Pending : PublishStatus::Failed;
+        if (waitingTask == nullptr) {
+            return PublishStatus::Pending;
+        }
+        return waitFor(offered, timeout);
     }
 
     PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) {
@@ -235,10 +240,13 @@ class MqttDriver {
             topic.c_str());
         TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
         bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
-        if (offered && waitingTask != nullptr) {
-            return waitFor(offered, timeout);
+        if (!offered) {
+            return PublishStatus::QueueFull;
+        }
+        if (waitingTask == nullptr) {
+            return PublishStatus::Pending;
         }
-        return offered ? PublishStatus::Pending : PublishStatus::Failed;
+        return waitFor(offered, timeout);
     }
 
     PublishStatus waitFor(bool offered, ticks timeout) {