From c13babb9fd458bf7123107a032895e542ebfacbf Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 9 Jun 2023 22:30:40 +0200 Subject: [PATCH 1/7] [CrateDB] Add basic data acquisition support for CrateDB --- .env | 5 + .github/workflows/tests.yml | 9 +- doc/source/development/tests.rst | 3 + docker-compose.yml | 13 +++ etc/test/cratedb.ini | 41 +++++++ etc/test/main.ini | 1 + kotori/daq/services/mig.py | 16 ++- kotori/daq/storage/cratedb.py | 195 +++++++++++++++++++++++++++++++ pytest.ini | 1 + setup.py | 2 + test/conftest.py | 4 +- test/settings/mqttkit.py | 13 ++- test/test_daq_mqtt.py | 34 +++++- test/test_daq_timestamp.py | 33 +++++- test/util.py | 139 ++++++++++++++++++++++ 15 files changed, 499 insertions(+), 10 deletions(-) create mode 100644 etc/test/cratedb.ini create mode 100644 kotori/daq/storage/cratedb.py diff --git a/.env b/.env index 5f9b9f3a..c40e3044 100644 --- a/.env +++ b/.env @@ -6,6 +6,11 @@ MOSQUITTO_VERSION=2.0 MOSQUITTO_MQTT_PORT=1883 MOSQUITTO_WS_PORT=9001 +# CrateDB +CRATEDB_VERSION=latest +CRATEDB_HTTP_PORT=4200 +CRATEDB_POSTGRESQL_PORT=5432 + # InfluxDB INFLUXDB_VERSION=1.8 INFLUXDB_HTTP_PORT=8086 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 516ff2b7..10e21c58 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,12 +24,19 @@ jobs: os: [ ubuntu-20.04 ] # , macos-latest, windows-latest ] python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ] mosquitto-version: [ "2.0" ] + cratedb-version: [ "5.3" ] influxdb-version: [ "1.8" ] grafana-version: [ "7.5.17", "8.5.27", "9.5.7", "10.0.3" ] # https://docs.github.com/en/free-pro-team@latest/actions/guides/about-service-containers services: + cratedb: + image: crate:${{ matrix.cratedb-version }} + ports: + - 4200:4200 + - 5432:5432 + influxdb: image: influxdb:${{ matrix.influxdb-version }} ports: @@ -52,7 +59,7 @@ jobs: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} - name: Python ${{ matrix.python-version }}, Grafana ${{ matrix.grafana-version }}, Mosquitto ${{ matrix.mosquitto-version }}, InfluxDB ${{ matrix.influxdb-version }} + name: Py ${{ matrix.python-version }}, Grafana ${{ matrix.grafana-version }}, Mosquitto ${{ matrix.mosquitto-version }}, InfluxDB ${{ matrix.influxdb-version }}, CrateDB ${{ matrix.cratedb-version }} steps: - name: Acquire sources diff --git a/doc/source/development/tests.rst b/doc/source/development/tests.rst index 91ee777c..977ca8e2 100644 --- a/doc/source/development/tests.rst +++ b/doc/source/development/tests.rst @@ -63,6 +63,9 @@ Run specific tests with maximum verbosity:: # Run tests marked with "tasmota", "homie" or "airrohr". pytest test ${PYTEST_OPTIONS} -m 'tasmota or homie or airrohr' + # Run tests with CrateDB as database backend. + pytest test ${PYTEST_OPTIONS} -m cratedb + To see available markers, type:: pytest --markers diff --git a/docker-compose.yml b/docker-compose.yml index 76441e66..9b6bf600 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,19 @@ services: - ${PATH_VAR_LIB}/mosquitto:/mosquitto/data - ${PATH_VAR_LOG}/mosquitto:/mosquitto/log + cratedb: + image: crate:${CRATEDB_VERSION} + ports: + - "${CRATEDB_HTTP_PORT}:${CRATEDB_HTTP_PORT}" + - "${CRATEDB_POSTGRESQL_PORT}:${CRATEDB_POSTGRESQL_PORT}" + environment: + CRATE_HEAP_SIZE: 2g + + command: ["crate", + "-Cdiscovery.type=single-node", + "-Ccluster.routing.allocation.disk.threshold_enabled=false", + ] + # https://github.com/robcowart/docker_compose_cookbook/blob/master/STACKS/influx_oss/docker-compose.yml#L21 influxdb: image: influxdb:${INFLUXDB_VERSION} diff --git a/etc/test/cratedb.ini b/etc/test/cratedb.ini new file mode 100644 index 00000000..ee3356c1 --- /dev/null +++ b/etc/test/cratedb.ini @@ -0,0 +1,41 @@ +; ###################################### +; Kotori test configuration with CrateDB +; ###################################### + + +; ===================== +; Connectivity settings +; ===================== + +; MQTT bus adapter +[mqtt] +host = localhost +#port = 1883 +username = kotori +password = kotori + +; Storage adapter +[cratedb] +; host = localhost +; port = 4200 +; username = crate +; password = + +; User interface +[grafana] +host = localhost +#port = 3000 +username = admin +password = admin + + +; ================ +; Channel settings +; ================ + +[mqttkit-2] +enable = true +type = application +realm = mqttkit-2 +mqtt_topics = mqttkit-2/# +application = kotori.daq.application.mqttkit:mqttkit_application diff --git a/etc/test/main.ini b/etc/test/main.ini index 4fc3b68e..a7838247 100644 --- a/etc/test/main.ini +++ b/etc/test/main.ini @@ -20,6 +20,7 @@ http_listen = localhost http_port = 24642 ; TODO: Implement backend database selection. +; use_database = cratedb ; use_database = influxdb ; mqtt bus adapter diff --git a/kotori/daq/services/mig.py b/kotori/daq/services/mig.py index 91e121dc..34df8eb9 100644 --- a/kotori/daq/services/mig.py +++ b/kotori/daq/services/mig.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# (c) 2015-2021 Andreas Motl +# (c) 2015-2023 Andreas Motl +import os import time import json @@ -17,6 +18,7 @@ from kotori.daq.decoder.schema import MessageType, TopicMatchers from kotori.daq.services import MultiServiceMixin from kotori.daq.intercom.mqtt import MqttAdapter +from kotori.daq.storage.cratedb import CrateDBAdapter from kotori.daq.storage.influx import InfluxDBAdapter from kotori.util.configuration import read_list from kotori.util.thimble import Thimble @@ -79,7 +81,14 @@ def setupService(self): self.registerService(self.mqtt_service) - self.influx = InfluxDBAdapter(settings = self.settings.influxdb) + # TODO: Support multiple databases at the same time. + log.info("Creating database adapter") + if "influxdb" in self.settings: + self.database = InfluxDBAdapter(settings=self.settings.influxdb) + elif "cratedb" in self.settings: + self.database = CrateDBAdapter(settings = self.settings.cratedb) + else: + log.warn("No time-series database configured") # Perform MQTT message processing using a different thread pool self.threadpool = ThreadPool() @@ -311,7 +320,8 @@ def store_message(self, storage, data): :param storage: The storage location object :param data: The data ready for storing """ - self.influx.write(storage, data) + if self.database is not None: + self.database.write(storage, data) def mqtt_process_error(self, failure, topic, payload): """ diff --git a/kotori/daq/storage/cratedb.py b/kotori/daq/storage/cratedb.py new file mode 100644 index 00000000..81440836 --- /dev/null +++ b/kotori/daq/storage/cratedb.py @@ -0,0 +1,195 @@ +# -*- coding: utf-8 -*- +# (c) 2023 Andreas Motl +import calendar +import json +from decimal import Decimal +from copy import deepcopy +from datetime import datetime, date + +import crate.client.http +import pytz +import requests +from crate import client +from crate.client.exceptions import ProgrammingError +from funcy import project +from twisted.logger import Logger + +from kotori.daq.storage.util import format_chunk + +log = Logger() + + +class CrateDBAdapter(object): + """ + Kotori database backend adapter for CrateDB. + + CrateDB is a distributed and scalable SQL database for storing and analyzing massive + amounts of data in near real-time, even with complex queries. It is PostgreSQL-compatible, + and based on Lucene. + + https://github.com/crate/crate + """ + + def __init__(self, settings=None, database=None): + """ + Carry over connectivity parameters. + + TODO: Verify with CrateDB Cloud. + """ + + settings = deepcopy(settings) or {} + settings.setdefault("host", "localhost") + settings.setdefault("port", "4200") + settings.setdefault("username", "crate") + settings.setdefault("password", "") + settings.setdefault("database", database) + + # TODO: Bring back pool size configuration. + # settings.setdefault('pool_size', 10) + + settings["port"] = int(settings["port"]) + + # FIXME: This is bad style. Well, but it is currently + # inherited from ~10 year old code, so c'est la vie. + self.__dict__.update(**settings) + + # Bookkeeping for all databases having been touched already + self.databases_written_once = set() + + self.host_uri = "{host}:{port}".format(**self.__dict__) + + # TODO: Bring back pool size configuration. + # log.info('Storage target is {uri}, pool size is {pool_size}', uri=self.host_uri, pool_size=self.pool_size) + log.info("Storage target is {uri}", uri=self.host_uri) + self.db_client = client.connect( + self.host_uri, username=self.username, password=self.password, pool_size=20, + ) + + def get_tablename(self, meta): + """ + Get table name for SensorWAN channel. + """ + return f"{meta.database}.{meta.measurement}" + + def create_table(self, tablename): + """ + Create database table for SensorWAN channel. + """ + log.info(f"Creating table: {tablename}") + sql_ddl = f""" +CREATE TABLE IF NOT EXISTS {tablename} ( + time TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + tags OBJECT(DYNAMIC), + fields OBJECT(DYNAMIC), + year TIMESTAMP GENERATED ALWAYS AS DATE_TRUNC('year', time) +) PARTITIONED BY (year); + """.strip() + cursor = self.db_client.cursor() + cursor.execute(sql_ddl) + cursor.close() + + def write(self, meta, data): + """ + Format ingress data chunk and store it into database table. + + TODO: This dearly needs efficiency improvements. Currently, there is no + batching, just single records/inserts. That yields bad performance. + """ + + meta_copy = deepcopy(dict(meta)) + data_copy = deepcopy(data) + + try: + chunk = format_chunk(meta, data) + + except Exception as ex: + log.failure( + "Could not format chunk (ex={ex_name}: {ex}): data={data}, meta={meta}", + ex_name=ex.__class__.__name__, + ex=ex, + meta=meta_copy, + data=data_copy, + ) + raise + + try: + success = self.write_chunk(meta, chunk) + return success + + except requests.exceptions.ConnectionError as ex: + log.failure( + "Problem connecting to CrateDB at {uri}: {ex}", uri=self.host_uri, ex=ex + ) + raise + + except ProgrammingError as ex: + if "SchemaUnknownException" in ex.message: + db_table = self.get_tablename(meta) + self.create_table(db_table) + + # Attempt second write + success = self.write_chunk(meta, chunk) + return success + + else: + raise + + def write_chunk(self, meta, chunk): + """ + Run the SQL `INSERT` operation. + """ + db_table = self.get_tablename(meta) + cursor = self.db_client.cursor() + + # With or without timestamp. + if "time" in chunk: + cursor.execute( + f"INSERT INTO {db_table} (time, tags, fields) VALUES (?, ?, ?)", + (chunk["time"], chunk["tags"], chunk["fields"]), + ) + else: + cursor.execute( + f"INSERT INTO {db_table} (tags, fields) VALUES (?, ?)", + (chunk["tags"], chunk["fields"]), + ) + success = True + self.databases_written_once.add(meta.database) + cursor.close() + if success: + log.debug("Storage success: {chunk}", chunk=chunk) + else: + log.error("Storage failed: {chunk}", chunk=chunk) + return success + + @staticmethod + def get_tags(data): + """ + Derive tags from topology information. + + TODO: Verify if this is used at all. + """ + return project(data, ["gateway", "node"]) + + +class TimezoneAwareCrateJsonEncoder(json.JSONEncoder): + epoch_aware = datetime(1970, 1, 1, tzinfo=pytz.UTC) + epoch_naive = datetime(1970, 1, 1) + + def default(self, o): + if isinstance(o, Decimal): + return str(o) + if isinstance(o, datetime): + if o.tzinfo: + delta = o - self.epoch_aware + else: + delta = o - self.epoch_naive + return int(delta.microseconds / 1000.0 + + (delta.seconds + delta.days * 24 * 3600) * 1000.0) + if isinstance(o, date): + return calendar.timegm(o.timetuple()) * 1000 + return json.JSONEncoder.default(self, o) + + +# Monkey patch. +# TODO: Submit upstream. +crate.client.http.CrateJsonEncoder = TimezoneAwareCrateJsonEncoder diff --git a/pytest.ini b/pytest.ini index b21cb283..450eb3b3 100644 --- a/pytest.ini +++ b/pytest.ini @@ -29,6 +29,7 @@ markers = http: Tests using HTTP. export: Tests for exporting data. mqtt: Tests only doing MQTT. + cratedb: Tests specific to CrateDB. influxdb: Tests specific to InfluxDB. grafana: Tests interacting with Grafana. mongodb: Tests using MongoDB. diff --git a/setup.py b/setup.py index e7416744..6f4b0640 100644 --- a/setup.py +++ b/setup.py @@ -67,6 +67,8 @@ extras = { 'daq': [ + 'crash<1', + 'crate[sqlalchemy]<1', 'influxdb>=5.3.0,<6', 'pytz>=2020.1', 'requests>=2.12.4,<3', diff --git a/test/conftest.py b/test/conftest.py index eee3a5ff..52ec53b4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -11,7 +11,7 @@ from kotori import KotoriBootloader from test.util import boot_kotori, sleep -from test.settings.mqttkit import influx_sensors, influx_events, grafana, device_influx_sensors +from test.settings.mqttkit import cratedb_sensors, influx_sensors, influx_events, grafana, device_influx_sensors logger = logging.getLogger(__name__) @@ -48,6 +48,8 @@ def machinery(): machinery = create_machinery('./etc/test/main.ini') +machinery_cratedb = create_machinery('./etc/test/cratedb.ini') +reset_cratedb = cratedb_sensors.make_reset_measurement() create_influxdb = influx_sensors.make_create_db() reset_influxdb = influx_sensors.make_reset_measurement() reset_grafana = grafana.make_reset() diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index c3094657..7ebc0f2d 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -# (c) 2020 Andreas Motl +# (c) 2020-2023 Andreas Motl -from test.util import InfluxWrapper, GrafanaWrapper +from test.util import CrateDBWrapper, InfluxWrapper, GrafanaWrapper PROCESS_DELAY_MQTT = 0.3 PROCESS_DELAY_HTTP = 0.3 @@ -9,6 +9,13 @@ class TestSettings: + # CrateDB settings. + cratedb_database = 'mqttkit_2_itest' + cratedb_databases = ['mqttkit_2_itest', 'mqttkit_2_itest3'] + cratedb_measurement_sensors = 'foo_bar_sensors' + cratedb_measurement_events = 'foo_bar_events' + mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json' + # InfluxDB settings. influx_database = 'mqttkit_1_itest' influx_databases = ['mqttkit_1_itest', 'mqttkit_1_itest3'] @@ -47,6 +54,8 @@ class TestSettings: settings = TestSettings +cratedb_sensors = CrateDBWrapper(database=settings.cratedb_database, measurement=settings.cratedb_measurement_sensors) +cratedb_events = CrateDBWrapper(database=settings.cratedb_database, measurement=settings.cratedb_measurement_events) influx_sensors = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_sensors) influx_events = InfluxWrapper(database=settings.influx_database, measurement=settings.influx_measurement_events) grafana = GrafanaWrapper(settings=settings) diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index 0c73b103..55e006b5 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -1,13 +1,14 @@ # -*- coding: utf-8 -*- # (c) 2020-2021 Andreas Motl import logging +import os import re import pytest import pytest_twisted from twisted.internet import threads -from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_MQTT, device_influx_sensors +from test.settings.mqttkit import settings, cratedb_sensors, influx_sensors, PROCESS_DELAY_MQTT, device_influx_sensors from test.util import mqtt_json_sensor, sleep, mqtt_sensor logger = logging.getLogger(__name__) @@ -42,6 +43,37 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb yield record +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +@pytest.mark.cratedb +def test_mqtt_to_cratedb_json_single(monkeypatch, machinery_cratedb, reset_cratedb): + """ + Publish single reading in JSON format to MQTT broker + and proof it is stored in the CrateDB database. + + Addressing: Classic WAN path + Example: mqttkit-1/network/gateway/node + Database: CrateDB + """ + + # Submit a single measurement, without timestamp. + data = { + 'temperature': 42.84, + 'humidity': 83.1, + } + yield threads.deferToThread(mqtt_json_sensor, settings.mqtt2_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = cratedb_sensors.get_first_record() + del record['time'] + assert record == {u'humidity': 83.1, u'temperature': 42.84} + yield record + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy diff --git a/test/test_daq_timestamp.py b/test/test_daq_timestamp.py index c451611a..39e6fa5f 100644 --- a/test/test_daq_timestamp.py +++ b/test/test_daq_timestamp.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -# (c) 2020-2021 Andreas Motl +# (c) 2020-2023 Andreas Motl import logging import pytest import pytest_twisted -from test.settings.mqttkit import settings, influx_sensors, PROCESS_DELAY_MQTT +from test.settings.mqttkit import settings, cratedb_sensors, influx_sensors, PROCESS_DELAY_MQTT from test.util import mqtt_json_sensor, sleep logger = logging.getLogger(__name__) @@ -38,6 +38,35 @@ def test_timestamp_rfc3339_influxdb(machinery, create_influxdb, reset_influxdb): yield record +@pytest_twisted.inlineCallbacks +@pytest.mark.cratedb +def test_timestamp_rfc3339_cratedb(machinery_cratedb, reset_cratedb): + """ + Publish single reading in JSON format to MQTT broker, using a timestamp in + RFC3339 format. Proof that the timestamp is processed and stored correctly. + + Here, CrateDB is used as database backend. + """ + + # Submit a single measurement, with timestamp. + data = { + 'temperature': 42.84, + 'humidity': 83.1, + 'timestamp': '2020-03-10 03:38:37.937059000+01:00' + } + yield mqtt_json_sensor(settings.mqtt2_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = cratedb_sensors.get_first_record() + # TODO: Original value for `time` is `2020-03-10T02:38:37.937059Z`. + assert record == {'time': 1583807917937, 'humidity': 83.1, 'temperature': 42.84} + yield record + + @pytest_twisted.inlineCallbacks def test_timestamp_seconds_integer(machinery, create_influxdb, reset_influxdb): """ diff --git a/test/util.py b/test/util.py index feaee391..c3128a02 100644 --- a/test/util.py +++ b/test/util.py @@ -7,9 +7,12 @@ import string import sys import typing as t +from collections import OrderedDict import pytest import requests +from crate import client as cratedb_client +from crate.client.exceptions import ProgrammingError from influxdb import InfluxDBClient from influxdb.exceptions import InfluxDBClientError from pyinfluxql import Query @@ -192,6 +195,142 @@ def reset_measurement(): return reset_measurement + +class CrateDBWrapper: + """ + Utilities for testing with CrateDB. + + Those helper functions are mostly used for test layer setup/teardown purposes. + """ + + def __init__(self, database, measurement): + self.database = database + self.measurement = measurement + self.client = self.client_factory() + self.create = None + self.reset = None + + @staticmethod + def client_factory(): + """ + Database client adapter factory. + """ + # FIXME: Connectivity parameters are hardcoded. + # TODO: Get configuration parameters from .ini file or from runtime application. + return cratedb_client.connect( + 'localhost:4200', + username="crate", + pool_size=20, + # TODO: Does configuring `timeout` actually work? + timeout=2, + ) + + def get_tablename(self): + """ + Provide table name per SensorWAN specification. + """ + return f"{self.database}.{self.measurement}" + + def query(self): + """ + Query CrateDB and respond with results in suitable shape. + + Make sure to synchronize data by using `REFRESH TABLE ...` before running + the actual `SELECT` statement. This is applicable in test case scenarios. + + Response format:: + + [ + { + "time": ..., + "tags": {"city": "berlin", "location": "balcony"}, + "fields": {"temperature": 42.42, "humidity": 84.84}, + }, + ... + ] + """ + logger.info('CrateDB: Querying database') + db_table = self.get_tablename() + self.execute(f"REFRESH TABLE {db_table};") + result = self.execute(f"SELECT * FROM {db_table};") + cols = result["cols"] + rows = result["rows"] + records = [] + for row in rows: + # Build a merged record from `tags` and `fields`. + item = dict(zip(cols, row)) + record = OrderedDict() + record.update({"time": item["time"]}) + record.update(item["tags"]) + record.update(item["fields"]) + records.append(record) + return records + + def execute(self, expression): + """ + Actually execute the database query, using a cursor. + """ + cursor = self.client.cursor() + cursor.execute(expression) + result = cursor._result + cursor.close() + return result + + def get_record(self, index=None): + """ + Convenience method for getting specific records. + """ + records = self.query() + + # Check number of records. + assert len(records) >= 1, "No data in database: len(result) = {}".format(len(records)) + + # Pick and return requested record. + return records[index] + + def get_first_record(self): + """ + Convenience method for getting the first record. + """ + return self.get_record(index=0) + + def drop_table(self, tablename: str): + """ + Drop the table on test suite teardown. + """ + sql_ddl = f"DROP TABLE {tablename}" + self.execute(sql_ddl) + + def make_create_db(self): + """ + Support fixture for test suite setup: Creates the `database` entity. + + Attention: Creating a database is effectively a no-op with CrateDB, so + this is only here for symmetry reasons. + """ + @pytest.fixture(scope="package") + def create(): + # logger.info('CrateDB: Creating database') + # self.client.drop_database(self.database) + # self.client.create_database(self.database) + pass + return create + + def make_reset_measurement(self): + """ + Support fixture for test suite setup: Make sure to start without existing tables. + """ + + @pytest.fixture(scope="function") + def reset_measurement(): + logger.info('CrateDB: Resetting database') + # Clear out the database table. + try: + self.drop_table(self.get_tablename()) + except ProgrammingError as ex: + if "SchemaUnknownException" not in ex.message: + raise + return reset_measurement From 4be8ea04d60c5f4be36065e98c379d23068f999a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 11 Jun 2023 01:11:47 +0200 Subject: [PATCH 2/7] [CrateDB] Add support for Grafana instant dashboards --- kotori/daq/graphing/grafana/dashboard.py | 25 ++++++++-- kotori/daq/graphing/grafana/manager.py | 47 +++++++++++++++---- .../resources/grafana-target-cratedb.json | 10 ++++ ...get.json => grafana-target-influxdb1.json} | 0 kotori/daq/model.py | 8 ++++ test/conftest.py | 4 +- test/settings/mqttkit.py | 1 + test/test_daq_grafana.py | 44 +++++++++++++++-- test/util.py | 25 +++++++--- 9 files changed, 140 insertions(+), 24 deletions(-) create mode 100644 kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json rename kotori/daq/graphing/grafana/resources/{grafana-target.json => grafana-target-influxdb1.json} (100%) create mode 100644 kotori/daq/model.py diff --git a/kotori/daq/graphing/grafana/dashboard.py b/kotori/daq/graphing/grafana/dashboard.py index 1df9702b..240f9bd6 100644 --- a/kotori/daq/graphing/grafana/dashboard.py +++ b/kotori/daq/graphing/grafana/dashboard.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# (c) 2015-2021 Andreas Motl, +# (c) 2015-2023 Andreas Motl, import os import json @@ -10,6 +10,8 @@ from twisted.logger import Logger from pyramid.settings import asbool +from kotori.daq.model import TimeseriesDatabaseType + log = Logger() @attr.s @@ -17,6 +19,8 @@ class GrafanaDashboardModel(object): name = attr.ib() title = attr.ib() datasource = attr.ib() + database_type: TimeseriesDatabaseType = attr.ib() + database_name = attr.ib() measurement_sensors = attr.ib() measurement_events = attr.ib() uid = attr.ib(default=None) @@ -49,6 +53,7 @@ def make(self, data=None): # Wrap everything into convenience object dashboard = GrafanaDashboard( channel=self.channel, + model=self.model, uid=dashboard_uid, title=dashboard_title, datasource=datasource, @@ -302,8 +307,9 @@ def use_field(field_name: str): class GrafanaDashboard(object): - def __init__(self, channel=None, uid=None, title='default', datasource='default', folder_id=None, dashboard_data=None): + def __init__(self, channel=None, model=None, uid=None, title='default', datasource='default', folder_id=None, dashboard_data=None): self.channel = channel or Munch() + self.model: GrafanaDashboardModel = model self.dashboard_uid = uid self.dashboard_title = title self.datasource = datasource @@ -351,11 +357,17 @@ def __init__(self, channel=None, uid=None, title='default', datasource='default' if panel_ids: self.panel_id = max(panel_ids) + if self.model.database_type is TimeseriesDatabaseType.CRATEDB: + target_template = 'grafana-target-cratedb.json' + elif self.model.database_type is TimeseriesDatabaseType.INFLUXDB1: + target_template = 'grafana-target-influxdb1.json' + else: + raise ValueError(f"Unknown database type: {self.model.database_type}") self.tpl_dashboard = self.get_template('grafana-dashboard.json') self.tpl_annotation = self.get_template('grafana-annotation.json') self.tpl_panel = self.get_template('grafana-panel.json') - self.tpl_target = self.get_template('grafana-target.json') + self.tpl_target = self.get_template(target_template) def get_template(self, filename): filename = os.path.join('resources', filename) @@ -450,8 +462,15 @@ def build_panel(self, panel, measurement): log.failure(u'Failed building valid JSON for Grafana panel. data={data}, json={json}', data=data_panel, json=panel_json) + def get_tablename(self): + """ + Produce full-qualified table name, like `.`, or `.
`. + """ + return f"{self.model.database_name}.{self.model.measurement_sensors}" + def get_target(self, panel, measurement, fieldname): data_target = { + 'table': self.get_tablename(), 'measurement': measurement, 'name': fieldname, 'alias': fieldname, diff --git a/kotori/daq/graphing/grafana/manager.py b/kotori/daq/graphing/grafana/manager.py index 561b63a6..acaf5256 100644 --- a/kotori/daq/graphing/grafana/manager.py +++ b/kotori/daq/graphing/grafana/manager.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- -# (c) 2015-2021 Andreas Motl, +# (c) 2015-2023 Andreas Motl, import arrow from twisted.logger import Logger from twisted.application.service import MultiService +from kotori.daq.model import TimeseriesDatabaseType from kotori.daq.services import MultiServiceMixin from kotori.daq.graphing.grafana.api import GrafanaApi from kotori.daq.graphing.grafana.dashboard import GrafanaDashboardBuilder, GrafanaDashboardModel @@ -25,6 +26,13 @@ def __init__(self, settings=None, channel=None): # Shortcut to global settings self.config = settings + if "cratedb" in self.config: + self.dbtype = TimeseriesDatabaseType.CRATEDB + elif "influxdb" in self.config: + self.dbtype = TimeseriesDatabaseType.INFLUXDB1 + else: + raise ValueError("Timeseries database type not defined") + if not 'port' in self.config['grafana']: self.config['grafana']['port'] = '3000' @@ -76,15 +84,34 @@ def create_datasource(self, storage_location): datasource_name = storage_location.database - self.grafana_api.create_datasource(datasource_name, { - "type": "influxdb", - "url": "http://{host}:{port}/".format( - host=self.config['influxdb']['host'], - port=int(self.config['influxdb'].get('port', '8086'))), - "database": storage_location.database, - "user": self.config['influxdb']['username'], - "password": self.config['influxdb']['password'], + if self.dbtype is TimeseriesDatabaseType.CRATEDB: + db_config = self.config['cratedb'] + self.grafana_api.create_datasource(datasource_name, { + "type": "postgres", + "url": "{host}:{port}".format( + host=db_config.get('host', 'localhost'), + port=int(db_config.get('port', '5432'))), + "database": storage_location.database, + "user": db_config.get('username', 'crate'), + "password": db_config.get('password'), + "jsonData": { + "sslmode": "disable", + "postgresVersion": 1400, + }, + }) + elif self.dbtype is TimeseriesDatabaseType.INFLUXDB1: + self.grafana_api.create_datasource(datasource_name, { + "type": "influxdb", + "url": "http://{host}:{port}/".format( + host=self.config['influxdb']['host'], + port=int(self.config['influxdb'].get('port', '8086'))), + "database": storage_location.database, + "user": self.config['influxdb']['username'], + "password": self.config['influxdb']['password'], }) + else: + log.warn("No time-series database enabled, skipping Grafana provisioning") + return datasource_name @@ -145,6 +172,8 @@ def provision(self, storage_location, data, topology=None): name=dashboard_identity.name, title=dashboard_identity.title, datasource=datasource_name, + database_type=self.dbtype, + database_name=storage_location.database, measurement_sensors=storage_location.measurement, measurement_events=storage_location.measurement_events ) diff --git a/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json b/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json new file mode 100644 index 00000000..0ecda7e7 --- /dev/null +++ b/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json @@ -0,0 +1,10 @@ +{ + "alias": "{{ alias }}", + "format": "table", + "resultFormat": "time_series", + "tags": {{ tags }}, + "groupByTags": [], + "measurement": "{{ measurement }}", + "rawQuery": true, + "rawSql": "SELECT time, fields['{{ name }}'] AS {{ alias }} FROM {{ table }} WHERE $__timeFilter(time)" +} diff --git a/kotori/daq/graphing/grafana/resources/grafana-target.json b/kotori/daq/graphing/grafana/resources/grafana-target-influxdb1.json similarity index 100% rename from kotori/daq/graphing/grafana/resources/grafana-target.json rename to kotori/daq/graphing/grafana/resources/grafana-target-influxdb1.json diff --git a/kotori/daq/model.py b/kotori/daq/model.py new file mode 100644 index 00000000..10cf4903 --- /dev/null +++ b/kotori/daq/model.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +# (c) 2023 Andreas Motl, +from enum import Enum + + +class TimeseriesDatabaseType(Enum): + CRATEDB = "cratedb" + INFLUXDB1 = "influxdb" diff --git a/test/conftest.py b/test/conftest.py index 52ec53b4..13bcd684 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -10,6 +10,7 @@ import pytest_twisted from kotori import KotoriBootloader +from kotori.daq.model import TimeseriesDatabaseType from test.util import boot_kotori, sleep from test.settings.mqttkit import cratedb_sensors, influx_sensors, influx_events, grafana, device_influx_sensors @@ -52,7 +53,8 @@ def machinery(): reset_cratedb = cratedb_sensors.make_reset_measurement() create_influxdb = influx_sensors.make_create_db() reset_influxdb = influx_sensors.make_reset_measurement() -reset_grafana = grafana.make_reset() +reset_grafana = grafana.make_reset(dbtype=TimeseriesDatabaseType.INFLUXDB1) +reset_grafana_cratedb = grafana.make_reset(dbtype=TimeseriesDatabaseType.CRATEDB) reset_influxdb_events = influx_events.make_reset_measurement() device_create_influxdb = device_influx_sensors.make_create_db() diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index 7ebc0f2d..19843ade 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -15,6 +15,7 @@ class TestSettings: cratedb_measurement_sensors = 'foo_bar_sensors' cratedb_measurement_events = 'foo_bar_events' mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json' + grafana2_dashboards = ['mqttkit-2-itest', 'mqttkit-2-itest3'] # InfluxDB settings. influx_database = 'mqttkit_1_itest' diff --git a/test/test_daq_grafana.py b/test/test_daq_grafana.py index 86a919d5..a2cfe58f 100644 --- a/test/test_daq_grafana.py +++ b/test/test_daq_grafana.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# (c) 2020-2021 Andreas Motl +# (c) 2020-2023 Andreas Motl import logging import pytest @@ -13,10 +13,11 @@ @pytest_twisted.inlineCallbacks @pytest.mark.grafana -def test_mqtt_to_grafana_single(machinery, create_influxdb, reset_influxdb, reset_grafana): +@pytest.mark.influxdb +def test_mqtt_influxdb_grafana_single(machinery, create_influxdb, reset_influxdb, reset_grafana): """ - Publish single reading in JSON format to MQTT broker and proof - that a corresponding datasource and a dashboard was created in Grafana. + Publish single reading in JSON format to MQTT broker and proof that a + corresponding InfluxDB datasource and a dashboard was created in Grafana. """ # Submit a single measurement, without timestamp. @@ -45,6 +46,41 @@ def test_mqtt_to_grafana_single(machinery, create_influxdb, reset_influxdb, rese assert 'temperature' in target['query'] or 'humidity' in target['query'] +@pytest_twisted.inlineCallbacks +@pytest.mark.grafana +@pytest.mark.cratedb +def test_mqtt_cratedb_grafana_single(machinery_cratedb, reset_cratedb, reset_grafana_cratedb): + """ + Publish single reading in JSON format to MQTT broker and proof that a + corresponding CrateDB datasource and a dashboard was created in Grafana. + """ + + # Submit a single measurement, without timestamp. + data = { + 'temperature': 42.84, + 'humidity': 83.1, + } + yield mqtt_json_sensor(settings.mqtt2_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that Grafana is well provisioned. + logger.info('Grafana: Checking datasource') + assert settings.cratedb_database in grafana.get_datasource_names() + + logger.info('Grafana: Retrieving dashboard') + dashboard_name = settings.grafana2_dashboards[0] + dashboard = grafana.get_dashboard_by_name(dashboard_name) + + logger.info('Grafana: Checking dashboard layout') + target = dashboard['rows'][0]['panels'][0]['targets'][0] + assert target['measurement'] == settings.cratedb_measurement_sensors + assert target['rawSql'] == "SELECT time, fields['humidity'] AS humidity FROM mqttkit_2_itest.foo_bar_sensors WHERE $__timeFilter(time)" + + @pytest_twisted.inlineCallbacks @pytest.mark.grafana def test_mqtt_to_grafana_update_panel(machinery, create_influxdb, reset_influxdb, reset_grafana): diff --git a/test/util.py b/test/util.py index c3128a02..f80a2d2b 100644 --- a/test/util.py +++ b/test/util.py @@ -22,6 +22,7 @@ import kotori from kotori.daq.graphing.grafana.manager import GrafanaManager +from kotori.daq.model import TimeseriesDatabaseType logger = logging.getLogger(__name__) @@ -90,10 +91,19 @@ def get_field_names(self, dashboard_name, panel_index): field_names = sorted(map(lambda x: x["fields"][0]["name"], panels[panel_index]['targets'])) return field_names - def make_reset(self): + def make_reset(self, dbtype: TimeseriesDatabaseType = TimeseriesDatabaseType.INFLUXDB1): + + if dbtype is TimeseriesDatabaseType.CRATEDB: + database = self.settings.cratedb_database + databases = getattr(self.settings, "cratedb_databases", []) + dashboards = self.settings.grafana2_dashboards + elif dbtype is TimeseriesDatabaseType.INFLUXDB1: + database = self.settings.influx_database + databases = getattr(self.settings, "influx_databases", []) + dashboards = self.settings.grafana_dashboards @pytest.fixture(scope="function") - def reset_grafana(machinery): + def resetfun(machinery, machinery_cratedb): """ Fixture to delete the Grafana datasource and dashboard. """ @@ -103,13 +113,12 @@ def reset_grafana(machinery): for datasource in self.client.datasources.get(): datasource_name = datasource['name'] logger.info(f"Attempt to delete datasource {datasource_name}") - if datasource_name == self.settings.influx_database or \ - datasource_name in getattr(self.settings, "influx_databases", []): + if datasource_name == database or datasource_name in databases: datasource_id = datasource['id'] self.client.datasources[datasource_id].delete() logger.info(f"Successfully deleted datasource {datasource_name}") - for dashboard_name in self.settings.grafana_dashboards: + for dashboard_name in dashboards: logger.info(f"Attempt to delete dashboard {dashboard_name}") try: dashboard = self.get_dashboard_by_name(dashboard_name) @@ -121,14 +130,16 @@ def reset_grafana(machinery): raise # Find all `GrafanaManager` service instances and invoke `KeyCache.reset()` on them. - if machinery: + for machinery in [machinery, machinery_cratedb]: + if machinery is None: + continue for app in machinery.applications: for service in app.services: for subservice in service.services: if isinstance(subservice, GrafanaManager): subservice.keycache.reset() - return reset_grafana + return resetfun class InfluxWrapper: From 3853f09584d859afada557c571e349d650c31839 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 18 Jun 2023 00:07:36 +0200 Subject: [PATCH 3/7] [CrateDB] Improve standard SQL statement for Grafana instant dashboards Apply time bucketing over interval obtained from Grafana's date range picker. Emulate `GROUP BY DATE_BIN()` by using Grafana's `$__timeGroupAlias` macro for casting `$__interval` values, until CrateDB's `DATE_BIN()` function understands Grafana's native interval values. --- .../graphing/grafana/resources/grafana-target-cratedb.json | 2 +- test/test_daq_grafana.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json b/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json index 0ecda7e7..cc5abdf7 100644 --- a/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json +++ b/kotori/daq/graphing/grafana/resources/grafana-target-cratedb.json @@ -6,5 +6,5 @@ "groupByTags": [], "measurement": "{{ measurement }}", "rawQuery": true, - "rawSql": "SELECT time, fields['{{ name }}'] AS {{ alias }} FROM {{ table }} WHERE $__timeFilter(time)" + "rawSql": "SELECT $__timeGroupAlias(time, $__interval), MEAN(fields['{{ name }}']) AS {{ alias }} FROM {{ table }} WHERE $__timeFilter(time) GROUP BY time ORDER BY time" } diff --git a/test/test_daq_grafana.py b/test/test_daq_grafana.py index a2cfe58f..5d31c04c 100644 --- a/test/test_daq_grafana.py +++ b/test/test_daq_grafana.py @@ -78,7 +78,9 @@ def test_mqtt_cratedb_grafana_single(machinery_cratedb, reset_cratedb, reset_gra logger.info('Grafana: Checking dashboard layout') target = dashboard['rows'][0]['panels'][0]['targets'][0] assert target['measurement'] == settings.cratedb_measurement_sensors - assert target['rawSql'] == "SELECT time, fields['humidity'] AS humidity FROM mqttkit_2_itest.foo_bar_sensors WHERE $__timeFilter(time)" + assert target['rawSql'] == \ + "SELECT $__timeGroupAlias(time, $__interval), MEAN(fields['humidity']) AS humidity " \ + "FROM mqttkit_2_itest.foo_bar_sensors WHERE $__timeFilter(time) GROUP BY time ORDER BY time" @pytest_twisted.inlineCallbacks From 20418da97d2644425d8b5afada6f90362108068f Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 18 Jun 2023 17:30:49 +0200 Subject: [PATCH 4/7] [CrateDB] Add support for the data export subsystem --- etc/test/cratedb.ini | 34 +++++++++++++++ etc/test/main.ini | 2 +- kotori/daq/storage/cratedb.py | 81 ++++++++++++++++++++++++++++++++++- kotori/io/export/database.py | 5 +++ kotori/io/protocol/cratedb.py | 40 +++++++++++++++++ kotori/io/protocol/target.py | 6 +-- test/settings/mqttkit.py | 1 + test/test_export.py | 19 ++++++++ test/util.py | 5 +++ 9 files changed, 187 insertions(+), 6 deletions(-) create mode 100644 kotori/io/protocol/cratedb.py diff --git a/etc/test/cratedb.ini b/etc/test/cratedb.ini index ee3356c1..b0bbc714 100644 --- a/etc/test/cratedb.ini +++ b/etc/test/cratedb.ini @@ -3,6 +3,18 @@ ; ###################################### +; ========================================== +; General settings +; ========================================== + +; http server +[kotori] + +; TODO: Refactor to [http] section, or, even better, into the channel configuration. +http_listen = localhost +http_port = 24643 + + ; ===================== ; Connectivity settings ; ===================== @@ -39,3 +51,25 @@ type = application realm = mqttkit-2 mqtt_topics = mqttkit-2/# application = kotori.daq.application.mqttkit:mqttkit_application + +[mqttkit-2.http-api-generic] +enable = true + +type = application +application = kotori.io.protocol.forwarder:boot + +realm = mqttkit-2 +source = http:/api/mqttkit-2/{address:.*}/{slot:(data|event)} [POST] +target = mqtt:/mqttkit-2/{address}/{slot}.json + +[mqttkit-2.cratedb-data-export] +enable = true + +type = application +application = kotori.io.protocol.forwarder:boot + +realm = mqttkit-2 +source = http:/api/{realm:mqttkit-2}/{network:.*}/{gateway:.*}/{node:.*}/{slot:(data|event)}.{suffix} [GET] +target = cratedb:/{database}?measurement={measurement} +transform = kotori.daq.strategy.wan:WanBusStrategy.topology_to_storage, + kotori.io.protocol.cratedb:QueryTransformer.transform diff --git a/etc/test/main.ini b/etc/test/main.ini index a7838247..0b00892b 100644 --- a/etc/test/main.ini +++ b/etc/test/main.ini @@ -15,7 +15,7 @@ include = ; http server [kotori] -; TODO: Refactor to [http] section. +; TODO: Refactor to [http] section, or, even better, into the channel configuration. http_listen = localhost http_port = 24642 diff --git a/kotori/daq/storage/cratedb.py b/kotori/daq/storage/cratedb.py index 81440836..07e508fc 100644 --- a/kotori/daq/storage/cratedb.py +++ b/kotori/daq/storage/cratedb.py @@ -1,17 +1,21 @@ # -*- coding: utf-8 -*- # (c) 2023 Andreas Motl import calendar +import functools import json +from collections import OrderedDict from decimal import Decimal from copy import deepcopy -from datetime import datetime, date +from datetime import datetime, date, timezone import crate.client.http import pytz import requests from crate import client +from crate.client.converter import DefaultTypeConverter from crate.client.exceptions import ProgrammingError from funcy import project +from munch import Munch from twisted.logger import Logger from kotori.daq.storage.util import format_chunk @@ -19,7 +23,7 @@ log = Logger() -class CrateDBAdapter(object): +class CrateDBAdapter: """ Kotori database backend adapter for CrateDB. @@ -88,6 +92,79 @@ def create_table(self, tablename): cursor.execute(sql_ddl) cursor.close() + def query(self, expression: str, tdata: Munch = None): + """ + Query CrateDB and respond with results in suitable shape. + + Make sure to synchronize data by using `REFRESH TABLE ...` before running + the actual `SELECT` statement. This is applicable in test case scenarios. + + Response format:: + + [ + { + "time": ..., + "tags": {"city": "berlin", "location": "balcony"}, + "fields": {"temperature": 42.42, "humidity": 84.84}, + }, + ... + ] + + TODO: Unify with `kotori.test.util:CrateDBWrapper.query`. + """ + + log.info(f"Database query: {expression}") + + tdata = tdata or {} + + # Before reading data from CrateDB, synchronize it. + # Currently, it is mostly needed to satisfy synchronization constraints when running the test suite. + # However, users also may expect to see data "immediately". On the other hand, in order to satisfy + # different needs, this should be made configurable per realm, channel and/or request. + # TODO: Maybe just _optionally_ synchronize with the database when reading data. + if tdata: + refresh_sql = f"REFRESH TABLE {self.get_tablename(tdata)}" + self.execute(refresh_sql) + + def dict_from_row(columns, row): + """ + https://stackoverflow.com/questions/3300464/how-can-i-get-dict-from-sqlite-query + https://stackoverflow.com/questions/4147707/python-mysqldb-sqlite-result-as-dictionary + """ + return dict(zip(columns, row)) + + def record_from_dict(item): + record = OrderedDict() + record.update({"time": item["time"]}) + record.update(item["tags"]) + record.update(item["fields"]) + return record + + # Query database, with convenience data type converters. Assume timestamps to be in UTC. + cursor = self.db_client.cursor(converter=DefaultTypeConverter(), time_zone=timezone.utc) + cursor.execute(expression) + data_raw = cursor.fetchall() + + # Provide fully-qualified records to downstream components, including column names. + column_names = [column_info[0] for column_info in cursor.description] + data_tags_fields = map(functools.partial(dict_from_row, column_names), data_raw) + + # Bring results into classic "records" shape. + data_records = map(record_from_dict, data_tags_fields) + + cursor.close() + return data_records + + def execute(self, expression: str): + """ + Execute a database query, using a cursor, and return its results. + """ + cursor = self.db_client.cursor() + cursor.execute(expression) + result = cursor._result + cursor.close() + return result + def write(self, meta, data): """ Format ingress data chunk and store it into database table. diff --git a/kotori/io/export/database.py b/kotori/io/export/database.py index dea6ff83..4bda1747 100644 --- a/kotori/io/export/database.py +++ b/kotori/io/export/database.py @@ -31,6 +31,11 @@ def query(self): settings=self.settings.influxdb, database=bucket.tdata.database, ) + elif "cratedb" in self.settings: + from kotori.daq.storage.cratedb import CrateDBAdapter + database = CrateDBAdapter( + settings=self.settings.cratedb, + ) else: log.warn("No time-series database configured") return diff --git a/kotori/io/protocol/cratedb.py b/kotori/io/protocol/cratedb.py new file mode 100644 index 00000000..55cde8bc --- /dev/null +++ b/kotori/io/protocol/cratedb.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# (c) 2023 Andreas Motl +from twisted.logger import Logger + +from kotori.io.protocol.util import compute_daterange + +log = Logger() + + +class QueryTransformer: + + @classmethod + def transform(cls, data): + """ + Compute CrateDB query expression from data in transformation dictionary. + Also compute date range from query parameters "from" and "to". + """ + + log.info(f"Querying database: {data}") + + # The PyInfluxQL query generator is versatile enough to be used for all SQL databases. + from pyinfluxql import Query + + # TODO: Use ".date_range" API method + time_begin, time_end = compute_daterange(data.get('from'), data.get('to')) + + # TODO: Add querying by tags. + tags = {} + # tags = CrateDBAdapter.get_tags(data) + + table = f"{data.database}.{data.measurement}" + expression = Query('*').from_(table).where(time__gte=time_begin, time__lte=time_end, **tags) + + result = { + 'expression': str(expression), + 'time_begin': time_begin, + 'time_end': time_end, + } + + return result diff --git a/kotori/io/protocol/target.py b/kotori/io/protocol/target.py index 3c18dd16..db977a2c 100644 --- a/kotori/io/protocol/target.py +++ b/kotori/io/protocol/target.py @@ -61,8 +61,8 @@ def setupService(self): except Exception as ex: log.failure("Connecting to MQTT broker failed: {ex}", ex=last_error_and_traceback()) - elif self.scheme == 'influxdb': - # InfluxDB has no subsystem service, it's just an adapter + # CrateDB and InfluxDB are not subsystem services, they are just adapters. + elif self.scheme in ['cratedb', 'influxdb']: pass else: @@ -87,7 +87,7 @@ def emit(self, uri, bucket): # TODO: Use threads.deferToThread here? return self.downstream.publish(topic, payload) - elif self.scheme == 'influxdb': + elif self.scheme in ['cratedb', 'influxdb']: # InfluxDB query wrapper using expression derived from transformation data dfq = DataFrameQuery(settings=self.settings, bucket=bucket) diff --git a/test/settings/mqttkit.py b/test/settings/mqttkit.py index 19843ade..41b28653 100644 --- a/test/settings/mqttkit.py +++ b/test/settings/mqttkit.py @@ -16,6 +16,7 @@ class TestSettings: cratedb_measurement_events = 'foo_bar_events' mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json' grafana2_dashboards = ['mqttkit-2-itest', 'mqttkit-2-itest3'] + io_channel_path = '/mqttkit-2/itest/foo/bar/data' # InfluxDB settings. influx_database = 'mqttkit_1_itest' diff --git a/test/test_export.py b/test/test_export.py index c4a89b2a..e9101499 100644 --- a/test/test_export.py +++ b/test/test_export.py @@ -23,6 +23,25 @@ ts_to = '2020-03-10T23:59:59.000Z' +@pytest_twisted.inlineCallbacks +@pytest.mark.http +@pytest.mark.export +@pytest.mark.cratedb +def test_export_cratedb_general(machinery_cratedb, reset_cratedb): + """ + Submit single reading in JSON format to HTTP API and proof + it can be retrieved back from the HTTP API in different formats. + + This uses CrateDB as timeseries database. + """ + + channel_path = settings.io_channel_path + http_submit = functools.partial(http_json_sensor, port=24643) + http_fetch = functools.partial(http_get_data, port=24643) + + yield verify_export_general(channel_path, http_submit, http_fetch) + + @pytest_twisted.inlineCallbacks @pytest.mark.http @pytest.mark.export diff --git a/test/util.py b/test/util.py index f80a2d2b..76b70a63 100644 --- a/test/util.py +++ b/test/util.py @@ -259,6 +259,9 @@ def query(self): }, ... ] + + TODO: Refactor to / unify with `kotori.daq.storage.cratedb:CrateDBAdapter.query`. + TODO: Add dict-based record retrieval as a row factory to `crate` library. """ logger.info('CrateDB: Querying database') db_table = self.get_tablename() @@ -280,6 +283,8 @@ def query(self): def execute(self, expression): """ Actually execute the database query, using a cursor. + + TODO: Use `kotori.daq.storage.cratedb:CrateDBAdapter.execute`. """ cursor = self.client.cursor() cursor.execute(expression) From fb5d55ff5ebc88ef98b59652ee2d8118116f982c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 20 Jun 2023 14:15:45 +0200 Subject: [PATCH 5/7] [CrateDB] Add documentation across the board --- CHANGES.rst | 1 + CONTRIBUTORS.rst | 19 +- README.rst | 7 +- doc/source/_meta.rst | 2 +- doc/source/_resources.rst | 5 + doc/source/about/scenarios.rst | 4 +- doc/source/about/technologies.rst | 12 +- doc/source/database/cratedb.rst | 542 ++++++++++++++++++ doc/source/database/index.md | 30 + .../development/releasing/foundation.rst | 20 +- doc/source/development/releasing/index.rst | 2 +- doc/source/development/systemd.rst | 4 +- doc/source/development/tests.rst | 6 +- .../handbook/acquisition/error-signalling.rst | 28 +- doc/source/handbook/index.rst | 1 + doc/source/handbook/usage/cratedb.rst | 36 ++ doc/source/integration/tts-ttn.rst | 1 + doc/source/setup/docker.rst | 34 +- doc/source/setup/index.rst | 4 +- doc/source/setup/linux-arch.rst | 1 + doc/source/setup/linux-debian.rst | 30 +- doc/source/setup/macos.rst | 18 +- doc/source/setup/sandbox.rst | 4 +- doc/source/setup/security.rst | 13 + etc/docker/docker-cratedb.ini | 56 ++ etc/test/cratedb.ini | 6 +- 26 files changed, 832 insertions(+), 54 deletions(-) create mode 100644 doc/source/database/cratedb.rst create mode 100644 doc/source/handbook/usage/cratedb.rst create mode 100644 etc/docker/docker-cratedb.ini diff --git a/CHANGES.rst b/CHANGES.rst index 450b6e45..4f1799e1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -27,6 +27,7 @@ in progress - [docs] Refactor "decoders" section to "integrations", and improve index/overview page - [export] Improve export capabilities by adding parameters ``sort``, ``direction``, ``limit``, and ``scalar``. Thanks, @ClemensGruber. +- [CrateDB] Add support for data acquisition and data export .. _kotori-0.27.0: diff --git a/CONTRIBUTORS.rst b/CONTRIBUTORS.rst index aabf5472..89095966 100644 --- a/CONTRIBUTORS.rst +++ b/CONTRIBUTORS.rst @@ -14,39 +14,52 @@ contributed to Kotori in one way or another. * Anne-Julia Scheuermann * Carolin Johannsen * Clemens Gruber +* Georg Traar * Georges Gagliano * Géraldine Farcy-Merlo +* Hernan Cianfagna * Jan Hoffmann * Jens Schmelkus * Karsten Harazim * Lorenzo Pons +* Marija Selakovic * Markus Euskirchen * Matthias Büchner * Matthias Mehldau * Michael Haberler +* Niklas Schmidtmer * Richard Pobering * Roh * Rui Alexandre Teixeira Sousa Pinto * Sebastian Donner * Sebastian Henneke +* Sebastian Utz * Smilie * Thorsten Kluß +* Walter Behmann * Weef -Thanks a bunch! +Thanks a stack. **************** Acknowledgements **************** + Kotori would not have been possible without all the amazing people -working on the infrastructure the software is leveraging. +working on the infrastructure the software is leveraging and building +upon. - Glyph Lefkowitz et al. for conceiving, building and maintaining the Twisted network programming framework. +- Dr. Andy Stanford-Clark and Arlen Nipper for inventing the MQTT + messaging protocol. - Roger Light et al. for conceiving, building and maintaining the Mosquitto MQTT broker. - Torkel Ödegaard and his team for creating and maintaining Grafana. +- Bernd Dorn, Bernd Rössl, Jodok Batlogg, Bernhard Kuzel, Mathias + Fußenegger, Sebastian Utz, et al., for conceiving and maintaining + CrateDB. - Paul Dix and his team for creating and maintaining InfluxDB. - The PostgreSQL and MongoDB developers for creating and maintaining their database systems. @@ -59,4 +72,4 @@ working on the infrastructure the software is leveraging. to put everything together. Thank you so much for providing such great infrastructure -components and resources to the community! You know who you are. +components and resources to the community. You know who you are. diff --git a/README.rst b/README.rst index 810294eb..1e7a9d08 100644 --- a/README.rst +++ b/README.rst @@ -76,6 +76,10 @@ Kotori :target: https://github.com/grafana/grafana :alt: Supported Grafana versions + .. image:: https://img.shields.io/badge/CrateDB-4.2%20--%205.x-blue.svg + :target: https://github.com/crate/crate + :alt: Supported CrateDB versions + .. image:: https://img.shields.io/badge/InfluxDB-1.6%2C%201.7%2C%201.8-blue.svg :target: https://github.com/influxdata/influxdb :alt: Supported InfluxDB versions @@ -105,7 +109,7 @@ Details Kotori takes the role of the `data historian`_ component within a `SCADA`_ / `MDE`_ system, exclusively built upon industry-grade `free and open-source software`_ -like Grafana_, Mosquitto_, or InfluxDB_. It is written in Python_, +like Grafana_, Mosquitto_, CrateDB_, or InfluxDB_. It is written in Python_, and uses the Twisted_ networking library. The best way to find out what you can do with Kotori, is by looking at @@ -222,6 +226,7 @@ The project is licensed under the terms of the GNU AGPL license, see `LICENSE`_. .. _Autobahn: https://crossbar.io/autobahn/ .. _contributors: https://getkotori.org/docs/project/contributors.html +.. _CrateDB: https://github.com/crate/crate .. _Create an issue: https://github.com/daq-tools/kotori/issues/new .. _data historian: https://en.wikipedia.org/wiki/Operational_historian .. _development sandbox: https://getkotori.org/docs/setup/sandbox.html diff --git a/doc/source/_meta.rst b/doc/source/_meta.rst index 515723cd..f1f81541 100644 --- a/doc/source/_meta.rst +++ b/doc/source/_meta.rst @@ -16,5 +16,5 @@ :keywords lang=es: adquisición de datos, gráficas, conjunto de herramientas, telemetría :keywords lang=fr: acquisition de données, graphiques, boîte à outils, télémétrie :keywords: m2m, iot, mqtt, wamp, http, rest, udp, binary struct, - Kotori, Grafana, InfluxDB, Mosquitto, MongoDB, XMPP, SMTP + Kotori, Grafana, CrateDB, InfluxDB, Mosquitto, MongoDB, XMPP, SMTP diff --git a/doc/source/_resources.rst b/doc/source/_resources.rst index 0931f922..3d6305ac 100644 --- a/doc/source/_resources.rst +++ b/doc/source/_resources.rst @@ -170,6 +170,8 @@ .. _Raumfahrtagentur: http://www.raumfahrtagentur.org/ .. NEW +.. _CrateDB: https://github.com/crate/crate +.. _CrateDB reference documentation: https://crate.io/docs/crate/reference/ .. _curl: https://en.wikipedia.org/wiki/CURL .. _Flux data scripting language: https://docs.influxdata.com/flux/ .. _Funky v3: https://harizanov.com/product/funky-v3/ @@ -178,12 +180,15 @@ .. _InfluxDB line protocol: https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/ .. _InfluxDB storage engine: https://docs.influxdata.com/influxdb/v2.7/reference/internals/storage-engine/ .. _LoRaWAN: https://en.wikipedia.org/wiki/LoRa#LoRaWAN +.. _Lucene: https://lucene.apache.org/ .. _MongoDB manual: https://www.mongodb.com/docs/manual/ .. _MongoDB Wire Protocol: https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/ .. _OpenXC: https://openxcplatform.com/ .. _OpenXC for Python: http://python.openxcplatform.com/ .. _multi-tenant: https://en.wikipedia.org/wiki/Multitenancy +.. _near real-time: https://en.wikipedia.org/wiki/Near_real-time#Near_real-time .. _NodeUSB: https://web.archive.org/web/20210621192219/http://www.nodeusb.com/ +.. _PostgreSQL wire protocol: https://www.postgresql.org/docs/16/protocol.html .. _trunking: https://en.wikipedia.org/wiki/Trunking .. _webhook: https://en.wikipedia.org/wiki/Webhook .. _webhooks: https://en.wikipedia.org/wiki/Webhook diff --git a/doc/source/about/scenarios.rst b/doc/source/about/scenarios.rst index edcfa346..8eeb20c6 100644 --- a/doc/source/about/scenarios.rst +++ b/doc/source/about/scenarios.rst @@ -85,8 +85,8 @@ We are standing on the shoulders of giants: - Leverage the open infrastructure based on Twisted_ - an event-driven networking engine - to implement custom software components. - Listen and talk M2M_ using the *MQ Telemetry Transport* connectivity protocol and software bus (MQTT_). -- Store data points into InfluxDB_, a leading open source time series database suitable - for realtime analytics and sensor data storage. +- Store data points into CrateDB_, InfluxDB_, or other open source time series databases + suitable for realtime analytics and sensor data storage. - Automate dashboard management in the context of data arriving on different data channels using Grafana_, an open source, feature rich metrics dashboard and graph editor. - Make Browser clients first-class citizens of the underpinning software bus framework diff --git a/doc/source/about/technologies.rst b/doc/source/about/technologies.rst index 62ed8abe..19e4f62b 100644 --- a/doc/source/about/technologies.rst +++ b/doc/source/about/technologies.rst @@ -26,14 +26,16 @@ Infrastructure components - Kotori_, a data acquisition, graphing and telemetry toolkit - Grafana_, a graph and dashboard builder for visualizing time series metrics -- InfluxDB_, a time-series database +- CrateDB_, a time-series database ¹ +- InfluxDB_, a time-series database ¹ - Mosquitto_, an MQTT message broker -- MongoDB_, a document store (optional) ¹² +- MongoDB_, a document store (optional) ²³ -| ¹ MongoDB is only required when doing CSV data acquisition, so it is completely +| ¹ Kotori can either use CrateDB or InfluxDB as timeseries database. +| ² MongoDB is only required when doing CSV data acquisition, so it is completely | optional for regular operations of Kotori. -| ² As MongoDB - strictly speaking - stopped being free software recently (2018/2019), -| it will probably be phased out gradually and replaced by PostgreSQL_. +| ³ As MongoDB - strictly speaking - stopped being free software recently (2018/2019), +| it will probably be phased out gradually and replaced by PostgreSQL_ or CrateDB_. Supported protocols diff --git a/doc/source/database/cratedb.rst b/doc/source/database/cratedb.rst new file mode 100644 index 00000000..517c0f4f --- /dev/null +++ b/doc/source/database/cratedb.rst @@ -0,0 +1,542 @@ +.. include:: ../_resources.rst + +.. _database-cratedb: + +####### +CrateDB +####### + + +***** +About +***** + +`CrateDB`_ is a distributed and scalable SQL database for storing and analyzing massive +amounts of data in near real-time, even with complex queries. It is PostgreSQL-compatible, +and based on `Lucene`_. + + +******* +Details +******* + +This section summarizes CrateDB's data model and query interface. + +Data model +========== + +As a time-series/document/OLAP/RDBMS database with an SQL interface, CrateDB stores records +into tables. Tables are grouped into schemas, which is equivalent to the concept of hosting +multiple databases on the same server instance. + +The schema of tables/records can be freely defined using a classic SQL DDL statement, +leveraging CrateDB's multi-modal data types. The tables can be queried also by using +classic, standards-compliant SQL DQL statements. + +.. figure:: https://github.com/daq-tools/kotori/assets/453543/fb469738-9d3e-4258-b546-1f5cd9bac261 + :width: 640 + :alt: A screenshot example of an SQL query submitted to CrateDB. + +Other than the record-based scheme of RDBMS databases, CrateDB also allows you to store +and retrieve nested data, by providing `container types`_ ``ARRAY`` and ``OBJECT``, +effectively providing document-oriented capabilities like CouchDB or MongoDB. + +On disk, CrateDB stores data into Lucene indexes. By default, all fields are indexed, +nested or not, but the indexing can be turned off selectively. + +Query interface +=============== + +Languages +--------- +CrateDB supports SQL as query language. Please inspect the :ref:`cratedb-query-examples`, +as well as the corresponding upstream documentation about how to `insert data`_ and +`query data`_. + +Protocols +--------- +CrateDB clients communicate to servers using either HTTP, or by using the `PostgreSQL +wire protocol`_, version 3. + + +************ +Key features +************ + +This section enumerates the key features of CrateDB, as advertised on its documentation. + +At a glance +=========== + +- Use standard SQL via the PostgreSQL wire protocol or an HTTP API. +- Dynamic table schemas and queryable objects provide document-oriented features in + addition to the relational features of SQL. +- Support for time-series data, realtime full-text search, geospatial data types and + search capabilities. +- Horizontally scalable, highly available and fault tolerant clusters that run very + well in virtualized and containerised environments. +- Extremely fast distributed query execution. +- Auto-partitioning, auto-sharding, and auto-replication. +- Self-healing and auto-rebalancing. +- User-defined functions (UDFs) can be used to extend the functionality of CrateDB. + +Storage layer +============= + +Lucene +------ + +The CrateDB storage layer is based on Lucene. This section enumerates some concepts of +Lucene, and the article `Indexing and Storage in CrateDB`_ goes into more details by +exploring its internal workings. + +Lucene offers scalable and high-performance indexing which enables efficient search and +aggregations over documents and rapid updates to the existing documents. Solr and +Elasticsearch are building upon the same technologies. + +- **Documents** + + A single record in Lucene is called "document", which is a unit of information for search + and indexing that contains a set of fields, where each field has a name and value. A Lucene + index can store an arbitrary number of documents, with an arbitrary number of different fields. + +- **Append-only segments** + + A Lucene index is composed of one or more sub-indexes. A sub-index is called a segment, + it is immutable, and built from a set of documents. When new documents are added to the + existing index, they are added to the next segment, while previous segments are never + modified. If the number of segments becomes too large, the system may decide to merge + some segments and discard the freed ones. This way, adding a new document does not require + rebuilding the whole index structure completely. + +- **Column store** + + For text values, other than storing the row data as-is (and indexing each value by default), + each value term is stored into a `column-based store`_ by default, which offers performance + improvements for global aggregations and groupings, and enables efficient ordering, because + the data for one column is packed at one place. + + In CrateDB, the column store is enabled by default and can be disabled only for text fields, + not for other primitive types. Furthermore, CrateDB does not support storing values for + container and geospatial types in the column store. + +Data structures +--------------- + +This section enumerates the three main Lucene data structures that are used within +CrateDB: Inverted indexes for text values, BKD trees for numeric values, and DocValues. + +- **Inverted index** + + The Lucene indexing strategy for text fields relies on a data structure called inverted + index, which is defined as a "data structure storing a mapping from content, such as + words and numbers, to its location in the database file, document or set of documents". + + Depending on the configuration of a column, the index can be plain (default) or full-text. + An index of type "plain" indexes content of one or more fields without analyzing and + tokenizing their values into terms. To create a "full-text" index, the field value is + first analyzed and based on the used analyzer, split into smaller units, such as + individual words. A full-text index is then created for each text unit separately. + + The inverted index enables a very efficient search over textual data. + +- **BKD tree** + + To optimize numeric range queries, Lucene uses an implementation of the Block KD (BKD) + tree data structure. The BKD tree index structure is suitable for indexing large + multi-dimensional point data sets. It is an I/O-efficient dynamic data structure based + on the KD tree. Contrary to its predecessors, the BKD tree maintains its high space + utilization and excellent query and update performance regardless of the number of + updates performed on it. + + Numeric range queries based on BKD trees can efficiently search numerical fields, + including fields defined as ``TIMESTAMP`` types, supporting performant date range + queries. + +- **DocValues** + + Because Lucene's inverted index data structure implementation is not optimal for + finding field values by given document identifier, and for performing column-oriented + retrieval of data, the DocValues data structure is used for those purposes instead. + + DocValues is a column-based data storage built at document index time. They store + all field values that are not analyzed as strings in a compact column, making it more + effective for sorting and aggregations. + +Clustering +========== + +Overview +-------- + +CrateDB splits tables into shards and replicas, meaning that tables are divided and +distributed across the nodes of a cluster. Each shard in CrateDB is a Lucene index +broken into segments and stored on the filesystem. + +CrateDB has been designed with clustering capabilities from the very beginning. The +clustering subsystem, effectively and efficiently distributing data amongst multiple +storage nodes, is originally based on prior art technology implementations from +Elasticsearch, in turn based on both quorum-based consensus algorithms as well as +primary-backup approaches. + +Benefits +-------- + +Database clusters are effective for storing and retrieving large amounts of data, +in the range of billions of records, and terabytes of data. + +On data retrieval, CrateDB's distributed query execution engine parallelizes query +workloads across the whole cluster. + +By distributing data to multiple machines, and properly configuring replication +parameters, you are also adding redundancy to your data, so it is protected against +data-loss resulting from fatal failures of individual storage nodes. + +Those concepts implement similar features like RAID drives, for the purposes of data +redundancy, performance improvements, or both. + +Early distributed systems and databases needed manual operations procedures, for +example to initiate node fail-over procedures. With CrateDB, the corresponding steps +around partitioning, sharding, replication, and rebalancing, are performed unattended +and automatically, effectively providing cluster self-healing capabilities. + +Complex queries +=============== + +By using the SQL query language, CrateDB provides an advanced query execution layer, +unlocking complex querying capabilities like date range filtering, sub-selects, +aggregations, JOINs, UNIONs, and CTEs, all within the same SQL statement. + +Query API +========= + +- CrateDB provides an `HTTP endpoint`_ that can be used to submit SQL queries. + As such, any HTTP client, like curl or Postman, can be used to communicate with CrateDB. + +- The standards-based `PostgreSQL wire protocol`_ interface unlocks compatibility + with a wide range of client applications which can talk to PostgreSQL servers. + +Ecosystem +========= + +CrateDB offers a wide range of connectivity options. In general, any PostgreSQL- +compatible driver or framework can be used to connect to CrateDB. + +- `Overview of CrateDB drivers and adapters`_ +- `Overview of CrateDB integration tutorials`_ + +User interface +============== + +CrateDB offers both a graphical-, and a commandline-based user interface. In general, +any PostgreSQL-compatible applications and systems can be used to connect to CrateDB. + +- CrateDB Admin, a graphical, web-based user interface, is built into CrateDB. +- ``crash`` is a command-line based terminal program, similar to ``psql``, but + with a bit more convenience. + + +.. _cratedb-query-examples: + +************** +Query examples +************** + +This section demonstrates a few query examples from CrateDB's documentation. + +Typical queries +=============== + +.. code-block:: sql + + -- Aggregations with date range filtering and + -- time bucketing using specified intervals. + SELECT + DATE_BIN('5 min'::INTERVAL, time::TIMESTAMPTZ, 0) AS time, + MEAN(fields['humidity']) AS humidity + FROM + readings + WHERE + time > NOW() - '1 hour'::INTERVAL + GROUP BY + time + ORDER BY + time; + +.. code-block:: sql + + -- An SQL DDL statement defining a custom schema for holding sensor data. + CREATE TABLE iot_data ( + timestamp TIMESTAMP WITH TIME ZONE, + sensor_data OBJECT (DYNAMIC) AS ( + temperature FLOAT, + humidity FLOAT, + location OBJECT (DYNAMIC) AS ( + latitude DOUBLE PRECISION, longitude DOUBLE PRECISION + ) + ) + ); + +.. code-block:: sql + + -- Inserting data into the table defined above. + INSERT INTO iot_data (ts, sensor_data) VALUES + -- Vienna + ('2022-01-01T01:00:00', '{"temperature": 20.3, "humidity": 50.5, "location": {"latitude": 48.2082, "longitude": 16.3738}}'), + -- Stockholm + ('2022-01-01T02:00:00', '{"temperature": 18.0, "humidity": 55.2, "location": {"latitude": 59.3293, "longitude": 18.0686}}'), + -- Tokyo + ('2022-01-01T03:00:00', '{"temperature": 24.5, "humidity": 60.8, "location": {"latitude": 35.6895, "longitude": 139.6917}}'), + -- Sydney + ('2022-01-01T04:00:00', '{"temperature": 25.7, "humidity": 65.0, "location": {"latitude": -33.8688, "longitude": 151.2093}}'); + +.. code-block:: sql + + -- Create a user-defined function to calculate the distance between two coordinates. + CREATE FUNCTION haversine_distance( + lat1 DOUBLE PRECISION, lon1 DOUBLE PRECISION, + lat2 DOUBLE PRECISION, lon2 DOUBLE PRECISION + ) RETURNS DOUBLE PRECISION LANGUAGE JAVASCRIPT AS '...'; + + -- Use the user-defined function with nested data. + SELECT + id, + haversine_distance( + sensor_data[ 'location' ][ 'latitude' ], + sensor_data[ 'location' ][ 'longitude' ], + 40.7128, -74.0060 + ) AS distance + FROM + iot_data + ORDER BY + distance; + + +Advanced queries +================ + +Time-series data +---------------- + +.. code-block:: sql + + /** + * Based on device data, this query returns the average + * of the battery level for every hour for each `device_id`. + **/ + + WITH avg_metrics AS ( + SELECT device_id, + DATE_BIN('1 hour'::INTERVAL, time, 0) AS period, + AVG(battery_level) AS avg_battery_level + FROM devices.readings + GROUP BY 1, 2 + ORDER BY 1, 2 + ) + SELECT period, + t.device_id, + manufacturer, + avg_battery_level + FROM avg_metrics t, devices.info i + WHERE t.device_id = i.device_id + AND model = 'mustang' + LIMIT 10; + + +IoT & sensor data +----------------- + +.. code-block:: sql + + /** + * Based on data acquisition from power metering devices, this query + * returns the voltage corresponding to the maximum global active power + * for each `meter_id`. + **/ + + SELECT meter_id, + MAX_BY("Voltage", "Global_active_power") AS voltage_max_global_power + FROM iot.power_consumption + GROUP BY 1 + LIMIT 10; + +Geospatial tracking +------------------- + +.. code-block:: sql + + /** + * Based on the location of the International Space Station, + * this query returns the 10 closest capital cities from + * the last known position. + **/ + + SELECT city AS "City Name", + country AS "Country", + DISTANCE(i.position, c.location)::LONG / 1000 AS "Distance [km]" + FROM demo.iss i + CROSS JOIN demo.world_cities c + WHERE capital = 'primary' + AND ts = (SELECT MAX(ts) FROM demo.iss) + ORDER BY 3 ASC + LIMIT 10; + +Log analysis +------------ + +.. code-block:: sql + + /** + * Based on system event logs, this query calculates: + * - a filter for specific messages using a full-text index + * - the number of entries per minute + * - the average scoring ratio for each matched row + **/ + + SELECT DATE_TRUNC('minute', receivedat) AS event_time, + COUNT(*) AS entries, + AVG(_score) AS avg_score + FROM "syslog"."systemevents" + WHERE MATCH(message, 'authentication failure') + USING most_fields WITH (analyzer = 'whitespace') + AND MATCH(syslogtag, 'sshd') + GROUP BY 1 + ORDER BY 1 DESC + LIMIT 10; + +Tracking analytics +------------------ + +This complex query executes in under 200 milliseconds on two tables containing +6 million records (``pageview``), respectively 35_000 records (``user_session``). + +.. code-block:: sql + + /** + * An analytics query about user visits and metrics. + * This SQL DQL statement uses date range filtering, + * sub-selects, aggregations, JOINs, UNIONs, and CTEs. + **/ + + WITH sessions AS ( + SELECT + user_id, + session_id + FROM + af_dev.user_session + WHERE + user_session.domain = 'domain.com' + AND user_session.hostname = 'www.domain.com' + AND user_session.event_time BETWEEN '2022-12-05' AND '2023-01-05' + AND user_session.device_type IS NOT NULL + ), + pageviews AS ( + SELECT + pageview.totaltime, + pageview.user_id, + pageview.event_time + FROM + af_dev.pageview + WHERE + pageview.event_time BETWEEN '2022-12-05' AND '2023-01-05' + AND pageview.domain = 'domain.com' + AND pageview.host = 'www.domain.com' + ), + visits AS ( + SELECT + MAX(totaltime) AS sess_len, + COUNT(session_id) AS sess_count, + COUNT(DISTINCT sessions.user_id) AS visitors, + MIN(event_time) AS event_time + FROM + pageviews + JOIN sessions ON pageviews.user_id = sessions.user_id + GROUP BY + pageviews.user_id, + sessions.session_id + ), + psessions AS ( + SELECT + user_id, + session_id + FROM + af_dev.user_session + WHERE + user_session.domain = 'domain.com' + AND user_session.hostname = 'www.domain.com' + AND user_session.event_time BETWEEN '2022-11-05' AND '2022-12-05' + AND user_session.device_type IS NOT NULL + ), + ppageviews AS ( + SELECT + pageview.totaltime, + pageview.user_id, + pageview.event_time + FROM + af_dev.pageview + WHERE + pageview.event_time BETWEEN '2022-11-05' AND '2022-12-05' + AND pageview.domain = 'domain.com' + AND pageview.host = 'www.domain.com' + ), + pvisits AS ( + SELECT + MAX(totaltime) AS sess_len, + COUNT(session_id) AS sess_count, + COUNT(DISTINCT psessions.user_id) AS visitors, + MIN(event_time) AS event_time + FROM + ppageviews + JOIN psessions ON ppageviews.user_id = psessions.user_id + GROUP BY + ppageviews.user_id, + psessions.session_id + ) + SELECT + MIN(event_time) AS event_date, + SUM(visitors) AS tot_vis, + SUM(visitors) FILTER (WHERE sess_count = 1) AS new_vis, + SUM(visitors) FILTER (WHERE sess_count > 1) AS ret_vis, + AVG(sess_len) AS tot_avg, + AVG(sess_len) FILTER (WHERE sess_count = 1) AS new_avg, + AVG(sess_len) FILTER (WHERE sess_count > 1) AS ret_avg + FROM visits + UNION + SELECT + MIN(event_time) AS event_date, + SUM(visitors) AS tot_vis, + SUM(visitors) FILTER (WHERE sess_count = 1) AS new_vis, + SUM(visitors) FILTER (WHERE sess_count > 1) AS ret_vis, + AVG(sess_len) AS tot_avg, + AVG(sess_len) FILTER (WHERE sess_count = 1) AS new_avg, + AVG(sess_len) FILTER (WHERE sess_count > 1) AS ret_avg + FROM pvisits; + + +***** +Usage +***** + +Purpose +======= + +Kotori uses CrateDB to store **timeseries-data** of data acquisition channels. + +Documentation +============= + +See :ref:`cratedb-handbook` and the `CrateDB reference documentation`_. + +Compatibility +============= + +Kotori supports data acquisition and export with CrateDB 4.2 and higher. + +.. _column-based store: https://crate.io/docs/crate/reference/en/latest/general/ddl/storage.html +.. _container types: https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#container-types +.. _HTTP endpoint: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html +.. _Indexing and Storage in CrateDB: https://crate.io/blog/indexing-and-storage-in-cratedb +.. _insert data: https://crate.io/docs/crate/reference/en/latest/general/dml.html#inserting-data +.. _Overview of CrateDB drivers and adapters: https://community.crate.io/t/overview-of-cratedb-drivers-and-adapters/1464 +.. _Overview of CrateDB integration tutorials: https://community.crate.io/t/overview-of-cratedb-integration-tutorials/1015 +.. _query data: https://crate.io/docs/crate/reference/en/latest/general/dql/ diff --git a/doc/source/database/index.md b/doc/source/database/index.md index b3d15aa9..3d9ca9b6 100644 --- a/doc/source/database/index.md +++ b/doc/source/database/index.md @@ -14,6 +14,7 @@ with Kotori. Adding more adapters is possible. :maxdepth: 1 :hidden: +cratedb influxdb mongodb ``` @@ -24,6 +25,35 @@ mongodb :padding: 0 +:::::{grid-item-card} +::::{grid} 2 +:margin: 0 +:padding: 0 + +:::{grid-item} +:columns: 8 +#### [](#database-cratedb) + +CrateDB is a distributed and scalable SQL database for storing and analyzing massive +amounts of data in near real-time, even with complex queries. It is PostgreSQL-compatible, +and based on Lucene. + + +Categories: timeseries-database, multi-modal database + +::: +:::{grid-item} +:columns: 4 +{bdg-primary-line}`eth` {bdg-primary-line}`wifi` {bdg-primary-line}`http` {bdg-primary-line}`postgresql` + +{bdg-success-line}`sql` {bdg-success-line}`blob` + +{bdg-secondary-line}`amd64` {bdg-secondary-line}`arm64` +::: +:::: +::::: + + :::::{grid-item-card} ::::{grid} 2 :margin: 0 diff --git a/doc/source/development/releasing/foundation.rst b/doc/source/development/releasing/foundation.rst index 8ce7102a..b51010e3 100644 --- a/doc/source/development/releasing/foundation.rst +++ b/doc/source/development/releasing/foundation.rst @@ -34,6 +34,24 @@ Download packages scp mosquitto*.deb workbench@packages.example.org:/srv/packages/organizations/elmyra/foss/aptly/public/incoming +******* +CrateDB +******* +- https://github.com/crate/crate +- https://crate.io/docs/crate/tutorials/ + +Download packages +================= +:: + + # Download amd64 packages + wget https://cdn.crate.io/downloads/apt/stable/pool/main/c/crate/crate_5.3.2-1~buster_amd64.deb + wget https://cdn.crate.io/downloads/apt/stable/pool/main/c/crate/crate_5.3.2-1~bullseye_amd64.deb + + # Upload to "incoming" directory + scp crate_*.deb workbench@packages.example.org:/srv/packages/organizations/elmyra/foss/aptly/public/incoming + + ******** InfluxDB ******** @@ -100,7 +118,7 @@ Publish packages # Add packages to repository aptly repo add -config=$APTLY_CONFIG -remove-files=true $APTLY_REPOSITORY \ - $PACKAGES_INCOMING/influxdb*.deb $PACKAGES_INCOMING/grafana*.deb $PACKAGES_INCOMING/*mosquitto*.deb + $PACKAGES_INCOMING/crate*.deb $PACKAGES_INCOMING/influxdb*.deb $PACKAGES_INCOMING/grafana*.deb $PACKAGES_INCOMING/*mosquitto*.deb # Publish repository aptly publish update -config=$APTLY_CONFIG -gpg-key=2543A838 -passphrase=esp $APTLY_DISTRIBUTION diff --git a/doc/source/development/releasing/index.rst b/doc/source/development/releasing/index.rst index 00fdc425..266acc9f 100644 --- a/doc/source/development/releasing/index.rst +++ b/doc/source/development/releasing/index.rst @@ -4,7 +4,7 @@ Releasing and packaging This section describes how to cut a new release, how to build and publish Kotori packages and how to add foundation packages -like InfluxDB and Grafana to the same package repositories. +like CrateDB, InfluxDB, and Grafana to the same package repositories. .. toctree:: :maxdepth: 1 diff --git a/doc/source/development/systemd.rst b/doc/source/development/systemd.rst index 2cfaad28..b28a93f6 100644 --- a/doc/source/development/systemd.rst +++ b/doc/source/development/systemd.rst @@ -169,12 +169,12 @@ systemd troubleshooting :: - systemctl status influxdb mosquitto grafana-server kotori + systemctl status crate influxdb mosquitto grafana-server kotori :: - systemctl list-unit-files kotori* influx* grafana* mosquitto* + systemctl list-unit-files kotori* crate* influx* grafana* mosquitto* :: diff --git a/doc/source/development/tests.rst b/doc/source/development/tests.rst index 977ca8e2..282976e4 100644 --- a/doc/source/development/tests.rst +++ b/doc/source/development/tests.rst @@ -11,7 +11,7 @@ The tests are mostly full integration tests. They are testing the whole system and the interactions between the subsystems. Messages will get published to the MQTT bus by shelling out to ``mosquitto_pub``. -After that, InfluxDB will be checked to contain the right data and Grafana will +After that, the database will be checked to contain the right data and Grafana will be checked to be accurately provisioned. While the shell-out can well be optimized for efficiency, it is also pleasant @@ -32,12 +32,12 @@ Install some needed packages:: Foundation services =================== -The test suite will assume running instances of Mosquitto, InfluxDB, MongoDB +The test suite will assume running instances of Mosquitto, CrateDB, InfluxDB, MongoDB and Grafana and fire up an in-process instance of Kotori to complement these. Please have a look at :ref:`setup-docker` in order to get the complementing services up and running in a quick and ad hoc manner. -Run Mosquitto, InfluxDB, MongoDB and Grafana as Docker containers:: +Run Mosquitto, CrateDB, InfluxDB, MongoDB and Grafana as Docker containers:: make start-foundation-services diff --git a/doc/source/handbook/acquisition/error-signalling.rst b/doc/source/handbook/acquisition/error-signalling.rst index 3df6357b..bc37186c 100644 --- a/doc/source/handbook/acquisition/error-signalling.rst +++ b/doc/source/handbook/acquisition/error-signalling.rst @@ -48,13 +48,24 @@ the system will respond with a MQTT "response" on the corresponding topic with s "description": "Error processing MQTT message \"{\"value\": 42.42\" from topic \"mqttkit-1/testdrive/area-42/node-1/data.json\"." } -Database error -============== -When sending a payload with an existing field already seeded in a different data type:: +Database errors +=============== +When sending a payload with an existing field already materialized using a different data type:: echo '{"value": "invalid"}' | mosquitto_pub -h kotori.example.org -t mqttkit-1/testdrive/area-42/node-1/data.json -l -the system will respond with:: +the system will respond correspondingly. + +CrateDB:: + + mqttkit-1/testdrive/area-42/node-1/error.json { + "type": "", + "message": "SQLParseException[Cannot cast object element `humidity` with value `invalid` to type `double precision`]", + "description": "Error processing MQTT message \"b'{\"temperature\": 42.84, \"humidity\": \"invalid\"}'\" from topic \"mqttkit-2/foo/bar/1/data.json\".", + "timestamp": "2023-06-20T12:00:08+00:00" + } + +InfluxDB:: mqttkit-1/testdrive/area-42/node-1/error.json { "message": "400: {\"error\":\"field type conflict: input field \\\"value\\\" on measurement \\\"area_42_node_1_sensors\\\" is type string, already exists as type float dropped=1\"}\n", @@ -73,11 +84,4 @@ and then sending an invalid payload like:: echo '2017-05-01 22:39:09,invalid' | http POST http://kotori.example.org/api/mqttkit-1/testdrive/area-42/node-1/data Content-Type:text/csv -The system will also respond over MQTT with:: - - mqttkit-1/testdrive/area-42/node-1/error.json { - "message": "400: {\"error\":\"field type conflict: input field \\\"value\\\" on measurement \\\"area_42_node_1_sensors\\\" is type string, already exists as type float dropped=1\"}\n", - "type": "", - "description": "Error processing MQTT message \"{\"time\": \"2017-05-01 22:40:09\", \"value\": \"invalid\"}\" from topic \"mqttkit-1/testdrive/area-42/node-1/data.json\"." - } - +The system will also respond over MQTT correspondingly, like outlined above. diff --git a/doc/source/handbook/index.rst b/doc/source/handbook/index.rst index e729020d..77ac46ba 100644 --- a/doc/source/handbook/index.rst +++ b/doc/source/handbook/index.rst @@ -49,4 +49,5 @@ Usage usage/kotori usage/grafana + usage/cratedb usage/influxdb diff --git a/doc/source/handbook/usage/cratedb.rst b/doc/source/handbook/usage/cratedb.rst new file mode 100644 index 00000000..af38e976 --- /dev/null +++ b/doc/source/handbook/usage/cratedb.rst @@ -0,0 +1,36 @@ +.. include:: ../../_resources.rst + +.. _cratedb-handbook: + +################ +CrateDB handbook +################ + +.. todo:: Content on this page may need an enrichment. + +This section of the documentation will give you a short and concise summary about +how to operate CrateDB, and how to use it. + +Unless we have more to report here, please refer to the upstream documentation. + +- `Installing CrateDB`_ +- `Using CrateDB`_ +- `CrateDB reference documentation`_ +- `Overview of CrateDB drivers and adapters`_ +- `Overview of CrateDB integration tutorials`_ + +The most easy way to run CrateDB on your workstation for evaluation purposes, is +to use Podman or Docker. + +:: + + docker run --rm -it \ + --publish=4200:4200 --publish=5432:5432 \ + --env=CRATE_HEAP_SIZE=4g \ + crate:5.2 -Cdiscovery.type=single-node + +.. _CrateDB reference documentation: https://crate.io/docs/crate/reference/ +.. _Installing CrateDB: https://crate.io/docs/crate/tutorials/ +.. _Overview of CrateDB drivers and adapters: https://community.crate.io/t/overview-of-cratedb-drivers-and-adapters/1464 +.. _Overview of CrateDB integration tutorials: https://community.crate.io/t/overview-of-cratedb-integration-tutorials/1015 +.. _Using CrateDB: https://crate.io/docs/crate/howtos/ diff --git a/doc/source/integration/tts-ttn.rst b/doc/source/integration/tts-ttn.rst index 426d9d4a..f406164f 100644 --- a/doc/source/integration/tts-ttn.rst +++ b/doc/source/integration/tts-ttn.rst @@ -45,6 +45,7 @@ Overview subgraph backend direction LR + Kotori --> CrateDB Kotori --> InfluxDB Kotori --> Grafana end diff --git a/doc/source/setup/docker.rst b/doc/source/setup/docker.rst index 2e5aaf17..cebcbd64 100644 --- a/doc/source/setup/docker.rst +++ b/doc/source/setup/docker.rst @@ -26,7 +26,7 @@ its configuration at :ref:`getting-started`. Introduction ************ -This section outlines how to conveniently run Mosquitto, InfluxDB, +This section outlines how to conveniently run Mosquitto, CrateDB, InfluxDB, MongoDB, Grafana and Kotori using Docker. The repository provides two files ``docker-compose.yml`` and ``.env``. They @@ -48,7 +48,7 @@ Those images are published to Docker Hub. Please note that this Docker Compose configuration is primarily suited for evaluation and development purposes. As it either disables authentication - or uses insecure authentication credentials for Mosquitto, InfluxDB, + or uses insecure authentication credentials for Mosquitto, CrateDB, InfluxDB, and Grafana, it is not prepared for production setups. @@ -56,7 +56,7 @@ Those images are published to Docker Hub. Prerequisites ************* -This will give you Mosquitto, InfluxDB, MongoDB, Grafana, an improved +This will give you Mosquitto, CrateDB, InfluxDB, MongoDB, Grafana, an improved Grafana map panel plugin, and a command alias for invoking Kotori. In order to invoke the auxiliary services, run:: @@ -94,6 +94,34 @@ Testdrive This is a basic test walkthrough, to check if data is correctly routed from the telemetry message bus to the database. +CrateDB +======= + +This example uses CrateDB as timeseries-database. + +Invoke Kotori:: + + kotori --config /etc/kotori/docker/docker-cratedb.ini + +Publish single reading using MQTT:: + + export CHANNEL_TOPIC=sensorwan-cratedb/foo/bar/1/data.json + docker run \ + --network kotori_default \ + -it --rm eclipse-mosquitto \ + mosquitto_pub -d -h mosquitto -t $CHANNEL_TOPIC -m '{"temperature": 42.84, "humidity": 83.1}' + +Check if reading has been stored in CrateDB:: + + docker run \ + --network kotori_default \ + -it --rm crate \ + crash --hosts cratedb -c 'SELECT * FROM sensorwan_cratedb_foo.bar_1_sensors;' + +Go to Grafana and visit the dashboard just created:: + + open "http://localhost:3000/?orgId=1&search=open&query=sensorwan-cratedb" + InfluxDB ======== diff --git a/doc/source/setup/index.rst b/doc/source/setup/index.rst index ec3f1f4a..adcfabb2 100644 --- a/doc/source/setup/index.rst +++ b/doc/source/setup/index.rst @@ -16,7 +16,7 @@ from Docker Hub, from the Python Package Index (PyPI) or from the Git repository. For running Kotori in a full configuration, you will need some other -infrastructure services like Mosquitto_, InfluxDB_, Grafana_ and optionally +infrastructure services like Mosquitto_, CrateDB_, InfluxDB_, Grafana_ and optionally MongoDB_. Have fun and enjoy your data acquisition! @@ -62,7 +62,7 @@ Details ¹ When choosing to install from the Debian package repository, you will also be -able to receive appropriate Debian packages for Mosquitto, InfluxDB and Grafana +able to receive appropriate Debian packages for Mosquitto, CrateDB, InfluxDB, and Grafana through the `DaqZilla package repository `_. This makes it easy to setup the complete DAQ system from a single package source. diff --git a/doc/source/setup/linux-arch.rst b/doc/source/setup/linux-arch.rst index 93cf7ffa..ef3c870f 100644 --- a/doc/source/setup/linux-arch.rst +++ b/doc/source/setup/linux-arch.rst @@ -59,6 +59,7 @@ MongoDB is provided through the Arch User Repository (AUR):: cd mongodb-tools-bin makepkg -si +.. todo:: Add walkthrough how to install CrateDB on Arch Linux. Kotori ====== diff --git a/doc/source/setup/linux-debian.rst b/doc/source/setup/linux-debian.rst index 60021b74..d104d8b0 100644 --- a/doc/source/setup/linux-debian.rst +++ b/doc/source/setup/linux-debian.rst @@ -102,11 +102,11 @@ Install Kotori together with all recommended and suggested packages:: apt install --install-recommends kotori -InfluxDB and Grafana are not always enabled and started automatically, +CrateDB, InfluxDB, and Grafana are not always enabled and started automatically, so ensure they are running by invoking:: - systemctl enable mosquitto influxdb grafana-server - systemctl start mosquitto influxdb grafana-server + systemctl enable mosquitto crate influxdb grafana-server + systemctl start mosquitto crate influxdb grafana-server Notes for ARM machines ====================== @@ -117,13 +117,21 @@ there is a more lightweight package with fewer dependencies called apt install --install-recommends kotori-standard -********* -Operation -********* +********** +Operations +********** -Watching the system -=================== -These are the log files at a glance where system messages might appear:: +Watching the system logs +======================== + +Being able to investigate problems is crucial. The first step is to inspect corresponding +log files. At a glance, those are the log files where system messages may appear, depending +on how you installed and run the corresponding services:: + + tail -F /var/log/kotori/*.log /var/log/grafana/*.log /var/log/crate/*.log /var/log/influxdb/*.log /var/log/mosquitto/*.log + +On modern Linux systems, log messages may be routed to systemd's journal daemon. In order to +inspect them, invoke, for example:: + + journalctl -u kotori -u grafana -u crate -u influxdb -u mosquitto - tail -F /var/log/kotori/*.log /var/log/grafana/*.log /var/log/influxdb/*.log /var/log/mosquitto/*.log - journalctl -f -u influxdb diff --git a/doc/source/setup/macos.rst b/doc/source/setup/macos.rst index b6cc9299..1bb0edc7 100644 --- a/doc/source/setup/macos.rst +++ b/doc/source/setup/macos.rst @@ -34,6 +34,9 @@ on macOS through Homebrew or by using Docker. Native ****** +Package-based installation +========================== + Setup packages:: brew install mosquitto influxdb grafana mongodb/brew/mongodb-community @@ -73,10 +76,21 @@ Testdrive:: export PATH="~/Library/Python/3.9/bin:$PATH" kotori --version +Manual installation +=================== + +`CrateDB`_ does not offer packages for macOS yet. In order to install it, select and download +a tarball archive from the `CrateDB for macOS`_ downloads folder, and follow the `CrateDB +tarball setup`_ instructions for extracting the tarball, and for running CrateDB. + ****** Docker ****** -Please have a look at :ref:`setup-docker` in order to run Mosquitto_, InfluxDB_, -MongoDB_, Grafana_ and Kotori by using Docker. +Please have a look at :ref:`setup-docker` in order to run Mosquitto_, CrateDB_, InfluxDB_, +MongoDB_, Grafana_, and Kotori by using Docker. + + +.. _CrateDB for macOS: https://cdn.crate.io/downloads/releases/cratedb/x64_mac/ +.. _CrateDB tarball setup: https://crate.io/docs/crate/tutorials/en/latest/basic/index.html#ad-hoc-unix-macos diff --git a/doc/source/setup/sandbox.rst b/doc/source/setup/sandbox.rst index 98987a81..3f235b56 100644 --- a/doc/source/setup/sandbox.rst +++ b/doc/source/setup/sandbox.rst @@ -31,13 +31,13 @@ Install some needed packages:: Foundation services =================== -You will need InfluxDB_, Grafana_, Mosquitto_ and optionally MongoDB_. +You will need Grafana_, Mosquitto_, one of CrateDB_ or InfluxDB_, and optionally MongoDB_. For installing them on your workstation, you might want to have a look at the :ref:`setup-docker`. When running Linux, you can also install the infrastructure on your local workstation natively like :ref:`setup-debian`. -The most easy way is to run Mosquitto, InfluxDB, MongoDB and Grafana as Docker +The most easy way is to run Mosquitto, CrateDB, InfluxDB, MongoDB and Grafana as Docker containers:: make start-foundation-services diff --git a/doc/source/setup/security.rst b/doc/source/setup/security.rst index b88f0556..314539e9 100644 --- a/doc/source/setup/security.rst +++ b/doc/source/setup/security.rst @@ -6,6 +6,11 @@ Secure the installation Close ports =========== +CrateDB +------- + +.. todo:: Outline how to protect CrateDB's public listening ports. + InfluxDB -------- @@ -39,6 +44,14 @@ Grafana Enable authentication ===================== +CrateDB +------- + +.. todo:: + + Outline how to enable authentication with CrateDB, and how to amend the default + credentials. + InfluxDB -------- Purpose: Enable auth-only access to InfluxDB. diff --git a/etc/docker/docker-cratedb.ini b/etc/docker/docker-cratedb.ini new file mode 100644 index 00000000..e09a6bcb --- /dev/null +++ b/etc/docker/docker-cratedb.ini @@ -0,0 +1,56 @@ +; #################################################### +; Kotori configuration with Docker Compose and CrateDB +; #################################################### + +; Services need to be invoked using Docker Compose. +; docker-compose up + + +; ================ +; General settings +; ================ + +; http server +[kotori] + +; TODO: Refactor to [http] section, or, even better, into the channel configuration. +http_listen = localhost +http_port = 24642 + + +; ===================== +; Connectivity settings +; ===================== + +; MQTT bus adapter +[mqtt] +host = mosquitto +#port = 1883 +username = kotori +password = kotori + +; Storage adapter +[cratedb] +host = cratedb +; port = 4200 +; username = crate +; password = + +; User interface +[grafana] +host = grafana +#port = 3000 +username = admin +password = admin + + +; ================ +; Channel settings +; ================ + +[sensorwan-cratedb] +enable = true +type = application +realm = sensorwan-cratedb +mqtt_topics = sensorwan-cratedb/# +application = kotori.daq.application.mqttkit:mqttkit_application diff --git a/etc/test/cratedb.ini b/etc/test/cratedb.ini index b0bbc714..60343ed1 100644 --- a/etc/test/cratedb.ini +++ b/etc/test/cratedb.ini @@ -3,9 +3,9 @@ ; ###################################### -; ========================================== -; General settings -; ========================================== +; ================ +; General settings +; ================ ; http server [kotori] From f17d1945864c453d62a117a540876be03c5972d0 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 23 Jun 2023 00:26:32 +0200 Subject: [PATCH 6/7] fixup! [CrateDB] Add support for Grafana instant dashboards --- test/test_daq_grafana.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_daq_grafana.py b/test/test_daq_grafana.py index 5d31c04c..30f85586 100644 --- a/test/test_daq_grafana.py +++ b/test/test_daq_grafana.py @@ -66,6 +66,7 @@ def test_mqtt_cratedb_grafana_single(machinery_cratedb, reset_cratedb, reset_gra yield sleep(PROCESS_DELAY_MQTT) yield sleep(PROCESS_DELAY_MQTT) yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) # Proof that Grafana is well provisioned. logger.info('Grafana: Checking datasource') From a6d66d55525290626d370d6fbfd08cdeca29be0b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 13 Jun 2024 20:52:58 +0200 Subject: [PATCH 7/7] fixup! [CrateDB] Add basic data acquisition support for CrateDB --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6f4b0640..f41b74e2 100644 --- a/setup.py +++ b/setup.py @@ -68,7 +68,7 @@ extras = { 'daq': [ 'crash<1', - 'crate[sqlalchemy]<1', + 'sqlalchemy-cratedb>=0.37,<1', 'influxdb>=5.3.0,<6', 'pytz>=2020.1', 'requests>=2.12.4,<3',