Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DA] Add per-device addressing (DA) to WAN topic/channel decoding strategy #136

Merged
merged 4 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ in progress
- [TTN] Add TTS (The Things Stack) / TTN (The Things Network) decoder.
Thanks, @thiasB and @u-l-m-i.
- [TTN] Decode metadata from full TTN payload. Thanks, @thiasB.
- [DA] Add per-device addressing and topic decoding strategies. Thanks,
@thiasB and @ClemensGruber. Examples:

- ``mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000``
- ``mqttkit-1/channel/network-gateway-node``


.. _kotori-0.27.0:

Expand Down
44 changes: 42 additions & 2 deletions doc/source/handbook/configuration/mqttkit.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ attribute of the main application settings section.
.. literalinclude:: ../../_static/content/etc/examples/mqttkit.ini
:language: ini
:linenos:
:lines: 10-27
:emphasize-lines: 4,11-18
:lines: 10-30
:emphasize-lines: 4,14-21


**********
Addressing
**********

Wide channel
============

To successfully publish data to the platform, you should get familiar with the MQTTKit addressing scheme.
We call it the »quadruple hierarchy strategy« and it is reflected on the mqtt bus topic topology.

Expand Down Expand Up @@ -89,6 +93,36 @@ The topology identifiers are specified as:
In the following examples, this topology address will be encoded into the variable ``CHANNEL``.


Direct channel
==============

When using the :ref:`hiveeyes-arduino:sensorwan-direct-addressing` scheme of
:ref:`hiveeyes-arduino:sensorwan`, it is possible to detour from the "wide" addressing scheme,
and submit data "directly" to a channel address like ``mqttkit-1/channel/<network>-<gateway>-<node>``
instead.

In order to restrict access to that addressing flavour to specific networks/owners only,
you can use the ``direct_channel_allowed_networks`` configuration setting, where you can
enumerate network/owner path components, which are allowed to submit data on their
corresponding channel groups.

.. literalinclude:: ../../_static/content/etc/examples/mqttkit.ini
:language: ini
:linenos:
:lines: 20-21

For all others, access will be rejected by raising an ``ChannelAccessDenied`` exception.


Direct device
=============

The :ref:`hiveeyes-arduino:sensorwan-direct-addressing` scheme also allows you to address
channels by device identifiers only, also detouring from the "wide" addressing scheme.

| An example for a corresponding channel address, identifying devices by `UUID`_, would be
| ``mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000``.


************
Sending data
Expand All @@ -107,6 +141,12 @@ will be encoded into the variable ``CHANNEL``.
# Publish telemetry data to MQTT topic.
echo "$DATA" | mosquitto_pub -t $CHANNEL/data.json -l

When using the "direct channel" addressing scheme, those invocations would address
the same channel as in the previous example::

CHANNEL=mqttkit-1/channel/testdrive-foobar-42
echo "$DATA" | mosquitto_pub -t $CHANNEL/data.json -l


**************
Receiving data
Expand Down
3 changes: 3 additions & 0 deletions etc/examples/mqttkit.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ application = kotori.daq.application.mqttkit:mqttkit_application
# How often to log metrics
metrics_logger_interval = 60

# Restrict SensorWAN direct addressing to specified networks/owners.
direct_channel_allowed_networks = itest, testdrive

# [mqttkit-1:mqtt]
# ; Configure individual MQTT broker for this application.
# ; The option group prefix `mqttkit-1` reflects the value of
Expand Down
2 changes: 1 addition & 1 deletion kotori/daq/application/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, name=None, application_settings=None, global_settings=None):
service = MqttInfluxGrafanaService(
channel = self.channel,
# Data processing strategy and graphing components
strategy=WanBusStrategy(),
strategy=WanBusStrategy(channel_settings=self.channel),
graphing=GrafanaManager(settings=global_settings, channel=self.channel)
)

Expand Down
6 changes: 6 additions & 0 deletions kotori/daq/strategy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl, <[email protected]>
from munch import Munch

from kotori.daq.decoder import MessageType


class StrategyBase:

def __init__(self, channel_settings=None):
channel_settings = channel_settings or Munch()
self.channel_settings = channel_settings

@staticmethod
def sanitize_db_identifier(value):
"""
Expand Down
81 changes: 74 additions & 7 deletions kotori/daq/strategy/wan.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl, <[email protected]>
# (c) 2015-2023 Andreas Motl, <[email protected]>
import re

from kotori.daq.exception import ChannelAccessDenied
from kotori.daq.strategy import StrategyBase
from kotori.util.common import SmartMunch
from kotori.util.configuration import read_list


class WanBusStrategy(StrategyBase):

# Regular expression pattern for decoding MQTT topic address segments.
pattern = r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)(?:/(?P<slot>.+?))?$'
matcher = re.compile(pattern)
wide_channel_matcher = re.compile(r'^(?P<realm>.+?)/(?P<network>.+?)/(?P<gateway>.+?)/(?P<node>.+?)(?:/(?P<slot>.+?))?$')
direct_channel_matcher = re.compile(r'^(?P<realm>.+?)/channel/(?P<channel>.+?)/(?:(?P<slot>.+?))$')
direct_device_matcher = re.compile(r'^(?P<realm>.+?)/device/(?P<device>.+?)/(?:(?P<slot>.+?))$')

def topic_to_topology(self, topic):
"""
Decode MQTT topic segments implementing the »quadruple hierarchy strategy«.
Decode MQTT topic segments implementing the »quadruple hierarchy/topology strategy«.

The topology hierarchy is directly specified by the MQTT topic and is
made up of a minimum of four identifiers describing the core structure::
Expand All @@ -40,14 +43,78 @@ def topic_to_topology(self, topic):

- "node" is the node identifier. Choose anything you like. This usually
gets transmitted from an embedded device node.

Other than decoding the classic WAN path topic style, like

mqttkit-1/network/gateway/node

the decoder now also knows how to handle per-device addressing schemes, like

mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000

Also, it has another special topic decoding scheme, by decomposing a dashed
identifier, and transforming it into the quadruple hierarchy.

mqttkit-1/channel/network-gateway-node

"""

# Decode the topic.
m = self.matcher.match(topic)
address = None

# Try to match the per-device pattern.
m = self.direct_device_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())
else:
address = {}
if "device" in address:
address.network = "devices"
address.gateway = "default"
address.node = address.device
del address.device

# Try to match the per-device pattern with dashed topology encoding for topics.
if address is None:

# Decode permission setting from channel configuration object.
direct_channel_allowed_networks = None
if "direct_channel_allowed_networks" in self.channel_settings:
direct_channel_allowed_networks = read_list(self.channel_settings.direct_channel_allowed_networks)

m = self.direct_channel_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())
if "channel" in address:
segments = address.channel.split("-")

# Compensate "too few segments": Fill up with "default" at the front.
missing_segments = 3 - len(segments)
segments = ["default"] * missing_segments + segments

# When the first topic path fragment is equal to the realm, we assume a
# slight misconfiguration, and ignore the first decoded segment completely.
if segments[0] == address.realm:
segments = segments[1:]

# Compensate "too many segments": Merge trailing segments.
if len(segments) > 3:
segments = segments[:2] + ["-".join(segments[2:])]

# Destructure three components / segments.
address.network, address.gateway, address.node = segments

# Do not propagate the `channel` slot. It either has been
# dissolved, or it was propagated into the `node` slot.
del address.channel

# Evaluate permissions.
if direct_channel_allowed_networks and address.network not in direct_channel_allowed_networks:
raise ChannelAccessDenied(f"Rejected access to SensorWAN network: {address.network}")

# Try to match the classic path-based WAN topic encoding scheme.
if address is None:
m = self.wide_channel_matcher.match(topic)
if m:
address = SmartMunch(m.groupdict())

return address

Expand Down
2 changes: 1 addition & 1 deletion kotori/vendor/hiveeyes/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def hiveeyes_boot(settings, debug=False):
HiveeyesGenericGrafanaManager(settings=settings, channel=channel),
HiveeyesBeehiveGrafanaManager(settings=settings, channel=channel),
],
strategy = WanBusStrategy()
strategy = WanBusStrategy(channel_settings=channel)
)

rootService.registerService(service)
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ markers =
ttn: Tests for TTS/TTN adapter.
hiveeyes: Tests for vendor hiveeyes.
legacy: Tests for legacy endpoints and such.
device: Device-based addressing.
strategy: Transformation strategies.
5 changes: 4 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from kotori import KotoriBootloader
from test.util import boot_kotori, sleep
from test.settings.mqttkit import influx_sensors, influx_events, grafana
from test.settings.mqttkit import influx_sensors, influx_events, grafana, device_influx_sensors

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,4 +53,7 @@ def machinery():
reset_grafana = grafana.make_reset()
reset_influxdb_events = influx_events.make_reset_measurement()

device_create_influxdb = device_influx_sensors.make_create_db()
device_reset_influxdb = device_influx_sensors.make_reset_measurement()

machinery_basic = create_machinery('./etc/test/basic.ini')
11 changes: 11 additions & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,20 @@ class TestSettings:
channel_path_event = '/mqttkit-1/itest/foo/bar/event'
channel_path_airrohr = '/mqttkit-1/itest/foo/bar/custom/airrohr'

# Per-device entrypoints.
direct_influx_database = 'mqttkit_1_devices'
direct_influx_measurement_sensors = 'default_123e4567_e89b_12d3_a456_426614174000_sensors'
direct_mqtt_topic_device = 'mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000/data.json'
direct_mqtt_topic_channel = 'mqttkit-1/channel/itest-foo-bar/data.json'
direct_mqtt_topic_channel_denied = 'mqttkit-1/channel/another-foo-bar/data.json'
direct_http_path_device = '/mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000/data'
direct_http_path_channel = '/mqttkit-1/channel/itest-foo-bar/data'


settings = TestSettings

influx_sensors = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_sensors)
influx_events = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_events)
grafana = GrafanaWrapper(settings=settings)

device_influx_sensors = InfluxWrapper(database=settings.direct_influx_database, measurement=settings.direct_influx_measurement_sensors)
70 changes: 69 additions & 1 deletion test/test_daq_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
from twisted.internet import threads

from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_HTTP
from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_HTTP, device_influx_sensors
from test.util import http_json_sensor, http_form_sensor, http_csv_sensor, sleep, http_raw

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,6 +45,74 @@ def test_http_json_valid(machinery, create_influxdb, reset_influxdb):
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.device
def test_http_json_wan_device(machinery, device_create_influxdb, device_reset_influxdb):
"""
Run HTTP data acquisition with per-device addressing.

Addressing: SensorWAN direct-device
Example: mqttkit-1/device/123e4567-e89b-12d3-a456-426614174000
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 25.26,
'humidity': 51.8,
}
deferred = threads.deferToThread(http_json_sensor, settings.direct_http_path_device, data)
yield deferred

# Check response.
response = deferred.result
assert response.status_code == 200
assert response.content == json.dumps([{"type": "info", "message": "Received #1 readings"}], indent=4).encode("utf-8")

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_HTTP)

# Proof that data arrived in InfluxDB.
record = device_influx_sensors.get_first_record()
del record['time']
assert record == {u'temperature': 25.26, u'humidity': 51.8}
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.device
def test_http_json_wan_channel(machinery, create_influxdb, reset_influxdb):
"""
Run MQTT data acquisition with per-device dashed-topo addressing.

Addressing: SensorWAN direct-channel, with dashed topology decoding
Example: mqttkit-1/channel/network-gateway-node
"""

# Submit a single measurement, without timestamp.
data = {
'temperature': 25.26,
'humidity': 51.8,
}
deferred = threads.deferToThread(http_json_sensor, settings.direct_http_path_channel, data)
yield deferred

# Check response.
response = deferred.result
assert response.status_code == 200
assert response.content == json.dumps([{"type": "info", "message": "Received #1 readings"}], indent=4).encode("utf-8")

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_HTTP)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_first_record()
del record['time']
assert record == {u'temperature': 25.26, u'humidity': 51.8}
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.http
def test_http_json_invalid(machinery):
Expand Down
Loading