From ce1fbbbe6d6ddf7461273017ce5101c33e3c63b6 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 18 Jun 2023 17:30:49 +0200 Subject: [PATCH] [CrateDB] Add support for the data export subsystem --- etc/test/cratedb.ini | 34 +++++++++++++++ etc/test/main.ini | 2 +- kotori/daq/storage/cratedb.py | 81 ++++++++++++++++++++++++++++++++++- kotori/io/export/database.py | 5 +++ kotori/io/protocol/cratedb.py | 40 +++++++++++++++++ kotori/io/protocol/target.py | 6 +-- test/settings/mqttkit.py | 1 + test/test_export.py | 19 ++++++++ test/util.py | 5 +++ 9 files changed, 187 insertions(+), 6 deletions(-) create mode 100644 kotori/io/protocol/cratedb.py diff --git a/etc/test/cratedb.ini b/etc/test/cratedb.ini index ee3356c..b0bbc71 100644 --- a/etc/test/cratedb.ini +++ b/etc/test/cratedb.ini @@ -3,6 +3,18 @@ ; ###################################### +; ========================================== +; General settings +; ========================================== + +; http server +[kotori] + +; TODO: Refactor to [http] section, or, even better, into the channel configuration. +http_listen = localhost +http_port = 24643 + + ; ===================== ; Connectivity settings ; ===================== @@ -39,3 +51,25 @@ type = application realm = mqttkit-2 mqtt_topics = mqttkit-2/# application = kotori.daq.application.mqttkit:mqttkit_application + +[mqttkit-2.http-api-generic] +enable = true + +type = application +application = kotori.io.protocol.forwarder:boot + +realm = mqttkit-2 +source = http:/api/mqttkit-2/{address:.*}/{slot:(data|event)} [POST] +target = mqtt:/mqttkit-2/{address}/{slot}.json + +[mqttkit-2.cratedb-data-export] +enable = true + +type = application +application = kotori.io.protocol.forwarder:boot + +realm = mqttkit-2 +source = http:/api/{realm:mqttkit-2}/{network:.*}/{gateway:.*}/{node:.*}/{slot:(data|event)}.{suffix} [GET] +target = cratedb:/{database}?measurement={measurement} +transform = kotori.daq.strategy.wan:WanBusStrategy.topology_to_storage, + kotori.io.protocol.cratedb:QueryTransformer.transform diff --git a/etc/test/main.ini b/etc/test/main.ini index a783824..0b00892 100644 --- a/etc/test/main.ini +++ b/etc/test/main.ini @@ -15,7 +15,7 @@ include = ; http server [kotori] -; TODO: Refactor to [http] section. +; TODO: Refactor to [http] section, or, even better, into the channel configuration. http_listen = localhost http_port = 24642 diff --git a/kotori/daq/storage/cratedb.py b/kotori/daq/storage/cratedb.py index b6b24b4..295d98a 100644 --- a/kotori/daq/storage/cratedb.py +++ b/kotori/daq/storage/cratedb.py @@ -1,17 +1,21 @@ # -*- coding: utf-8 -*- # (c) 2023 Andreas Motl import calendar +import functools import json +from collections import OrderedDict from decimal import Decimal from copy import deepcopy -from datetime import datetime, date +from datetime import datetime, date, timezone import crate.client.http import pytz import requests from crate import client +from crate.client.converter import DefaultTypeConverter from crate.client.exceptions import ProgrammingError from funcy import project +from munch import Munch from twisted.logger import Logger from kotori.daq.storage.util import format_chunk @@ -19,7 +23,7 @@ log = Logger() -class CrateDBAdapter(object): +class CrateDBAdapter: """ Kotori database backend adapter for CrateDB. @@ -86,6 +90,79 @@ def create_table(self, tablename): cursor.execute(sql_ddl) cursor.close() + def query(self, expression: str, tdata: Munch = None): + """ + Query CrateDB and respond with results in suitable shape. + + Make sure to synchronize data by using `REFRESH TABLE ...` before running + the actual `SELECT` statement. This is applicable in test case scenarios. + + Response format:: + + [ + { + "time": ..., + "tags": {"city": "berlin", "location": "balcony"}, + "fields": {"temperature": 42.42, "humidity": 84.84}, + }, + ... + ] + + TODO: Unify with `kotori.test.util:CrateDBWrapper.query`. + """ + + log.info(f"Database query: {expression}") + + tdata = tdata or {} + + # Before reading data from CrateDB, synchronize it. + # Currently, it is mostly needed to satisfy synchronization constraints when running the test suite. + # However, users also may expect to see data "immediately". On the other hand, in order to satisfy + # different needs, this should be made configurable per realm, channel and/or request. + # TODO: Maybe just _optionally_ synchronize with the database when reading data. + if tdata: + refresh_sql = f"REFRESH TABLE {self.get_tablename(tdata)}" + self.execute(refresh_sql) + + def dict_from_row(columns, row): + """ + https://stackoverflow.com/questions/3300464/how-can-i-get-dict-from-sqlite-query + https://stackoverflow.com/questions/4147707/python-mysqldb-sqlite-result-as-dictionary + """ + return dict(zip(columns, row)) + + def record_from_dict(item): + record = OrderedDict() + record.update({"time": item["time"]}) + record.update(item["tags"]) + record.update(item["fields"]) + return record + + # Query database, with convenience data type converters. Assume timestamps to be in UTC. + cursor = self.db_client.cursor(converter=DefaultTypeConverter(), time_zone=timezone.utc) + cursor.execute(expression) + data_raw = cursor.fetchall() + + # Provide fully-qualified records to downstream components, including column names. + column_names = [column_info[0] for column_info in cursor.description] + data_tags_fields = map(functools.partial(dict_from_row, column_names), data_raw) + + # Bring results into classic "records" shape. + data_records = map(record_from_dict, data_tags_fields) + + cursor.close() + return data_records + + def execute(self, expression: str): + """ + Execute a database query, using a cursor, and return its results. + """ + cursor = self.db_client.cursor() + cursor.execute(expression) + result = cursor._result + cursor.close() + return result + def write(self, meta, data): """ Format ingress data chunk and store it into database table. diff --git a/kotori/io/export/database.py b/kotori/io/export/database.py index dea6ff8..4bda174 100644 --- a/kotori/io/export/database.py +++ b/kotori/io/export/database.py @@ -31,6 +31,11 @@ def query(self): settings=self.settings.influxdb, database=bucket.tdata.database, ) + elif "cratedb" in self.settings: + from kotori.daq.storage.cratedb import CrateDBAdapter + database = CrateDBAdapter( + settings=self.settings.cratedb, + ) else: log.warn("No time-series database configured") return diff --git a/kotori/io/protocol/cratedb.py b/kotori/io/protocol/cratedb.py new file mode 100644 index 0000000..55cde8b --- /dev/null +++ b/kotori/io/protocol/cratedb.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# (c) 2023 Andreas Motl +from twisted.logger import Logger + +from kotori.io.protocol.util import compute_daterange + +log = Logger() + + +class QueryTransformer: + + @classmethod + def transform(cls, data): + """ + Compute CrateDB query expression from data in transformation dictionary. + Also compute date range from query parameters "from" and "to". + """ + + log.info(f"Querying database: {data}") + + # The PyInfluxQL query generator is versatile enough to be used for all SQL databases. + from pyinfluxql import Query + + # TODO: Use ".date_range" API method + time_begin, time_end = compute_daterange(data.get('from'), data.get('to')) + + # TODO: Add querying by tags. + tags = {} + # tags = CrateDBAdapter.get_tags(data) + + table = f"{data.database}.{data.measurement}" + expression = Query('*').from_(table).where(time__gte=time_begin, time__lte=time_end, **tags) + + result = { + 'expression': str(expression), + 'time_begin': time_begin, + 'time_end': time_end, + } + + return result diff --git a/kotori/io/protocol/target.py b/kotori/io/protocol/target.py index 56f3abd..baf303a 100644 --- a/kotori/io/protocol/target.py +++ b/kotori/io/protocol/target.py @@ -61,8 +61,8 @@ def setupService(self): except Exception as ex: log.failure("Connecting to MQTT broker failed: {ex}", ex=last_error_and_traceback()) - elif self.scheme == 'influxdb': - # InfluxDB has no subsystem service, it's just an adapter + # CrateDB and InfluxDB are not subsystem services, they are just adapters. + elif self.scheme in ['cratedb', 'influxdb']: pass else: @@ -87,7 +87,7 @@ def emit(self, uri, bucket): # TODO: Use threads.deferToThread here? return self.downstream.publish(topic, payload) - elif self.scheme == 'influxdb': + elif self.scheme in ['cratedb', 'influxdb']: # InfluxDB query wrapper using expression derived from transformation data dfq = DataFrameQuery(settings=self.settings, bucket=bucket) diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index 19843ad..41b2865 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -16,6 +16,7 @@ class TestSettings: cratedb_measurement_events = 'foo_bar_events' mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json' grafana2_dashboards = ['mqttkit-2-itest', 'mqttkit-2-itest3'] + io_channel_path = '/mqttkit-2/itest/foo/bar/data' # InfluxDB settings. influx_database = 'mqttkit_1_itest' diff --git a/test/test_export.py b/test/test_export.py index 519b632..212b8bd 100644 --- a/test/test_export.py +++ b/test/test_export.py @@ -23,6 +23,25 @@ ts_to = '2020-03-10T23:59:59.000Z' +@pytest_twisted.inlineCallbacks +@pytest.mark.http +@pytest.mark.export +@pytest.mark.cratedb +def test_export_cratedb_general(machinery_cratedb, reset_cratedb): + """ + Submit single reading in JSON format to HTTP API and proof + it can be retrieved back from the HTTP API in different formats. + + This uses CrateDB as timeseries database. + """ + + channel_path = settings.io_channel_path + http_submit = functools.partial(http_json_sensor, port=24643) + http_fetch = functools.partial(http_get_data, port=24643) + + yield verify_export_general(channel_path, http_submit, http_fetch) + + @pytest_twisted.inlineCallbacks @pytest.mark.http @pytest.mark.export diff --git a/test/util.py b/test/util.py index 3514f40..5cf9336 100644 --- a/test/util.py +++ b/test/util.py @@ -259,6 +259,9 @@ def query(self): }, ... ] + + TODO: Refactor to / unify with `kotori.daq.storage.cratedb:CrateDBAdapter.query`. + TODO: Add dict-based record retrieval as a row factory to `crate` library. """ logger.info('CrateDB: Querying database') db_table = self.get_tablename() @@ -280,6 +283,8 @@ def query(self): def execute(self, expression): """ Actually execute the database query, using a cursor. + + TODO: Use `kotori.daq.storage.cratedb:CrateDBAdapter.execute`. """ cursor = self.client.cursor() cursor.execute(expression)