From 59cf9a914fef0799257ad0d5e1d6464fd1392f90 Mon Sep 17 00:00:00 2001 From: Nicolas Aguirre Date: Thu, 14 Apr 2022 20:19:41 +0000 Subject: [PATCH] Accept subscribtions on wildcard topics --- src/bin/calaos_server/IO/Mqtt/MqttCtrl.cpp | 174 ++++++++++++++++++++- 1 file changed, 167 insertions(+), 7 deletions(-) diff --git a/src/bin/calaos_server/IO/Mqtt/MqttCtrl.cpp b/src/bin/calaos_server/IO/Mqtt/MqttCtrl.cpp index 48eb3f44..6377d488 100644 --- a/src/bin/calaos_server/IO/Mqtt/MqttCtrl.cpp +++ b/src/bin/calaos_server/IO/Mqtt/MqttCtrl.cpp @@ -65,22 +65,24 @@ MqttCtrl::MqttCtrl(const Params ¶ms) // Set or replace the message messages[p["topic"]] = p["payload"]; - for(auto cb : subscribeCb[p["topic"]]) + for (auto& it: subscribeCb) { + if (topicMatchesSubscription(it.first, p["topic"])) { - cb(p["payload"]); + auto cb = it.second[0]; + cb(p["topic"], p["payload"]); + } } - json_decref(jroot); - }); - process->startProcess(exe, "mqtt", arg); + json_decref(jroot); }); + process->startProcess(exe, "mqtt", arg); } MqttCtrl::~MqttCtrl() { } -void MqttCtrl::subscribeTopic(const string topic, sigc::slot callback) +void MqttCtrl::subscribeTopic(const string topic, sigc::slot callback) { // subscribeCb contains a map of list of callbacks, register this callback to the key relative of this topic cDebugDom("mqtt") << "Topic : " << topic; @@ -325,5 +327,163 @@ void MqttCtrl::commonDoc(IODoc *ioDoc) ioDoc->paramAdd("path", _("The path where to found the value in the mqtt payload. If payload if JSON, informations will be extracted depending on the path. for example weather[0]/description, try to read the description value of the 1 element of the array of the weather object. if payload is somple json, just try to use the key of the value you want to read, for example : {\"temperature\":14.23} use \"temperature\" as path\n"), IODoc::TYPE_STRING, true); - //user, password, keepalive + // user, password, keepalive +} + +/* Does a topic match a subscription? */ +/* Does a topic match a subscription? */ +bool MqttCtrl::topicMatchesSubscription(string s, string t) +{ + + const char *sub = s.c_str(); + const char *topic = t.c_str(); + size_t spos; + bool result = false; + + if (!sub || !topic || sub[0] == 0 || topic[0] == 0) + { + return result; + } + + if ((sub[0] == '$' && topic[0] != '$') || (topic[0] == '$' && sub[0] != '$')) + { + + return result; + } + + spos = 0; + + while (sub[0] != 0) + { + if (topic[0] == '+' || topic[0] == '#') + { + return result; + } + if (sub[0] != topic[0] || topic[0] == 0) + { /* Check for wildcard matches */ + if (sub[0] == '+') + { + /* Check for bad "+foo" or "a/+foo" subscription */ + if (spos > 0 && sub[-1] != '/') + { + return result; + } + /* Check for bad "foo+" or "foo+/a" subscription */ + if (sub[1] != 0 && sub[1] != '/') + { + return result; + } + spos++; + sub++; + while (topic[0] != 0 && topic[0] != '/') + { + if (topic[0] == '+' || topic[0] == '#') + { + return result; + } + topic++; + } + if (topic[0] == 0 && sub[0] == 0) + { + result = true; + return result; + } + } + else if (sub[0] == '#') + { + /* Check for bad "foo#" subscription */ + if (spos > 0 && sub[-1] != '/') + { + return result; + } + /* Check for # not the final character of the sub, e.g. "#foo" */ + if (sub[1] != 0) + { + return result; + } + else + { + while (topic[0] != 0) + { + if (topic[0] == '+' || topic[0] == '#') + { + return result; + } + topic++; + } + result = true; + return result; + } + } + else + { + /* Check for e.g. foo/bar matching foo/+/# */ + if (topic[0] == 0 && spos > 0 && sub[-1] == '+' && sub[0] == '/' && sub[1] == '#') + { + result = true; + return result; + } + + /* There is no match at this point, but is the sub invalid? */ + while (sub[0] != 0) + { + if (sub[0] == '#' && sub[1] != 0) + { + return result; + } + spos++; + sub++; + } + + /* Valid input, but no match */ + return result; + } + } + else + { + /* sub[spos] == topic[tpos] */ + if (topic[1] == 0) + { + /* Check for e.g. foo matching foo/# */ + if (sub[1] == '/' && sub[2] == '#' && sub[3] == 0) + { + result = true; + return result; + } + } + spos++; + sub++; + topic++; + if (sub[0] == 0 && topic[0] == 0) + { + result = true; + return result; + } + else if (topic[0] == 0 && sub[0] == '+' && sub[1] == 0) + { + if (spos > 0 && sub[-1] != '/') + { + return result; + } + spos++; + sub++; + result = true; + return result; + } + } + } + if ((topic[0] != 0 || sub[0] != 0)) + { + result = false; + } + while (topic[0] != 0) + { + if (topic[0] == '+' || topic[0] == '#') + { + return result; + } + topic++; + } + + return result; }