Skip to content

Commit

Permalink
Return queue full
Browse files Browse the repository at this point in the history
  • Loading branch information
lptr committed Feb 4, 2024
1 parent a06dbbb commit a2e726b
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions src/kernel/drivers/MqttDriver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class MqttDriver {
enum class PublishStatus {
TimeOut = 0,
Success = 1,
Pending = 2,
Failed = 3
Failed = 2,
Pending = 3,
QueueFull = 4
};

typedef std::function<void(const JsonObject&, JsonObject&)> CommandHandler;
Expand Down Expand Up @@ -82,6 +83,7 @@ class MqttDriver {
bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) {
String suffix = "commands/" + name;
return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) {
// TODO Do exponential backoff when clear cannot be finished
// Clear topic and wait for it to be cleared
auto clearStatus = mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, std::chrono::seconds { 5 });
if (clearStatus != PublishStatus::Success) {
Expand Down Expand Up @@ -224,21 +226,27 @@ class MqttDriver {
#endif
TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
if (offered && waitingTask != nullptr) {
return waitFor(offered, timeout);
if (!offered) {
return PublishStatus::QueueFull;
}
return offered ? PublishStatus::Pending : PublishStatus::Failed;
if (waitingTask == nullptr) {
return PublishStatus::Pending;
}
return waitFor(offered, timeout);
}

PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) {
Log.traceln("MQTT: Clearing topic '%s'",
topic.c_str());
TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
if (offered && waitingTask != nullptr) {
return waitFor(offered, timeout);
if (!offered) {
return PublishStatus::QueueFull;
}
if (waitingTask == nullptr) {
return PublishStatus::Pending;
}
return offered ? PublishStatus::Pending : PublishStatus::Failed;
return waitFor(offered, timeout);
}

PublishStatus waitFor(bool offered, ticks timeout) {
Expand Down

0 comments on commit a2e726b

Please sign in to comment.