Skip to content

Commit

Permalink
[CrateDB] Add support for the data export subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jun 20, 2023
1 parent d23f617 commit ce1fbbb
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 6 deletions.
34 changes: 34 additions & 0 deletions etc/test/cratedb.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
; ######################################


; ==========================================
; General settings
; ==========================================

; http server
[kotori]

; TODO: Refactor to [http] section, or, even better, into the channel configuration.
http_listen = localhost
http_port = 24643


; =====================
; Connectivity settings
; =====================
Expand Down Expand Up @@ -39,3 +51,25 @@ type = application
realm = mqttkit-2
mqtt_topics = mqttkit-2/#
application = kotori.daq.application.mqttkit:mqttkit_application

[mqttkit-2.http-api-generic]
enable = true

type = application
application = kotori.io.protocol.forwarder:boot

realm = mqttkit-2
source = http:/api/mqttkit-2/{address:.*}/{slot:(data|event)} [POST]
target = mqtt:/mqttkit-2/{address}/{slot}.json

[mqttkit-2.cratedb-data-export]
enable = true

type = application
application = kotori.io.protocol.forwarder:boot

realm = mqttkit-2
source = http:/api/{realm:mqttkit-2}/{network:.*}/{gateway:.*}/{node:.*}/{slot:(data|event)}.{suffix} [GET]
target = cratedb:/{database}?measurement={measurement}
transform = kotori.daq.strategy.wan:WanBusStrategy.topology_to_storage,
kotori.io.protocol.cratedb:QueryTransformer.transform
2 changes: 1 addition & 1 deletion etc/test/main.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ include =
; http server
[kotori]

; TODO: Refactor to [http] section.
; TODO: Refactor to [http] section, or, even better, into the channel configuration.
http_listen = localhost
http_port = 24642

Expand Down
81 changes: 79 additions & 2 deletions kotori/daq/storage/cratedb.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
# -*- coding: utf-8 -*-
# (c) 2023 Andreas Motl <[email protected]>
import calendar
import functools
import json
from collections import OrderedDict
from decimal import Decimal
from copy import deepcopy
from datetime import datetime, date
from datetime import datetime, date, timezone

import crate.client.http
import pytz
import requests
from crate import client
from crate.client.converter import DefaultTypeConverter
from crate.client.exceptions import ProgrammingError
from funcy import project
from munch import Munch
from twisted.logger import Logger

from kotori.daq.storage.util import format_chunk

log = Logger()


class CrateDBAdapter(object):
class CrateDBAdapter:
"""
Kotori database backend adapter for CrateDB.
Expand Down Expand Up @@ -86,6 +90,79 @@ def create_table(self, tablename):
cursor.execute(sql_ddl)
cursor.close()

def query(self, expression: str, tdata: Munch = None):
"""
Query CrateDB and respond with results in suitable shape.
Make sure to synchronize data by using `REFRESH TABLE ...` before running
the actual `SELECT` statement. This is applicable in test case scenarios.
Response format::
[
{
"time": ...,
"tags": {"city": "berlin", "location": "balcony"},
"fields": {"temperature": 42.42, "humidity": 84.84},
},
...
]
TODO: Unify with `kotori.test.util:CrateDBWrapper.query`.
"""

log.info(f"Database query: {expression}")

tdata = tdata or {}

# Before reading data from CrateDB, synchronize it.
# Currently, it is mostly needed to satisfy synchronization constraints when running the test suite.
# However, users also may expect to see data "immediately". On the other hand, in order to satisfy
# different needs, this should be made configurable per realm, channel and/or request.
# TODO: Maybe just _optionally_ synchronize with the database when reading data.
if tdata:
refresh_sql = f"REFRESH TABLE {self.get_tablename(tdata)}"
self.execute(refresh_sql)

def dict_from_row(columns, row):
"""
https://stackoverflow.com/questions/3300464/how-can-i-get-dict-from-sqlite-query
https://stackoverflow.com/questions/4147707/python-mysqldb-sqlite-result-as-dictionary
"""
return dict(zip(columns, row))

def record_from_dict(item):
record = OrderedDict()
record.update({"time": item["time"]})
record.update(item["tags"])
record.update(item["fields"])
return record

# Query database, with convenience data type converters. Assume timestamps to be in UTC.
cursor = self.db_client.cursor(converter=DefaultTypeConverter(), time_zone=timezone.utc)
cursor.execute(expression)
data_raw = cursor.fetchall()

# Provide fully-qualified records to downstream components, including column names.
column_names = [column_info[0] for column_info in cursor.description]
data_tags_fields = map(functools.partial(dict_from_row, column_names), data_raw)

# Bring results into classic "records" shape.
data_records = map(record_from_dict, data_tags_fields)

cursor.close()
return data_records

def execute(self, expression: str):
"""
Execute a database query, using a cursor, and return its results.
"""
cursor = self.db_client.cursor()
cursor.execute(expression)
result = cursor._result
cursor.close()
return result

def write(self, meta, data):
"""
Format ingress data chunk and store it into database table.
Expand Down
5 changes: 5 additions & 0 deletions kotori/io/export/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def query(self):
settings=self.settings.influxdb,
database=bucket.tdata.database,
)
elif "cratedb" in self.settings:
from kotori.daq.storage.cratedb import CrateDBAdapter
database = CrateDBAdapter(
settings=self.settings.cratedb,
)
else:
log.warn("No time-series database configured")
return
Expand Down
40 changes: 40 additions & 0 deletions kotori/io/protocol/cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# (c) 2023 Andreas Motl <[email protected]>
from twisted.logger import Logger

from kotori.io.protocol.util import compute_daterange

log = Logger()


class QueryTransformer:

@classmethod
def transform(cls, data):
"""
Compute CrateDB query expression from data in transformation dictionary.
Also compute date range from query parameters "from" and "to".
"""

log.info(f"Querying database: {data}")

# The PyInfluxQL query generator is versatile enough to be used for all SQL databases.
from pyinfluxql import Query

# TODO: Use ".date_range" API method
time_begin, time_end = compute_daterange(data.get('from'), data.get('to'))

# TODO: Add querying by tags.
tags = {}
# tags = CrateDBAdapter.get_tags(data)

table = f"{data.database}.{data.measurement}"
expression = Query('*').from_(table).where(time__gte=time_begin, time__lte=time_end, **tags)

result = {
'expression': str(expression),
'time_begin': time_begin,
'time_end': time_end,
}

return result
6 changes: 3 additions & 3 deletions kotori/io/protocol/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def setupService(self):
except Exception as ex:
log.failure("Connecting to MQTT broker failed: {ex}", ex=last_error_and_traceback())

elif self.scheme == 'influxdb':
# InfluxDB has no subsystem service, it's just an adapter
# CrateDB and InfluxDB are not subsystem services, they are just adapters.
elif self.scheme in ['cratedb', 'influxdb']:
pass

else:
Expand All @@ -87,7 +87,7 @@ def emit(self, uri, bucket):
# TODO: Use threads.deferToThread here?
return self.downstream.publish(topic, payload)

elif self.scheme == 'influxdb':
elif self.scheme in ['cratedb', 'influxdb']:

# InfluxDB query wrapper using expression derived from transformation data
dfq = DataFrameQuery(settings=self.settings, bucket=bucket)
Expand Down
1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TestSettings:
cratedb_measurement_events = 'foo_bar_events'
mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json'
grafana2_dashboards = ['mqttkit-2-itest', 'mqttkit-2-itest3']
io_channel_path = '/mqttkit-2/itest/foo/bar/data'

# InfluxDB settings.
influx_database = 'mqttkit_1_itest'
Expand Down
19 changes: 19 additions & 0 deletions test/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@
ts_to = '2020-03-10T23:59:59.000Z'


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.export
@pytest.mark.cratedb
def test_export_cratedb_general(machinery_cratedb, reset_cratedb):
"""
Submit single reading in JSON format to HTTP API and proof
it can be retrieved back from the HTTP API in different formats.
This uses CrateDB as timeseries database.
"""

channel_path = settings.io_channel_path
http_submit = functools.partial(http_json_sensor, port=24643)
http_fetch = functools.partial(http_get_data, port=24643)

yield verify_export_general(channel_path, http_submit, http_fetch)


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.export
Expand Down
5 changes: 5 additions & 0 deletions test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ def query(self):
},
...
]
TODO: Refactor to / unify with `kotori.daq.storage.cratedb:CrateDBAdapter.query`.
TODO: Add dict-based record retrieval as a row factory to `crate` library.
"""
logger.info('CrateDB: Querying database')
db_table = self.get_tablename()
Expand All @@ -280,6 +283,8 @@ def query(self):
def execute(self, expression):
"""
Actually execute the database query, using a cursor.
TODO: Use `kotori.daq.storage.cratedb:CrateDBAdapter.execute`.
"""
cursor = self.client.cursor()
cursor.execute(expression)
Expand Down

0 comments on commit ce1fbbb

Please sign in to comment.