From 44a1be41bb74fbe2e9c715bdd68e517241913708 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 15 Feb 2022 00:16:38 +0100 Subject: [PATCH 1/2] [TTN] Add experimental TTS/TTN HTTP Webhook forwarder --- etc/examples/forwarders/http-api-tts-ttn.ini | 82 ++++++++++++++++++++ etc/test/main.ini | 3 +- kotori/daq/strategy/tts_ttn.py | 70 +++++++++++++++++ test/settings/mqttkit.py | 1 + test/test_tts_ttn.py | 32 +++++++- 5 files changed, 184 insertions(+), 4 deletions(-) create mode 100644 etc/examples/forwarders/http-api-tts-ttn.ini create mode 100644 kotori/daq/strategy/tts_ttn.py diff --git a/etc/examples/forwarders/http-api-tts-ttn.ini b/etc/examples/forwarders/http-api-tts-ttn.ini new file mode 100644 index 00000000..ed89584a --- /dev/null +++ b/etc/examples/forwarders/http-api-tts-ttn.ini @@ -0,0 +1,82 @@ +; ------------------------------------------ +; Family: Protocol forwarder +; About: Versatile protocol forwarder components for bridging the gap between +; different data sinks, bus systems and serialization formats. +; ------------------------------------------ +; Name: TTS/TTN-to-MQTT protocol forwarder +; About: Suitable for data acquisition via TTS/TTN Webhook HTTP POST requests. +; Channel: Transport: HTTP, MQTT; Format: JSON +; See also: https://getkotori.org/docs/handbook/forwarders/tts-ttn.html +; https://getkotori.org/docs/handbook/acquisition/protocol/http.html +; https://getkotori.org/docs/handbook/decoders/ +; ------------------------------------------ +; Description: +; +; - Listen to HTTP POST or PUT requests +; - Receive payloads formatted as JSON or in urlencoded format +; - Decode payloads using specific decoders +; - Forward payloads to the MQTT bus +; +; Manual: Please specify forwarding source and target parameters in URI format. +; The MQTT topic path is derived from the HTTP URI path by interpolating +; the appropriate part of the context URI. +; +; Example: In the example below, given the "address" part of the resource URI +; is "testdrive/area-42/node-1", data sent to the full URIs +; +; /api/mqttkit-1/ttn/{devID} +; /api/mqttkit-1/ttn/{devID}/uplinks +; /api/mqttkit-1/ttn/{devID}/join-accept +; /api/mqttkit-1/ttn/{devID}/downlink-ack +; +; with, e.g.:: +; +; devID=mqttkit-1/testdrive/area-42/node-1 +; +; will be republished to the MQTT topics +; +; mqttkit-1/testdrive/area-42/node-1/data.json +; mqttkit-1/testdrive/area-42/node-1/events.json +; +; ------------------------------------------ + + +[mqttkit-1.http-api-tts-ttn.composite] +enable = false +type = application +realm = mqttkit-1 +mqtt_topics = mqttkit-1/# +app_factory = kotori.daq.application.composite:boot +services = kotori.daq.services.mig:MqttInfluxGrafanaService +graphing = kotori.daq.graphing.grafana:GrafanaManager +strategy = kotori.daq.strategy.tts_ttn:TheThingsWanBusStrategy + + + +[mqttkit-1.http-api-tts-ttn] +enable = true + +type = application +application = kotori.io.protocol.forwarder:boot +#app_factory = kotori.daq.application.composite:boot + +realm = mqttkit-1 +source = http:/api/{realm:mqttkit-1}/ttn/{device_id:.*}/{slot:(uplinks|join-accept|downlink-ack)} [POST] +target = mqtt:/{realm}/{address}/ttn/{slot}.json +transform = kotori.daq.strategy.tts_ttn:TheThingsWanBusStrategy.topic_to_topology + + +; ------------------------------------------------ +; Notes about "source" and "target" parameters +; ------------------------------------------------ + +; Note that the "netloc" part (i.e. for specifying hostname/port) +; in directives "source" and "target" are omitted from these uris. +; +; Kotori will only listen to the default HTTP port and forward +; payloads to the default MQTT broker. Both are specified in the +; main configuration file, usually "/etc/kotori/kotori.ini". +; +; However, this might change in the future to enable spinning +; up HTTP listeners on arbitrary ports at runtime and to allow +; publishing messages to different MQTT brokers. diff --git a/etc/test/main.ini b/etc/test/main.ini index ba7fb361..0e72e90e 100644 --- a/etc/test/main.ini +++ b/etc/test/main.ini @@ -5,7 +5,8 @@ include = etc/examples/mqttkit.ini, etc/examples/forwarders/http-api-generic.ini, - etc/examples/forwarders/http-api-export.ini + etc/examples/forwarders/http-api-export.ini, + etc/examples/forwarders/http-api-tts-ttn.ini ; ========================================== diff --git a/kotori/daq/strategy/tts_ttn.py b/kotori/daq/strategy/tts_ttn.py new file mode 100644 index 00000000..288f16bf --- /dev/null +++ b/kotori/daq/strategy/tts_ttn.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# (c) 2022 Andreas Motl, +import re + +from munch import Munch + +from kotori.daq.strategy.wan import WanBusStrategy +from kotori.util.common import SmartBunch + + +class TheThingsWanBusStrategy(WanBusStrategy): + + # Regular expression pattern for decoding MQTT topic address segments. + #pattern = r'^(?P.+?)/ttn/(?P.+?)(?:/(?P.+?))?$' + #matcher = re.compile(pattern) + + @classmethod + def topic_to_topology(cls, topic): + """ + Decode MQTT topic segments implementing the »quadruple hierarchy strategy«. + + The topology hierarchy is directly specified by the MQTT topic and is + made up of a minimum of four identifiers describing the core structure:: + + realm / network / gateway / node + + The topology identifiers are specified as: + + - "realm" is the designated root realm. You should prefix the topic name + with this label when opting in for all features of the telemetry platform. + For other purposes, feel free to publish to any MQTT topic you like. + + - "network" is your personal realm. Choose anything you like or use an + `Online GUID Generator `_ to gain + maximum uniqueness. + + - "gateway" is your gateway identifier. Choose anything you like. + This does not have to be very unique, so you might use labels + having the names of sites. While you are the owner of this + namespace hierarchy, remember these labels might be visible on + the collaborative ether, though. You might want to assign nicknames + to your sites to not identify their location. + + - "node" is the node identifier. Choose anything you like. This usually + gets transmitted from an embedded device node. + """ + + print("########## TOPIC:", topic) + + # Munch({'realm': 'mqttkit-1', 'device_id': 'itest-foo-bar', 'slot': 'uplinks'}) + assert isinstance(topic, Munch) + assert topic.realm + assert topic.device_id + assert topic.slot + + # {'realm': 'mqttkit-1', 'network': 'itest', 'gateway': 'foo', 'node': 'bar', 'slot': 'data.json'} + address = SmartBunch( + realm=topic.realm, + + ) + + + # Decode the topic. + m = cls.matcher.match(topic) + if m: + address = SmartBunch(m.groupdict()) + else: + address = {} + + return address diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index 70c328d5..e26bc70f 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -34,6 +34,7 @@ class TestSettings: channel2_path_data = '/mqttkit-1/itest/foo/bar2/data' channel_path_event = '/mqttkit-1/itest/foo/bar/event' channel_path_airrohr = '/mqttkit-1/itest/foo/bar/custom/airrohr' + channel_path_ttn = '/mqttkit-1/ttn' settings = TestSettings diff --git a/test/test_tts_ttn.py b/test/test_tts_ttn.py index e97bc776..e56178fe 100644 --- a/test/test_tts_ttn.py +++ b/test/test_tts_ttn.py @@ -66,9 +66,7 @@ def make_testcases(): @pytest.mark.tts @pytest.mark.ttn @pytest.mark.parametrize("testcase", make_testcases()) -def test_tts_ttn_http_json_full( - testcase, machinery_basic, create_influxdb, reset_influxdb -): +def test_tts_ttn_http_json_decoder(testcase, machinery_basic, create_influxdb, reset_influxdb): """ Submit single reading in TTS/TTN webhook JSON format to HTTP API, and verify it was correctly stored in the InfluxDB database. @@ -95,3 +93,31 @@ def test_tts_ttn_http_json_full( # Verify the records looks like expected. assert record == data_out yield record + + +@pytest_twisted.inlineCallbacks +@pytest.mark.http +@pytest.mark.tts +@pytest.mark.ttn +@pytest.mark.amo +def test_tts_ttn_http_json_forwarder(machinery, create_influxdb, reset_influxdb): + """ + Accept all requests to the `/api/ttn` URL suffix in TTS/TTN webhook JSON format + and proof it is stored in the InfluxDB database. + """ + + from test.settings.mqttkit import settings as mqttkit_settings + + # Submit a single measurement, without timestamp. + baseurl = mqttkit_settings.channel_path_ttn + device_id = "itest-foo-bar" + yield threads.deferToThread(http_json_sensor, f"{baseurl}/{device_id}/uplinks", data_in) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB properly. + record = influx_sensors.get_first_record() + assert record == data_out + yield record From b13ed5babf0445bf76180b77f0c7da4538a3969a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 15 Feb 2022 00:17:29 +0100 Subject: [PATCH 2/2] [TTN] Add debugging statements --- CHANGES.rst | 2 ++ kotori/daq/decoder/__init__.py | 1 + kotori/daq/services/mig.py | 1 + kotori/daq/strategy/wan.py | 2 ++ kotori/io/protocol/forwarder.py | 3 +++ 5 files changed, 9 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 018482bb..f630f0db 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,6 +12,8 @@ in progress mean iterator type: *query.stringInterruptIterator`` or ``InfluxDB Error: not executed``. - Add TTS (The Things Stack) / TTN (The Things Network) decoder +- Add experimental TTS/TTN HTTP Webhook forwarder + ``/api/mqttkit-1/ttn/itest-foo-bar/uplinks`` .. _kotori-0.27.0: diff --git a/kotori/daq/decoder/__init__.py b/kotori/daq/decoder/__init__.py index 653570c5..03a64926 100644 --- a/kotori/daq/decoder/__init__.py +++ b/kotori/daq/decoder/__init__.py @@ -43,6 +43,7 @@ def probe(self, payload: str = None): return True # TTS/TTN: The Things Stack / The Things Network + print("self.topology:", self.topology) if self.topology.slot.endswith('data.json') \ and payload is not None \ and "uplink_message" in payload \ diff --git a/kotori/daq/services/mig.py b/kotori/daq/services/mig.py index 4e2a0b17..33892ab9 100644 --- a/kotori/daq/services/mig.py +++ b/kotori/daq/services/mig.py @@ -183,6 +183,7 @@ def process_message(self, topic, payload, **kwargs): def decode_message(self, topic, payload): # Compute topology information from channel topic. + print("========== decode_message >>> topic_to_topology:", topic) topology = self.strategy.topic_to_topology(topic) log.debug(u'Topology address: {topology}', topology=dict(topology)) diff --git a/kotori/daq/strategy/wan.py b/kotori/daq/strategy/wan.py index 662f16d3..85650393 100644 --- a/kotori/daq/strategy/wan.py +++ b/kotori/daq/strategy/wan.py @@ -42,6 +42,8 @@ def topic_to_topology(self, topic): gets transmitted from an embedded device node. """ + print("########## TOPIC:", topic) + # Decode the topic. m = self.matcher.match(topic) if m: diff --git a/kotori/io/protocol/forwarder.py b/kotori/io/protocol/forwarder.py index 1b693ba3..7b5f093d 100644 --- a/kotori/io/protocol/forwarder.py +++ b/kotori/io/protocol/forwarder.py @@ -115,12 +115,14 @@ def forward(self, bucket): Receive data bucket from source, run through transformation machinery and emit to target. """ + print("bucket:", bucket) # 1. Map/transform topology address information if 'transform' in self.channel: for entrypoint in read_list(self.channel.transform): try: transformer = KotoriBootloader.load_entrypoint(entrypoint) + print("transformer:", transformer) bucket.tdata.update(transformer(bucket.tdata)) except ImportError as ex: log.error('ImportError "{message}" when loading entrypoint "{entrypoint}"', @@ -128,6 +130,7 @@ def forward(self, bucket): # MQTT doesn't prefer leading forward slashes with topic names, let's get rid of them target_uri_tpl = self.target_uri.path.lstrip('/') + print("target_uri_tpl:", target_uri_tpl) # Compute target bus topic from url matches target_uri = target_uri_tpl.format(**bucket.tdata)