From fcf6ab57a82f9691ecafeb362e3aaba759caaff1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=B3r=C3=A1nt=20Pint=C3=A9r?= Date: Fri, 2 Feb 2024 10:53:18 +0100 Subject: [PATCH 1/3] Wait for network to be up before attempting to connect to MQTT --- src/kernel/drivers/MqttDriver.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp index 678d5e41..f18558b4 100644 --- a/src/kernel/drivers/MqttDriver.hpp +++ b/src/kernel/drivers/MqttDriver.hpp @@ -226,6 +226,8 @@ class MqttDriver { } void setup() { + networkReady.awaitSet(); + if (config.host.get().length() > 0) { mqttServer.hostname = config.host.get(); mqttServer.port = config.port.get(); From 2fb0a358441bc5926814e13115069a2e939b1d08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=B3r=C3=A1nt=20Pint=C3=A9r?= Date: Fri, 2 Feb 2024 10:55:17 +0100 Subject: [PATCH 2/3] Separate publishing of outgoing and incoming MQTT And wait for clear after receiving command. --- src/kernel/drivers/MqttDriver.hpp | 99 ++++++++++++++++++------------- 1 file changed, 59 insertions(+), 40 deletions(-) diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp index f18558b4..8fa04530 100644 --- a/src/kernel/drivers/MqttDriver.hpp +++ b/src/kernel/drivers/MqttDriver.hpp @@ -49,7 +49,7 @@ class MqttDriver { } bool publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) { - return mqtt.publish(fullTopic(suffix), json, retain, qos); + return mqtt.publish(fullTopic(suffix), json, retain, qos, nullptr, ticks::zero()); } bool publish(const String& suffix, std::function populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, int size = MQTT_BUFFER_SIZE) { @@ -60,7 +60,7 @@ class MqttDriver { } bool clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) { - return mqtt.clear(fullTopic(suffix), retain, qos); + return mqtt.clear(fullTopic(suffix), retain, qos, nullptr, ticks::zero()); } bool subscribe(const String& suffix, SubscriptionHandler handler) { @@ -74,8 +74,9 @@ class MqttDriver { bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) { String suffix = "commands/" + name; return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) { - // Clear topic - clear(suffix, Retention::Retain, QoS::ExactlyOnce); + // Clear topic and wait for it to be cleared + mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, xTaskGetCurrentTaskHandle(), ticks::max()); + DynamicJsonDocument responseDoc(responseSize); auto response = responseDoc.to(); handler(request, response); @@ -115,26 +116,25 @@ class MqttDriver { const String payload; const Retention retain; const QoS qos; + const TaskHandle_t waitingTask; - Message() - : topic("") - , payload("") - , retain(Retention::NoRetain) - , qos(QoS::AtMostOnce) { - } + static const uint32_t PUBLISH_SUCCESS = 1; + static const uint32_t PUBLISH_FAILED = 2; - Message(const String& topic, const String& payload, Retention retention, QoS qos) + Message(const String& topic, const String& payload, Retention retention, QoS qos, TaskHandle_t waitingTask) : topic(topic) , payload(payload) , retain(retention) - , qos(qos) { + , qos(qos) + , waitingTask(waitingTask) { } - Message(const String& topic, const JsonDocument& payload, Retention retention, QoS qos) + Message(const String& topic, const JsonDocument& payload, Retention retention, QoS qos, TaskHandle_t waitingTask) : topic(topic) , payload(serializeJsonToString(payload)) , retain(retention) - , qos(qos) { + , qos(qos) + , waitingTask(waitingTask) { } private: @@ -185,13 +185,18 @@ class MqttDriver { , instanceName(instanceName) , clientId(getClientId(config.clientId.get(), instanceName)) , mqttReady(mqttReady) { - Task::run("mqtt", 8192, 1, [this](Task& task) { + Task::run("mqtt:outgoing", 4096, 1, [this](Task& task) { setup(); while (true) { auto delay = loopAndDelay(); task.delay(delay); } }); + Task::loop("mqtt:incoming", 4096, 1, [this](Task& task) { + incomingQueue.take([&](const Message& message) { + processIncomingMessage(message); + }); + }); } shared_ptr forRoot(const String& topic) { @@ -199,20 +204,33 @@ class MqttDriver { } private: - bool publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos) { + bool publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) { #ifdef DUMP_MQTT String serializedJson; serializeJsonPretty(json, serializedJson); Log.infoln("MQTT: Queuing topic '%s'%s (qos = %d): %s", topic.c_str(), (retain == Retention::Retain ? " (retain)" : ""), qos, serializedJson.c_str()); #endif - return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos); + bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask); + if (offered && waitingTask != nullptr) { + return waitFor(offered, timeout); + } + return offered; } - bool clear(const String& topic, Retention retain, QoS qos) { + bool clear(const String& topic, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) { Log.traceln("MQTT: Clearing topic '%s'", topic.c_str()); - return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos); + bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask); + if (offered && waitingTask != nullptr) { + return waitFor(offered, timeout); + } + return offered; + } + + bool waitFor(bool offered, ticks timeout) { + uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count()); + return status == Message::PUBLISH_SUCCESS; } /** @@ -243,7 +261,7 @@ class MqttDriver { Log.infoln("MQTT: Received '%s' (size: %d): %s", topic.c_str(), payload.length(), payload.c_str()); #endif - incomingQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, payload, Retention::NoRetain, QoS::ExactlyOnce); + incomingQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, payload, Retention::NoRetain, QoS::ExactlyOnce, nullptr); }); if (mqttServer.ip == IPAddress()) { @@ -281,7 +299,6 @@ class MqttDriver { processPublishQueue(); processSubscriptionQueue(); - procesIncomingQueue(); return MQTT_LOOP_INTERVAL; } @@ -300,6 +317,10 @@ class MqttDriver { Log.errorln("MQTT: Error publishing to '%s', error = %d", message.topic.c_str(), mqttClient.lastError()); } + if (message.waitingTask != nullptr) { + uint32_t status = success ? Message::PUBLISH_SUCCESS : Message::PUBLISH_FAILED; + xTaskNotify(message.waitingTask, status, eSetValueWithOverwrite); + } }); } @@ -311,30 +332,28 @@ class MqttDriver { }); } - void procesIncomingQueue() { - incomingQueue.drain([&](const Message& message) { - const String& topic = message.topic; - const String& payload = message.payload; + void processIncomingMessage(const Message& message) { + const String& topic = message.topic; + const String& payload = message.payload; - if (payload.isEmpty()) { + if (payload.isEmpty()) { #ifdef DUMP_MQTT - Log.verboseln("MQTT: Ignoring empty payload"); + Log.verboseln("MQTT: Ignoring empty payload"); #endif - return; - } + return; + } - Log.traceln("MQTT: Received message: '%s'", topic.c_str()); - for (auto subscription : subscriptions) { - if (subscription.topic == topic) { - DynamicJsonDocument json(docSizeFor(payload)); - deserializeJson(json, payload); - subscription.handle(topic, json.as()); - return; - } + Log.traceln("MQTT: Received message: '%s'", topic.c_str()); + for (auto subscription : subscriptions) { + if (subscription.topic == topic) { + DynamicJsonDocument json(docSizeFor(payload)); + deserializeJson(json, payload); + subscription.handle(topic, json.as()); + return; } - Log.warningln("MQTT: No handler for topic '%s'", - topic.c_str()); - }); + } + Log.warningln("MQTT: No handler for topic '%s'", + topic.c_str()); } // Actually subscribe to the given topic From abb9cd1098cfcc6aa05afab7e12ed35ef854a04a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=B3r=C3=A1nt=20Pint=C3=A9r?= Date: Fri, 2 Feb 2024 10:59:54 +0100 Subject: [PATCH 3/3] Separate MQTT init from processing --- src/kernel/drivers/MqttDriver.hpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp index 8fa04530..726132cf 100644 --- a/src/kernel/drivers/MqttDriver.hpp +++ b/src/kernel/drivers/MqttDriver.hpp @@ -185,16 +185,16 @@ class MqttDriver { , instanceName(instanceName) , clientId(getClientId(config.clientId.get(), instanceName)) , mqttReady(mqttReady) { - Task::run("mqtt:outgoing", 4096, 1, [this](Task& task) { + Task::run("mqtt:init", 4096, [this](Task& task) { setup(); - while (true) { + Task::loop("mqtt:outgoing", 4096, [this](Task& task) { auto delay = loopAndDelay(); task.delay(delay); - } - }); - Task::loop("mqtt:incoming", 4096, 1, [this](Task& task) { - incomingQueue.take([&](const Message& message) { - processIncomingMessage(message); + }); + Task::loop("mqtt:incoming", 4096, [this](Task& task) { + incomingQueue.take([&](const Message& message) { + processIncomingMessage(message); + }); }); }); }