diff --git a/compose/.env b/compose/.env index f7a6d3f2..617fde97 100755 --- a/compose/.env +++ b/compose/.env @@ -47,3 +47,5 @@ TZ=Australia/Sydney # Used by the Intersect DataBolt delivery service. DATABOLT_SHARED_DIR=/some/where/raw_data + +AXISTECH_TOKEN= diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml index 617fe282..5510f7b8 100644 --- a/compose/docker-compose.yml +++ b/compose/docker-compose.yml @@ -2,8 +2,12 @@ version: '3.1' services: db: - image: postgres:14.2 - restart: "no" + image: postgis/postgis:14-3.4 + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env volumes: @@ -18,7 +22,11 @@ services: mq: hostname: "mq" image: rabbitmq:3.9-management - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env volumes: @@ -30,7 +38,11 @@ services: restapi: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env depends_on: @@ -38,20 +50,21 @@ services: volumes: - ../src/python:/home/broker/python working_dir: "/home/broker/python/restapi" - entrypoint: [ "/home/broker/.local/bin/uvicorn", "--host", "0.0.0.0", "--port", "5687", "RestAPI:app", "--reload" ] - healthcheck: - test: curl --fail -I http://localhost:5687/openapi.json || exit 1 - interval: 1m - start_period: 20s + entrypoint: [ "/home/broker/.local/bin/uvicorn", "--proxy-headers", "--host", "0.0.0.0", "--port", "5687", "RestAPI:app" ] website: image: broker/mgmt-app + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env environment: - PYTHONPATH=/app:/iota depends_on: - restapi: + db: condition: "service_healthy" volumes: - ../src/www:/app @@ -59,7 +72,11 @@ services: ttn_webhook: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -74,7 +91,11 @@ services: ttn_processor: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -93,7 +114,11 @@ services: ttn_decoder: image: broker/ttn_decoder - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -106,7 +131,11 @@ services: ydoc: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -123,11 +152,14 @@ services: wombat: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env - profiles: - - wombat + profiles: ["wombat", "frred"] depends_on: db: condition: "service_healthy" @@ -140,7 +172,11 @@ services: lm: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env depends_on: @@ -155,6 +191,10 @@ services: delivery: image: broker/python-base + logging: + driver: local + options: + max-file: "3" restart: unless-stopped env_file: - .env @@ -172,6 +212,10 @@ services: pollers: image: broker/python-base + logging: + driver: local + options: + max-file: "3" restart: unless-stopped env_file: - .env @@ -189,6 +233,10 @@ services: frred: image: broker/python-base + logging: + driver: local + options: + max-file: "3" restart: unless-stopped env_file: - .env @@ -201,10 +249,26 @@ services: condition: "service_healthy" volumes: - ../src/python:/home/broker/python - - ${DATABOLT_SHARED_DIR}/raw_data:/raw_data + - ${DATABOLT_SHARED_DIR}/nectar_raw_data:/raw_data working_dir: "/home/broker/python" entrypoint: [ "python", "-m", "delivery.FRRED" ] + + axistech: + image: broker/python-base + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped + env_file: + - .env + profiles: + - frred + depends_on: + - db + entrypoint: [ "python", "-m", "pollers.axistech" ] + timescaledb: build: ../timescale # Point to the directory containing the custom Dockerfile hostname: "tsdb" @@ -239,5 +303,3 @@ services: - ../src/python:/home/broker/python working_dir: "/home/broker/python" entrypoint: [ "python", "-m", "timescale.TS_LTSReader" ] - - diff --git a/compose/production/prod.yml b/compose/production/prod.yml index 8cd43d47..c31d1eb4 100644 --- a/compose/production/prod.yml +++ b/compose/production/prod.yml @@ -13,7 +13,7 @@ services: # nginx will accept a connection on 1883, forward it to # 127.0.0.1:1884, which docker will forward to port 1883 # within the container. - - "127.0.0.1:1884:1883" + - "0.0.0.0:1883:1883" - "127.0.0.1:15672:15672" - "127.0.0.1:15692:15692" volumes: diff --git a/db/init.d/init_db.sql b/db/init.d/init_db.sql index 4a7cd54b..0df6d030 100755 --- a/db/init.d/init_db.sql +++ b/db/init.d/init_db.sql @@ -1,3 +1,6 @@ +CREATE EXTENSION postgis; +CREATE EXTENSION pgcrypto; + create table if not exists sources ( source_name text primary key not null ); @@ -6,7 +9,7 @@ create table if not exists physical_devices ( uid integer generated always as identity primary key, source_name text not null references sources, name text not null, - location point, + location geometry('POINT', 4283), last_seen timestamptz, -- Store only top level key value pairs here; it is used -- for quickly finding a device using information carried @@ -72,7 +75,7 @@ create table if not exists device_blobs ( create table if not exists logical_devices ( uid integer generated always as identity primary key, name text not null, - location point, + location geometry('POINT', 4283), last_seen timestamptz, properties jsonb not null default '{}' ); @@ -90,7 +93,7 @@ create table if not exists physical_logical_map ( primary key(physical_uid, logical_uid, start_time) ); -create table if not exists users( +create table if not exists users ( uid integer generated always as identity primary key, username text not null unique, salt text not null, @@ -100,6 +103,15 @@ create table if not exists users( read_only boolean default True not null ); +create table if not exists version ( + version integer not null +); + +create index if not exists pd_src_id_idx on physical_devices using GIN (source_ids); + +insert into sources values ('ttn'), ('greenbrain'), ('wombat'), ('ydoc'), ('ict_eagleio'); +insert into version values (2); + create table if not exists data_name_map( input_name text not null primary key, std_name text not null @@ -448,8 +460,3 @@ insert into word_list values ('UP'), ('VAPOUR'), ('WIND'); - --- Enable the PostGIS extensions --- CREATE EXTENSION postgis; --- CREATE EXTENSION postgis_raster; --- CREATE EXTENSION postgis_sfcgal; diff --git a/db/upgrade/002.sql b/db/upgrade/002.sql new file mode 100644 index 00000000..a6920962 --- /dev/null +++ b/db/upgrade/002.sql @@ -0,0 +1,17 @@ +CREATE EXTENSION postgis; +CREATE EXTENSION pgcrypto; + +SELECT AddGeometryColumn('logical_devices','geom',4283,'POINT',2); +SELECT AddGeometryColumn('physical_devices','geom',4283,'POINT',2); + +UPDATE logical_devices SET geom = ST_MakePoint(location[1], location[0]) WHERE location IS NOT NULL; +UPDATE physical_devices SET geom = ST_MakePoint(location[1], location[0]) WHERE location IS NOT NULL; + +ALTER TABLE logical_devices DROP COLUMN location; +ALTER TABLE physical_devices DROP COLUMN location; + +ALTER TABLE logical_devices RENAME COLUMN geom TO location; +ALTER TABLE physical_devices RENAME COLUMN geom TO location; + +TRUNCATE version; +insert into version values (2); diff --git a/dc.sh b/dc.sh index 6a57f2ab..d3c0c9c3 100755 --- a/dc.sh +++ b/dc.sh @@ -12,4 +12,4 @@ if [ "$RUN_MODE" != test ]; then fi fi -exec docker compose --profile ttn --profile ydoc --profile wombat --profile ubidots --profile pollers --profile frred -p $RUN_MODE -f ../docker-compose.yml -f ./$RUN_MODE.yml $* +exec docker-compose --profile wombat --profile ubidots --profile frred -p $RUN_MODE -f ../docker-compose.yml -f ./$RUN_MODE.yml $* diff --git a/images/restapi/requirements.txt b/images/restapi/requirements.txt index 65e46478..9d580371 100644 --- a/images/restapi/requirements.txt +++ b/images/restapi/requirements.txt @@ -6,29 +6,38 @@ charset-normalizer==2.1.0 click==8.1.3 dnspython==2.2.1 email-validator==1.2.1 +exceptiongroup==1.1.1 fastapi==0.98.0 h11==0.13.0 httpcore==0.17.2 httptools==0.4.0 httpx==0.24.1 idna==3.3 +iniconfig==2.0.0 itsdangerous==2.1.2 Jinja2==3.1.2 MarkupSafe==2.1.1 +numpy==1.26.4 orjson==3.7.7 +packaging==23.1 +pandas==2.2.1 pika==1.3.0 +pluggy==1.2.0 psycopg2-binary==2.9.3 pydantic==1.9.1 pytest==7.4.0 python-dateutil==2.8.2 python-dotenv==0.20.0 python-multipart==0.0.5 +pytz==2024.1 PyYAML==6.0 requests==2.28.1 six==1.16.0 sniffio==1.2.0 starlette==0.27.0 +tomli==2.0.1 typing_extensions==4.3.0 +tzdata==2024.1 ujson==5.4.0 urllib3==1.26.10 uvicorn==0.17.6 diff --git a/run.sh b/run.sh index f31ac05b..295f48af 100755 --- a/run.sh +++ b/run.sh @@ -14,7 +14,7 @@ fi cd $BROKER_ROOT cd compose/$MODE -./dc.sh stop +./dc.sh down cd $BROKER_ROOT docker build -t broker/python-base -f images/restapi/Dockerfile . docker build -t broker/ttn_decoder -f images/ttn_decoder/Dockerfile . diff --git a/src/python/BrokerConstants.py b/src/python/BrokerConstants.py index 62f2a0a4..ccf74c26 100644 --- a/src/python/BrokerConstants.py +++ b/src/python/BrokerConstants.py @@ -13,6 +13,7 @@ WOMBAT = 'wombat' YDOC = 'ydoc' ICT_EAGLEIO = 'ict_eagleio' +AXISTECH = 'axistech' CREATION_CORRELATION_ID_KEY = 'creation_correlation_id' SENSOR_GROUP_ID_KEY = 'sensor_group_id' diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 7cefea0d..320c3039 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -1,39 +1,38 @@ from contextlib import contextmanager from datetime import datetime, timezone -from email.utils import parseaddr -import logging, re, warnings -from turtle import back -from unittest import result -import backoff +import logging, warnings import dateutil.parser import psycopg2 from psycopg2 import pool from typing import List, Tuple # Import Tuple import psycopg2.errors -from psycopg2.extensions import register_adapter, AsIs +from psycopg2.extensions import AsIs from psycopg2.extras import Json, register_uuid from typing import Any, Dict, List, Optional, Union import hashlib -import base64 import os import BrokerConstants -from pdmodels.Models import DeviceNote, Location, LogicalDevice, PhysicalDevice, PhysicalToLogicalMapping, User +from pdmodels.Models import BaseDevice, DeviceNote, LogicalDevice, PhysicalDevice, PhysicalToLogicalMapping, User logging.captureWarnings(True) + class DAOException(Exception): def __init__(self, msg: str = None, wrapped: Exception = None): self.msg: str = msg self.wrapped: Exception = wrapped + # This is raised in update methods when the entity to be updated does not exist. class DAODeviceNotFound(DAOException): pass + class DAOUserNotFound(DAOException): pass + # This is raised if Postgres raises a unique constraint exception. It is useful # for the REST API to know this was the problem rather than some general database # problem, and allows calling code to not have to examine the Postgres exception @@ -42,29 +41,14 @@ class DAOUniqeConstraintException(DAOException): pass -def adapt_location(location: Location): - """ - Transform a Location instance into a value usable in an SQL statement. - """ - return AsIs(f"('{location.lat},{location.long}')") - - -def cast_point(value, cur): - """ - Transform a value from an SQL result into a Location instance. - """ - if value is None: - return None - - if isinstance(value, str): - m = re.fullmatch(r'\(([+-]?\d+\.?\d*),([+-]?\d+\.?\d*)\)', value) - if m is not None: - return Location(lat=float(m.group(1)),long=m.group(2)) - - raise psycopg2.InterfaceError(f'Bad point representation: {value}') - - conn_pool = None +_physical_device_select_all_cols = """ +select uid, source_name, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location, last_seen, source_ids, properties from physical_devices +""" + +_logical_device_select_all_cols = """ +select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location, last_seen, properties from logical_devices +""" def stop() -> None: @@ -95,7 +79,7 @@ def _get_connection(): if conn_pool is None: logging.info('Creating connection pool, registering type converters.') conn_pool = pool.ThreadedConnectionPool(1, 5) - _register_type_adapters() + register_uuid() conn = conn_pool.getconn() logging.debug(f'Taking conn {conn}') @@ -120,25 +104,6 @@ def free_conn(conn) -> None: conn_pool.putconn(conn) -def _register_type_adapters(): - """ - Register adapter functions to handle Location <-> SQL Point data types. - """ - # Apapter to convert from a Location instance to a quoted SQL string. - register_adapter(Location, adapt_location) - - # Adapter to convert from an SQL result to a Location instance. - with _get_connection() as conn, conn.cursor() as cursor: - conn.autocommit = True - cursor.execute("SELECT NULL::point") - point_oid = cursor.description[0][1] - POINT = psycopg2.extensions.new_type((point_oid,), "POINT", cast_point) - psycopg2.extensions.register_type(POINT) - - free_conn(conn) - register_uuid() - - def _dict_from_row(result_metadata, row) -> Dict[str, Any]: obj = {} for i, col_def in enumerate(result_metadata): @@ -147,14 +112,28 @@ def _dict_from_row(result_metadata, row) -> Dict[str, Any]: return obj +def _device_to_query_params(device: BaseDevice) -> dict: + dev_fields = {} + for k, v in vars(device).items(): + match k: + case 'properties' | 'source_ids': + dev_fields[k] = Json(v) + case 'location': + if device.location is None or device.location.lat is None or device.location.long is None: + dev_fields[k] = None + else: + dev_fields[k] = AsIs(f"ST_GeomFromText('POINT({device.location.long} {device.location.lat})')") + case _: + dev_fields[k] = v + + return dev_fields + + """ Physical device source CRUD methods - -create table if not exists sources ( - source_name text primary key not null -); """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) + + def get_all_physical_sources() -> List[PhysicalDevice]: conn = None try: @@ -173,24 +152,45 @@ def get_all_physical_sources() -> List[PhysicalDevice]: free_conn(conn) +def add_physical_source(name: str) -> None: + """ + Add a new physical device source if it does not already exist. + """ + conn = None + try: + with _get_connection() as conn, conn.cursor() as cursor: + conn.autocommit = False + cursor.execute('select source_name from sources where source_name = %s', (name, )) + if cursor.rowcount == 0: + cursor.execute("insert into sources values (%s)", (name, )) + cursor.execute('select source_name from sources where source_name = %s', (name, )) + if cursor.rowcount == 1: + logging.info(f'Added physical device source {name}') + conn.commit() + else: + logging.error(f'Failed to add physical device source {name}') + conn.rollback() + except Exception as err: + conn.rollback() + raise err if isinstance(err, DAOException) else DAOException('add_physical_source failed.', err) + finally: + if conn is not None: + free_conn(conn) + + """ Physical device CRUD methods """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def create_physical_device(device: PhysicalDevice) -> PhysicalDevice: conn = None try: - dev_fields = {} - for k, v in vars(device).items(): - dev_fields[k] = v if k not in ('source_ids', 'properties') else Json(v) - + dev_fields = _device_to_query_params(device) with _get_connection() as conn, conn.cursor() as cursor: #logging.info(cursor.mogrify("insert into physical_devices (source_name, name, location, last_seen, source_ids, properties) values (%(source_name)s, %(name)s, %(location)s, %(last_seen)s, %(source_ids)s, %(properties)s) returning uid", dev_fields)) cursor.execute("insert into physical_devices (source_name, name, location, last_seen, source_ids, properties) values (%(source_name)s, %(name)s, %(location)s, %(last_seen)s, %(source_ids)s, %(properties)s) returning uid", dev_fields) uid = cursor.fetchone()[0] dev = _get_physical_device(conn, uid) - - return dev + return dev except Exception as err: raise err if isinstance(err, DAOException) else DAOException('create_physical_device failed.', err) finally: @@ -212,7 +212,7 @@ def _get_physical_device(conn, uid: int) -> PhysicalDevice: """ dev = None with conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices where uid = %s' + sql = f'{_physical_device_select_all_cols} where uid = %s' cursor.execute(sql, (uid, )) row = cursor.fetchone() if row is not None: @@ -222,15 +222,12 @@ def _get_physical_device(conn, uid: int) -> PhysicalDevice: return dev -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_device(uid: int) -> PhysicalDevice: conn = None try: - dev = None with _get_connection() as conn: dev = _get_physical_device(conn, uid) - - return dev + return dev except Exception as err: raise err if isinstance(err, DAOException) else DAOException('get_physical_device failed.', err) finally: @@ -238,13 +235,12 @@ def get_physical_device(uid: int) -> PhysicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_pyhsical_devices_using_source_ids(source_name: str, source_ids: Dict[str, str]) -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices where source_name = %s and source_ids @> %s order by uid asc' + sql = f'{_physical_device_select_all_cols} where source_name = %s and source_ids @> %s order by uid asc' args = (source_name, Json(source_ids)) #logging.info(cursor.mogrify(sql, args)) cursor.execute(sql, args) @@ -260,13 +256,12 @@ def get_pyhsical_devices_using_source_ids(source_name: str, source_ids: Dict[str free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_all_physical_devices() -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices order by uid asc' + sql = f'{_physical_device_select_all_cols} order by uid asc' cursor.execute(sql) cursor.arraysize = 200 rows = cursor.fetchmany() @@ -285,13 +280,12 @@ def get_all_physical_devices() -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_devices_from_source(source_name: str) -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices where source_name = %s order by uid asc' + sql = f'{_physical_device_select_all_cols} where source_name = %s order by uid asc' cursor.execute(sql, (source_name, )) cursor.arraysize = 200 rows = cursor.fetchmany() @@ -310,13 +304,15 @@ def get_physical_devices_from_source(source_name: str) -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def get_physical_devices(query_args = {}) -> List[PhysicalDevice]: +def get_physical_devices(query_args=None) -> List[PhysicalDevice]: + if query_args is None: + query_args = {} + conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices' + sql = _physical_device_select_all_cols args = {} add_where = True @@ -369,7 +365,6 @@ def get_physical_devices(query_args = {}) -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_physical_device(device: PhysicalDevice) -> PhysicalDevice: conn = None try: @@ -385,7 +380,18 @@ def update_physical_device(device: PhysicalDevice) -> PhysicalDevice: for name, val in vars(device).items(): if val != current_values[name]: update_col_names.append(f'{name} = %s') - update_col_values.append(val if name not in ('source_ids', 'properties') else Json(val)) + match name: + case 'location': + if val is not None: + val = AsIs(f"ST_PointFromText('POINT({val.long} {val.lat})')") + + update_col_values.append(val) + + case 'properties' | 'source_ids': + update_col_values.append(Json(val)) + + case _: + update_col_values.append(val) logging.debug(update_col_names) logging.debug(update_col_values) @@ -416,7 +422,6 @@ def update_physical_device(device: PhysicalDevice) -> PhysicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_physical_device(uid: int) -> PhysicalDevice: conn = None try: @@ -434,7 +439,6 @@ def delete_physical_device(uid: int) -> PhysicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def create_physical_device_note(uid: int, note: str) -> None: conn = None try: @@ -449,7 +453,6 @@ def create_physical_device_note(uid: int, note: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_device_notes(uid: int) -> List[DeviceNote]: conn = None try: @@ -467,7 +470,6 @@ def get_physical_device_notes(uid: int) -> List[DeviceNote]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_physical_device_note(note: DeviceNote) -> None: conn = None try: @@ -481,7 +483,6 @@ def update_physical_device_note(note: DeviceNote) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_physical_device_note(uid: int) -> None: conn = None try: @@ -500,21 +501,15 @@ def delete_physical_device_note(uid: int) -> None: Logical Device CRUD methods """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def create_logical_device(device: LogicalDevice) -> LogicalDevice: conn = None try: - dev = None - dev_fields = {} - for k, v in vars(device).items(): - dev_fields[k] = v if k != 'properties' else Json(v) - + dev_fields = _device_to_query_params(device) with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("insert into logical_devices (name, location, last_seen, properties) values (%(name)s, %(location)s, %(last_seen)s, %(properties)s) returning uid", dev_fields) uid = cursor.fetchone()[0] dev = _get_logical_device(conn, uid) - - return dev + return dev except Exception as err: raise err if isinstance(err, DAOException) else DAOException('create_logical_device failed.', err) finally: @@ -536,7 +531,15 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice: """ dev = None with conn.cursor() as cursor: - sql = 'select uid, name, location, last_seen, properties from logical_devices where uid = %s' + """ + See: https://dba.stackexchange.com/questions/27732/set-names-to-attributes-when-creating-json-with-row-to-json + + for an explanation of how to name the fields in a row_to_json call. + + Example query tested in psql: + select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location from logical_devices; + """ + sql = f'{_logical_device_select_all_cols} where uid = %s' cursor.execute(sql, (uid, )) row = cursor.fetchone() if row is not None: @@ -546,7 +549,6 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice: return dev -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_logical_device(uid: int) -> LogicalDevice: conn = None try: @@ -560,15 +562,12 @@ def get_logical_device(uid: int) -> LogicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_logical_devices(query_args = {}) -> List[LogicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - #conn.autocommit = True - - sql = 'select uid, name, location, last_seen, properties from logical_devices' + sql = _logical_device_select_all_cols args = {} add_where = True @@ -593,12 +592,11 @@ def get_logical_devices(query_args = {}) -> List[LogicalDevice]: sql = sql + ' order by uid asc' #logging.info(cursor.mogrify(sql, args)) - + logging.info(cursor.mogrify(sql, args)) cursor.execute(sql, args) cursor.arraysize = 200 rows = cursor.fetchmany() while len(rows) > 0: - #logging.info(f'processing {len(rows)} rows.') for r in rows: d = LogicalDevice.parse_obj(_dict_from_row(cursor.description, r)) devs.append(d) @@ -613,7 +611,6 @@ def get_logical_devices(query_args = {}) -> List[LogicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_logical_device(device: LogicalDevice) -> LogicalDevice: conn = None try: @@ -629,7 +626,18 @@ def update_logical_device(device: LogicalDevice) -> LogicalDevice: for name, val in vars(device).items(): if val != current_values[name]: update_col_names.append(f'{name} = %s') - update_col_values.append(val if name != 'properties' else Json(val)) + match name: + case 'location': + if val is not None: + val = AsIs(f"ST_PointFromText('POINT({val.long} {val.lat})')") + + update_col_values.append(val) + + case 'properties': + update_col_values.append(Json(val)) + + case _: + update_col_values.append(val) if len(update_col_names) < 1: return device @@ -653,7 +661,6 @@ def update_logical_device(device: LogicalDevice) -> LogicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_logical_device(uid: int) -> LogicalDevice: conn = None try: @@ -682,7 +689,6 @@ def delete_logical_device(uid: int) -> LogicalDevice: ); """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: """ Insert a device mapping. @@ -692,7 +698,7 @@ def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: with _get_connection() as conn, conn.cursor() as cursor: current_mapping = _get_current_device_mapping(conn, pd=mapping.pd.uid) if current_mapping is not None: - raise DAOUniqeConstraintException(f'insert_mapping failed: physical device {current_mapping.pd.uid} / "{current_mapping.pd.name}" is already mapped to logical device {current_mapping.ld.uid} / "{current_mapping.ld.name}"') + raise ValueError(f'insert_mapping failed: physical device {current_mapping.pd.uid} / "{current_mapping.pd.name}" is already mapped to logical device {current_mapping.ld.uid} / "{current_mapping.ld.name}"') current_mapping = _get_current_device_mapping(conn, ld=mapping.ld.uid) if current_mapping is not None: @@ -704,26 +710,33 @@ def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: except psycopg2.errors.UniqueViolation as err: raise DAOUniqeConstraintException(f'Mapping already exists: {mapping.pd.uid} {mapping.pd.name} -> {mapping.ld.uid} {mapping.ld.name}, starting at {mapping.start_time}.', err) except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('insert_mapping failed.', err) + match err: + case DAOException() | ValueError(): + raise err + case _: + raise DAOException('insert_mapping failed.', err) finally: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def end_mapping(pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: conn = None try: with _get_connection() as conn: _end_mapping(conn, pd, ld) except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('end_mapping failed.', err) + match err: + case DAOException() | ValueError(): + raise err + case _: + raise DAOException('end_mapping failed.', err) + finally: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def _end_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: with conn.cursor() as cursor: mapping: PhysicalToLogicalMapping = _get_current_device_mapping(conn, pd, ld) @@ -731,10 +744,10 @@ def _end_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Opti return if pd is None and ld is None: - raise DAOException('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to end a mapping.') + raise ValueError('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to end a mapping.') if pd is not None and ld is not None: - raise DAOException('Both pd and ld were provided, only give one when ending a mapping.') + raise ValueError('Both pd and ld were provided, only give one when ending a mapping.') p_uid = None if pd is not None: @@ -753,7 +766,6 @@ def _end_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Opti logging.warning(f'No mapping was updated during end_mapping for {pd.uid} {pd.name} -> {ld.uid} {ld.name}') -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_mapping(mapping: PhysicalToLogicalMapping) -> None: """ Remove an entry from the physical_logical_map table. Advised to use end_mapping instead. This method should only be used when permanently deleting a physical or logical device from the database. @@ -770,7 +782,6 @@ def delete_mapping(mapping: PhysicalToLogicalMapping) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: """ Change the is_active column in the database @@ -791,10 +802,10 @@ def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, in def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None): if pd is None and ld is None: - raise DAOException('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') + raise ValueError('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') if pd is not None and ld is not None: - raise DAOException('Both pd and ld were provided, only give one.') + raise ValueError('Both pd and ld were provided, only give one.') p_uid = None if pd is not None: @@ -812,17 +823,19 @@ def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, i return None -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_current_device_mapping(pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None, only_current_mapping: bool = True) -> Optional[PhysicalToLogicalMapping]: conn = None try: - mapping = None with _get_connection() as conn: mapping = _get_current_device_mapping(conn, pd, ld, only_current_mapping) - - return mapping + return mapping except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('get_current_device_mapping failed.', err) + match err: + case DAOException() | ValueError(): + raise err + case _: + raise DAOException('insert_mapping failed.', err) + finally: if conn is not None: free_conn(conn) @@ -832,10 +845,10 @@ def _get_current_device_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = mappings = None if pd is None and ld is None: - raise DAOException('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') + raise ValueError('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') if pd is not None and ld is not None: - raise DAOException('Both pd and ld were provided, only give one.') + raise ValueError('Both pd and ld were provided, only give one.') p_uid = None if pd is not None: @@ -869,13 +882,12 @@ def _get_current_device_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = return None -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_unmapped_physical_devices() -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - cursor.execute('select * from physical_devices where uid not in (select physical_uid from physical_logical_map where end_time is null) order by uid asc') + cursor.execute(f'{_physical_device_select_all_cols} where uid not in (select physical_uid from physical_logical_map where end_time is null) order by uid asc') for r in cursor: dfr = _dict_from_row(cursor.description, r) devs.append(PhysicalDevice.parse_obj(dfr)) @@ -888,7 +900,6 @@ def get_unmapped_physical_devices() -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_logical_device_mappings(ld: Union[LogicalDevice, int]) -> List[PhysicalToLogicalMapping]: conn = None try: @@ -910,7 +921,6 @@ def get_logical_device_mappings(ld: Union[LogicalDevice, int]) -> List[PhysicalT free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[PhysicalToLogicalMapping]: conn = None try: @@ -932,7 +942,6 @@ def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[Physica if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_all_current_mappings(return_uids: bool = True) -> List[PhysicalToLogicalMapping]: conn = None try: @@ -957,7 +966,6 @@ def get_all_current_mappings(return_uids: bool = True) -> List[PhysicalToLogical free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_raw_json_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None): conn = None try: @@ -972,7 +980,6 @@ def add_raw_json_message(source_name: str, ts: datetime, correlation_uuid: str, free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None: conn = None p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] @@ -989,8 +996,7 @@ def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, count: int, only_timestamp: bool) -> List[Dict]: +def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, include_received_at: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]: conn = None if start is None: @@ -1002,22 +1008,53 @@ def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, if count < 1: count = 1 - column_name = 'ts' if only_timestamp else 'json_msg' + if p_uid is None and l_uid is None: + raise ValueError('p_uid or l_uid must be supplied.') + + if p_uid is not None and l_uid is not None: + raise ValueError('Both p_uid and l_uid were provided, only give one.') + + if p_uid is not None: + uid_col_name = 'physical_uid' + uid = p_uid + else: + uid_col_name = 'logical_uid' + uid = l_uid + + if not isinstance(uid, int): + raise TypeError + + if not isinstance(start, datetime): + raise TypeError + + if not isinstance(end, datetime): + raise TypeError + + column_names = ['ts'] + if include_received_at: + column_names.append('received_at') + + if not only_timestamp: + column_names.append('json_msg') + + column_names = ', '.join(column_names) try: + # Order messages by descending timestamp. If a caller asks for one message, they probably want the latest + # message. with _get_connection() as conn, conn.cursor() as cursor: qry = f""" - select {column_name} from physical_timeseries - where physical_uid = %s + select ts {column_names} from physical_timeseries + where {uid_col_name} = %s and ts > %s and ts <= %s - order by ts asc + order by ts desc limit %s """ - args = (p_uid, start, end, count) + args = (uid, start, end, count) cursor.execute(qry, args) - return [row[0] for row in cursor.fetchall()] + return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()] except Exception as err: raise DAOException('get_physical_timeseries_message failed.', err) finally: @@ -1025,7 +1062,34 @@ def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) +def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: + if arg2 is None and arg3 is None: + return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} + + if arg2 is not None and arg3 is None: + if isinstance(arg2, datetime): + return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2} + else: + return arg2 + + msg_dict = {} + if arg3 is not None: + if isinstance(arg3, datetime): + msg_dict: dict = arg2 + if isinstance(arg3, datetime): + msg_dict['received_at'] = arg3.isoformat() + else: + msg_dict['received_at'] = arg3 + else: + msg_dict: dict = arg3 + if isinstance(arg2, datetime): + msg_dict['received_at'] = arg2.isoformat() + else: + msg_dict['received_at'] = arg2 + + return msg_dict + + def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None): conn = None try: @@ -1044,7 +1108,6 @@ def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, """ User and authentication CRUD methods """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_add(uname: str, passwd: str, disabled: bool) -> None: #Generate salted password @@ -1067,7 +1130,6 @@ def user_add(uname: str, passwd: str, disabled: bool) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_rm(uname: str) -> None: conn = None try: @@ -1081,7 +1143,6 @@ def user_rm(uname: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_set_read_only(uname: str, read_only: bool) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1093,11 +1154,10 @@ def user_set_read_only(uname: str, read_only: bool) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_user(uid = None, username = None, auth_token = None) -> User: conn = None if uid is None and username is None and auth_token is None: - raise DAOException('get_user requires at least one parameter') + raise ValueError('get_user requires at least one parameter') else: try: user = None @@ -1120,12 +1180,11 @@ def get_user(uid = None, username = None, auth_token = None) -> User: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_ls() -> List: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("select username from users order by uid") - results=cursor.fetchall() + results = cursor.fetchall() return [i[0] for i in results] except Exception as err: @@ -1135,15 +1194,13 @@ def user_ls() -> List: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_get_token(username, password) -> str | None: conn = None try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("select salt, password, auth_token from users where username=%s",(username,)) - result=cursor.fetchone() - + result = cursor.fetchone() if result is None: return None @@ -1163,7 +1220,6 @@ def user_get_token(username, password) -> str | None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def token_is_valid(user_token) -> bool: ''' Check if token is in database and is valid @@ -1171,7 +1227,7 @@ def token_is_valid(user_token) -> bool: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("select uid from users where auth_token=%s and valid='True'", (user_token,)) - result=cursor.fetchone() + result = cursor.fetchone() if result is None: return False @@ -1183,11 +1239,10 @@ def token_is_valid(user_token) -> bool: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def token_refresh(uname)-> None: +def token_refresh(uname) -> None: # Auth token to be used on other endpoints. - auth_token=os.urandom(64).hex() + auth_token = os.urandom(64).hex() try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1201,10 +1256,9 @@ def token_refresh(uname)-> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_change_password(username: str, new_passwd: str) -> None: - salt=os.urandom(64).hex() - pass_hash=hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() + salt = os.urandom(64).hex() + pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1217,17 +1271,16 @@ def user_change_password(username: str, new_passwd: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: """ Changes user's password and auth token, returns users new auth token upon success """ #Generate salted password - salt=os.urandom(64).hex() - pass_hash=hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() + salt = os.urandom(64).hex() + pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() #Auth token to be used on other endpoints - auth_token=os.urandom(64).hex() + auth_token = os.urandom(64).hex() try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1245,8 +1298,7 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def token_disable(uname)->None: +def token_disable(uname) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1259,7 +1311,6 @@ def token_disable(uname)->None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def token_enable(uname)-> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1271,7 +1322,6 @@ def token_enable(uname)-> None: if conn is not None: free_conn(conn) - """ DATA_NAME_MAP : links incoming data names to a standardised version, so that timeseries data can be more coherent """ @@ -1295,7 +1345,6 @@ def _get_std_name(conn, input_name: str) -> str: return std_name -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_std_name(input_name: str) -> str: """ CASE INSENSITIVE @@ -1311,7 +1360,6 @@ def get_std_name(input_name: str) -> str: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_name_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1324,7 +1372,6 @@ def update_name_map(input_name: str, std_name:str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_name_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1337,7 +1384,6 @@ def add_name_map(input_name: str, std_name:str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def remove_name_map(input_name: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1349,7 +1395,6 @@ def remove_name_map(input_name: str) -> None: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def list_name_map() -> List[Tuple[str, str]]: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1362,7 +1407,6 @@ def list_name_map() -> List[Tuple[str, str]]: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_word_list(full_word: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1374,7 +1418,6 @@ def add_word_list(full_word: str) -> None: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def remove_word_list(full_word: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1405,7 +1448,6 @@ def _get_type_map(conn): return row return type_map -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_type_map(): """ CASE INSENSITIVE @@ -1422,7 +1464,6 @@ def get_type_map(): -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def list_word_list() -> List[str]: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1436,7 +1477,6 @@ def list_word_list() -> List[str]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_type_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1449,7 +1489,6 @@ def update_type_map(input_name: str, std_name:str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_type_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1477,7 +1516,6 @@ def _get_word_list(conn): return word_list -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_word_list(): """ CASE INSENSITIVE @@ -1493,7 +1531,6 @@ def get_word_list(): free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def remove_type_map(input_name: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1506,7 +1543,6 @@ def remove_type_map(input_name: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def list_type_map() -> List[Tuple[str, str]]: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1533,7 +1569,6 @@ def _get_hash_table(conn): return hash_table -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_hash_table(): """ CASE INSENSITIVE @@ -1547,3 +1582,4 @@ def get_hash_table(): finally: if conn is not None: free_conn(conn) + diff --git a/src/python/broker-cli.py b/src/python/broker-cli.py index c1a9dbf6..97d13c55 100755 --- a/src/python/broker-cli.py +++ b/src/python/broker-cli.py @@ -66,6 +66,7 @@ def str_to_dict(val) -> Dict: ## List logical devices ld_ls_parser = ld_sub_parsers.add_parser('ls', help='list logical devices') +ld_ls_parser.add_argument('--properties', action='store_true', help='Include the properties field in the output', dest='include_props', required=False) ld_ls_parser.add_argument('--plain', action='store_true', help='Plain output, not JSON', dest='plain') ## Create logical devices @@ -345,11 +346,19 @@ def main() -> None: elif args.cmd1 == 'ld': if args.cmd2 == 'ls': devs = dao.get_logical_devices() + + if args.include_props: + tmp_list = list(map(lambda dev: dev.dict(), devs)) + else: + tmp_list = list(map(lambda dev: dev.dict(exclude={'properties'}), devs)) + print(pretty_print_json(tmp_list)) + tmp_list = list(map(lambda dev: dev.dict(exclude={'properties'}), devs)) if not args.plain: print(pretty_print_json(tmp_list)) else: plain_pd_list(devs) + elif args.cmd2 == 'create': dev = LogicalDevice.parse_obj(dict_from_file_or_string()) print(dao.create_logical_device(dev)) diff --git a/src/python/delivery/FRRED.py b/src/python/delivery/FRRED.py index 40b168db..aed79c7b 100644 --- a/src/python/delivery/FRRED.py +++ b/src/python/delivery/FRRED.py @@ -147,7 +147,7 @@ def on_message(channel, method, properties, body): _channel.basic_ack(delivery_tag) return - if BrokerConstants.WOMBAT != pd.source_name: + if pd.source_name not in [BrokerConstants.WOMBAT, BrokerConstants.AXISTECH]: _channel.basic_ack(delivery_tag) return diff --git a/src/python/logical_mapper/LogicalMapper.py b/src/python/logical_mapper/LogicalMapper.py index 9a4c159e..0c98b28a 100644 --- a/src/python/logical_mapper/LogicalMapper.py +++ b/src/python/logical_mapper/LogicalMapper.py @@ -18,6 +18,9 @@ """ import asyncio, json, logging, signal +import datetime + +import dateutil.parser import BrokerConstants from pika.exchange_type import ExchangeType @@ -25,10 +28,12 @@ import api.client.DAO as dao import util.LoggingUtil as lu -rx_channel = None -tx_channel = None -mq_client = None -finish = False +_rx_channel = None +_tx_channel = None +_mq_client = None +_finish = False + +_max_delta = datetime.timedelta(hours=-1) def sigterm_handler(sig_no, stack_frame) -> None: @@ -36,12 +41,12 @@ def sigterm_handler(sig_no, stack_frame) -> None: Handle SIGTERM from docker by closing the mq and db connections and setting a flag to tell the main loop to exit. """ - global finish, mq_client + global _finish, _mq_client - logging.info(f'{signal.strsignal(sig_no)}, setting finish to True') - finish = True + logging.info(f'{signal.strsignal(sig_no)}, setting _finish to True') + _finish = True dao.stop() - mq_client.stop() + _mq_client.stop() async def main(): @@ -53,25 +58,25 @@ async def main(): It would be good to find a better way to do nothing than the current loop. """ - global mq_client, rx_channel, tx_channel, finish + global _mq_client, _rx_channel, _tx_channel, _finish logging.info('===============================================================') logging.info(' STARTING LOGICAL MAPPER') logging.info('===============================================================') - rx_channel = mq.RxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='lm_physical_timeseries', on_message=on_message) - tx_channel = mq.TxChannel(exchange_name=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout) - mq_client = mq.RabbitMQConnection(channels=[rx_channel, tx_channel]) + _rx_channel = mq.RxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='lm_physical_timeseries', on_message=on_message) + _tx_channel = mq.TxChannel(exchange_name=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout) + _mq_client = mq.RabbitMQConnection(channels=[_rx_channel, _tx_channel]) - asyncio.create_task(mq_client.connect()) + asyncio.create_task(_mq_client.connect()) - while not (rx_channel.is_open and tx_channel.is_open): + while not (_rx_channel.is_open and _tx_channel.is_open): await asyncio.sleep(0) - while not finish: + while not _finish: await asyncio.sleep(2) - while not mq_client.stopped: + while not _mq_client.stopped: await asyncio.sleep(1) @@ -80,14 +85,14 @@ def on_message(channel, method, properties, body): This function is called when a message arrives from RabbitMQ. """ - global rx_channel, tx_channel, finish + global _rx_channel, _tx_channel, _finish delivery_tag = method.delivery_tag - # If the finish flag is set, reject the message so RabbitMQ will re-queue it + # If the _finish flag is set, reject the message so RabbitMQ will re-queue it # and return early. - if finish: - rx_channel._channel.basic_reject(delivery_tag) + if _finish: + _rx_channel._channel.basic_reject(delivery_tag) return try: @@ -99,7 +104,7 @@ def on_message(channel, method, properties, body): lu.cid_logger.error(f'Physical device not found, cannot continue. Dropping message.', extra=msg) # Ack the message, even though we cannot process it. We don't want it redelivered. # We can change this to a Nack if that would provide extra context somewhere. - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) return lu.cid_logger.info(f'Accepted message from {pd.name}', extra=msg) @@ -116,7 +121,7 @@ def on_message(channel, method, properties, body): # Ack the message, even though we cannot process it. We don't want it redelivered. # We can change this to a Nack if that would provide extra context somewhere. - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) return msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] = mapping.ld.uid @@ -124,19 +129,39 @@ def on_message(channel, method, properties, body): dao.insert_physical_timeseries_message(msg) ld = mapping.ld - ld.last_seen = msg[BrokerConstants.TIMESTAMP_KEY] + + # Determine if the message has a future timestamp. + ts_str: datetime.datetime = msg[BrokerConstants.TIMESTAMP_KEY] + ts = dateutil.parser.isoparse(ts_str) + utc_now = datetime.datetime.now(datetime.timezone.utc) + ts_delta = utc_now - ts + + # Drop messages with a timestamp more than 1 hour in the future. + if ts_delta < _max_delta: + lu.cid_logger.warning(f'Message with future timestamp. Dropping message.', extra=msg) + # Ack the message, even though we cannot process it. We don't want it redelivered. + # We can change this to a Nack if that would provide extra context somewhere. + _rx_channel._channel.basic_ack(delivery_tag) + return + + if ts > utc_now: + # If the timestamp is a bit in the future then make the last seen time 'now'. + ld.last_seen = utc_now + else: + ld.last_seen = ts + lu.cid_logger.info(f'Timestamp from message for LD last seen update: {ld.last_seen}', extra=msg) ld.properties[BrokerConstants.LAST_MSG] = msg dao.update_logical_device(ld) - tx_channel.publish_message('logical_timeseries', msg) + _tx_channel.publish_message('logical_timeseries', msg) # This tells RabbitMQ the message is handled and can be deleted from the queue. - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) except BaseException as e: logging.exception('Error while processing message') - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) if __name__ == '__main__': diff --git a/src/python/pdmodels/Models.py b/src/python/pdmodels/Models.py index c3afc72b..a69dd7f1 100644 --- a/src/python/pdmodels/Models.py +++ b/src/python/pdmodels/Models.py @@ -4,8 +4,8 @@ class Location(BaseModel): - lat: float - long: float + lat: Optional[float] + long: Optional[float] @staticmethod def from_ttn_device(ttn_dev: Dict): @@ -22,24 +22,25 @@ def from_ttn_device(ttn_dev: Dict): # Allowing extra attributes in this class to make life easier for the webapp - it can pass extra info # to the templates in the device object rather than passing in lists of mappings etc. -class PhysicalDevice(BaseModel, extra=Extra.allow): +class BaseDevice(BaseModel, extra=Extra.allow): uid: Optional[int] - source_name: str name: str location: Optional[Location] last_seen: Optional[datetime] - source_ids: Dict = {} properties: Dict = {} # Allowing extra attributes in this class to make life easier for the webapp - it can pass extra info # to the templates in the device object rather than passing in lists of mappings etc. -class LogicalDevice(BaseModel, extra=Extra.allow): - uid: Optional[int] - name: str - location: Optional[Location] - last_seen: Optional[datetime] - properties = {} +class PhysicalDevice(BaseDevice, extra=Extra.allow): + source_name: str + source_ids: Dict = {} + + +# Allowing extra attributes in this class to make life easier for the webapp - it can pass extra info +# to the templates in the device object rather than passing in lists of mappings etc. +class LogicalDevice(BaseDevice, extra=Extra.allow): + pass class PhysicalToLogicalMapping(BaseModel): diff --git a/src/python/pollers/axistech.py b/src/python/pollers/axistech.py new file mode 100644 index 00000000..56857218 --- /dev/null +++ b/src/python/pollers/axistech.py @@ -0,0 +1,286 @@ +import datetime as dt +import json +import logging +import os +import time +import uuid +from typing import Dict, Optional + +import dateutil.tz as dtz +import dateutil.parser as dup +import pandas as pd +import pika +import pika.adapters.blocking_connection as pab +import pika.channel +import pika.spec +import pprint +import requests +from pika.exchange_type import ExchangeType + +import BrokerConstants +import api.client.DAO as dao +import util.LoggingUtil as lu +from pdmodels.Models import PhysicalDevice + +_user = os.environ['RABBITMQ_DEFAULT_USER'] +_passwd = os.environ['RABBITMQ_DEFAULT_PASS'] +_host = os.environ['RABBITMQ_HOST'] +_port = os.environ['RABBITMQ_PORT'] + +_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F' + +_connection: pab.BlockingConnection = None +_channel: pab.BlockingChannel = None + +_api_token = os.getenv('AXISTECH_TOKEN') + +_recent_msg_times: Dict[str, dt.datetime] = {} +""" +Holds the most recent message timestamp for each AxisTech device. Keyed by device code. +""" + + +_sydney_tz = dtz.gettz('Australia/Sydney') + + +def local_time_str(ts: dt.datetime) -> str: + """ + Return an AE[S|D]T string representation of ts, eg '16/01/2024 23:11' + """ + return ts.astimezone(_sydney_tz).strftime('%d/%m/%Y %H:%M') + + +def z_ts(ts: dt.datetime) -> str: + """ + AxisTech will only accept start and end timestamps with a time component in the form YYYY-MM-DDThh:mm:ssZ, + so this function takes a datetime object and returns it formatted as described, by converting to UTC if + necessary and then replacing the +00:00 tz suffix with Z. + """ + return ts.astimezone(dt.timezone.utc).isoformat(timespec='seconds').replace('+00:00', 'Z') + + +def make_msg(row: pd.Series) -> Dict: + """ + Transform a row from the DataFrame with the AxisTech data into a row with an IoTa format message. + """ + serial_no, ts = row.name + values = dict(zip(row.index.values, row.values)) + correlation_id = str(uuid.uuid4()) + str_timestamp = ts.isoformat(timespec='seconds') + if str_timestamp.endswith('+00:00'): + str_timestamp = str_timestamp.replace('+00:00', 'Z') + + source_ids = {'serial_no': serial_no, 'sdi-12': [f'813AXSTECH AWS 000{serial_no}']} + msg = {BrokerConstants.TIMESTAMP_KEY: str_timestamp, 'source_ids': source_ids, + BrokerConstants.TIMESERIES_KEY: [], BrokerConstants.CORRELATION_ID_KEY: correlation_id} + + for name, value in values.items(): + msg['timeseries'].append({'name': name, 'value': value}) + + return msg + + +def process_msg(msg: Dict) -> None: + """ + Send a message onto the rest of IoTa. + """ + global _connection, _channel + + ts = dup.parse(msg[BrokerConstants.TIMESTAMP_KEY]) + serial_no = msg["source_ids"]["serial_no"] + source_ids = msg['source_ids'] + + dao.add_raw_json_message(BrokerConstants.AXISTECH, ts, msg[BrokerConstants.CORRELATION_ID_KEY], msg) + + pds = dao.get_pyhsical_devices_using_source_ids(BrokerConstants.AXISTECH, source_ids) + if len(pds) < 1: + lu.cid_logger.info(f'Physical device not found for device {serial_no}, creating a new one.', extra=msg) + + props = {BrokerConstants.CREATION_CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY], + BrokerConstants.LAST_MSG: json.dumps(msg)} + + pdev = PhysicalDevice(source_name=BrokerConstants.AXISTECH, name=serial_no, location=None, + source_ids=source_ids, properties=props) + pdev = dao.create_physical_device(pdev) + else: + lu.cid_logger.info(f'Accepted message from {serial_no}, updating last seen time to {ts}.', extra=msg) + pdev = pds[0] + pdev.properties[BrokerConstants.LAST_MSG] = json.dumps(msg) + + msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] = pdev.uid + lu.cid_logger.info(f'Posting msg: {msg}', extra=msg) + _channel.basic_publish(BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, 'physical_timeseries', json.dumps(msg).encode('UTF-8')) + _connection.process_data_events(0) + + # Update last seen here so if the publish fails and the process restarts, the message will be reprocessed because + # it is less than the device's last_seen time. + pdev.last_seen = ts + dao.update_physical_device(pdev) + + +def get_messages(start: dt.datetime, end: dt.datetime) -> Optional[pd.DataFrame]: + global _recent_msg_times + + drop_cols = ['wind_dir_var_avg', 'uv_index_avg'] + """ + Columns in the AxisTech message that have no equivalent in the SCMN ATM-41 messages, so these get dropped. + """ + + atm41_col_names = ['8_Precipitation', '8_AirTemperature', '8_WindSpeed', '8_WindSpeed_max', '8_RH', '8_AirPressure', + '8_DeltaT', '8_DewPoint', '8_Solar', '8_WindDirection', '8_WindSpeed_min'] + """ + The variable names to use to make the AxisTech message look like an SCMN ATM-41 message. + """ + + try: + url = f'https://data.exchange.axisstream.co/?token={_api_token}&startDate={z_ts(start)}&endDate={z_ts(end)}' + r = requests.get(url) + r.raise_for_status() + data = r.json() + + if 'bb5d4f86-6eaa-494d-abcc-8f2e9b66b214' not in data['data']: + logging.warning('Did not find expected UUID in data object.') + logging.warning(pprint.pformat(data)) + return None + + frames = [] + counter = 0 + for info in data['data']['bb5d4f86-6eaa-494d-abcc-8f2e9b66b214']['weather']: + code = info['code'] + ts = dup.parse(info['time']) + if code not in _recent_msg_times or ts > _recent_msg_times[code]: + _recent_msg_times[code] = ts + frame = pd.DataFrame(info, index=[counter]) + frames.append(frame) + counter += 1 + + if counter < 1: + return None + + df = pd.concat(frames, axis=0) + df['rainfall'] = df['rainfall'].astype(float) + df['humidity_avg'] = df['humidity_avg'].astype(float) + df['temperature_avg'] = df['temperature_avg'].astype(float) + df['wind_speed_avg'] = df['wind_speed_avg'].astype(float) + df['wind_speed_max'] = df['wind_speed_max'].astype(float) + df['atmos_pressure_avg'] = df['atmos_pressure_avg'].astype(float) + df['deltat_avg'] = df['deltat_avg'].astype(float) + df['dewpoint_avg'] = df['dewpoint_avg'].astype(float) + df['solar_rad_avg'] = df['solar_rad_avg'].astype(float) + df['uv_index_avg'] = df['uv_index_avg'].astype(float) + df['wind_dir_deg_avg'] = df['wind_dir_deg_avg'].astype(float) + df['wind_speed_min'] = df['wind_speed_min'].astype(float) + df['time'] = pd.to_datetime(df['time']) + + # Use a MultiIndex to make grouping by code easy later on. + df.set_index(['code', 'time'], inplace=True) + df.index = df.index.sort_values() + + # Apply column header changes + df.drop(drop_cols, inplace=True, axis=1) + df.columns = atm41_col_names + + return df + + except BaseException as e: + logging.exception(e) + + return None + + +def poll() -> None: + # The reason for such a large window time is that the AxisTech API is slow to provide new messages + # and seems to lag hours behind. If we poll every hour and don't ask for too big a window, it should not + # place too high a load on their servers. + # + # If we only ever polled for say the last hour, we'd rarely if ever get any messages. + end_ts = dt.datetime.now(dt.timezone.utc) + start_ts = end_ts - dt.timedelta(days=5) + + # Find the earliest 'most recent' message time. If one can be found there is no point asking for + # messages from before then because they've already been seen. One hole in this logic would be + # if a new device is added to AxisTech, it's first messages may be missed. + some_ts = None + for code, ts in _recent_msg_times.items(): + if some_ts is None or ts < some_ts: + some_ts = ts + + # If a message has been seen more recently than the default start_ts value, only ask for messages since the + # timestamp of the received messages. This risks missing messages from a code that are older than the default + # start of the window if the code has not sent a message in longer than that, but the alternative is to risk + # the window growing indefinitely if a device goes offline. + if some_ts is not None and some_ts > start_ts: + logging.info(f'Adjusting start_ts, was {local_time_str(start_ts)}, will be {local_time_str(some_ts)}') + start_ts = some_ts + + logging.info(f'Polling for message between {z_ts(start_ts)} and {z_ts(end_ts)}, [{local_time_str(start_ts)} to {local_time_str(end_ts)}]') + msgs_df = get_messages(start_ts, end_ts) + if msgs_df is None: + logging.info('No new messages') + return + + # Group the dataframe rows by device code. + code_groups = msgs_df.groupby(level=0) + + logging.info('New messages') + # For each device code subset of the dataframe, apply the function to create the messages. The function is given + # a pd.Series that contains all the info for one row. + for code, code_df in code_groups: + code_df.apply(make_msg, axis=1).apply(process_msg) + + logging.info(f'Latest message times: {_recent_msg_times}') + + +def main() -> None: + global _connection, _channel + + logging.info('===============================================================') + logging.info(' STARTING AXISTECH POLLER') + logging.info('===============================================================') + + dao.add_physical_source(BrokerConstants.AXISTECH) + + # Initialise the most recent message timestamp cache. This is used to control the time window + # used in the AxisTech API calls. + for pdev in dao.get_physical_devices_from_source(BrokerConstants.AXISTECH): + _recent_msg_times[pdev.source_ids['serial_no']] = pdev.last_seen + + try: + logging.info('Opening connection') + conn_attempts = 0 + backoff = 10 + while _connection is None: + try: + _connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str)) + except: + conn_attempts += 1 + logging.warning(f'Connection to RabbitMQ attempt {conn_attempts} failed.') + + if conn_attempts % 5 == 0 and backoff < 60: + backoff += 10 + + time.sleep(backoff) + + logging.info('Opening channel') + _channel = _connection.channel() + _channel.basic_qos(prefetch_count=1) + logging.info('Declaring exchange') + _channel.exchange_declare(exchange=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, + exchange_type=ExchangeType.fanout, durable=True) + + sleep_time = 1800 # seconds + while True: + poll() + _connection.process_data_events(sleep_time) + + except KeyboardInterrupt: + logging.info('Stopping') + dao.stop() + + if _connection is not None: + _connection.close() + + +if __name__ == '__main__': + main() + diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 2bdada76..206b9f39 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -252,6 +252,7 @@ async def update_logical_device(device: LogicalDevice) -> LogicalDevice: except dao.DAODeviceNotFound as daonf: raise HTTPException(status_code=404, detail=daonf.msg) except dao.DAOException as err: + logging.exception(err) raise HTTPException(status_code=500, detail=err.msg) @@ -290,6 +291,8 @@ async def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: raise HTTPException(status_code=400, detail=err.msg) except dao.DAOException as err: raise HTTPException(status_code=500, detail=err.msg) + except ValueError as err: + raise HTTPException(status_code=400, detail=str(err)) @router.get("/mappings/current/", tags=['device mapping'], response_model=List[PhysicalToLogicalMapping], dependencies=[Depends(token_auth_scheme)]) @@ -459,21 +462,24 @@ async def end_mapping_of_logical_uid(uid: int) -> None: MESSAGE RELATED --------------------------------------------------------------------------""" -@router.get("/physical/messages/{uid}", tags=['messages']) +@router.get("/messages", tags=['Messages'], dependencies=[Depends(token_auth_scheme)]) async def get_physical_timeseries( request: Request, - uid: int, + p_uid: int | None = None, + l_uid: int | None = None, count: Annotated[int | None, Query(gt=0, le=65536)] = None, last: str = None, start: datetime.datetime = None, end: datetime.datetime = None, - only_timestamp: bool = False): + include_received_at: bool = False, + only_timestamp: bool = False) -> List[Dict]: """ Get the physical_timeseries entries described by the physical device uid and the parameters. Args: request: The HTTP request object. - uid: The unique identifier of the physical device. + p_uid: The unique identifier of a physical device. Mutually exclusive with l_uid. + l_uid: The unique identifier of a logical device. Mutually exclusive with p_uid. count: The maximum number of entries to return. last: Return messages from the last nx interval where n is a number and x is 'h'ours, 'd'ays, 'w'eeks, 'm'onths, 'y'ears. start: The start date and time of the time range. @@ -487,7 +493,6 @@ async def get_physical_timeseries( HTTPException: If an error occurs. """ try: - #logging.info(f'start: {start.isoformat() if start is not None else start}, end: {end.isoformat() if end is not None else end}, count: {count}, only_timestamp: {only_timestamp}') if end is not None: if start is not None and start >= end: raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "start"], "msg": "ensure start value is less than end"}]}) @@ -503,8 +508,7 @@ async def get_physical_timeseries( except: raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "last"], "msg": "the first part of last must be an integer"}]}) - unit = last[-1] - + unit = str(last[-1]).lower() if unit == 'h': diff = datetime.timedelta(hours=i) elif unit == 'd': @@ -520,7 +524,12 @@ async def get_physical_timeseries( start = end - diff - msgs = dao.get_physical_timeseries_message(uid, start, end, count, only_timestamp) + msgs = None + if p_uid is not None: + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, p_uid=p_uid) + elif l_uid is not None: + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, l_uid=l_uid) + if msgs is None: raise HTTPException(status_code=404, detail="Failed to retrieve messages") @@ -530,6 +539,7 @@ async def get_physical_timeseries( except dao.DAOException as err: raise HTTPException(status_code=500, detail=err.msg) + """-------------------------------------------------------------------------- USER AUTHENTICATION --------------------------------------------------------------------------""" diff --git a/src/python/util/Extract.py b/src/python/util/Extract.py new file mode 100644 index 00000000..b3a2a7ed --- /dev/null +++ b/src/python/util/Extract.py @@ -0,0 +1,103 @@ +import argparse as ap +import datetime as dt +import logging +import os +from typing import Any, List, Optional +import pandas as pd +import psycopg2 as pg + +import LoggingUtil + +def get_data_for_logical_device(l_uid: int, start_ts: Optional[dt.datetime] = None, end_ts: Optional[dt.datetime] = None) -> pd.DataFrame: + qry = """ + SELECT logical_uid, physical_uid, ts, received_at, ts_delta, json_msg->'timeseries' AS ts_array FROM physical_timeseries + WHERE logical_uid = %s + """ + + qry_args: List[Any] = [l_uid] + + if start_ts is not None: + qry += ' AND ts >= %s' + qry_args.append(start_ts) + + if end_ts is not None: + qry += ' AND ts < %s' + qry_args.append(end_ts) + + dataset = [] + with pg.connect() as conn, conn.cursor() as curs: + curs.execute('SELECT name from logical_devices where uid = %s', (l_uid, )) + if curs.rowcount != 1: # If > 1, how? + logging.error(f'Did not find a logical device with id {l_uid}') + exit(1) + + dev_name: str = str(curs.fetchone()[0]) + dev_name = dev_name.replace(' ', '_') + logging.info(f'Fetching data for {l_uid} / {dev_name}') + + logging.debug(qry) + logging.debug(curs.mogrify(qry, qry_args)) + curs.execute(qry, qry_args) + if curs.rowcount < 1: + logging.info(f'No data for {l_uid} / {dev_name}') + exit(0) + + while True: + rows = curs.fetchmany(size=2000) + print(f'fetched {len(rows)} rows') + if len(rows) < 1: + break + + for row in rows: + dset_item = {'l_uid': row[0], 'p_uid': row[1], 'ts': row[2], 'received_at': row[3], 'ts_delta': row[4]} + for ts_obj in row[5]: + dset_item[ts_obj['name']] = ts_obj['value'] + dataset.append(dset_item) + + df = pd.DataFrame(dataset) + df.set_index(['l_uid', 'ts'], inplace=True) + df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True) + df.to_csv(f'{l_uid}_{dev_name}.csv') + return df + + +_default_host = 'localhost' +_default_port = '5432' +_default_dbname = 'broker' # This is an IoTa utility, so use the IoTa database name by default. +_default_user = 'postgres' + +parser = ap.ArgumentParser(description='Extract data from the IoTa database') +parser.add_argument('-H', dest='host', help='host to connect to, default = localhost') +parser.add_argument('-p', dest='port', help='port number to connect to, default = 5432') +parser.add_argument('-d', dest='dbname', help='database name to connect to, default = broker') +parser.add_argument('-U', dest='user', help='User name to connect as, default = postgres') +parser.add_argument('-P', dest='password', help='password to connect with, prefer to set PGPASSWORD env var') +parser.add_argument('-l', dest='l_uid', type=int, help='logical device id') +parser.add_argument('-s', dest='start_time', type=dt.datetime.fromisoformat, help='earliest timestamp in ISO-8601 format (>=)') +parser.add_argument('-e', dest='end_time', type=dt.datetime.fromisoformat, help='latest timestamp in ISO-8601 format (<)') + +args = parser.parse_args() + +# Give precendence to command line args, fall back to env var value if it is set, +# finally, fall back to a default value. +_host = os.getenv('PGHOST', _default_host) if args.host is None else args.host +if _host is not None: + os.environ['PGHOST'] = _host + +_port = os.getenv('PGPORT', _default_port) if args.port is None else args.port +if _port is not None: + os.environ['PGPORT'] = _port + +_dbname = os.getenv('PGDATABASE', _default_dbname)if args.dbname is None else args.dbname +if _dbname is not None: + os.environ['PGDATABASE'] = _dbname + +_user = os.getenv('PGUSER', _default_user) if args.user is None else args.user +if _user is not None: + os.environ['PGUSER'] = _user + +_password = os.getenv('PGPASSWORD') if args.password is None else args.password +if _password is not None: + os.environ['PGPASSWORD'] = _password + +get_data_for_logical_device(args.l_uid, args.start_time, args.end_time) diff --git a/src/www/app/main.py b/src/www/app/main.py index 6b2085df..a9ae00ae 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -1,9 +1,13 @@ +import atexit +import logging +import time from sys import stderr from typing import Tuple from requests.auth import parse_dict_header from flask import Flask, render_template, request, make_response, redirect, url_for, session, send_from_directory, jsonify + import folium -import paho.mqtt.publish as publish +import paho.mqtt.client as mqtt import os from datetime import timedelta, datetime, timezone import re @@ -13,6 +17,26 @@ from werkzeug.middleware.dispatcher import DispatcherMiddleware from werkzeug.wrappers import Response +from pdmodels.Models import PhysicalDevice, LogicalDevice, PhysicalToLogicalMapping, Location + +from logging.config import dictConfig + +dictConfig({ + 'version': 1, + 'formatters': {'default': { + 'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', + }}, + 'handlers': {'wsgi': { + 'class': 'logging.StreamHandler', + 'stream': 'ext://flask.logging.wsgi_errors_stream', + 'formatter': 'default' + }}, + 'root': { + 'level': 'INFO', + 'handlers': ['wsgi'] + } +}) + app = Flask(__name__, static_url_path='/static') _location_re = re.compile(r'([+-]?\d+\.?\d*)\s*,\s*([+-]?\d+\.?\d*)') @@ -64,6 +88,63 @@ def time_since(date: datetime) -> str: return f'{seconds} seconds ago' +#------------- +# MQTT section +#------------- + +_mqtt_client = None +_wombat_config_msgs = {} + +def mqtt_on_connect(client, userdata, flags, rc): + global _mqtt_client + app.logger.info('MQTT connected') + _mqtt_client.subscribe(f'wombat/+') + +def mqtt_on_message(client, userdata, msg): + tp = msg.topic.split('/') + if len(tp) == 2: + sn = tp[1] + if len(msg.payload) > 0: + script = str(msg.payload, encoding='UTF-8') + _wombat_config_msgs[sn] = script + else: + _wombat_config_msgs[sn] = None + + +def _send_mqtt_msg_via_sn(serial_nos: List[str], msg: str | None) -> None: + """ + Publish a config script to a list of Wombats. + + Params: + serial_nos: A list of Wombat serial numbers. + msg: The config script to send, use None to clear the script. + """ + if _mqtt_client.is_connected(): + for sn in serial_nos: + _mqtt_client.publish(f'wombat/{sn}', msg, 1, True) + + +def _send_mqtt_msg_via_uids(p_uids: str | List[int], msg: str | None) -> None: + """ + Publish a config script to a list of Wombats. + + Params: + p_uids: A list of integer physical device ids or a string of the form "1,2,3". + msg: The config script to send, use None to clear the script. + """ + if _mqtt_client.is_connected(): + if isinstance(p_uids, str): + p_uids = list(map(lambda i: int(i), p_uids.split(','))) + + serial_nos = [] + for p_uid in p_uids: + pd = get_physical_device(p_uid, session.get('token')) + if pd is not None and pd.source_ids.get('serial_no', None) is not None: + serial_nos.append(pd.source_ids['serial_no']) + + _send_mqtt_msg_via_sn(serial_nos, msg) + + """ Session cookie config @@ -81,7 +162,7 @@ def check_user_logged_in(): if not session.get('token'): if request.path != '/login' and request.path != '/static/main.css': # Stores the url user tried to go to in session so when they log in, we take them back to it - session['original_url'] = request.url + session['original_url'] = request.url return redirect(url_for('login'), code=302) @@ -101,10 +182,10 @@ def login(): user_token = get_user_token(username=username, password=password) session['user'] = username session['token'] = user_token - + if 'original_url' in session: return redirect(session.pop('original_url')) - + return redirect(url_for('index')) return render_template("login.html") @@ -143,37 +224,47 @@ def account(): return render_template('account.html') -@app.route('/get_wombat_logs', methods=['GET']) +@app.route('/wombats/config/logs', methods=['GET']) def get_wombat_logs(): - p_uids = list(map(lambda i: int(i), request.args['uids'].split(','))) - app.logger.info(f'get_wombat_logs for p_uids: {p_uids}') - msgs = [] - - for p_uid in p_uids: - pd = get_physical_device(p_uid, session.get('token')) - if pd is not None: - app.logger.info(f'Wombat serial no: {pd.source_ids["serial_no"]}') - msgs.append((f'wombat/{pd.source_ids["serial_no"]}', 'ftp login\nupload log.txt\nftp logout\n', 1, True)) - - # NOTE! Assuming default port of 1883. - publish.multiple(msgs, hostname=_mqtt_host, auth={'username': _mqtt_user, 'password': _mqtt_pass}) + _send_mqtt_msg_via_uids(request.args['uids'], 'ftp login\nftp upload log.txt\nftp logout\n') + return "OK" + + +@app.route('/wombats/config/data', methods=['GET']) +def get_wombat_data(): + _send_mqtt_msg_via_uids(request.args['uids'], 'ftp login\nftp upload data.json\nftp logout\n') return "OK" -@app.route('/wombat_ota', methods=['GET']) +@app.route('/wombats/config/ota', methods=['GET']) def wombat_ota(): - p_uids = list(map(lambda i: int(i), request.args['uids'].split(','))) - app.logger.info(f'wombat_ota for p_uids: {p_uids}') - msgs = [] - - for p_uid in p_uids: - pd = get_physical_device(p_uid, session.get('token')) - if pd is not None: - app.logger.info(f'Wombat serial no: {pd.source_ids["serial_no"]}') - msgs.append((f'wombat/{pd.source_ids["serial_no"]}', 'config ota 1\nconfig reboot\n', 1, True)) - - # NOTE! Assuming default port of 1883. - publish.multiple(msgs, hostname=_mqtt_host, auth={'username': _mqtt_user, 'password': _mqtt_pass}) + _send_mqtt_msg_via_uids(request.args['uids'], 'config ota 1\nconfig reboot\n') + return "OK" + + +@app.route('/wombats/config/clear') +def clear_wombat_config_script(): + """ + Clear the config scripts for the Wombats identified by the sn request parameter. + + If the id request parameter is "uid" then sn must contain a list of phyiscal device ids. If id is "sn" + then sn must contain a list of Wombat serial numbers. + """ + sn = request.args['sn'] + if sn is not None: + id_type = request.args.get('id', 'id') + app.logger.info(f'Clearing config script for {sn}, id={id_type}') + + # Publish an empty retained message to clear the config script message from the topic. + if id_type == 'uid': + ids = list(map(lambda i: int(i), sn.split(','))) + app.logger.info(ids) + _send_mqtt_msg_via_uids(ids, None) + else: + ids = sn.split(',') + app.logger.info(ids) + _send_mqtt_msg_via_sn(ids, None) + return "OK" @@ -192,8 +283,10 @@ def wombats(): mappings = get_current_mappings(session.get('token')) for dev in physical_devices: + sn = dev.source_ids["serial_no"] ccid = dev.source_ids.get('ccid', None) fw_version: str = dev.source_ids.get('firmware', None) + config_script: str | None = _wombat_config_msgs.get(dev.source_ids["serial_no"], None) if ccid is not None: setattr(dev, 'ccid', ccid) @@ -201,6 +294,11 @@ def wombats(): if fw_version is not None: setattr(dev, 'fw', fw_version) + if config_script is not None: + setattr(dev, 'script', config_script) + + setattr(dev, 'sn', sn) + setattr(dev, 'ts_sort', dev.last_seen.timestamp()) dev.last_seen = time_since(dev.last_seen) @@ -387,7 +485,7 @@ def show_map(): # folium.Marker([-31.956194913619864, 115.85911692112582], popup="Mt. Hood Meadows", tooltip='click me').add_to(center_map) data: List[LogicalDevice] = get_logical_devices(session.get('token'), include_properties=True) for dev in data: - if dev.location is not None: + if dev.location is not None and dev.location.lat is not None and dev.location.long is not None: color = 'blue' if dev.last_seen is None: @@ -431,7 +529,7 @@ def CreateMapping(): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -445,7 +543,7 @@ def CreateNote(noteText, uid): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -457,7 +555,7 @@ def DeleteNote(noteUID): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -470,7 +568,7 @@ def EditNote(noteText, uid): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -492,23 +590,23 @@ def UpdatePhysicalDevice(): update_physical_device(uid, new_name, location, token) return 'Success', 200 - + except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code -@app.route('/update-mappings', methods=['GET']) +@app.route('/update-mappings', methods=['PATCH']) def UpdateMappings(): try: - insert_device_mapping(request.args['physicalDevice_mapping'], request.args['logicalDevice_mapping'], session.get('token')) + insert_device_mapping(request.form.get('physicalDevice_mapping'), request.form.get('logicalDevice_mapping'), session.get('token')) return 'Success', 200 except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -536,7 +634,7 @@ def ToggleDeviceMapping(): is_active = request.args['is_active'] toggle_device_mapping(uid=uid, dev_type=dev_type, is_active=is_active, token=session.get('token')) - + return 'Success', 200 @@ -560,9 +658,10 @@ def UpdateLogicalDevice(): return 'Success', 200 except requests.exceptions.HTTPError as e: + logging.exception(e) if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -583,7 +682,7 @@ def format_time_stamp(unformatted_time: datetime) -> str: def format_location_string(location: Location) -> str: formatted_location = '' - if location is not None: + if location is not None and location.lat is not None and location.long is not None: formatted_location = f'{location.lat:.5f}, {location.long:.5f}' return formatted_location @@ -637,6 +736,18 @@ def generate_link(data): return link +def exit_handler(): + global _mqtt_client + + app.logger.info('Stopping') + _mqtt_client.disconnect() + + while _mqtt_client.is_connected(): + time.sleep(0.5) + + _mqtt_client.loop_stop() + app.logger.info('Done') + """ parse the timeseries data received by api into what is expected by graph """ @@ -719,5 +830,19 @@ def parse_ts_table_data(raw_data): return result + if __name__ == '__main__': + app.logger.info('Starting') + _mqtt_client = mqtt.Client() + _mqtt_client.username_pw_set(_mqtt_user, _mqtt_pass) + _mqtt_client.on_connect = mqtt_on_connect + _mqtt_client.on_message = mqtt_on_message + + app.logger.info('Connecting to MQTT broker') + _mqtt_client.connect_async(_mqtt_host) + app.logger.info('Starting MQTT thread') + _mqtt_client.loop_start() + + atexit.register(exit_handler) + app.run(port='5000', host='0.0.0.0') diff --git a/src/www/app/templates/base.html b/src/www/app/templates/base.html index 889ec761..c72c248e 100644 --- a/src/www/app/templates/base.html +++ b/src/www/app/templates/base.html @@ -14,7 +14,7 @@
    -
  • Fetch logs
  • -
  • FW OTA
  • +
  • Fetch logs
  • +
  • Fetch data
  • +
  • FW OTA
  • +
  • Clear Scripts
@@ -22,6 +103,7 @@ Current mapping Ubidots + @@ -58,6 +140,11 @@ class="material-icons">open_in_new {% endif %} + + {% if physicalDevice.script is defined %} + install_desktop + {% endif %} + {% endfor %} diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index 4a04f112..b3c97524 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -1,4 +1,4 @@ -import json +import json, os from typing import List import sys from typing import List @@ -9,7 +9,7 @@ from pdmodels.Models import PhysicalDevice, LogicalDevice, PhysicalToLogicalMapping, DeviceNote, Location -end_point = 'http://restapi:5687' +end_point = os.getenv('IOTA_API_URL', 'http://restapi:5687') def get_sources(token: str) -> List[str]: """ @@ -256,6 +256,10 @@ def create_logical_device(physical_device: PhysicalDevice, token: str) ->str: "location": physical_device.location, } + # This is to work around a problem that turned up after the change to PostGIS. + if logicalJson['location'] is not None and (logicalJson['location'].lat is None or logicalJson['location'].long is None): + logicalJson.pop('location') + response = requests.post(f'{end_point}/broker/api/logical/devices/', json=logicalJson, headers=headers) response.raise_for_status() @@ -361,8 +365,8 @@ def change_user_password(password: str, token: str) -> str: Params: password: str - User's new password token: str - User's authentication token - - reutrn: + + return: token: str - User's new authentication token """ headers = {"Authorization": f"Bearer {token}"} diff --git a/test/python/test_dao.py b/test/python/test_dao.py index 2844e496..1fa29f77 100644 --- a/test/python/test_dao.py +++ b/test/python/test_dao.py @@ -10,6 +10,17 @@ logger = logging.getLogger(__name__) logging.captureWarnings(True) + +def _create_test_user() -> str: + test_uname=os.urandom(4).hex() + dao.user_add(uname=test_uname, passwd='password', disabled=False) + return test_uname + + +def _now() -> datetime.datetime: + return datetime.datetime.now(tz=datetime.timezone.utc) + + class TestDAO(unittest.TestCase): def setUp(self): @@ -22,7 +33,8 @@ def setUp(self): truncate physical_logical_map cascade; truncate device_notes cascade; truncate physical_timeseries cascade; - truncate raw_messages cascade''') + truncate raw_messages cascade; + delete from sources where source_name = 'axistech';''') finally: dao.free_conn(conn) @@ -30,11 +42,25 @@ def test_get_all_physical_sources(self): sources = dao.get_all_physical_sources() self.assertEqual(sources, ['greenbrain', 'ict_eagleio', 'ttn', 'wombat', 'ydoc']) - def now(self): - return datetime.datetime.now(tz=datetime.timezone.utc) + def test_add_physical_source(self): + sources = dao.get_all_physical_sources() + self.assertFalse(BrokerConstants.AXISTECH in sources) + dao.add_physical_source(BrokerConstants.AXISTECH) + sources = dao.get_all_physical_sources() + self.assertTrue(BrokerConstants.AXISTECH in sources) + + # Do it again to ensure it doesn't crash, and there is only one instance of the string. + dao.add_physical_source(BrokerConstants.AXISTECH) + sources = dao.get_all_physical_sources() + i = 0 + for s in sources: + if s == BrokerConstants.AXISTECH: + i += 1 + + self.assertEqual(1, i) def _create_physical_device(self, dev: PhysicalDevice = None) -> Tuple[PhysicalDevice, PhysicalDevice]: - last_seen = self.now() + last_seen = _now() if dev is None: dev = PhysicalDevice(source_name='ttn', name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, source_ids={'appId': 'x', 'devId': 'y'}, @@ -42,11 +68,6 @@ def _create_physical_device(self, dev: PhysicalDevice = None) -> Tuple[PhysicalD return (dev, dao.create_physical_device(dev)) - def _create_test_user(self) -> str: - test_uname=os.urandom(4).hex() - dao.user_add(uname=test_uname, passwd='password', disabled=False) - return test_uname - def test_create_physical_device(self): dev, new_dev = self._create_physical_device() @@ -203,7 +224,7 @@ def test_delete_physical_device_note(self): def _create_default_logical_device(self, dev=None) -> Tuple[LogicalDevice, LogicalDevice]: if dev is None: - last_seen = self.now() + last_seen = _now() dev = LogicalDevice(name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, properties={'appId': 'x', 'devId': 'y', 'other': 'z'}) @@ -257,28 +278,28 @@ def test_delete_logical_device(self): def test_insert_mapping(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) # This should work. dao.insert_mapping(mapping) # This should fail due to duplicate start time. - self.assertRaises(dao.DAOException, dao.insert_mapping, mapping) + self.assertRaises(ValueError, dao.insert_mapping, mapping) # This should fail due to the physical device is still mapped to something. time.sleep(0.001) - mapping.start_time=self.now() - self.assertRaises(dao.DAOException, dao.insert_mapping, mapping) + mapping.start_time= _now() + self.assertRaises(ValueError, dao.insert_mapping, mapping) # Unmap the physical device so the next test doesn't fail due to the device being mapped. dao.end_mapping(pd=new_pdev) # The insert_mapping operation should succeed because the timestamp is different from above. - mapping.start_time=self.now() + mapping.start_time= _now() dao.insert_mapping(mapping) pdx = copy.deepcopy(new_pdev) pdx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=_now()) # This should fail due to invalid physical uid. self.assertRaises(dao.DAODeviceNotFound, dao.insert_mapping, mapping) @@ -286,7 +307,7 @@ def test_insert_mapping(self): dao.end_mapping(pd=new_pdev) ldx = copy.deepcopy(new_ldev) ldx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=_now()) # This should fail due to invalid logical uid. self.assertRaises(dao.DAODeviceNotFound, dao.insert_mapping, mapping) @@ -302,14 +323,14 @@ def test_get_current_device_mapping(self): self.assertIsNone(dao.get_current_device_mapping(ld=new_ldev)) # Confirm pd or ld must be given. - self.assertRaises(dao.DAOException, dao.get_current_device_mapping) + self.assertRaises(ValueError, dao.get_current_device_mapping) # Confirm only pd or ld can be given. - self.assertRaises(dao.DAOException, dao.get_current_device_mapping, -1, -1) - self.assertRaises(dao.DAOException, dao.get_current_device_mapping, new_pdev, new_ldev) + self.assertRaises(ValueError, dao.get_current_device_mapping, -1, -1) + self.assertRaises(ValueError, dao.get_current_device_mapping, new_pdev, new_ldev) # confirm a physical device can be mapped to a logical device. - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) self.assertEqual(mapping1, dao.get_current_device_mapping(pd=new_pdev.uid)) @@ -322,7 +343,7 @@ def test_get_current_device_mapping(self): time.sleep(0.001) pdev2, new_pdev2 = self._create_physical_device() - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) # This test must fail if multiple mappings are found, because there should not be @@ -341,7 +362,7 @@ def test_get_latest_device_mapping(self): ldev, new_ldev = self._create_default_logical_device() # confirm getting the latest mapping returns a current mapping - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) self.assertEqual(mapping1, dao.get_current_device_mapping(pd=new_pdev.uid, only_current_mapping=True)) @@ -367,7 +388,7 @@ def test_get_latest_device_mapping(self): self.assertTrue(self.compare_mappings_ignore_end_time(mapping1, mapping2)) time.sleep(0.1) - mapping1.start_time = self.now() + mapping1.start_time = _now() dao.insert_mapping(mapping1) # with a new mapping with no end time, both calls should again return the same thing. @@ -381,7 +402,14 @@ def test_end_mapping(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + # Confirm pd or ld must be given. + self.assertRaises(ValueError, dao.end_mapping) + + # Confirm only pd or ld can be given. + self.assertRaises(ValueError, dao.end_mapping, -1, -1) + self.assertRaises(ValueError, dao.end_mapping, new_pdev, new_ldev) + + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) mappings = dao.get_unmapped_physical_devices() @@ -394,7 +422,7 @@ def test_end_mapping(self): pdev2 = copy.deepcopy(pdev) pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) mappings = dao.get_unmapped_physical_devices() @@ -406,13 +434,13 @@ def test_end_mapping(self): # Avoid a unique key constraint due to identical timestamps. time.sleep(0.001) - mapping1.start_time = self.now() + mapping1.start_time = _now() dao.insert_mapping(mapping1) dao.end_mapping(pd=new_pdev) mappings = dao.get_unmapped_physical_devices() self.assertEqual(len(mappings), 2) - mapping2.start_time = self.now() + mapping2.start_time = _now() dao.insert_mapping(mapping2) mappings = dao.get_unmapped_physical_devices() self.assertEqual(len(mappings), 1) @@ -427,19 +455,19 @@ def compare_mappings_ignore_end_time(self, m1: PhysicalToLogicalMapping, m2: Phy def test_get_mappings(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) pdev2 = copy.deepcopy(pdev) pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2) time.sleep(0.1) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) # Avoid a unique key constraint due to identical timestamps. time.sleep(0.1) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping3) mappings = dao.get_logical_device_mappings(new_ldev) @@ -453,7 +481,7 @@ def test_get_all_logical_device_mappings(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -463,7 +491,7 @@ def test_get_all_logical_device_mappings(self): pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2) time.sleep(0.1) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -473,7 +501,7 @@ def test_get_all_logical_device_mappings(self): pdev3.name = 'D3' pdev3, new_pdev3 = self._create_physical_device(dev=pdev3) time.sleep(0.1) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping3) mappings = dao.get_logical_device_mappings(ld=new_ldev.uid) @@ -487,7 +515,7 @@ def test_get_unmapped_devices(self): ldev, new_ldev = self._create_default_logical_device() # confirm a physical device can be mapped to a logical device. - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) pdev2 = copy.deepcopy(pdev) @@ -510,15 +538,15 @@ def test_get_unmapped_devices(self): self.assertTrue(new_pdev3 in unmapped_devs) self.assertTrue(new_pdev4 in unmapped_devs) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) ldev2 = copy.deepcopy(ldev) ldev2.name = 'L2' - ldev2.last_seen = self.now() + ldev2.last_seen = _now() ldev2, new_ldev2 = self._create_default_logical_device(dev=ldev2) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev4, ld=new_ldev2, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev4, ld=new_ldev2, start_time=_now()) dao.insert_mapping(mapping3) unmapped_devs = dao.get_unmapped_physical_devices() self.assertEqual(len(unmapped_devs), 2) @@ -527,10 +555,10 @@ def test_get_unmapped_devices(self): ldev3 = copy.deepcopy(ldev) ldev3.name = 'L3' - ldev3.last_seen = self.now() + ldev3.last_seen = _now() ldev3, new_ldev3 = self._create_default_logical_device(dev=ldev3) - mapping4 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev3, start_time=self.now()) + mapping4 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev3, start_time=_now()) dao.insert_mapping(mapping4) unmapped_devs = dao.get_unmapped_physical_devices() self.assertEqual(len(unmapped_devs), 1) @@ -539,12 +567,12 @@ def test_get_unmapped_devices(self): def test_add_raw_json_message(self): uuid1 = uuid.uuid4() - obj1 = {'a':1, 'b':'2', 'c':True, 'd':False} - dao.add_raw_json_message('ttn', self.now(), uuid1, obj1) + obj1 = {'a': 1, 'b': '2', 'c': True, 'd': False} + dao.add_raw_json_message('ttn', _now(), uuid1, obj1) uuid2 = uuid.uuid4() - obj2 = {'a':1, 'b':'2', 'c':False, 'd':True} - dao.add_raw_json_message('ttn', self.now(), uuid2, obj2, 1) + obj2 = {'a': 1, 'b': '2', 'c': False, 'd': True} + dao.add_raw_json_message('ttn', _now(), uuid2, obj2, 1) with dao._get_connection() as conn, conn.cursor() as cursor: cursor.execute('select physical_uid, correlation_id, json_msg from raw_messages order by uid asc') @@ -563,23 +591,24 @@ def test_add_raw_json_message(self): # Confirm the DAO raises a warning when trying to add a message with a # duplicate UUID, but doesn't throw an exception. with self.assertWarns(UserWarning): - dao.add_raw_json_message('ttn', self.now(), uuid1, obj1) + dao.add_raw_json_message('ttn', _now(), uuid1, obj1) def test_insert_physical_timeseries_message(self): dev, new_dev = self._create_physical_device() msg = { - "p_uid":new_dev.uid, - "timestamp":"2023-02-20T07:57:52Z", - "timeseries":[ + "p_uid": new_dev.uid, + "timestamp": "2023-02-20T07:57:52Z", + "timeseries": [ { - "name":"airTemperature", - "value":35.1 + "name": "x", + "value": 35.1 } ], - "broker_correlation_id":"3d7762f6-bcc6-44d4-82ba-49b07e61e601" + "broker_correlation_id": "3d7762f6-bcc6-44d4-82ba-49b07e61e601" } + msg_ts = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]) dao.insert_physical_timeseries_message(msg) @@ -590,15 +619,95 @@ def test_insert_physical_timeseries_message(self): self.assertEqual(ts, msg_ts) self.assertEqual(msg, retrieved_msg) + def test_get_physical_timeseries_messages(self): + _, new_pdev = self._create_physical_device() + + # Basic smoke test - no messages, no results. + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, p_uid=new_pdev.uid) + self.assertSequenceEqual(msgs, []) + + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, l_uid=20) + self.assertSequenceEqual(msgs, []) + + msg_list = [ + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-20T01:00+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-20T00:30+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-20T00:00+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-19T23:30+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-19T23:00+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-19T22:30+11:00"} + ] + + msg_ts = [] + for msg in msg_list: + dao.insert_physical_timeseries_message(msg) + msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) + + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=False, include_received_at=False, p_uid=new_pdev.uid) + self.assertEqual(len(msgs), 1) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) + + msgs = dao.get_physical_timeseries_message(None, None, None, only_timestamp=True, p_uid=new_pdev.uid) + + self.assertEqual(len(msgs), len(msg_list)) + for i, msg in enumerate(msgs): + print(msg) + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) + + _, new_ldev = self._create_default_logical_device() + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) + dao.insert_mapping(mapping) + + # Ensure 1 message will be returned from the DAO when no start or end is given. + now = _now() - datetime.timedelta(minutes=1) + td_30_mins = datetime.timedelta(minutes=30) + + for i, msg in enumerate(msg_list): + msg[BrokerConstants.TIMESTAMP_KEY] = (now + (i * td_30_mins)).isoformat() + msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] = new_ldev.uid + + msg_list.sort(key=lambda m: m[BrokerConstants.TIMESTAMP_KEY], reverse=True) + + msg_ts.clear() + for msg in msg_list: + dao.insert_physical_timeseries_message(msg) + msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) + + # This will return a single message because 'end' is None, meaning the DAO will set the + # end timestamp to 'now'. This batch of messages has timestamps from 1 minute ago to a + # couple of hours in the future, so only the message with the earliest timestamp in the + # batch should be returned. + msgs = dao.get_physical_timeseries_message(only_timestamp=True, l_uid=new_ldev.uid) + self.assertEqual(len(msgs), 1) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[-1]) + + # This will return all the messages because 'end' has been set past the latest message timestamp. + msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, l_uid=new_ldev.uid) + self.assertEqual(len(msgs), len(msg_list)) + for i, msg in enumerate(msgs): + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) + + # Should return only the latest message. + msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, count=1, l_uid=new_ldev.uid) + self.assertEqual(len(msgs), 1) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) + + self.assertRaises(ValueError, dao.get_physical_timeseries_message) + self.assertRaises(TypeError, dao.get_physical_timeseries_message, p_uid='x') + self.assertRaises(TypeError, dao.get_physical_timeseries_message, l_uid='x') + self.assertRaises(TypeError, dao.get_physical_timeseries_message, start='x', l_uid=1) + self.assertRaises(TypeError, dao.get_physical_timeseries_message, end='x', l_uid=1) + self.assertRaises(TypeError, dao.get_physical_timeseries_message, count='x', l_uid=1) + def test_add_raw_text_message(self): uuid1 = uuid.uuid4() msg1 = 'This is a text message.' - dao.add_raw_text_message('greenbrain', self.now(), uuid1, msg1) + dao.add_raw_text_message('greenbrain', _now(), uuid1, msg1) uuid2 = uuid.uuid4() msg2 = 'This is a text message 2.' - dao.add_raw_text_message('greenbrain', self.now(), uuid2, msg2, 2) + dao.add_raw_text_message('greenbrain', _now(), uuid2, msg2, 2) with dao._get_connection() as conn, conn.cursor() as cursor: cursor.execute('select physical_uid, correlation_id, text_msg from raw_messages') self.assertEqual(2, cursor.rowcount) @@ -616,21 +725,21 @@ def test_add_raw_text_message(self): # Confirm the DAO raises a warning when trying to add a message with a # duplicate UUID, but doesn't throw an exception. with self.assertWarns(UserWarning): - dao.add_raw_text_message('ttn', self.now(), uuid1, msg1) + dao.add_raw_text_message('ttn', _now(), uuid1, msg1) def test_user_add(self): - uname=self._create_test_user() - users=dao.user_ls() + uname = _create_test_user() + users = dao.user_ls() self.assertEqual(uname, users[-1]) def test_user_rm(self): - uname=self._create_test_user() + uname = _create_test_user() dao.user_rm(uname) self.assertFalse(uname in dao.user_ls()) def test_user_set_read_only(self): - uname=self._create_test_user() + uname= _create_test_user() dao.user_set_read_only(uname, False) user_token=dao.user_get_token(username=uname, password='password') user=dao.get_user(auth_token=user_token) @@ -638,39 +747,39 @@ def test_user_set_read_only(self): def test_add_non_unique_user(self): #Check that two users with the same username cannot be created - uname=self._create_test_user() + uname = _create_test_user() self.assertRaises(dao.DAOUniqeConstraintException, dao.user_add, uname, 'password', False) def test_get_user_token(self): - uname=self._create_test_user() + uname = _create_test_user() self.assertIsNotNone(dao.user_get_token(username=uname, password='password')) self.assertIsNone(dao.user_get_token(username=uname, password='x')) def test_user_token_refresh(self): - uname=self._create_test_user() - token1=dao.user_get_token(username=uname, password='password') + uname = _create_test_user() + token1 = dao.user_get_token(username=uname, password='password') dao.token_refresh(uname=uname) - token2=dao.user_get_token(username=uname, password='password') + token2 = dao.user_get_token(username=uname, password='password') self.assertNotEqual(token1, token2) def test_user_token_disable(self): - uname=self._create_test_user() - user_token=dao.user_get_token(username=uname, password='password') + uname = _create_test_user() + user_token = dao.user_get_token(username=uname, password='password') dao.token_disable(uname) self.assertFalse(dao.token_is_valid(user_token)) def test_user_token_enable(self): - uname=self._create_test_user() - user_token=dao.user_get_token(username=uname, password='password') + uname = _create_test_user() + user_token = dao.user_get_token(username=uname, password='password') dao.token_disable(uname) dao.token_enable(uname) self.assertTrue(dao.token_is_valid(user_token)) def test_user_change_password(self): - uname=self._create_test_user() + uname = _create_test_user() dao.user_change_password(uname, 'nuiscyeriygsreiuliu') self.assertIsNotNone(dao.user_get_token(username=uname, password='nuiscyeriygsreiuliu')) diff --git a/test/python/test_get_physical_messages.py b/test/python/test_get_physical_messages.py index 6f76e2ab..8e54ccfd 100644 --- a/test/python/test_get_physical_messages.py +++ b/test/python/test_get_physical_messages.py @@ -6,12 +6,16 @@ import pytest from fastapi.testclient import TestClient +import BrokerConstants import test_utils as tu import api.client.DAO as dao from pdmodels.Models import PhysicalDevice from restapi.RestAPI import app -ts: dt.datetime = tu.now() - dt.timedelta(days=683.0) +ts: dt.datetime = dateutil.parser.isoparse('2024-05-10T23:00:00Z') +latest_ts = ts +earliest_ts = ts + interval = dt.timedelta(minutes=15.0) timestamps = [] @@ -31,7 +35,7 @@ def create_user(): @pytest.fixture(scope='module') def create_msgs(): - global interval, msgs, pd, timestamps, ts + global interval, msgs, pd, timestamps, ts, earliest_ts logging.info('Generating messages') @@ -40,11 +44,12 @@ def create_msgs(): with open('/tmp/msgs.json', 'w') as f: for i in range(0, max_msgs + 1): timestamps.append(ts) - msg = {'ts': ts.isoformat(), 'i': i} + msg = {BrokerConstants.TIMESTAMP_KEY: ts.isoformat(), 'i': i} msgs.append(msg) s = f'{pd.uid}\t{ts.isoformat()}\t{json.dumps(msg)}' print(s, file=f) - ts = ts + interval + ts = ts - interval + earliest_ts = ts with open('/tmp/msgs.json', 'r') as f: with dao.get_connection() as conn, conn.cursor() as cursor: @@ -64,107 +69,109 @@ def test_client(create_user, create_msgs): def test_no_params_no_msgs(test_client): - no_msg_pd: PhysicalDevice = dao.create_physical_device(PhysicalDevice(source_name='wombat', name='dummy', source_ids={'x': 1})) - response = test_client.get(f'/broker/api/physical/messages/{no_msg_pd.uid}') - assert response.status_code == 200 - assert response.json() == [] + response = test_client.get(f'/broker/api/messages/') + assert response.status_code == 404 def test_no_params(test_client): # Confirm the default count parameter value is correct, so 65536 of the 65537 messages are returned. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}') + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid}) assert response.status_code == 200 - assert response.json() == msgs[:-1] + for a, b in zip(response.json(), msgs[:-1]): + assert a == b def test_no_params_ts(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'only_timestamp': 1}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'only_timestamp': 1}) assert response.status_code == 200 + for a, b in zip(response.json(), timestamps): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_count(test_client): # Confirm the count parameter works. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 50}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 50}) assert response.status_code == 200 - assert response.json() == msgs[:50] + #assert response.json() == msgs[:50] + for a, b in zip(response.json(), msgs[:50]): + assert a == b def test_count_ts(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 50, 'only_timestamp': 1}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': 50, 'only_timestamp': 1}) assert response.status_code == 200 for a, b in zip(response.json(), timestamps): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_start_after_end(test_client): - start_ts = ts + interval - # Confirm no messages returned when a start timestamp at or after the last message is used. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': start_ts}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': latest_ts}) assert response.status_code == 200 assert response.json() == [] - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[max_msgs]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[0]}) assert response.status_code == 200 assert response.json() == [] def test_start_gives_gt(test_client): # Confirm start time parameter gets the next message greater than, not greater than equal to. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[max_msgs - 1]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[1]}) assert response.status_code == 200 - assert response.json() == [msgs[max_msgs]] + assert response.json()[0] == msgs[0] def test_invalid_count(test_client): # Check the invalid count values. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': -1}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': -1}) assert response.status_code == 422 - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 0}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': 0}) assert response.status_code == 422 - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 65537}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': 65537}) assert response.status_code == 422 def test_end(test_client): # Test the end parameter - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'end': timestamps[0] - interval}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': earliest_ts}) assert response.status_code == 200 assert response.json() == [] - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'end': timestamps[0]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-1]}) assert response.status_code == 200 - assert response.json() == [msgs[0]] + assert response.json()[0] == msgs[-1] - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'end': timestamps[9]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-9]}) assert response.status_code == 200 - assert response.json() == msgs[:10] + for a, b in zip(response.json(), msgs[-9:]): + assert a == b def test_start_end(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[5], 'end': timestamps[9]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[9], 'end': timestamps[5]}) assert response.status_code == 200 - assert response.json() == msgs[6:10] + for a, b in zip(response.json(), msgs[5:9]): + assert a == b def test_invalid_start_end(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[5], 'end': timestamps[5]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[5], 'end': timestamps[5]}) assert response.status_code == 422 - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[5], 'end': timestamps[4]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[4], 'end': timestamps[5]}) assert response.status_code == 422 def test_invalid_count_end(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 1, 'end': timestamps[4]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 1, 'end': timestamps[4]}) assert response.status_code == 422 diff --git a/test/python/test_restapi.py b/test/python/test_restapi.py index 2ad80647..594b3652 100644 --- a/test/python/test_restapi.py +++ b/test/python/test_restapi.py @@ -1,16 +1,16 @@ import base64 -import copy, datetime, logging, time, unittest, uuid -from typing_extensions import assert_type -import re +import copy +import logging +import os +import time +import unittest from typing import Tuple -import api.client.DAO as dao import requests +import api.client.DAO as dao from pdmodels.Models import DeviceNote, PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice -from typing import Tuple - -import os +from test_utils import now logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z') logger = logging.getLogger(__name__) @@ -62,13 +62,10 @@ def setUp(self): self._admin_token = dao.user_get_token(self._admin_username, 'password') self._ADMIN_HEADERS['Authorization'] = f'Bearer {self._admin_token}' - def now(self): - return datetime.datetime.now(tz=datetime.timezone.utc) - def _create_physical_device(self, expected_code=201, req_header=_HEADERS, dev=None) -> Tuple[ PhysicalDevice, PhysicalDevice]: if dev is None: - last_seen = self.now() + last_seen = now() dev = PhysicalDevice(source_name='ttn', name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, source_ids={'appId': 'x', 'devId': 'y'}, @@ -292,10 +289,12 @@ def test_get_device_notes(self): LOGICAL DEVICES --------------------------------------------------------------------------""" - def _create_default_logical_device(self, expected_code=201, req_header=_HEADERS, dev=None) -> Tuple[ - LogicalDevice, LogicalDevice]: + def _create_default_logical_device(self, expected_code=201, req_header=None, dev=None) -> Tuple[LogicalDevice, LogicalDevice]: + if req_header is None: + req_header = self._HEADERS + if dev is None: - last_seen = self.now() + last_seen = now() dev = LogicalDevice(name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, properties={'appId': 'x', 'devId': 'y', 'other': 'z'}) @@ -416,7 +415,7 @@ def test_delete_logical_device(self): def test_insert_mapping(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) # This should work. url = f'{_BASE}/mappings/' @@ -430,7 +429,7 @@ def test_insert_mapping(self): # This should fail because the physical device has a current mapping. time.sleep(0.001) - mapping.start_time = self.now() + mapping.start_time = now() payload = mapping.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) self.assertEqual(r.status_code, 400) @@ -438,14 +437,14 @@ def test_insert_mapping(self): # End the current mapping and create a new one. This should work and # simulates 'pausing' a physical device while working on it. requests.patch(f'{url}physical/end/{mapping.pd.uid}', headers=self._ADMIN_HEADERS) - mapping.start_time = self.now() + mapping.start_time = now() payload = mapping.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) self.assertEqual(r.status_code, 201) pdx = copy.deepcopy(new_pdev) pdx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=now()) # This should fail due to invalid physical uid. payload = mapping.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) @@ -453,7 +452,7 @@ def test_insert_mapping(self): ldx = copy.deepcopy(new_ldev) ldx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=now()) # End the current mapping for the phyiscal device so the RESTAPI doesn't # return status 400. @@ -467,7 +466,7 @@ def test_insert_mapping(self): def test_get_mapping_from_physical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' payload = mapping.json() @@ -491,7 +490,7 @@ def test_get_mapping_from_physical(self): # Confirm the latest mapping is returned. time.sleep(0.001) mapping2 = copy.deepcopy(mapping) - mapping2.start_time = self.now() + mapping2.start_time = now() self.assertNotEqual(mapping, mapping2) payload = mapping2.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) @@ -505,7 +504,7 @@ def test_get_mapping_from_physical(self): def test_get_mapping_from_logical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' payload = mapping.json() @@ -529,7 +528,7 @@ def test_get_mapping_from_logical(self): # Confirm the latest mapping is returned. time.sleep(0.001) mapping2 = copy.deepcopy(mapping) - mapping2.start_time = self.now() + mapping2.start_time = now() self.assertNotEqual(mapping, mapping2) payload = mapping2.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) @@ -546,7 +545,7 @@ def compare_mappings_ignore_end_time(self, m1: PhysicalToLogicalMapping, m2: Phy def test_get_latest_mapping_from_physical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' @@ -590,7 +589,7 @@ def test_get_latest_mapping_from_physical(self): def test_get_latest_mapping_from_logical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' @@ -639,7 +638,7 @@ def test_get_all_logical_device_mappings(self): # Using the DAO to create the test data, the REST API methods to do this are # tested elsewhere. - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) dao.insert_mapping(mapping1) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -649,7 +648,7 @@ def test_get_all_logical_device_mappings(self): pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2, req_header=self._ADMIN_HEADERS) time.sleep(0.1) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=now()) dao.insert_mapping(mapping2) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -659,7 +658,7 @@ def test_get_all_logical_device_mappings(self): pdev3.name = 'D3' pdev3, new_pdev3 = self._create_physical_device(dev=pdev3, req_header=self._ADMIN_HEADERS) time.sleep(0.1) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=now()) dao.insert_mapping(mapping3) url = f'{_BASE}/mappings/logical/all/{new_ldev.uid}'