Skip to content

Commit

Permalink
Accept subscribtions on wildcard topics
Browse files Browse the repository at this point in the history
  • Loading branch information
naguirre committed Apr 14, 2022
1 parent 186fcd2 commit 59cf9a9
Showing 1 changed file with 167 additions and 7 deletions.
174 changes: 167 additions & 7 deletions src/bin/calaos_server/IO/Mqtt/MqttCtrl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,24 @@ MqttCtrl::MqttCtrl(const Params &params)

// Set or replace the message
messages[p["topic"]] = p["payload"];
for(auto cb : subscribeCb[p["topic"]])
for (auto& it: subscribeCb) {
if (topicMatchesSubscription(it.first, p["topic"]))
{
cb(p["payload"]);
auto cb = it.second[0];
cb(p["topic"], p["payload"]);
}
}
json_decref(jroot);
});

process->startProcess(exe, "mqtt", arg);
json_decref(jroot); });

process->startProcess(exe, "mqtt", arg);
}

MqttCtrl::~MqttCtrl()
{
}

void MqttCtrl::subscribeTopic(const string topic, sigc::slot<void, string> callback)
void MqttCtrl::subscribeTopic(const string topic, sigc::slot<void, string, string> callback)
{
// subscribeCb contains a map of list of callbacks, register this callback to the key relative of this topic
cDebugDom("mqtt") << "Topic : " << topic;
Expand Down Expand Up @@ -325,5 +327,163 @@ void MqttCtrl::commonDoc(IODoc *ioDoc)

ioDoc->paramAdd("path", _("The path where to found the value in the mqtt payload. If payload if JSON, informations will be extracted depending on the path. for example weather[0]/description, try to read the description value of the 1 element of the array of the weather object. if payload is somple json, just try to use the key of the value you want to read, for example : {\"temperature\":14.23} use \"temperature\" as path\n"), IODoc::TYPE_STRING, true);

//user, password, keepalive
// user, password, keepalive
}

/* Does a topic match a subscription? */
/* Does a topic match a subscription? */
bool MqttCtrl::topicMatchesSubscription(string s, string t)
{

const char *sub = s.c_str();
const char *topic = t.c_str();
size_t spos;
bool result = false;

if (!sub || !topic || sub[0] == 0 || topic[0] == 0)
{
return result;
}

if ((sub[0] == '$' && topic[0] != '$') || (topic[0] == '$' && sub[0] != '$'))
{

return result;
}

spos = 0;

while (sub[0] != 0)
{
if (topic[0] == '+' || topic[0] == '#')
{
return result;
}
if (sub[0] != topic[0] || topic[0] == 0)
{ /* Check for wildcard matches */
if (sub[0] == '+')
{
/* Check for bad "+foo" or "a/+foo" subscription */
if (spos > 0 && sub[-1] != '/')
{
return result;
}
/* Check for bad "foo+" or "foo+/a" subscription */
if (sub[1] != 0 && sub[1] != '/')
{
return result;
}
spos++;
sub++;
while (topic[0] != 0 && topic[0] != '/')
{
if (topic[0] == '+' || topic[0] == '#')
{
return result;
}
topic++;
}
if (topic[0] == 0 && sub[0] == 0)
{
result = true;
return result;
}
}
else if (sub[0] == '#')
{
/* Check for bad "foo#" subscription */
if (spos > 0 && sub[-1] != '/')
{
return result;
}
/* Check for # not the final character of the sub, e.g. "#foo" */
if (sub[1] != 0)
{
return result;
}
else
{
while (topic[0] != 0)
{
if (topic[0] == '+' || topic[0] == '#')
{
return result;
}
topic++;
}
result = true;
return result;
}
}
else
{
/* Check for e.g. foo/bar matching foo/+/# */
if (topic[0] == 0 && spos > 0 && sub[-1] == '+' && sub[0] == '/' && sub[1] == '#')
{
result = true;
return result;
}

/* There is no match at this point, but is the sub invalid? */
while (sub[0] != 0)
{
if (sub[0] == '#' && sub[1] != 0)
{
return result;
}
spos++;
sub++;
}

/* Valid input, but no match */
return result;
}
}
else
{
/* sub[spos] == topic[tpos] */
if (topic[1] == 0)
{
/* Check for e.g. foo matching foo/# */
if (sub[1] == '/' && sub[2] == '#' && sub[3] == 0)
{
result = true;
return result;
}
}
spos++;
sub++;
topic++;
if (sub[0] == 0 && topic[0] == 0)
{
result = true;
return result;
}
else if (topic[0] == 0 && sub[0] == '+' && sub[1] == 0)
{
if (spos > 0 && sub[-1] != '/')
{
return result;
}
spos++;
sub++;
result = true;
return result;
}
}
}
if ((topic[0] != 0 || sub[0] != 0))
{
result = false;
}
while (topic[0] != 0)
{
if (topic[0] == '+' || topic[0] == '#')
{
return result;
}
topic++;
}

return result;
}

0 comments on commit 59cf9a9

Please sign in to comment.