Skip to content

Commit

Permalink
Merge pull request #89 from kivancsikert/mqtt/do-not-fail-on-hanging-…
Browse files Browse the repository at this point in the history
…clear

Fix hanging MQTT incoming queue
  • Loading branch information
lptr authored Feb 4, 2024
2 parents 3b1848d + ac55865 commit d151ee6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,3 @@ jobs:
with:
files: firmware-*/*
repo-token: ${{ secrets.GITHUB_TOKEN }}

ready-to-merge:
needs: build
if: ${{ github.ref != 'refs/heads/main' }}
runs-on: ubuntu-latest
steps:
- name: Dummy
run: echo "Everything is green across the board on ${{ github.ref }}"
1 change: 0 additions & 1 deletion src/devices/Peripheral.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class PeripheralBase
Log.verboseln("No telemetry to publish for peripheral: %s", name.c_str());
return;
}
// TODO Add device ID
mqttRoot->publish("telemetry", telemetryDoc);
}

Expand Down
67 changes: 46 additions & 21 deletions src/kernel/drivers/MqttDriver.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <list>
#include <memory>

Expand Down Expand Up @@ -33,6 +34,14 @@ class MqttDriver {
ExactlyOnce = 2
};

enum class PublishStatus {
TimeOut = 0,
Success = 1,
Failed = 2,
Pending = 3,
QueueFull = 4
};

typedef std::function<void(const JsonObject&, JsonObject&)> CommandHandler;

typedef std::function<void(const String&, const JsonObject&)> SubscriptionHandler;
Expand All @@ -48,19 +57,19 @@ class MqttDriver {
return make_shared<MqttRoot>(mqtt, rootTopic + "/" + suffix);
}

bool publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) {
return mqtt.publish(fullTopic(suffix), json, retain, qos, nullptr, ticks::zero());
PublishStatus publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero()) {
return mqtt.publish(fullTopic(suffix), json, retain, qos, timeout);
}

bool publish(const String& suffix, std::function<void(JsonObject&)> populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, int size = MQTT_BUFFER_SIZE) {
PublishStatus publish(const String& suffix, std::function<void(JsonObject&)> populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero(), int size = MQTT_BUFFER_SIZE) {
DynamicJsonDocument doc(size);
JsonObject root = doc.to<JsonObject>();
populate(root);
return publish(suffix, doc, retain, qos);
return publish(suffix, doc, retain, qos, timeout);
}

bool clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) {
return mqtt.clear(fullTopic(suffix), retain, qos, nullptr, ticks::zero());
PublishStatus clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, ticks timeout = ticks::zero()) {
return mqtt.clear(fullTopic(suffix), retain, qos, timeout);
}

bool subscribe(const String& suffix, SubscriptionHandler handler) {
Expand All @@ -74,8 +83,12 @@ class MqttDriver {
bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) {
String suffix = "commands/" + name;
return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) {
// TODO Do exponential backoff when clear cannot be finished
// Clear topic and wait for it to be cleared
mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, xTaskGetCurrentTaskHandle(), ticks::max());
auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 });
if (clearStatus != PublishStatus::Success) {
Log.errorln("MQTT: Failed to clear retained command topic '%s', status: %d", suffix.c_str(), clearStatus);
}

DynamicJsonDocument responseDoc(responseSize);
auto response = responseDoc.to<JsonObject>();
Expand Down Expand Up @@ -204,33 +217,45 @@ class MqttDriver {
}

private:
bool publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) {
PublishStatus publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos, ticks timeout = ticks::zero()) {
#ifdef DUMP_MQTT
String serializedJson;
serializeJsonPretty(json, serializedJson);
Log.infoln("MQTT: Queuing topic '%s'%s (qos = %d): %s",
topic.c_str(), (retain == Retention::Retain ? " (retain)" : ""), qos, serializedJson.c_str());
#endif
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
if (offered && waitingTask != nullptr) {
return waitFor(offered, timeout);
}
return offered;
return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) {
return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
});
}

bool clear(const String& topic, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) {
PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) {
Log.traceln("MQTT: Clearing topic '%s'",
topic.c_str());
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
if (offered && waitingTask != nullptr) {
return waitFor(offered, timeout);
}
return offered;
return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) {
return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
});
}

bool waitFor(bool offered, ticks timeout) {
PublishStatus executeAndAwait(ticks timeout, std::function<bool(TaskHandle_t)> enqueue) {
TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
bool offered = enqueue(waitingTask);
if (!offered) {
return PublishStatus::QueueFull;
}
if (waitingTask == nullptr) {
return PublishStatus::Pending;
}
uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count());
return status == Message::PUBLISH_SUCCESS;
switch (status) {
case 0:
return PublishStatus::TimeOut;
case Message::PUBLISH_SUCCESS:
return PublishStatus::Success;
case Message::PUBLISH_FAILED:
default:
return PublishStatus::Failed;
}
}

/**
Expand Down

0 comments on commit d151ee6

Please sign in to comment.