Skip to content

Commit

Permalink
Extract common code
Browse files Browse the repository at this point in the history
  • Loading branch information
lptr committed Feb 4, 2024
1 parent a2e726b commit ac55865
Showing 1 changed file with 10 additions and 14 deletions.
24 changes: 10 additions & 14 deletions src/kernel/drivers/MqttDriver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,32 +224,28 @@ class MqttDriver {
Log.infoln("MQTT: Queuing topic '%s'%s (qos = %d): %s",
topic.c_str(), (retain == Retention::Retain ? " (retain)" : ""), qos, serializedJson.c_str());
#endif
TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
if (!offered) {
return PublishStatus::QueueFull;
}
if (waitingTask == nullptr) {
return PublishStatus::Pending;
}
return waitFor(offered, timeout);
return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) {
return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, json, retain, qos, waitingTask);
});
}

PublishStatus clear(const String& topic, Retention retain, QoS qos, ticks timeout = ticks::zero()) {
Log.traceln("MQTT: Clearing topic '%s'",
topic.c_str());
return executeAndAwait(timeout, [&](TaskHandle_t waitingTask) {
return publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
});
}

PublishStatus executeAndAwait(ticks timeout, std::function<bool(TaskHandle_t)> enqueue) {
TaskHandle_t waitingTask = timeout == ticks::zero() ? nullptr : xTaskGetCurrentTaskHandle();
bool offered = publishQueue.offerIn(MQTT_QUEUE_TIMEOUT, topic, "", retain, qos, waitingTask);
bool offered = enqueue(waitingTask);
if (!offered) {
return PublishStatus::QueueFull;
}
if (waitingTask == nullptr) {
return PublishStatus::Pending;
}
return waitFor(offered, timeout);
}

PublishStatus waitFor(bool offered, ticks timeout) {
uint32_t status = ulTaskNotifyTake(pdTRUE, timeout.count());
switch (status) {
case 0:
Expand Down

0 comments on commit ac55865

Please sign in to comment.