Skip to content

Commit

Permalink
Merge pull request #84 from kivancsikert/mqtt/clean-up-reset-command
Browse files Browse the repository at this point in the history
Wait for network to be up before attempting to connect to MQTT
  • Loading branch information
lptr authored Feb 2, 2024
2 parents 92bda5d + abb9cd1 commit ff30be4
Showing 1 changed file with 63 additions and 42 deletions.
105 changes: 63 additions & 42 deletions src/kernel/drivers/MqttDriver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class MqttDriver {
}

bool publish(const String& suffix, const JsonDocument& json, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) {
return mqtt.publish(fullTopic(suffix), json, retain, qos);
return mqtt.publish(fullTopic(suffix), json, retain, qos, nullptr, ticks::zero());
}

bool publish(const String& suffix, std::function<void(JsonObject&)> populate, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce, int size = MQTT_BUFFER_SIZE) {
Expand All @@ -60,7 +60,7 @@ class MqttDriver {
}

bool clear(const String& suffix, Retention retain = Retention::NoRetain, QoS qos = QoS::AtMostOnce) {
return mqtt.clear(fullTopic(suffix), retain, qos);
return mqtt.clear(fullTopic(suffix), retain, qos, nullptr, ticks::zero());
}

bool subscribe(const String& suffix, SubscriptionHandler handler) {
Expand All @@ -74,8 +74,9 @@ class MqttDriver {
bool registerCommand(const String& name, size_t responseSize, CommandHandler handler) {
String suffix = "commands/" + name;
return subscribe(suffix, QoS::ExactlyOnce, [this, name, suffix, responseSize, handler](const String&, const JsonObject& request) {
// Clear topic
clear(suffix, Retention::Retain, QoS::ExactlyOnce);
// Clear topic and wait for it to be cleared
mqtt.clear(fullTopic(suffix), Retention::Retain, QoS::ExactlyOnce, xTaskGetCurrentTaskHandle(), ticks::max());

DynamicJsonDocument responseDoc(responseSize);
auto response = responseDoc.to<JsonObject>();
handler(request, response);
Expand Down Expand Up @@ -115,26 +116,25 @@ class MqttDriver {
const String payload;
const Retention retain;
const QoS qos;
const TaskHandle_t waitingTask;

Message()
: topic("")
, payload("")
, retain(Retention::NoRetain)
, qos(QoS::AtMostOnce) {
}
static const uint32_t PUBLISH_SUCCESS = 1;
static const uint32_t PUBLISH_FAILED = 2;

Message(const String& topic, const String& payload, Retention retention, QoS qos)
Message(const String& topic, const String& payload, Retention retention, QoS qos, TaskHandle_t waitingTask)
: topic(topic)
, payload(payload)
, retain(retention)
, qos(qos) {
, qos(qos)
, waitingTask(waitingTask) {
}

Message(const String& topic, const JsonDocument& payload, Retention retention, QoS qos)
Message(const String& topic, const JsonDocument& payload, Retention retention, QoS qos, TaskHandle_t waitingTask)
: topic(topic)
, payload(serializeJsonToString(payload))
, retain(retention)
, qos(qos) {
, qos(qos)
, waitingTask(waitingTask) {
}

private:
Expand Down Expand Up @@ -185,12 +185,17 @@ class MqttDriver {
, instanceName(instanceName)
, clientId(getClientId(config.clientId.get(), instanceName))
, mqttReady(mqttReady) {
Task::run("mqtt", 8192, 1, [this](Task& task) {
Task::run("mqtt:init", 4096, [this](Task& task) {
setup();
while (true) {
Task::loop("mqtt:outgoing", 4096, [this](Task& task) {
auto delay = loopAndDelay();
task.delay(delay);
}
});
Task::loop("mqtt:incoming", 4096, [this](Task& task) {
incomingQueue.take([&](const Message& message) {
processIncomingMessage(message);
});
});
});
}

Expand All @@ -199,20 +204,33 @@ class MqttDriver {
}

private:
bool publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos) {
bool publish(const String& topic, const JsonDocument& json, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) {
#ifdef DUMP_MQTT
String serializedJson;
serializeJsonPretty(json, serializedJson);
Log.infoln("MQTT: Queuing topic '%s'%s (qos = %d): %s",
topic.c_str(), (retain == Retention::Retain ? " (retain)" : ""), qos, serializedJson.c_str());
#endif
return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos);
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
if (offered && waitingTask != nullptr) {
return waitFor(offered, timeout);
}
return offered;
}

bool clear(const String& topic, Retention retain, QoS qos) {
bool clear(const String& topic, Retention retain, QoS qos, TaskHandle_t waitingTask, ticks timeout) {
Log.traceln("MQTT: Clearing topic '%s'",
topic.c_str());
return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos);
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
if (offered && waitingTask != nullptr) {
return waitFor(offered, timeout);
}
return offered;
}

bool waitFor(bool offered, ticks timeout) {
uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count());
return status == Message::PUBLISH_SUCCESS;
}

/**
Expand All @@ -226,6 +244,8 @@ class MqttDriver {
}

void setup() {
networkReady.awaitSet();

if (config.host.get().length() > 0) {
mqttServer.hostname = config.host.get();
mqttServer.port = config.port.get();
Expand All @@ -241,7 +261,7 @@ class MqttDriver {
Log.infoln("MQTT: Received '%s' (size: %d): %s",
topic.c_str(), payload.length(), payload.c_str());
#endif
incomingQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, payload, Retention::NoRetain, QoS::ExactlyOnce);
incomingQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, payload, Retention::NoRetain, QoS::ExactlyOnce, nullptr);
});

if (mqttServer.ip == IPAddress()) {
Expand Down Expand Up @@ -279,7 +299,6 @@ class MqttDriver {

processPublishQueue();
processSubscriptionQueue();
procesIncomingQueue();

return MQTT_LOOP_INTERVAL;
}
Expand All @@ -298,6 +317,10 @@ class MqttDriver {
Log.errorln("MQTT: Error publishing to '%s', error = %d",
message.topic.c_str(), mqttClient.lastError());
}
if (message.waitingTask != nullptr) {
uint32_t status = success ? Message::PUBLISH_SUCCESS : Message::PUBLISH_FAILED;
xTaskNotify(message.waitingTask, status, eSetValueWithOverwrite);
}
});
}

Expand All @@ -309,30 +332,28 @@ class MqttDriver {
});
}

void procesIncomingQueue() {
incomingQueue.drain([&](const Message& message) {
const String& topic = message.topic;
const String& payload = message.payload;
void processIncomingMessage(const Message& message) {
const String& topic = message.topic;
const String& payload = message.payload;

if (payload.isEmpty()) {
if (payload.isEmpty()) {
#ifdef DUMP_MQTT
Log.verboseln("MQTT: Ignoring empty payload");
Log.verboseln("MQTT: Ignoring empty payload");
#endif
return;
}
return;
}

Log.traceln("MQTT: Received message: '%s'", topic.c_str());
for (auto subscription : subscriptions) {
if (subscription.topic == topic) {
DynamicJsonDocument json(docSizeFor(payload));
deserializeJson(json, payload);
subscription.handle(topic, json.as<JsonObject>());
return;
}
Log.traceln("MQTT: Received message: '%s'", topic.c_str());
for (auto subscription : subscriptions) {
if (subscription.topic == topic) {
DynamicJsonDocument json(docSizeFor(payload));
deserializeJson(json, payload);
subscription.handle(topic, json.as<JsonObject>());
return;
}
Log.warningln("MQTT: No handler for topic '%s'",
topic.c_str());
});
}
Log.warningln("MQTT: No handler for topic '%s'",
topic.c_str());
}

// Actually subscribe to the given topic
Expand Down

0 comments on commit ff30be4

Please sign in to comment.