diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2a571a67..a9145ad2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -76,11 +76,3 @@ jobs: with: files: firmware-*/* repo-token: ${{ secrets.GITHUB_TOKEN }} - - ready-to-merge: - needs: build - if: ${{ github.ref != 'refs/heads/main' }} - runs-on: ubuntu-latest - steps: - - name: Dummy - run: echo "Everything is green across the board on ${{ github.ref }}" diff --git a/src/devices/Peripheral.hpp b/src/devices/Peripheral.hpp index 35e5a3ae..b5e64fc0 100644 --- a/src/devices/Peripheral.hpp +++ b/src/devices/Peripheral.hpp @@ -47,7 +47,6 @@ class PeripheralBase Log.verboseln("No telemetry to publish for peripheral: %s", name.c_str()); return; } - // TODO Add device ID mqttRoot->publish("telemetry", telemetryDoc); } diff --git a/src/kernel/drivers/MqttDriver.hpp b/src/kernel/drivers/MqttDriver.hpp index d27c3a35..93d2b9df 100644 --- a/src/kernel/drivers/MqttDriver.hpp +++ b/src/kernel/drivers/MqttDriver.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -33,6 +34,14 @@ class MqttDriver { ExactlyOnce = 2 }; + enum class PublishStatus { + TimeOut = 0, + Success = 1, + Failed = 2, + Pending = 3, + QueueFull = 4 + }; + typedef std::function CommandHandler; typedef std::function SubscriptionHandler; @@ -48,19 +57,19 @@ class MqttDriver { return make_shared(mqtt, rootTopic + "/" + suffix); } - bool publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) { - return mqtt.publish(fullTopic(suffix), json, retain, qos, nullptr, ticks::zero()); + PublishStatus publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero()) { + return mqtt.publish(fullTopic(suffix), json, retain, qos, timeout); } - bool publish(const String& suffix, std::function populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, int size = MQTT_BUFFER_SIZE) { + PublishStatus publish(const String& suffix, std::function populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero(), int size = MQTT_BUFFER_SIZE) { DynamicJsonDocument doc(size); JsonObject root = doc.to(); populate(root); - return publish(suffix, doc, retain, qos); + return publish(suffix, doc, retain, qos, timeout); } - bool clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) { - return mqtt.clear(fullTopic(suffix), retain, qos, nullptr, ticks::zero()); + PublishStatus clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero()) { + return mqtt.clear(fullTopic(suffix), retain, qos, timeout); } bool subscribe(const String& suffix, SubscriptionHandler handler) { @@ -74,8 +83,12 @@ class MqttDriver { bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) { String suffix = "commands/" + name; return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) { + // TODO Do exponential backoff when clear cannot be finished // Clear topic and wait for it to be cleared - mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, xTaskGetCurrentTaskHandle(), ticks::max()); + auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 }); + if (clearStatus != PublishStatus::Success) { + Log.errorln("MQTT: Failed to clear retained command topic '%s', status: %d", suffix.c_str(), clearStatus); + } DynamicJsonDocument responseDoc(responseSize); auto response = responseDoc.to(); @@ -204,33 +217,45 @@ class MqttDriver { } private: - bool publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) { + PublishStatus publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos, ticks timeout = ticks::zero()) { #ifdef DUMP_MQTT String serializedJson; serializeJsonPretty(json, serializedJson); Log.infoln("MQTT: Queuing topic '%s'%s (qos = %d): %s", topic.c_str(), (retain == Retention::Retain ? " (retain)" : ""), qos, serializedJson.c_str()); #endif - bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask); - if (offered && waitingTask != nullptr) { - return waitFor(offered, timeout); - } - return offered; + return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) { + return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask); + }); } - bool clear(const String& topic, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) { + PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) { Log.traceln("MQTT: Clearing topic '%s'", topic.c_str()); - bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask); - if (offered && waitingTask != nullptr) { - return waitFor(offered, timeout); - } - return offered; + return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) { + return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask); + }); } - bool waitFor(bool offered, ticks timeout) { + PublishStatus executeAndAwait(ticks timeout, std::function enqueue) { + TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle(); + bool offered = enqueue(waitingTask); + if (!offered) { + return PublishStatus::QueueFull; + } + if (waitingTask == nullptr) { + return PublishStatus::Pending; + } uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count()); - return status == Message::PUBLISH_SUCCESS; + switch (status) { + case 0: + return PublishStatus::TimeOut; + case Message::PUBLISH_SUCCESS: + return PublishStatus::Success; + case Message::PUBLISH_FAILED: + default: + return PublishStatus::Failed; + } } /**