From 281b9bc3cceeea10615764dbf568463f8ed9a48a Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 16 Jul 2024 14:38:42 +1000 Subject: [PATCH] Bring master into Timescale branch. (#87) * Changes for SCMN deployment. * Added Wombat config script info to webapp. * Wombat page changes - fixed log upload script, added data upload command. * Fixed update mapping functions. * ..New tap when open Map * Added axistech poller, changed logging in docker compose. * Adding the axistech module. * Changed axistech message format. * Changed handleSubmit to check if event parameter was supplied. * Logical mapper drops messages whose timestamp is too far into the future. The REST API messages endpoint can now take a physical or logical device id. * Fixed pytest unit tests for the /message REST API endpoint. * AxisTech API only accepts a specific timezone suffix, so changed to use that. * Added the ability to include logical device properties in the ld ls command via the --properties flag. * DB init script now creates version table so DB v1 backups restore cleanly. * Added the ability to include logical device properties in the ld ls command via the --properties flag. DAO & RestAPI updated to handle PostGIS geometry location column. Removed backoff annotations from DAO, they have not been useful. Changed errors raised by the DAO to be ValueErrors when parameter validation fails. * Restoring lost changes to files. * Restoring lost upgrade script. * Webapp changes to handle the geometry location columns. * Removed unused variable. * Fixed webapp map code to handle location lat or long being None, changed docker compose service restart and logging policies. * Issue 82 (#83) Implements issue #82 * More PostGIS and axistech fixes from SCMN (#86) * Issue 82 (#83) (#85) Implements issue #82 * Added extra error logging to axistech receiver. --------- Co-authored-by: dajtxx * Added Extract.py to read timeseries data from IoTa database. * Fix typo in axistech dependency * Fix the typo again. --------- Co-authored-by: dpi-group Co-authored-by: Mal Kul Co-authored-by: mal-22 <105465227+mal-22@users.noreply.github.com> Co-authored-by: mal-22 Co-authored-by: dajtxx --- compose/.env | 2 + compose/docker-compose.yml | 104 ++++- compose/production/prod.yml | 2 +- db/init.d/init_db.sql | 23 +- db/upgrade/002.sql | 17 + dc.sh | 2 +- images/restapi/requirements.txt | 9 + run.sh | 2 +- src/python/BrokerConstants.py | 1 + src/python/api/client/DAO.py | 392 ++++++++++-------- src/python/broker-cli.py | 9 + src/python/delivery/FRRED.py | 2 +- src/python/logical_mapper/LogicalMapper.py | 77 ++-- src/python/pdmodels/Models.py | 23 +- src/python/pollers/axistech.py | 286 +++++++++++++ src/python/restapi/RestAPI.py | 26 +- src/python/util/Extract.py | 103 +++++ src/www/app/main.py | 211 ++++++++-- src/www/app/templates/base.html | 59 +-- src/www/app/templates/header.html | 2 +- .../app/templates/logical_device_form.html | 5 +- .../app/templates/physical_device_form.html | 9 +- src/www/app/templates/wombats.html | 91 +++- src/www/app/utils/api.py | 12 +- test/python/test_dao.py | 245 ++++++++--- test/python/test_get_physical_messages.py | 77 ++-- test/python/test_restapi.py | 55 ++- 27 files changed, 1364 insertions(+), 482 deletions(-) create mode 100644 db/upgrade/002.sql create mode 100644 src/python/pollers/axistech.py create mode 100644 src/python/util/Extract.py diff --git a/compose/.env b/compose/.env index f7a6d3f2..617fde97 100755 --- a/compose/.env +++ b/compose/.env @@ -47,3 +47,5 @@ TZ=Australia/Sydney # Used by the Intersect DataBolt delivery service. DATABOLT_SHARED_DIR=/some/where/raw_data + +AXISTECH_TOKEN= diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml index 617fe282..5510f7b8 100644 --- a/compose/docker-compose.yml +++ b/compose/docker-compose.yml @@ -2,8 +2,12 @@ version: '3.1' services: db: - image: postgres:14.2 - restart: "no" + image: postgis/postgis:14-3.4 + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env volumes: @@ -18,7 +22,11 @@ services: mq: hostname: "mq" image: rabbitmq:3.9-management - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env volumes: @@ -30,7 +38,11 @@ services: restapi: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env depends_on: @@ -38,20 +50,21 @@ services: volumes: - ../src/python:/home/broker/python working_dir: "/home/broker/python/restapi" - entrypoint: [ "/home/broker/.local/bin/uvicorn", "--host", "0.0.0.0", "--port", "5687", "RestAPI:app", "--reload" ] - healthcheck: - test: curl --fail -I http://localhost:5687/openapi.json || exit 1 - interval: 1m - start_period: 20s + entrypoint: [ "/home/broker/.local/bin/uvicorn", "--proxy-headers", "--host", "0.0.0.0", "--port", "5687", "RestAPI:app" ] website: image: broker/mgmt-app + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env environment: - PYTHONPATH=/app:/iota depends_on: - restapi: + db: condition: "service_healthy" volumes: - ../src/www:/app @@ -59,7 +72,11 @@ services: ttn_webhook: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -74,7 +91,11 @@ services: ttn_processor: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -93,7 +114,11 @@ services: ttn_decoder: image: broker/ttn_decoder - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -106,7 +131,11 @@ services: ydoc: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env profiles: @@ -123,11 +152,14 @@ services: wombat: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env - profiles: - - wombat + profiles: ["wombat", "frred"] depends_on: db: condition: "service_healthy" @@ -140,7 +172,11 @@ services: lm: image: broker/python-base - restart: "no" + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped env_file: - .env depends_on: @@ -155,6 +191,10 @@ services: delivery: image: broker/python-base + logging: + driver: local + options: + max-file: "3" restart: unless-stopped env_file: - .env @@ -172,6 +212,10 @@ services: pollers: image: broker/python-base + logging: + driver: local + options: + max-file: "3" restart: unless-stopped env_file: - .env @@ -189,6 +233,10 @@ services: frred: image: broker/python-base + logging: + driver: local + options: + max-file: "3" restart: unless-stopped env_file: - .env @@ -201,10 +249,26 @@ services: condition: "service_healthy" volumes: - ../src/python:/home/broker/python - - ${DATABOLT_SHARED_DIR}/raw_data:/raw_data + - ${DATABOLT_SHARED_DIR}/nectar_raw_data:/raw_data working_dir: "/home/broker/python" entrypoint: [ "python", "-m", "delivery.FRRED" ] + + axistech: + image: broker/python-base + logging: + driver: local + options: + max-file: "3" + restart: unless-stopped + env_file: + - .env + profiles: + - frred + depends_on: + - db + entrypoint: [ "python", "-m", "pollers.axistech" ] + timescaledb: build: ../timescale # Point to the directory containing the custom Dockerfile hostname: "tsdb" @@ -239,5 +303,3 @@ services: - ../src/python:/home/broker/python working_dir: "/home/broker/python" entrypoint: [ "python", "-m", "timescale.TS_LTSReader" ] - - diff --git a/compose/production/prod.yml b/compose/production/prod.yml index 8cd43d47..c31d1eb4 100644 --- a/compose/production/prod.yml +++ b/compose/production/prod.yml @@ -13,7 +13,7 @@ services: # nginx will accept a connection on 1883, forward it to # 127.0.0.1:1884, which docker will forward to port 1883 # within the container. - - "127.0.0.1:1884:1883" + - "0.0.0.0:1883:1883" - "127.0.0.1:15672:15672" - "127.0.0.1:15692:15692" volumes: diff --git a/db/init.d/init_db.sql b/db/init.d/init_db.sql index 4a7cd54b..0df6d030 100755 --- a/db/init.d/init_db.sql +++ b/db/init.d/init_db.sql @@ -1,3 +1,6 @@ +CREATE EXTENSION postgis; +CREATE EXTENSION pgcrypto; + create table if not exists sources ( source_name text primary key not null ); @@ -6,7 +9,7 @@ create table if not exists physical_devices ( uid integer generated always as identity primary key, source_name text not null references sources, name text not null, - location point, + location geometry('POINT', 4283), last_seen timestamptz, -- Store only top level key value pairs here; it is used -- for quickly finding a device using information carried @@ -72,7 +75,7 @@ create table if not exists device_blobs ( create table if not exists logical_devices ( uid integer generated always as identity primary key, name text not null, - location point, + location geometry('POINT', 4283), last_seen timestamptz, properties jsonb not null default '{}' ); @@ -90,7 +93,7 @@ create table if not exists physical_logical_map ( primary key(physical_uid, logical_uid, start_time) ); -create table if not exists users( +create table if not exists users ( uid integer generated always as identity primary key, username text not null unique, salt text not null, @@ -100,6 +103,15 @@ create table if not exists users( read_only boolean default True not null ); +create table if not exists version ( + version integer not null +); + +create index if not exists pd_src_id_idx on physical_devices using GIN (source_ids); + +insert into sources values ('ttn'), ('greenbrain'), ('wombat'), ('ydoc'), ('ict_eagleio'); +insert into version values (2); + create table if not exists data_name_map( input_name text not null primary key, std_name text not null @@ -448,8 +460,3 @@ insert into word_list values ('UP'), ('VAPOUR'), ('WIND'); - --- Enable the PostGIS extensions --- CREATE EXTENSION postgis; --- CREATE EXTENSION postgis_raster; --- CREATE EXTENSION postgis_sfcgal; diff --git a/db/upgrade/002.sql b/db/upgrade/002.sql new file mode 100644 index 00000000..a6920962 --- /dev/null +++ b/db/upgrade/002.sql @@ -0,0 +1,17 @@ +CREATE EXTENSION postgis; +CREATE EXTENSION pgcrypto; + +SELECT AddGeometryColumn('logical_devices','geom',4283,'POINT',2); +SELECT AddGeometryColumn('physical_devices','geom',4283,'POINT',2); + +UPDATE logical_devices SET geom = ST_MakePoint(location[1], location[0]) WHERE location IS NOT NULL; +UPDATE physical_devices SET geom = ST_MakePoint(location[1], location[0]) WHERE location IS NOT NULL; + +ALTER TABLE logical_devices DROP COLUMN location; +ALTER TABLE physical_devices DROP COLUMN location; + +ALTER TABLE logical_devices RENAME COLUMN geom TO location; +ALTER TABLE physical_devices RENAME COLUMN geom TO location; + +TRUNCATE version; +insert into version values (2); diff --git a/dc.sh b/dc.sh index 6a57f2ab..d3c0c9c3 100755 --- a/dc.sh +++ b/dc.sh @@ -12,4 +12,4 @@ if [ "$RUN_MODE" != test ]; then fi fi -exec docker compose --profile ttn --profile ydoc --profile wombat --profile ubidots --profile pollers --profile frred -p $RUN_MODE -f ../docker-compose.yml -f ./$RUN_MODE.yml $* +exec docker-compose --profile wombat --profile ubidots --profile frred -p $RUN_MODE -f ../docker-compose.yml -f ./$RUN_MODE.yml $* diff --git a/images/restapi/requirements.txt b/images/restapi/requirements.txt index 65e46478..9d580371 100644 --- a/images/restapi/requirements.txt +++ b/images/restapi/requirements.txt @@ -6,29 +6,38 @@ charset-normalizer==2.1.0 click==8.1.3 dnspython==2.2.1 email-validator==1.2.1 +exceptiongroup==1.1.1 fastapi==0.98.0 h11==0.13.0 httpcore==0.17.2 httptools==0.4.0 httpx==0.24.1 idna==3.3 +iniconfig==2.0.0 itsdangerous==2.1.2 Jinja2==3.1.2 MarkupSafe==2.1.1 +numpy==1.26.4 orjson==3.7.7 +packaging==23.1 +pandas==2.2.1 pika==1.3.0 +pluggy==1.2.0 psycopg2-binary==2.9.3 pydantic==1.9.1 pytest==7.4.0 python-dateutil==2.8.2 python-dotenv==0.20.0 python-multipart==0.0.5 +pytz==2024.1 PyYAML==6.0 requests==2.28.1 six==1.16.0 sniffio==1.2.0 starlette==0.27.0 +tomli==2.0.1 typing_extensions==4.3.0 +tzdata==2024.1 ujson==5.4.0 urllib3==1.26.10 uvicorn==0.17.6 diff --git a/run.sh b/run.sh index f31ac05b..295f48af 100755 --- a/run.sh +++ b/run.sh @@ -14,7 +14,7 @@ fi cd $BROKER_ROOT cd compose/$MODE -./dc.sh stop +./dc.sh down cd $BROKER_ROOT docker build -t broker/python-base -f images/restapi/Dockerfile . docker build -t broker/ttn_decoder -f images/ttn_decoder/Dockerfile . diff --git a/src/python/BrokerConstants.py b/src/python/BrokerConstants.py index 62f2a0a4..ccf74c26 100644 --- a/src/python/BrokerConstants.py +++ b/src/python/BrokerConstants.py @@ -13,6 +13,7 @@ WOMBAT = 'wombat' YDOC = 'ydoc' ICT_EAGLEIO = 'ict_eagleio' +AXISTECH = 'axistech' CREATION_CORRELATION_ID_KEY = 'creation_correlation_id' SENSOR_GROUP_ID_KEY = 'sensor_group_id' diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 7cefea0d..320c3039 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -1,39 +1,38 @@ from contextlib import contextmanager from datetime import datetime, timezone -from email.utils import parseaddr -import logging, re, warnings -from turtle import back -from unittest import result -import backoff +import logging, warnings import dateutil.parser import psycopg2 from psycopg2 import pool from typing import List, Tuple # Import Tuple import psycopg2.errors -from psycopg2.extensions import register_adapter, AsIs +from psycopg2.extensions import AsIs from psycopg2.extras import Json, register_uuid from typing import Any, Dict, List, Optional, Union import hashlib -import base64 import os import BrokerConstants -from pdmodels.Models import DeviceNote, Location, LogicalDevice, PhysicalDevice, PhysicalToLogicalMapping, User +from pdmodels.Models import BaseDevice, DeviceNote, LogicalDevice, PhysicalDevice, PhysicalToLogicalMapping, User logging.captureWarnings(True) + class DAOException(Exception): def __init__(self, msg: str = None, wrapped: Exception = None): self.msg: str = msg self.wrapped: Exception = wrapped + # This is raised in update methods when the entity to be updated does not exist. class DAODeviceNotFound(DAOException): pass + class DAOUserNotFound(DAOException): pass + # This is raised if Postgres raises a unique constraint exception. It is useful # for the REST API to know this was the problem rather than some general database # problem, and allows calling code to not have to examine the Postgres exception @@ -42,29 +41,14 @@ class DAOUniqeConstraintException(DAOException): pass -def adapt_location(location: Location): - """ - Transform a Location instance into a value usable in an SQL statement. - """ - return AsIs(f"('{location.lat},{location.long}')") - - -def cast_point(value, cur): - """ - Transform a value from an SQL result into a Location instance. - """ - if value is None: - return None - - if isinstance(value, str): - m = re.fullmatch(r'\(([+-]?\d+\.?\d*),([+-]?\d+\.?\d*)\)', value) - if m is not None: - return Location(lat=float(m.group(1)),long=m.group(2)) - - raise psycopg2.InterfaceError(f'Bad point representation: {value}') - - conn_pool = None +_physical_device_select_all_cols = """ +select uid, source_name, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location, last_seen, source_ids, properties from physical_devices +""" + +_logical_device_select_all_cols = """ +select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location, last_seen, properties from logical_devices +""" def stop() -> None: @@ -95,7 +79,7 @@ def _get_connection(): if conn_pool is None: logging.info('Creating connection pool, registering type converters.') conn_pool = pool.ThreadedConnectionPool(1, 5) - _register_type_adapters() + register_uuid() conn = conn_pool.getconn() logging.debug(f'Taking conn {conn}') @@ -120,25 +104,6 @@ def free_conn(conn) -> None: conn_pool.putconn(conn) -def _register_type_adapters(): - """ - Register adapter functions to handle Location <-> SQL Point data types. - """ - # Apapter to convert from a Location instance to a quoted SQL string. - register_adapter(Location, adapt_location) - - # Adapter to convert from an SQL result to a Location instance. - with _get_connection() as conn, conn.cursor() as cursor: - conn.autocommit = True - cursor.execute("SELECT NULL::point") - point_oid = cursor.description[0][1] - POINT = psycopg2.extensions.new_type((point_oid,), "POINT", cast_point) - psycopg2.extensions.register_type(POINT) - - free_conn(conn) - register_uuid() - - def _dict_from_row(result_metadata, row) -> Dict[str, Any]: obj = {} for i, col_def in enumerate(result_metadata): @@ -147,14 +112,28 @@ def _dict_from_row(result_metadata, row) -> Dict[str, Any]: return obj +def _device_to_query_params(device: BaseDevice) -> dict: + dev_fields = {} + for k, v in vars(device).items(): + match k: + case 'properties' | 'source_ids': + dev_fields[k] = Json(v) + case 'location': + if device.location is None or device.location.lat is None or device.location.long is None: + dev_fields[k] = None + else: + dev_fields[k] = AsIs(f"ST_GeomFromText('POINT({device.location.long} {device.location.lat})')") + case _: + dev_fields[k] = v + + return dev_fields + + """ Physical device source CRUD methods - -create table if not exists sources ( - source_name text primary key not null -); """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) + + def get_all_physical_sources() -> List[PhysicalDevice]: conn = None try: @@ -173,24 +152,45 @@ def get_all_physical_sources() -> List[PhysicalDevice]: free_conn(conn) +def add_physical_source(name: str) -> None: + """ + Add a new physical device source if it does not already exist. + """ + conn = None + try: + with _get_connection() as conn, conn.cursor() as cursor: + conn.autocommit = False + cursor.execute('select source_name from sources where source_name = %s', (name, )) + if cursor.rowcount == 0: + cursor.execute("insert into sources values (%s)", (name, )) + cursor.execute('select source_name from sources where source_name = %s', (name, )) + if cursor.rowcount == 1: + logging.info(f'Added physical device source {name}') + conn.commit() + else: + logging.error(f'Failed to add physical device source {name}') + conn.rollback() + except Exception as err: + conn.rollback() + raise err if isinstance(err, DAOException) else DAOException('add_physical_source failed.', err) + finally: + if conn is not None: + free_conn(conn) + + """ Physical device CRUD methods """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def create_physical_device(device: PhysicalDevice) -> PhysicalDevice: conn = None try: - dev_fields = {} - for k, v in vars(device).items(): - dev_fields[k] = v if k not in ('source_ids', 'properties') else Json(v) - + dev_fields = _device_to_query_params(device) with _get_connection() as conn, conn.cursor() as cursor: #logging.info(cursor.mogrify("insert into physical_devices (source_name, name, location, last_seen, source_ids, properties) values (%(source_name)s, %(name)s, %(location)s, %(last_seen)s, %(source_ids)s, %(properties)s) returning uid", dev_fields)) cursor.execute("insert into physical_devices (source_name, name, location, last_seen, source_ids, properties) values (%(source_name)s, %(name)s, %(location)s, %(last_seen)s, %(source_ids)s, %(properties)s) returning uid", dev_fields) uid = cursor.fetchone()[0] dev = _get_physical_device(conn, uid) - - return dev + return dev except Exception as err: raise err if isinstance(err, DAOException) else DAOException('create_physical_device failed.', err) finally: @@ -212,7 +212,7 @@ def _get_physical_device(conn, uid: int) -> PhysicalDevice: """ dev = None with conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices where uid = %s' + sql = f'{_physical_device_select_all_cols} where uid = %s' cursor.execute(sql, (uid, )) row = cursor.fetchone() if row is not None: @@ -222,15 +222,12 @@ def _get_physical_device(conn, uid: int) -> PhysicalDevice: return dev -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_device(uid: int) -> PhysicalDevice: conn = None try: - dev = None with _get_connection() as conn: dev = _get_physical_device(conn, uid) - - return dev + return dev except Exception as err: raise err if isinstance(err, DAOException) else DAOException('get_physical_device failed.', err) finally: @@ -238,13 +235,12 @@ def get_physical_device(uid: int) -> PhysicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_pyhsical_devices_using_source_ids(source_name: str, source_ids: Dict[str, str]) -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices where source_name = %s and source_ids @> %s order by uid asc' + sql = f'{_physical_device_select_all_cols} where source_name = %s and source_ids @> %s order by uid asc' args = (source_name, Json(source_ids)) #logging.info(cursor.mogrify(sql, args)) cursor.execute(sql, args) @@ -260,13 +256,12 @@ def get_pyhsical_devices_using_source_ids(source_name: str, source_ids: Dict[str free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_all_physical_devices() -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices order by uid asc' + sql = f'{_physical_device_select_all_cols} order by uid asc' cursor.execute(sql) cursor.arraysize = 200 rows = cursor.fetchmany() @@ -285,13 +280,12 @@ def get_all_physical_devices() -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_devices_from_source(source_name: str) -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices where source_name = %s order by uid asc' + sql = f'{_physical_device_select_all_cols} where source_name = %s order by uid asc' cursor.execute(sql, (source_name, )) cursor.arraysize = 200 rows = cursor.fetchmany() @@ -310,13 +304,15 @@ def get_physical_devices_from_source(source_name: str) -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def get_physical_devices(query_args = {}) -> List[PhysicalDevice]: +def get_physical_devices(query_args=None) -> List[PhysicalDevice]: + if query_args is None: + query_args = {} + conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - sql = 'select uid, source_name, name, location, last_seen, source_ids, properties from physical_devices' + sql = _physical_device_select_all_cols args = {} add_where = True @@ -369,7 +365,6 @@ def get_physical_devices(query_args = {}) -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_physical_device(device: PhysicalDevice) -> PhysicalDevice: conn = None try: @@ -385,7 +380,18 @@ def update_physical_device(device: PhysicalDevice) -> PhysicalDevice: for name, val in vars(device).items(): if val != current_values[name]: update_col_names.append(f'{name} = %s') - update_col_values.append(val if name not in ('source_ids', 'properties') else Json(val)) + match name: + case 'location': + if val is not None: + val = AsIs(f"ST_PointFromText('POINT({val.long} {val.lat})')") + + update_col_values.append(val) + + case 'properties' | 'source_ids': + update_col_values.append(Json(val)) + + case _: + update_col_values.append(val) logging.debug(update_col_names) logging.debug(update_col_values) @@ -416,7 +422,6 @@ def update_physical_device(device: PhysicalDevice) -> PhysicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_physical_device(uid: int) -> PhysicalDevice: conn = None try: @@ -434,7 +439,6 @@ def delete_physical_device(uid: int) -> PhysicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def create_physical_device_note(uid: int, note: str) -> None: conn = None try: @@ -449,7 +453,6 @@ def create_physical_device_note(uid: int, note: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_device_notes(uid: int) -> List[DeviceNote]: conn = None try: @@ -467,7 +470,6 @@ def get_physical_device_notes(uid: int) -> List[DeviceNote]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_physical_device_note(note: DeviceNote) -> None: conn = None try: @@ -481,7 +483,6 @@ def update_physical_device_note(note: DeviceNote) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_physical_device_note(uid: int) -> None: conn = None try: @@ -500,21 +501,15 @@ def delete_physical_device_note(uid: int) -> None: Logical Device CRUD methods """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def create_logical_device(device: LogicalDevice) -> LogicalDevice: conn = None try: - dev = None - dev_fields = {} - for k, v in vars(device).items(): - dev_fields[k] = v if k != 'properties' else Json(v) - + dev_fields = _device_to_query_params(device) with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("insert into logical_devices (name, location, last_seen, properties) values (%(name)s, %(location)s, %(last_seen)s, %(properties)s) returning uid", dev_fields) uid = cursor.fetchone()[0] dev = _get_logical_device(conn, uid) - - return dev + return dev except Exception as err: raise err if isinstance(err, DAOException) else DAOException('create_logical_device failed.', err) finally: @@ -536,7 +531,15 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice: """ dev = None with conn.cursor() as cursor: - sql = 'select uid, name, location, last_seen, properties from logical_devices where uid = %s' + """ + See: https://dba.stackexchange.com/questions/27732/set-names-to-attributes-when-creating-json-with-row-to-json + + for an explanation of how to name the fields in a row_to_json call. + + Example query tested in psql: + select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location from logical_devices; + """ + sql = f'{_logical_device_select_all_cols} where uid = %s' cursor.execute(sql, (uid, )) row = cursor.fetchone() if row is not None: @@ -546,7 +549,6 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice: return dev -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_logical_device(uid: int) -> LogicalDevice: conn = None try: @@ -560,15 +562,12 @@ def get_logical_device(uid: int) -> LogicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_logical_devices(query_args = {}) -> List[LogicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - #conn.autocommit = True - - sql = 'select uid, name, location, last_seen, properties from logical_devices' + sql = _logical_device_select_all_cols args = {} add_where = True @@ -593,12 +592,11 @@ def get_logical_devices(query_args = {}) -> List[LogicalDevice]: sql = sql + ' order by uid asc' #logging.info(cursor.mogrify(sql, args)) - + logging.info(cursor.mogrify(sql, args)) cursor.execute(sql, args) cursor.arraysize = 200 rows = cursor.fetchmany() while len(rows) > 0: - #logging.info(f'processing {len(rows)} rows.') for r in rows: d = LogicalDevice.parse_obj(_dict_from_row(cursor.description, r)) devs.append(d) @@ -613,7 +611,6 @@ def get_logical_devices(query_args = {}) -> List[LogicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_logical_device(device: LogicalDevice) -> LogicalDevice: conn = None try: @@ -629,7 +626,18 @@ def update_logical_device(device: LogicalDevice) -> LogicalDevice: for name, val in vars(device).items(): if val != current_values[name]: update_col_names.append(f'{name} = %s') - update_col_values.append(val if name != 'properties' else Json(val)) + match name: + case 'location': + if val is not None: + val = AsIs(f"ST_PointFromText('POINT({val.long} {val.lat})')") + + update_col_values.append(val) + + case 'properties': + update_col_values.append(Json(val)) + + case _: + update_col_values.append(val) if len(update_col_names) < 1: return device @@ -653,7 +661,6 @@ def update_logical_device(device: LogicalDevice) -> LogicalDevice: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_logical_device(uid: int) -> LogicalDevice: conn = None try: @@ -682,7 +689,6 @@ def delete_logical_device(uid: int) -> LogicalDevice: ); """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: """ Insert a device mapping. @@ -692,7 +698,7 @@ def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: with _get_connection() as conn, conn.cursor() as cursor: current_mapping = _get_current_device_mapping(conn, pd=mapping.pd.uid) if current_mapping is not None: - raise DAOUniqeConstraintException(f'insert_mapping failed: physical device {current_mapping.pd.uid} / "{current_mapping.pd.name}" is already mapped to logical device {current_mapping.ld.uid} / "{current_mapping.ld.name}"') + raise ValueError(f'insert_mapping failed: physical device {current_mapping.pd.uid} / "{current_mapping.pd.name}" is already mapped to logical device {current_mapping.ld.uid} / "{current_mapping.ld.name}"') current_mapping = _get_current_device_mapping(conn, ld=mapping.ld.uid) if current_mapping is not None: @@ -704,26 +710,33 @@ def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: except psycopg2.errors.UniqueViolation as err: raise DAOUniqeConstraintException(f'Mapping already exists: {mapping.pd.uid} {mapping.pd.name} -> {mapping.ld.uid} {mapping.ld.name}, starting at {mapping.start_time}.', err) except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('insert_mapping failed.', err) + match err: + case DAOException() | ValueError(): + raise err + case _: + raise DAOException('insert_mapping failed.', err) finally: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def end_mapping(pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: conn = None try: with _get_connection() as conn: _end_mapping(conn, pd, ld) except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('end_mapping failed.', err) + match err: + case DAOException() | ValueError(): + raise err + case _: + raise DAOException('end_mapping failed.', err) + finally: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def _end_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: with conn.cursor() as cursor: mapping: PhysicalToLogicalMapping = _get_current_device_mapping(conn, pd, ld) @@ -731,10 +744,10 @@ def _end_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Opti return if pd is None and ld is None: - raise DAOException('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to end a mapping.') + raise ValueError('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to end a mapping.') if pd is not None and ld is not None: - raise DAOException('Both pd and ld were provided, only give one when ending a mapping.') + raise ValueError('Both pd and ld were provided, only give one when ending a mapping.') p_uid = None if pd is not None: @@ -753,7 +766,6 @@ def _end_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Opti logging.warning(f'No mapping was updated during end_mapping for {pd.uid} {pd.name} -> {ld.uid} {ld.name}') -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def delete_mapping(mapping: PhysicalToLogicalMapping) -> None: """ Remove an entry from the physical_logical_map table. Advised to use end_mapping instead. This method should only be used when permanently deleting a physical or logical device from the database. @@ -770,7 +782,6 @@ def delete_mapping(mapping: PhysicalToLogicalMapping) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: """ Change the is_active column in the database @@ -791,10 +802,10 @@ def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, in def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None): if pd is None and ld is None: - raise DAOException('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') + raise ValueError('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') if pd is not None and ld is not None: - raise DAOException('Both pd and ld were provided, only give one.') + raise ValueError('Both pd and ld were provided, only give one.') p_uid = None if pd is not None: @@ -812,17 +823,19 @@ def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, i return None -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_current_device_mapping(pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None, only_current_mapping: bool = True) -> Optional[PhysicalToLogicalMapping]: conn = None try: - mapping = None with _get_connection() as conn: mapping = _get_current_device_mapping(conn, pd, ld, only_current_mapping) - - return mapping + return mapping except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('get_current_device_mapping failed.', err) + match err: + case DAOException() | ValueError(): + raise err + case _: + raise DAOException('insert_mapping failed.', err) + finally: if conn is not None: free_conn(conn) @@ -832,10 +845,10 @@ def _get_current_device_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = mappings = None if pd is None and ld is None: - raise DAOException('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') + raise ValueError('A PhysicalDevice or a LogicalDevice (or an uid for one of them) must be supplied to find a mapping.') if pd is not None and ld is not None: - raise DAOException('Both pd and ld were provided, only give one.') + raise ValueError('Both pd and ld were provided, only give one.') p_uid = None if pd is not None: @@ -869,13 +882,12 @@ def _get_current_device_mapping(conn, pd: Optional[Union[PhysicalDevice, int]] = return None -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_unmapped_physical_devices() -> List[PhysicalDevice]: conn = None try: devs = [] with _get_connection() as conn, conn.cursor() as cursor: - cursor.execute('select * from physical_devices where uid not in (select physical_uid from physical_logical_map where end_time is null) order by uid asc') + cursor.execute(f'{_physical_device_select_all_cols} where uid not in (select physical_uid from physical_logical_map where end_time is null) order by uid asc') for r in cursor: dfr = _dict_from_row(cursor.description, r) devs.append(PhysicalDevice.parse_obj(dfr)) @@ -888,7 +900,6 @@ def get_unmapped_physical_devices() -> List[PhysicalDevice]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_logical_device_mappings(ld: Union[LogicalDevice, int]) -> List[PhysicalToLogicalMapping]: conn = None try: @@ -910,7 +921,6 @@ def get_logical_device_mappings(ld: Union[LogicalDevice, int]) -> List[PhysicalT free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[PhysicalToLogicalMapping]: conn = None try: @@ -932,7 +942,6 @@ def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[Physica if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_all_current_mappings(return_uids: bool = True) -> List[PhysicalToLogicalMapping]: conn = None try: @@ -957,7 +966,6 @@ def get_all_current_mappings(return_uids: bool = True) -> List[PhysicalToLogical free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_raw_json_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None): conn = None try: @@ -972,7 +980,6 @@ def add_raw_json_message(source_name: str, ts: datetime, correlation_uuid: str, free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None: conn = None p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] @@ -989,8 +996,7 @@ def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, count: int, only_timestamp: bool) -> List[Dict]: +def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, include_received_at: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]: conn = None if start is None: @@ -1002,22 +1008,53 @@ def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, if count < 1: count = 1 - column_name = 'ts' if only_timestamp else 'json_msg' + if p_uid is None and l_uid is None: + raise ValueError('p_uid or l_uid must be supplied.') + + if p_uid is not None and l_uid is not None: + raise ValueError('Both p_uid and l_uid were provided, only give one.') + + if p_uid is not None: + uid_col_name = 'physical_uid' + uid = p_uid + else: + uid_col_name = 'logical_uid' + uid = l_uid + + if not isinstance(uid, int): + raise TypeError + + if not isinstance(start, datetime): + raise TypeError + + if not isinstance(end, datetime): + raise TypeError + + column_names = ['ts'] + if include_received_at: + column_names.append('received_at') + + if not only_timestamp: + column_names.append('json_msg') + + column_names = ', '.join(column_names) try: + # Order messages by descending timestamp. If a caller asks for one message, they probably want the latest + # message. with _get_connection() as conn, conn.cursor() as cursor: qry = f""" - select {column_name} from physical_timeseries - where physical_uid = %s + select ts {column_names} from physical_timeseries + where {uid_col_name} = %s and ts > %s and ts <= %s - order by ts asc + order by ts desc limit %s """ - args = (p_uid, start, end, count) + args = (uid, start, end, count) cursor.execute(qry, args) - return [row[0] for row in cursor.fetchall()] + return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()] except Exception as err: raise DAOException('get_physical_timeseries_message failed.', err) finally: @@ -1025,7 +1062,34 @@ def get_physical_timeseries_message(p_uid: int, start: datetime, end: datetime, free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) +def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: + if arg2 is None and arg3 is None: + return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} + + if arg2 is not None and arg3 is None: + if isinstance(arg2, datetime): + return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2} + else: + return arg2 + + msg_dict = {} + if arg3 is not None: + if isinstance(arg3, datetime): + msg_dict: dict = arg2 + if isinstance(arg3, datetime): + msg_dict['received_at'] = arg3.isoformat() + else: + msg_dict['received_at'] = arg3 + else: + msg_dict: dict = arg3 + if isinstance(arg2, datetime): + msg_dict['received_at'] = arg2.isoformat() + else: + msg_dict['received_at'] = arg2 + + return msg_dict + + def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None): conn = None try: @@ -1044,7 +1108,6 @@ def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, """ User and authentication CRUD methods """ -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_add(uname: str, passwd: str, disabled: bool) -> None: #Generate salted password @@ -1067,7 +1130,6 @@ def user_add(uname: str, passwd: str, disabled: bool) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_rm(uname: str) -> None: conn = None try: @@ -1081,7 +1143,6 @@ def user_rm(uname: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_set_read_only(uname: str, read_only: bool) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1093,11 +1154,10 @@ def user_set_read_only(uname: str, read_only: bool) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_user(uid = None, username = None, auth_token = None) -> User: conn = None if uid is None and username is None and auth_token is None: - raise DAOException('get_user requires at least one parameter') + raise ValueError('get_user requires at least one parameter') else: try: user = None @@ -1120,12 +1180,11 @@ def get_user(uid = None, username = None, auth_token = None) -> User: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_ls() -> List: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("select username from users order by uid") - results=cursor.fetchall() + results = cursor.fetchall() return [i[0] for i in results] except Exception as err: @@ -1135,15 +1194,13 @@ def user_ls() -> List: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_get_token(username, password) -> str | None: conn = None try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("select salt, password, auth_token from users where username=%s",(username,)) - result=cursor.fetchone() - + result = cursor.fetchone() if result is None: return None @@ -1163,7 +1220,6 @@ def user_get_token(username, password) -> str | None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def token_is_valid(user_token) -> bool: ''' Check if token is in database and is valid @@ -1171,7 +1227,7 @@ def token_is_valid(user_token) -> bool: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("select uid from users where auth_token=%s and valid='True'", (user_token,)) - result=cursor.fetchone() + result = cursor.fetchone() if result is None: return False @@ -1183,11 +1239,10 @@ def token_is_valid(user_token) -> bool: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def token_refresh(uname)-> None: +def token_refresh(uname) -> None: # Auth token to be used on other endpoints. - auth_token=os.urandom(64).hex() + auth_token = os.urandom(64).hex() try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1201,10 +1256,9 @@ def token_refresh(uname)-> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_change_password(username: str, new_passwd: str) -> None: - salt=os.urandom(64).hex() - pass_hash=hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() + salt = os.urandom(64).hex() + pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1217,17 +1271,16 @@ def user_change_password(username: str, new_passwd: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: """ Changes user's password and auth token, returns users new auth token upon success """ #Generate salted password - salt=os.urandom(64).hex() - pass_hash=hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() + salt = os.urandom(64).hex() + pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() #Auth token to be used on other endpoints - auth_token=os.urandom(64).hex() + auth_token = os.urandom(64).hex() try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1245,8 +1298,7 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) -def token_disable(uname)->None: +def token_disable(uname) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1259,7 +1311,6 @@ def token_disable(uname)->None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def token_enable(uname)-> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1271,7 +1322,6 @@ def token_enable(uname)-> None: if conn is not None: free_conn(conn) - """ DATA_NAME_MAP : links incoming data names to a standardised version, so that timeseries data can be more coherent """ @@ -1295,7 +1345,6 @@ def _get_std_name(conn, input_name: str) -> str: return std_name -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_std_name(input_name: str) -> str: """ CASE INSENSITIVE @@ -1311,7 +1360,6 @@ def get_std_name(input_name: str) -> str: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_name_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1324,7 +1372,6 @@ def update_name_map(input_name: str, std_name:str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_name_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1337,7 +1384,6 @@ def add_name_map(input_name: str, std_name:str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def remove_name_map(input_name: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1349,7 +1395,6 @@ def remove_name_map(input_name: str) -> None: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def list_name_map() -> List[Tuple[str, str]]: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1362,7 +1407,6 @@ def list_name_map() -> List[Tuple[str, str]]: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_word_list(full_word: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1374,7 +1418,6 @@ def add_word_list(full_word: str) -> None: if conn is not None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def remove_word_list(full_word: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1405,7 +1448,6 @@ def _get_type_map(conn): return row return type_map -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_type_map(): """ CASE INSENSITIVE @@ -1422,7 +1464,6 @@ def get_type_map(): -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def list_word_list() -> List[str]: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1436,7 +1477,6 @@ def list_word_list() -> List[str]: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def update_type_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1449,7 +1489,6 @@ def update_type_map(input_name: str, std_name:str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def add_type_map(input_name: str, std_name:str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1477,7 +1516,6 @@ def _get_word_list(conn): return word_list -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_word_list(): """ CASE INSENSITIVE @@ -1493,7 +1531,6 @@ def get_word_list(): free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def remove_type_map(input_name: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1506,7 +1543,6 @@ def remove_type_map(input_name: str) -> None: free_conn(conn) -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def list_type_map() -> List[Tuple[str, str]]: try: with _get_connection() as conn, conn.cursor() as cursor: @@ -1533,7 +1569,6 @@ def _get_hash_table(conn): return hash_table -@backoff.on_exception(backoff.expo, DAOException, max_time=30) def get_hash_table(): """ CASE INSENSITIVE @@ -1547,3 +1582,4 @@ def get_hash_table(): finally: if conn is not None: free_conn(conn) + diff --git a/src/python/broker-cli.py b/src/python/broker-cli.py index c1a9dbf6..97d13c55 100755 --- a/src/python/broker-cli.py +++ b/src/python/broker-cli.py @@ -66,6 +66,7 @@ def str_to_dict(val) -> Dict: ## List logical devices ld_ls_parser = ld_sub_parsers.add_parser('ls', help='list logical devices') +ld_ls_parser.add_argument('--properties', action='store_true', help='Include the properties field in the output', dest='include_props', required=False) ld_ls_parser.add_argument('--plain', action='store_true', help='Plain output, not JSON', dest='plain') ## Create logical devices @@ -345,11 +346,19 @@ def main() -> None: elif args.cmd1 == 'ld': if args.cmd2 == 'ls': devs = dao.get_logical_devices() + + if args.include_props: + tmp_list = list(map(lambda dev: dev.dict(), devs)) + else: + tmp_list = list(map(lambda dev: dev.dict(exclude={'properties'}), devs)) + print(pretty_print_json(tmp_list)) + tmp_list = list(map(lambda dev: dev.dict(exclude={'properties'}), devs)) if not args.plain: print(pretty_print_json(tmp_list)) else: plain_pd_list(devs) + elif args.cmd2 == 'create': dev = LogicalDevice.parse_obj(dict_from_file_or_string()) print(dao.create_logical_device(dev)) diff --git a/src/python/delivery/FRRED.py b/src/python/delivery/FRRED.py index 40b168db..aed79c7b 100644 --- a/src/python/delivery/FRRED.py +++ b/src/python/delivery/FRRED.py @@ -147,7 +147,7 @@ def on_message(channel, method, properties, body): _channel.basic_ack(delivery_tag) return - if BrokerConstants.WOMBAT != pd.source_name: + if pd.source_name not in [BrokerConstants.WOMBAT, BrokerConstants.AXISTECH]: _channel.basic_ack(delivery_tag) return diff --git a/src/python/logical_mapper/LogicalMapper.py b/src/python/logical_mapper/LogicalMapper.py index 9a4c159e..0c98b28a 100644 --- a/src/python/logical_mapper/LogicalMapper.py +++ b/src/python/logical_mapper/LogicalMapper.py @@ -18,6 +18,9 @@ """ import asyncio, json, logging, signal +import datetime + +import dateutil.parser import BrokerConstants from pika.exchange_type import ExchangeType @@ -25,10 +28,12 @@ import api.client.DAO as dao import util.LoggingUtil as lu -rx_channel = None -tx_channel = None -mq_client = None -finish = False +_rx_channel = None +_tx_channel = None +_mq_client = None +_finish = False + +_max_delta = datetime.timedelta(hours=-1) def sigterm_handler(sig_no, stack_frame) -> None: @@ -36,12 +41,12 @@ def sigterm_handler(sig_no, stack_frame) -> None: Handle SIGTERM from docker by closing the mq and db connections and setting a flag to tell the main loop to exit. """ - global finish, mq_client + global _finish, _mq_client - logging.info(f'{signal.strsignal(sig_no)}, setting finish to True') - finish = True + logging.info(f'{signal.strsignal(sig_no)}, setting _finish to True') + _finish = True dao.stop() - mq_client.stop() + _mq_client.stop() async def main(): @@ -53,25 +58,25 @@ async def main(): It would be good to find a better way to do nothing than the current loop. """ - global mq_client, rx_channel, tx_channel, finish + global _mq_client, _rx_channel, _tx_channel, _finish logging.info('===============================================================') logging.info(' STARTING LOGICAL MAPPER') logging.info('===============================================================') - rx_channel = mq.RxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='lm_physical_timeseries', on_message=on_message) - tx_channel = mq.TxChannel(exchange_name=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout) - mq_client = mq.RabbitMQConnection(channels=[rx_channel, tx_channel]) + _rx_channel = mq.RxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='lm_physical_timeseries', on_message=on_message) + _tx_channel = mq.TxChannel(exchange_name=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout) + _mq_client = mq.RabbitMQConnection(channels=[_rx_channel, _tx_channel]) - asyncio.create_task(mq_client.connect()) + asyncio.create_task(_mq_client.connect()) - while not (rx_channel.is_open and tx_channel.is_open): + while not (_rx_channel.is_open and _tx_channel.is_open): await asyncio.sleep(0) - while not finish: + while not _finish: await asyncio.sleep(2) - while not mq_client.stopped: + while not _mq_client.stopped: await asyncio.sleep(1) @@ -80,14 +85,14 @@ def on_message(channel, method, properties, body): This function is called when a message arrives from RabbitMQ. """ - global rx_channel, tx_channel, finish + global _rx_channel, _tx_channel, _finish delivery_tag = method.delivery_tag - # If the finish flag is set, reject the message so RabbitMQ will re-queue it + # If the _finish flag is set, reject the message so RabbitMQ will re-queue it # and return early. - if finish: - rx_channel._channel.basic_reject(delivery_tag) + if _finish: + _rx_channel._channel.basic_reject(delivery_tag) return try: @@ -99,7 +104,7 @@ def on_message(channel, method, properties, body): lu.cid_logger.error(f'Physical device not found, cannot continue. Dropping message.', extra=msg) # Ack the message, even though we cannot process it. We don't want it redelivered. # We can change this to a Nack if that would provide extra context somewhere. - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) return lu.cid_logger.info(f'Accepted message from {pd.name}', extra=msg) @@ -116,7 +121,7 @@ def on_message(channel, method, properties, body): # Ack the message, even though we cannot process it. We don't want it redelivered. # We can change this to a Nack if that would provide extra context somewhere. - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) return msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] = mapping.ld.uid @@ -124,19 +129,39 @@ def on_message(channel, method, properties, body): dao.insert_physical_timeseries_message(msg) ld = mapping.ld - ld.last_seen = msg[BrokerConstants.TIMESTAMP_KEY] + + # Determine if the message has a future timestamp. + ts_str: datetime.datetime = msg[BrokerConstants.TIMESTAMP_KEY] + ts = dateutil.parser.isoparse(ts_str) + utc_now = datetime.datetime.now(datetime.timezone.utc) + ts_delta = utc_now - ts + + # Drop messages with a timestamp more than 1 hour in the future. + if ts_delta < _max_delta: + lu.cid_logger.warning(f'Message with future timestamp. Dropping message.', extra=msg) + # Ack the message, even though we cannot process it. We don't want it redelivered. + # We can change this to a Nack if that would provide extra context somewhere. + _rx_channel._channel.basic_ack(delivery_tag) + return + + if ts > utc_now: + # If the timestamp is a bit in the future then make the last seen time 'now'. + ld.last_seen = utc_now + else: + ld.last_seen = ts + lu.cid_logger.info(f'Timestamp from message for LD last seen update: {ld.last_seen}', extra=msg) ld.properties[BrokerConstants.LAST_MSG] = msg dao.update_logical_device(ld) - tx_channel.publish_message('logical_timeseries', msg) + _tx_channel.publish_message('logical_timeseries', msg) # This tells RabbitMQ the message is handled and can be deleted from the queue. - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) except BaseException as e: logging.exception('Error while processing message') - rx_channel._channel.basic_ack(delivery_tag) + _rx_channel._channel.basic_ack(delivery_tag) if __name__ == '__main__': diff --git a/src/python/pdmodels/Models.py b/src/python/pdmodels/Models.py index c3afc72b..a69dd7f1 100644 --- a/src/python/pdmodels/Models.py +++ b/src/python/pdmodels/Models.py @@ -4,8 +4,8 @@ class Location(BaseModel): - lat: float - long: float + lat: Optional[float] + long: Optional[float] @staticmethod def from_ttn_device(ttn_dev: Dict): @@ -22,24 +22,25 @@ def from_ttn_device(ttn_dev: Dict): # Allowing extra attributes in this class to make life easier for the webapp - it can pass extra info # to the templates in the device object rather than passing in lists of mappings etc. -class PhysicalDevice(BaseModel, extra=Extra.allow): +class BaseDevice(BaseModel, extra=Extra.allow): uid: Optional[int] - source_name: str name: str location: Optional[Location] last_seen: Optional[datetime] - source_ids: Dict = {} properties: Dict = {} # Allowing extra attributes in this class to make life easier for the webapp - it can pass extra info # to the templates in the device object rather than passing in lists of mappings etc. -class LogicalDevice(BaseModel, extra=Extra.allow): - uid: Optional[int] - name: str - location: Optional[Location] - last_seen: Optional[datetime] - properties = {} +class PhysicalDevice(BaseDevice, extra=Extra.allow): + source_name: str + source_ids: Dict = {} + + +# Allowing extra attributes in this class to make life easier for the webapp - it can pass extra info +# to the templates in the device object rather than passing in lists of mappings etc. +class LogicalDevice(BaseDevice, extra=Extra.allow): + pass class PhysicalToLogicalMapping(BaseModel): diff --git a/src/python/pollers/axistech.py b/src/python/pollers/axistech.py new file mode 100644 index 00000000..56857218 --- /dev/null +++ b/src/python/pollers/axistech.py @@ -0,0 +1,286 @@ +import datetime as dt +import json +import logging +import os +import time +import uuid +from typing import Dict, Optional + +import dateutil.tz as dtz +import dateutil.parser as dup +import pandas as pd +import pika +import pika.adapters.blocking_connection as pab +import pika.channel +import pika.spec +import pprint +import requests +from pika.exchange_type import ExchangeType + +import BrokerConstants +import api.client.DAO as dao +import util.LoggingUtil as lu +from pdmodels.Models import PhysicalDevice + +_user = os.environ['RABBITMQ_DEFAULT_USER'] +_passwd = os.environ['RABBITMQ_DEFAULT_PASS'] +_host = os.environ['RABBITMQ_HOST'] +_port = os.environ['RABBITMQ_PORT'] + +_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F' + +_connection: pab.BlockingConnection = None +_channel: pab.BlockingChannel = None + +_api_token = os.getenv('AXISTECH_TOKEN') + +_recent_msg_times: Dict[str, dt.datetime] = {} +""" +Holds the most recent message timestamp for each AxisTech device. Keyed by device code. +""" + + +_sydney_tz = dtz.gettz('Australia/Sydney') + + +def local_time_str(ts: dt.datetime) -> str: + """ + Return an AE[S|D]T string representation of ts, eg '16/01/2024 23:11' + """ + return ts.astimezone(_sydney_tz).strftime('%d/%m/%Y %H:%M') + + +def z_ts(ts: dt.datetime) -> str: + """ + AxisTech will only accept start and end timestamps with a time component in the form YYYY-MM-DDThh:mm:ssZ, + so this function takes a datetime object and returns it formatted as described, by converting to UTC if + necessary and then replacing the +00:00 tz suffix with Z. + """ + return ts.astimezone(dt.timezone.utc).isoformat(timespec='seconds').replace('+00:00', 'Z') + + +def make_msg(row: pd.Series) -> Dict: + """ + Transform a row from the DataFrame with the AxisTech data into a row with an IoTa format message. + """ + serial_no, ts = row.name + values = dict(zip(row.index.values, row.values)) + correlation_id = str(uuid.uuid4()) + str_timestamp = ts.isoformat(timespec='seconds') + if str_timestamp.endswith('+00:00'): + str_timestamp = str_timestamp.replace('+00:00', 'Z') + + source_ids = {'serial_no': serial_no, 'sdi-12': [f'813AXSTECH AWS 000{serial_no}']} + msg = {BrokerConstants.TIMESTAMP_KEY: str_timestamp, 'source_ids': source_ids, + BrokerConstants.TIMESERIES_KEY: [], BrokerConstants.CORRELATION_ID_KEY: correlation_id} + + for name, value in values.items(): + msg['timeseries'].append({'name': name, 'value': value}) + + return msg + + +def process_msg(msg: Dict) -> None: + """ + Send a message onto the rest of IoTa. + """ + global _connection, _channel + + ts = dup.parse(msg[BrokerConstants.TIMESTAMP_KEY]) + serial_no = msg["source_ids"]["serial_no"] + source_ids = msg['source_ids'] + + dao.add_raw_json_message(BrokerConstants.AXISTECH, ts, msg[BrokerConstants.CORRELATION_ID_KEY], msg) + + pds = dao.get_pyhsical_devices_using_source_ids(BrokerConstants.AXISTECH, source_ids) + if len(pds) < 1: + lu.cid_logger.info(f'Physical device not found for device {serial_no}, creating a new one.', extra=msg) + + props = {BrokerConstants.CREATION_CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY], + BrokerConstants.LAST_MSG: json.dumps(msg)} + + pdev = PhysicalDevice(source_name=BrokerConstants.AXISTECH, name=serial_no, location=None, + source_ids=source_ids, properties=props) + pdev = dao.create_physical_device(pdev) + else: + lu.cid_logger.info(f'Accepted message from {serial_no}, updating last seen time to {ts}.', extra=msg) + pdev = pds[0] + pdev.properties[BrokerConstants.LAST_MSG] = json.dumps(msg) + + msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] = pdev.uid + lu.cid_logger.info(f'Posting msg: {msg}', extra=msg) + _channel.basic_publish(BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, 'physical_timeseries', json.dumps(msg).encode('UTF-8')) + _connection.process_data_events(0) + + # Update last seen here so if the publish fails and the process restarts, the message will be reprocessed because + # it is less than the device's last_seen time. + pdev.last_seen = ts + dao.update_physical_device(pdev) + + +def get_messages(start: dt.datetime, end: dt.datetime) -> Optional[pd.DataFrame]: + global _recent_msg_times + + drop_cols = ['wind_dir_var_avg', 'uv_index_avg'] + """ + Columns in the AxisTech message that have no equivalent in the SCMN ATM-41 messages, so these get dropped. + """ + + atm41_col_names = ['8_Precipitation', '8_AirTemperature', '8_WindSpeed', '8_WindSpeed_max', '8_RH', '8_AirPressure', + '8_DeltaT', '8_DewPoint', '8_Solar', '8_WindDirection', '8_WindSpeed_min'] + """ + The variable names to use to make the AxisTech message look like an SCMN ATM-41 message. + """ + + try: + url = f'https://data.exchange.axisstream.co/?token={_api_token}&startDate={z_ts(start)}&endDate={z_ts(end)}' + r = requests.get(url) + r.raise_for_status() + data = r.json() + + if 'bb5d4f86-6eaa-494d-abcc-8f2e9b66b214' not in data['data']: + logging.warning('Did not find expected UUID in data object.') + logging.warning(pprint.pformat(data)) + return None + + frames = [] + counter = 0 + for info in data['data']['bb5d4f86-6eaa-494d-abcc-8f2e9b66b214']['weather']: + code = info['code'] + ts = dup.parse(info['time']) + if code not in _recent_msg_times or ts > _recent_msg_times[code]: + _recent_msg_times[code] = ts + frame = pd.DataFrame(info, index=[counter]) + frames.append(frame) + counter += 1 + + if counter < 1: + return None + + df = pd.concat(frames, axis=0) + df['rainfall'] = df['rainfall'].astype(float) + df['humidity_avg'] = df['humidity_avg'].astype(float) + df['temperature_avg'] = df['temperature_avg'].astype(float) + df['wind_speed_avg'] = df['wind_speed_avg'].astype(float) + df['wind_speed_max'] = df['wind_speed_max'].astype(float) + df['atmos_pressure_avg'] = df['atmos_pressure_avg'].astype(float) + df['deltat_avg'] = df['deltat_avg'].astype(float) + df['dewpoint_avg'] = df['dewpoint_avg'].astype(float) + df['solar_rad_avg'] = df['solar_rad_avg'].astype(float) + df['uv_index_avg'] = df['uv_index_avg'].astype(float) + df['wind_dir_deg_avg'] = df['wind_dir_deg_avg'].astype(float) + df['wind_speed_min'] = df['wind_speed_min'].astype(float) + df['time'] = pd.to_datetime(df['time']) + + # Use a MultiIndex to make grouping by code easy later on. + df.set_index(['code', 'time'], inplace=True) + df.index = df.index.sort_values() + + # Apply column header changes + df.drop(drop_cols, inplace=True, axis=1) + df.columns = atm41_col_names + + return df + + except BaseException as e: + logging.exception(e) + + return None + + +def poll() -> None: + # The reason for such a large window time is that the AxisTech API is slow to provide new messages + # and seems to lag hours behind. If we poll every hour and don't ask for too big a window, it should not + # place too high a load on their servers. + # + # If we only ever polled for say the last hour, we'd rarely if ever get any messages. + end_ts = dt.datetime.now(dt.timezone.utc) + start_ts = end_ts - dt.timedelta(days=5) + + # Find the earliest 'most recent' message time. If one can be found there is no point asking for + # messages from before then because they've already been seen. One hole in this logic would be + # if a new device is added to AxisTech, it's first messages may be missed. + some_ts = None + for code, ts in _recent_msg_times.items(): + if some_ts is None or ts < some_ts: + some_ts = ts + + # If a message has been seen more recently than the default start_ts value, only ask for messages since the + # timestamp of the received messages. This risks missing messages from a code that are older than the default + # start of the window if the code has not sent a message in longer than that, but the alternative is to risk + # the window growing indefinitely if a device goes offline. + if some_ts is not None and some_ts > start_ts: + logging.info(f'Adjusting start_ts, was {local_time_str(start_ts)}, will be {local_time_str(some_ts)}') + start_ts = some_ts + + logging.info(f'Polling for message between {z_ts(start_ts)} and {z_ts(end_ts)}, [{local_time_str(start_ts)} to {local_time_str(end_ts)}]') + msgs_df = get_messages(start_ts, end_ts) + if msgs_df is None: + logging.info('No new messages') + return + + # Group the dataframe rows by device code. + code_groups = msgs_df.groupby(level=0) + + logging.info('New messages') + # For each device code subset of the dataframe, apply the function to create the messages. The function is given + # a pd.Series that contains all the info for one row. + for code, code_df in code_groups: + code_df.apply(make_msg, axis=1).apply(process_msg) + + logging.info(f'Latest message times: {_recent_msg_times}') + + +def main() -> None: + global _connection, _channel + + logging.info('===============================================================') + logging.info(' STARTING AXISTECH POLLER') + logging.info('===============================================================') + + dao.add_physical_source(BrokerConstants.AXISTECH) + + # Initialise the most recent message timestamp cache. This is used to control the time window + # used in the AxisTech API calls. + for pdev in dao.get_physical_devices_from_source(BrokerConstants.AXISTECH): + _recent_msg_times[pdev.source_ids['serial_no']] = pdev.last_seen + + try: + logging.info('Opening connection') + conn_attempts = 0 + backoff = 10 + while _connection is None: + try: + _connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str)) + except: + conn_attempts += 1 + logging.warning(f'Connection to RabbitMQ attempt {conn_attempts} failed.') + + if conn_attempts % 5 == 0 and backoff < 60: + backoff += 10 + + time.sleep(backoff) + + logging.info('Opening channel') + _channel = _connection.channel() + _channel.basic_qos(prefetch_count=1) + logging.info('Declaring exchange') + _channel.exchange_declare(exchange=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, + exchange_type=ExchangeType.fanout, durable=True) + + sleep_time = 1800 # seconds + while True: + poll() + _connection.process_data_events(sleep_time) + + except KeyboardInterrupt: + logging.info('Stopping') + dao.stop() + + if _connection is not None: + _connection.close() + + +if __name__ == '__main__': + main() + diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 2bdada76..206b9f39 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -252,6 +252,7 @@ async def update_logical_device(device: LogicalDevice) -> LogicalDevice: except dao.DAODeviceNotFound as daonf: raise HTTPException(status_code=404, detail=daonf.msg) except dao.DAOException as err: + logging.exception(err) raise HTTPException(status_code=500, detail=err.msg) @@ -290,6 +291,8 @@ async def insert_mapping(mapping: PhysicalToLogicalMapping) -> None: raise HTTPException(status_code=400, detail=err.msg) except dao.DAOException as err: raise HTTPException(status_code=500, detail=err.msg) + except ValueError as err: + raise HTTPException(status_code=400, detail=str(err)) @router.get("/mappings/current/", tags=['device mapping'], response_model=List[PhysicalToLogicalMapping], dependencies=[Depends(token_auth_scheme)]) @@ -459,21 +462,24 @@ async def end_mapping_of_logical_uid(uid: int) -> None: MESSAGE RELATED --------------------------------------------------------------------------""" -@router.get("/physical/messages/{uid}", tags=['messages']) +@router.get("/messages", tags=['Messages'], dependencies=[Depends(token_auth_scheme)]) async def get_physical_timeseries( request: Request, - uid: int, + p_uid: int | None = None, + l_uid: int | None = None, count: Annotated[int | None, Query(gt=0, le=65536)] = None, last: str = None, start: datetime.datetime = None, end: datetime.datetime = None, - only_timestamp: bool = False): + include_received_at: bool = False, + only_timestamp: bool = False) -> List[Dict]: """ Get the physical_timeseries entries described by the physical device uid and the parameters. Args: request: The HTTP request object. - uid: The unique identifier of the physical device. + p_uid: The unique identifier of a physical device. Mutually exclusive with l_uid. + l_uid: The unique identifier of a logical device. Mutually exclusive with p_uid. count: The maximum number of entries to return. last: Return messages from the last nx interval where n is a number and x is 'h'ours, 'd'ays, 'w'eeks, 'm'onths, 'y'ears. start: The start date and time of the time range. @@ -487,7 +493,6 @@ async def get_physical_timeseries( HTTPException: If an error occurs. """ try: - #logging.info(f'start: {start.isoformat() if start is not None else start}, end: {end.isoformat() if end is not None else end}, count: {count}, only_timestamp: {only_timestamp}') if end is not None: if start is not None and start >= end: raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "start"], "msg": "ensure start value is less than end"}]}) @@ -503,8 +508,7 @@ async def get_physical_timeseries( except: raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "last"], "msg": "the first part of last must be an integer"}]}) - unit = last[-1] - + unit = str(last[-1]).lower() if unit == 'h': diff = datetime.timedelta(hours=i) elif unit == 'd': @@ -520,7 +524,12 @@ async def get_physical_timeseries( start = end - diff - msgs = dao.get_physical_timeseries_message(uid, start, end, count, only_timestamp) + msgs = None + if p_uid is not None: + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, p_uid=p_uid) + elif l_uid is not None: + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, l_uid=l_uid) + if msgs is None: raise HTTPException(status_code=404, detail="Failed to retrieve messages") @@ -530,6 +539,7 @@ async def get_physical_timeseries( except dao.DAOException as err: raise HTTPException(status_code=500, detail=err.msg) + """-------------------------------------------------------------------------- USER AUTHENTICATION --------------------------------------------------------------------------""" diff --git a/src/python/util/Extract.py b/src/python/util/Extract.py new file mode 100644 index 00000000..b3a2a7ed --- /dev/null +++ b/src/python/util/Extract.py @@ -0,0 +1,103 @@ +import argparse as ap +import datetime as dt +import logging +import os +from typing import Any, List, Optional +import pandas as pd +import psycopg2 as pg + +import LoggingUtil + +def get_data_for_logical_device(l_uid: int, start_ts: Optional[dt.datetime] = None, end_ts: Optional[dt.datetime] = None) -> pd.DataFrame: + qry = """ + SELECT logical_uid, physical_uid, ts, received_at, ts_delta, json_msg->'timeseries' AS ts_array FROM physical_timeseries + WHERE logical_uid = %s + """ + + qry_args: List[Any] = [l_uid] + + if start_ts is not None: + qry += ' AND ts >= %s' + qry_args.append(start_ts) + + if end_ts is not None: + qry += ' AND ts < %s' + qry_args.append(end_ts) + + dataset = [] + with pg.connect() as conn, conn.cursor() as curs: + curs.execute('SELECT name from logical_devices where uid = %s', (l_uid, )) + if curs.rowcount != 1: # If > 1, how? + logging.error(f'Did not find a logical device with id {l_uid}') + exit(1) + + dev_name: str = str(curs.fetchone()[0]) + dev_name = dev_name.replace(' ', '_') + logging.info(f'Fetching data for {l_uid} / {dev_name}') + + logging.debug(qry) + logging.debug(curs.mogrify(qry, qry_args)) + curs.execute(qry, qry_args) + if curs.rowcount < 1: + logging.info(f'No data for {l_uid} / {dev_name}') + exit(0) + + while True: + rows = curs.fetchmany(size=2000) + print(f'fetched {len(rows)} rows') + if len(rows) < 1: + break + + for row in rows: + dset_item = {'l_uid': row[0], 'p_uid': row[1], 'ts': row[2], 'received_at': row[3], 'ts_delta': row[4]} + for ts_obj in row[5]: + dset_item[ts_obj['name']] = ts_obj['value'] + dataset.append(dset_item) + + df = pd.DataFrame(dataset) + df.set_index(['l_uid', 'ts'], inplace=True) + df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True) + df.to_csv(f'{l_uid}_{dev_name}.csv') + return df + + +_default_host = 'localhost' +_default_port = '5432' +_default_dbname = 'broker' # This is an IoTa utility, so use the IoTa database name by default. +_default_user = 'postgres' + +parser = ap.ArgumentParser(description='Extract data from the IoTa database') +parser.add_argument('-H', dest='host', help='host to connect to, default = localhost') +parser.add_argument('-p', dest='port', help='port number to connect to, default = 5432') +parser.add_argument('-d', dest='dbname', help='database name to connect to, default = broker') +parser.add_argument('-U', dest='user', help='User name to connect as, default = postgres') +parser.add_argument('-P', dest='password', help='password to connect with, prefer to set PGPASSWORD env var') +parser.add_argument('-l', dest='l_uid', type=int, help='logical device id') +parser.add_argument('-s', dest='start_time', type=dt.datetime.fromisoformat, help='earliest timestamp in ISO-8601 format (>=)') +parser.add_argument('-e', dest='end_time', type=dt.datetime.fromisoformat, help='latest timestamp in ISO-8601 format (<)') + +args = parser.parse_args() + +# Give precendence to command line args, fall back to env var value if it is set, +# finally, fall back to a default value. +_host = os.getenv('PGHOST', _default_host) if args.host is None else args.host +if _host is not None: + os.environ['PGHOST'] = _host + +_port = os.getenv('PGPORT', _default_port) if args.port is None else args.port +if _port is not None: + os.environ['PGPORT'] = _port + +_dbname = os.getenv('PGDATABASE', _default_dbname)if args.dbname is None else args.dbname +if _dbname is not None: + os.environ['PGDATABASE'] = _dbname + +_user = os.getenv('PGUSER', _default_user) if args.user is None else args.user +if _user is not None: + os.environ['PGUSER'] = _user + +_password = os.getenv('PGPASSWORD') if args.password is None else args.password +if _password is not None: + os.environ['PGPASSWORD'] = _password + +get_data_for_logical_device(args.l_uid, args.start_time, args.end_time) diff --git a/src/www/app/main.py b/src/www/app/main.py index 6b2085df..a9ae00ae 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -1,9 +1,13 @@ +import atexit +import logging +import time from sys import stderr from typing import Tuple from requests.auth import parse_dict_header from flask import Flask, render_template, request, make_response, redirect, url_for, session, send_from_directory, jsonify + import folium -import paho.mqtt.publish as publish +import paho.mqtt.client as mqtt import os from datetime import timedelta, datetime, timezone import re @@ -13,6 +17,26 @@ from werkzeug.middleware.dispatcher import DispatcherMiddleware from werkzeug.wrappers import Response +from pdmodels.Models import PhysicalDevice, LogicalDevice, PhysicalToLogicalMapping, Location + +from logging.config import dictConfig + +dictConfig({ + 'version': 1, + 'formatters': {'default': { + 'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', + }}, + 'handlers': {'wsgi': { + 'class': 'logging.StreamHandler', + 'stream': 'ext://flask.logging.wsgi_errors_stream', + 'formatter': 'default' + }}, + 'root': { + 'level': 'INFO', + 'handlers': ['wsgi'] + } +}) + app = Flask(__name__, static_url_path='/static') _location_re = re.compile(r'([+-]?\d+\.?\d*)\s*,\s*([+-]?\d+\.?\d*)') @@ -64,6 +88,63 @@ def time_since(date: datetime) -> str: return f'{seconds} seconds ago' +#------------- +# MQTT section +#------------- + +_mqtt_client = None +_wombat_config_msgs = {} + +def mqtt_on_connect(client, userdata, flags, rc): + global _mqtt_client + app.logger.info('MQTT connected') + _mqtt_client.subscribe(f'wombat/+') + +def mqtt_on_message(client, userdata, msg): + tp = msg.topic.split('/') + if len(tp) == 2: + sn = tp[1] + if len(msg.payload) > 0: + script = str(msg.payload, encoding='UTF-8') + _wombat_config_msgs[sn] = script + else: + _wombat_config_msgs[sn] = None + + +def _send_mqtt_msg_via_sn(serial_nos: List[str], msg: str | None) -> None: + """ + Publish a config script to a list of Wombats. + + Params: + serial_nos: A list of Wombat serial numbers. + msg: The config script to send, use None to clear the script. + """ + if _mqtt_client.is_connected(): + for sn in serial_nos: + _mqtt_client.publish(f'wombat/{sn}', msg, 1, True) + + +def _send_mqtt_msg_via_uids(p_uids: str | List[int], msg: str | None) -> None: + """ + Publish a config script to a list of Wombats. + + Params: + p_uids: A list of integer physical device ids or a string of the form "1,2,3". + msg: The config script to send, use None to clear the script. + """ + if _mqtt_client.is_connected(): + if isinstance(p_uids, str): + p_uids = list(map(lambda i: int(i), p_uids.split(','))) + + serial_nos = [] + for p_uid in p_uids: + pd = get_physical_device(p_uid, session.get('token')) + if pd is not None and pd.source_ids.get('serial_no', None) is not None: + serial_nos.append(pd.source_ids['serial_no']) + + _send_mqtt_msg_via_sn(serial_nos, msg) + + """ Session cookie config @@ -81,7 +162,7 @@ def check_user_logged_in(): if not session.get('token'): if request.path != '/login' and request.path != '/static/main.css': # Stores the url user tried to go to in session so when they log in, we take them back to it - session['original_url'] = request.url + session['original_url'] = request.url return redirect(url_for('login'), code=302) @@ -101,10 +182,10 @@ def login(): user_token = get_user_token(username=username, password=password) session['user'] = username session['token'] = user_token - + if 'original_url' in session: return redirect(session.pop('original_url')) - + return redirect(url_for('index')) return render_template("login.html") @@ -143,37 +224,47 @@ def account(): return render_template('account.html') -@app.route('/get_wombat_logs', methods=['GET']) +@app.route('/wombats/config/logs', methods=['GET']) def get_wombat_logs(): - p_uids = list(map(lambda i: int(i), request.args['uids'].split(','))) - app.logger.info(f'get_wombat_logs for p_uids: {p_uids}') - msgs = [] - - for p_uid in p_uids: - pd = get_physical_device(p_uid, session.get('token')) - if pd is not None: - app.logger.info(f'Wombat serial no: {pd.source_ids["serial_no"]}') - msgs.append((f'wombat/{pd.source_ids["serial_no"]}', 'ftp login\nupload log.txt\nftp logout\n', 1, True)) - - # NOTE! Assuming default port of 1883. - publish.multiple(msgs, hostname=_mqtt_host, auth={'username': _mqtt_user, 'password': _mqtt_pass}) + _send_mqtt_msg_via_uids(request.args['uids'], 'ftp login\nftp upload log.txt\nftp logout\n') + return "OK" + + +@app.route('/wombats/config/data', methods=['GET']) +def get_wombat_data(): + _send_mqtt_msg_via_uids(request.args['uids'], 'ftp login\nftp upload data.json\nftp logout\n') return "OK" -@app.route('/wombat_ota', methods=['GET']) +@app.route('/wombats/config/ota', methods=['GET']) def wombat_ota(): - p_uids = list(map(lambda i: int(i), request.args['uids'].split(','))) - app.logger.info(f'wombat_ota for p_uids: {p_uids}') - msgs = [] - - for p_uid in p_uids: - pd = get_physical_device(p_uid, session.get('token')) - if pd is not None: - app.logger.info(f'Wombat serial no: {pd.source_ids["serial_no"]}') - msgs.append((f'wombat/{pd.source_ids["serial_no"]}', 'config ota 1\nconfig reboot\n', 1, True)) - - # NOTE! Assuming default port of 1883. - publish.multiple(msgs, hostname=_mqtt_host, auth={'username': _mqtt_user, 'password': _mqtt_pass}) + _send_mqtt_msg_via_uids(request.args['uids'], 'config ota 1\nconfig reboot\n') + return "OK" + + +@app.route('/wombats/config/clear') +def clear_wombat_config_script(): + """ + Clear the config scripts for the Wombats identified by the sn request parameter. + + If the id request parameter is "uid" then sn must contain a list of phyiscal device ids. If id is "sn" + then sn must contain a list of Wombat serial numbers. + """ + sn = request.args['sn'] + if sn is not None: + id_type = request.args.get('id', 'id') + app.logger.info(f'Clearing config script for {sn}, id={id_type}') + + # Publish an empty retained message to clear the config script message from the topic. + if id_type == 'uid': + ids = list(map(lambda i: int(i), sn.split(','))) + app.logger.info(ids) + _send_mqtt_msg_via_uids(ids, None) + else: + ids = sn.split(',') + app.logger.info(ids) + _send_mqtt_msg_via_sn(ids, None) + return "OK" @@ -192,8 +283,10 @@ def wombats(): mappings = get_current_mappings(session.get('token')) for dev in physical_devices: + sn = dev.source_ids["serial_no"] ccid = dev.source_ids.get('ccid', None) fw_version: str = dev.source_ids.get('firmware', None) + config_script: str | None = _wombat_config_msgs.get(dev.source_ids["serial_no"], None) if ccid is not None: setattr(dev, 'ccid', ccid) @@ -201,6 +294,11 @@ def wombats(): if fw_version is not None: setattr(dev, 'fw', fw_version) + if config_script is not None: + setattr(dev, 'script', config_script) + + setattr(dev, 'sn', sn) + setattr(dev, 'ts_sort', dev.last_seen.timestamp()) dev.last_seen = time_since(dev.last_seen) @@ -387,7 +485,7 @@ def show_map(): # folium.Marker([-31.956194913619864, 115.85911692112582], popup="Mt. Hood Meadows", tooltip='click me').add_to(center_map) data: List[LogicalDevice] = get_logical_devices(session.get('token'), include_properties=True) for dev in data: - if dev.location is not None: + if dev.location is not None and dev.location.lat is not None and dev.location.long is not None: color = 'blue' if dev.last_seen is None: @@ -431,7 +529,7 @@ def CreateMapping(): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -445,7 +543,7 @@ def CreateNote(noteText, uid): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -457,7 +555,7 @@ def DeleteNote(noteUID): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -470,7 +568,7 @@ def EditNote(noteText, uid): except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -492,23 +590,23 @@ def UpdatePhysicalDevice(): update_physical_device(uid, new_name, location, token) return 'Success', 200 - + except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code -@app.route('/update-mappings', methods=['GET']) +@app.route('/update-mappings', methods=['PATCH']) def UpdateMappings(): try: - insert_device_mapping(request.args['physicalDevice_mapping'], request.args['logicalDevice_mapping'], session.get('token')) + insert_device_mapping(request.form.get('physicalDevice_mapping'), request.form.get('logicalDevice_mapping'), session.get('token')) return 'Success', 200 except requests.exceptions.HTTPError as e: if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -536,7 +634,7 @@ def ToggleDeviceMapping(): is_active = request.args['is_active'] toggle_device_mapping(uid=uid, dev_type=dev_type, is_active=is_active, token=session.get('token')) - + return 'Success', 200 @@ -560,9 +658,10 @@ def UpdateLogicalDevice(): return 'Success', 200 except requests.exceptions.HTTPError as e: + logging.exception(e) if e.response.status_code == 403: return f"You do not have sufficient permissions to make this change", e.response.status_code - + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code @@ -583,7 +682,7 @@ def format_time_stamp(unformatted_time: datetime) -> str: def format_location_string(location: Location) -> str: formatted_location = '' - if location is not None: + if location is not None and location.lat is not None and location.long is not None: formatted_location = f'{location.lat:.5f}, {location.long:.5f}' return formatted_location @@ -637,6 +736,18 @@ def generate_link(data): return link +def exit_handler(): + global _mqtt_client + + app.logger.info('Stopping') + _mqtt_client.disconnect() + + while _mqtt_client.is_connected(): + time.sleep(0.5) + + _mqtt_client.loop_stop() + app.logger.info('Done') + """ parse the timeseries data received by api into what is expected by graph """ @@ -719,5 +830,19 @@ def parse_ts_table_data(raw_data): return result + if __name__ == '__main__': + app.logger.info('Starting') + _mqtt_client = mqtt.Client() + _mqtt_client.username_pw_set(_mqtt_user, _mqtt_pass) + _mqtt_client.on_connect = mqtt_on_connect + _mqtt_client.on_message = mqtt_on_message + + app.logger.info('Connecting to MQTT broker') + _mqtt_client.connect_async(_mqtt_host) + app.logger.info('Starting MQTT thread') + _mqtt_client.loop_start() + + atexit.register(exit_handler) + app.run(port='5000', host='0.0.0.0') diff --git a/src/www/app/templates/base.html b/src/www/app/templates/base.html index 889ec761..c72c248e 100644 --- a/src/www/app/templates/base.html +++ b/src/www/app/templates/base.html @@ -14,7 +14,7 @@
    -
  • Fetch logs
  • -
  • FW OTA
  • +
  • Fetch logs
  • +
  • Fetch data
  • +
  • FW OTA
  • +
  • Clear Scripts
@@ -22,6 +103,7 @@ Current mapping Ubidots + @@ -58,6 +140,11 @@ class="material-icons">open_in_new {% endif %} + + {% if physicalDevice.script is defined %} + install_desktop + {% endif %} + {% endfor %} diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index 4a04f112..b3c97524 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -1,4 +1,4 @@ -import json +import json, os from typing import List import sys from typing import List @@ -9,7 +9,7 @@ from pdmodels.Models import PhysicalDevice, LogicalDevice, PhysicalToLogicalMapping, DeviceNote, Location -end_point = 'http://restapi:5687' +end_point = os.getenv('IOTA_API_URL', 'http://restapi:5687') def get_sources(token: str) -> List[str]: """ @@ -256,6 +256,10 @@ def create_logical_device(physical_device: PhysicalDevice, token: str) ->str: "location": physical_device.location, } + # This is to work around a problem that turned up after the change to PostGIS. + if logicalJson['location'] is not None and (logicalJson['location'].lat is None or logicalJson['location'].long is None): + logicalJson.pop('location') + response = requests.post(f'{end_point}/broker/api/logical/devices/', json=logicalJson, headers=headers) response.raise_for_status() @@ -361,8 +365,8 @@ def change_user_password(password: str, token: str) -> str: Params: password: str - User's new password token: str - User's authentication token - - reutrn: + + return: token: str - User's new authentication token """ headers = {"Authorization": f"Bearer {token}"} diff --git a/test/python/test_dao.py b/test/python/test_dao.py index 2844e496..1fa29f77 100644 --- a/test/python/test_dao.py +++ b/test/python/test_dao.py @@ -10,6 +10,17 @@ logger = logging.getLogger(__name__) logging.captureWarnings(True) + +def _create_test_user() -> str: + test_uname=os.urandom(4).hex() + dao.user_add(uname=test_uname, passwd='password', disabled=False) + return test_uname + + +def _now() -> datetime.datetime: + return datetime.datetime.now(tz=datetime.timezone.utc) + + class TestDAO(unittest.TestCase): def setUp(self): @@ -22,7 +33,8 @@ def setUp(self): truncate physical_logical_map cascade; truncate device_notes cascade; truncate physical_timeseries cascade; - truncate raw_messages cascade''') + truncate raw_messages cascade; + delete from sources where source_name = 'axistech';''') finally: dao.free_conn(conn) @@ -30,11 +42,25 @@ def test_get_all_physical_sources(self): sources = dao.get_all_physical_sources() self.assertEqual(sources, ['greenbrain', 'ict_eagleio', 'ttn', 'wombat', 'ydoc']) - def now(self): - return datetime.datetime.now(tz=datetime.timezone.utc) + def test_add_physical_source(self): + sources = dao.get_all_physical_sources() + self.assertFalse(BrokerConstants.AXISTECH in sources) + dao.add_physical_source(BrokerConstants.AXISTECH) + sources = dao.get_all_physical_sources() + self.assertTrue(BrokerConstants.AXISTECH in sources) + + # Do it again to ensure it doesn't crash, and there is only one instance of the string. + dao.add_physical_source(BrokerConstants.AXISTECH) + sources = dao.get_all_physical_sources() + i = 0 + for s in sources: + if s == BrokerConstants.AXISTECH: + i += 1 + + self.assertEqual(1, i) def _create_physical_device(self, dev: PhysicalDevice = None) -> Tuple[PhysicalDevice, PhysicalDevice]: - last_seen = self.now() + last_seen = _now() if dev is None: dev = PhysicalDevice(source_name='ttn', name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, source_ids={'appId': 'x', 'devId': 'y'}, @@ -42,11 +68,6 @@ def _create_physical_device(self, dev: PhysicalDevice = None) -> Tuple[PhysicalD return (dev, dao.create_physical_device(dev)) - def _create_test_user(self) -> str: - test_uname=os.urandom(4).hex() - dao.user_add(uname=test_uname, passwd='password', disabled=False) - return test_uname - def test_create_physical_device(self): dev, new_dev = self._create_physical_device() @@ -203,7 +224,7 @@ def test_delete_physical_device_note(self): def _create_default_logical_device(self, dev=None) -> Tuple[LogicalDevice, LogicalDevice]: if dev is None: - last_seen = self.now() + last_seen = _now() dev = LogicalDevice(name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, properties={'appId': 'x', 'devId': 'y', 'other': 'z'}) @@ -257,28 +278,28 @@ def test_delete_logical_device(self): def test_insert_mapping(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) # This should work. dao.insert_mapping(mapping) # This should fail due to duplicate start time. - self.assertRaises(dao.DAOException, dao.insert_mapping, mapping) + self.assertRaises(ValueError, dao.insert_mapping, mapping) # This should fail due to the physical device is still mapped to something. time.sleep(0.001) - mapping.start_time=self.now() - self.assertRaises(dao.DAOException, dao.insert_mapping, mapping) + mapping.start_time= _now() + self.assertRaises(ValueError, dao.insert_mapping, mapping) # Unmap the physical device so the next test doesn't fail due to the device being mapped. dao.end_mapping(pd=new_pdev) # The insert_mapping operation should succeed because the timestamp is different from above. - mapping.start_time=self.now() + mapping.start_time= _now() dao.insert_mapping(mapping) pdx = copy.deepcopy(new_pdev) pdx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=_now()) # This should fail due to invalid physical uid. self.assertRaises(dao.DAODeviceNotFound, dao.insert_mapping, mapping) @@ -286,7 +307,7 @@ def test_insert_mapping(self): dao.end_mapping(pd=new_pdev) ldx = copy.deepcopy(new_ldev) ldx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=_now()) # This should fail due to invalid logical uid. self.assertRaises(dao.DAODeviceNotFound, dao.insert_mapping, mapping) @@ -302,14 +323,14 @@ def test_get_current_device_mapping(self): self.assertIsNone(dao.get_current_device_mapping(ld=new_ldev)) # Confirm pd or ld must be given. - self.assertRaises(dao.DAOException, dao.get_current_device_mapping) + self.assertRaises(ValueError, dao.get_current_device_mapping) # Confirm only pd or ld can be given. - self.assertRaises(dao.DAOException, dao.get_current_device_mapping, -1, -1) - self.assertRaises(dao.DAOException, dao.get_current_device_mapping, new_pdev, new_ldev) + self.assertRaises(ValueError, dao.get_current_device_mapping, -1, -1) + self.assertRaises(ValueError, dao.get_current_device_mapping, new_pdev, new_ldev) # confirm a physical device can be mapped to a logical device. - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) self.assertEqual(mapping1, dao.get_current_device_mapping(pd=new_pdev.uid)) @@ -322,7 +343,7 @@ def test_get_current_device_mapping(self): time.sleep(0.001) pdev2, new_pdev2 = self._create_physical_device() - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) # This test must fail if multiple mappings are found, because there should not be @@ -341,7 +362,7 @@ def test_get_latest_device_mapping(self): ldev, new_ldev = self._create_default_logical_device() # confirm getting the latest mapping returns a current mapping - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) self.assertEqual(mapping1, dao.get_current_device_mapping(pd=new_pdev.uid, only_current_mapping=True)) @@ -367,7 +388,7 @@ def test_get_latest_device_mapping(self): self.assertTrue(self.compare_mappings_ignore_end_time(mapping1, mapping2)) time.sleep(0.1) - mapping1.start_time = self.now() + mapping1.start_time = _now() dao.insert_mapping(mapping1) # with a new mapping with no end time, both calls should again return the same thing. @@ -381,7 +402,14 @@ def test_end_mapping(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + # Confirm pd or ld must be given. + self.assertRaises(ValueError, dao.end_mapping) + + # Confirm only pd or ld can be given. + self.assertRaises(ValueError, dao.end_mapping, -1, -1) + self.assertRaises(ValueError, dao.end_mapping, new_pdev, new_ldev) + + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) mappings = dao.get_unmapped_physical_devices() @@ -394,7 +422,7 @@ def test_end_mapping(self): pdev2 = copy.deepcopy(pdev) pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) mappings = dao.get_unmapped_physical_devices() @@ -406,13 +434,13 @@ def test_end_mapping(self): # Avoid a unique key constraint due to identical timestamps. time.sleep(0.001) - mapping1.start_time = self.now() + mapping1.start_time = _now() dao.insert_mapping(mapping1) dao.end_mapping(pd=new_pdev) mappings = dao.get_unmapped_physical_devices() self.assertEqual(len(mappings), 2) - mapping2.start_time = self.now() + mapping2.start_time = _now() dao.insert_mapping(mapping2) mappings = dao.get_unmapped_physical_devices() self.assertEqual(len(mappings), 1) @@ -427,19 +455,19 @@ def compare_mappings_ignore_end_time(self, m1: PhysicalToLogicalMapping, m2: Phy def test_get_mappings(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) pdev2 = copy.deepcopy(pdev) pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2) time.sleep(0.1) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) # Avoid a unique key constraint due to identical timestamps. time.sleep(0.1) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping3) mappings = dao.get_logical_device_mappings(new_ldev) @@ -453,7 +481,7 @@ def test_get_all_logical_device_mappings(self): pdev, new_pdev = self._create_physical_device() ldev, new_ldev = self._create_default_logical_device() - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -463,7 +491,7 @@ def test_get_all_logical_device_mappings(self): pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2) time.sleep(0.1) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -473,7 +501,7 @@ def test_get_all_logical_device_mappings(self): pdev3.name = 'D3' pdev3, new_pdev3 = self._create_physical_device(dev=pdev3) time.sleep(0.1) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping3) mappings = dao.get_logical_device_mappings(ld=new_ldev.uid) @@ -487,7 +515,7 @@ def test_get_unmapped_devices(self): ldev, new_ldev = self._create_default_logical_device() # confirm a physical device can be mapped to a logical device. - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping1) pdev2 = copy.deepcopy(pdev) @@ -510,15 +538,15 @@ def test_get_unmapped_devices(self): self.assertTrue(new_pdev3 in unmapped_devs) self.assertTrue(new_pdev4 in unmapped_devs) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=_now()) dao.insert_mapping(mapping2) ldev2 = copy.deepcopy(ldev) ldev2.name = 'L2' - ldev2.last_seen = self.now() + ldev2.last_seen = _now() ldev2, new_ldev2 = self._create_default_logical_device(dev=ldev2) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev4, ld=new_ldev2, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev4, ld=new_ldev2, start_time=_now()) dao.insert_mapping(mapping3) unmapped_devs = dao.get_unmapped_physical_devices() self.assertEqual(len(unmapped_devs), 2) @@ -527,10 +555,10 @@ def test_get_unmapped_devices(self): ldev3 = copy.deepcopy(ldev) ldev3.name = 'L3' - ldev3.last_seen = self.now() + ldev3.last_seen = _now() ldev3, new_ldev3 = self._create_default_logical_device(dev=ldev3) - mapping4 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev3, start_time=self.now()) + mapping4 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev3, start_time=_now()) dao.insert_mapping(mapping4) unmapped_devs = dao.get_unmapped_physical_devices() self.assertEqual(len(unmapped_devs), 1) @@ -539,12 +567,12 @@ def test_get_unmapped_devices(self): def test_add_raw_json_message(self): uuid1 = uuid.uuid4() - obj1 = {'a':1, 'b':'2', 'c':True, 'd':False} - dao.add_raw_json_message('ttn', self.now(), uuid1, obj1) + obj1 = {'a': 1, 'b': '2', 'c': True, 'd': False} + dao.add_raw_json_message('ttn', _now(), uuid1, obj1) uuid2 = uuid.uuid4() - obj2 = {'a':1, 'b':'2', 'c':False, 'd':True} - dao.add_raw_json_message('ttn', self.now(), uuid2, obj2, 1) + obj2 = {'a': 1, 'b': '2', 'c': False, 'd': True} + dao.add_raw_json_message('ttn', _now(), uuid2, obj2, 1) with dao._get_connection() as conn, conn.cursor() as cursor: cursor.execute('select physical_uid, correlation_id, json_msg from raw_messages order by uid asc') @@ -563,23 +591,24 @@ def test_add_raw_json_message(self): # Confirm the DAO raises a warning when trying to add a message with a # duplicate UUID, but doesn't throw an exception. with self.assertWarns(UserWarning): - dao.add_raw_json_message('ttn', self.now(), uuid1, obj1) + dao.add_raw_json_message('ttn', _now(), uuid1, obj1) def test_insert_physical_timeseries_message(self): dev, new_dev = self._create_physical_device() msg = { - "p_uid":new_dev.uid, - "timestamp":"2023-02-20T07:57:52Z", - "timeseries":[ + "p_uid": new_dev.uid, + "timestamp": "2023-02-20T07:57:52Z", + "timeseries": [ { - "name":"airTemperature", - "value":35.1 + "name": "x", + "value": 35.1 } ], - "broker_correlation_id":"3d7762f6-bcc6-44d4-82ba-49b07e61e601" + "broker_correlation_id": "3d7762f6-bcc6-44d4-82ba-49b07e61e601" } + msg_ts = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]) dao.insert_physical_timeseries_message(msg) @@ -590,15 +619,95 @@ def test_insert_physical_timeseries_message(self): self.assertEqual(ts, msg_ts) self.assertEqual(msg, retrieved_msg) + def test_get_physical_timeseries_messages(self): + _, new_pdev = self._create_physical_device() + + # Basic smoke test - no messages, no results. + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, p_uid=new_pdev.uid) + self.assertSequenceEqual(msgs, []) + + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, l_uid=20) + self.assertSequenceEqual(msgs, []) + + msg_list = [ + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-20T01:00+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-20T00:30+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-20T00:00+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-19T23:30+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-19T23:00+11:00"}, + {BrokerConstants.PHYSICAL_DEVICE_UID_KEY: new_pdev.uid, BrokerConstants.TIMESTAMP_KEY: "2023-02-19T22:30+11:00"} + ] + + msg_ts = [] + for msg in msg_list: + dao.insert_physical_timeseries_message(msg) + msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) + + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=False, include_received_at=False, p_uid=new_pdev.uid) + self.assertEqual(len(msgs), 1) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) + + msgs = dao.get_physical_timeseries_message(None, None, None, only_timestamp=True, p_uid=new_pdev.uid) + + self.assertEqual(len(msgs), len(msg_list)) + for i, msg in enumerate(msgs): + print(msg) + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) + + _, new_ldev = self._create_default_logical_device() + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) + dao.insert_mapping(mapping) + + # Ensure 1 message will be returned from the DAO when no start or end is given. + now = _now() - datetime.timedelta(minutes=1) + td_30_mins = datetime.timedelta(minutes=30) + + for i, msg in enumerate(msg_list): + msg[BrokerConstants.TIMESTAMP_KEY] = (now + (i * td_30_mins)).isoformat() + msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] = new_ldev.uid + + msg_list.sort(key=lambda m: m[BrokerConstants.TIMESTAMP_KEY], reverse=True) + + msg_ts.clear() + for msg in msg_list: + dao.insert_physical_timeseries_message(msg) + msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) + + # This will return a single message because 'end' is None, meaning the DAO will set the + # end timestamp to 'now'. This batch of messages has timestamps from 1 minute ago to a + # couple of hours in the future, so only the message with the earliest timestamp in the + # batch should be returned. + msgs = dao.get_physical_timeseries_message(only_timestamp=True, l_uid=new_ldev.uid) + self.assertEqual(len(msgs), 1) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[-1]) + + # This will return all the messages because 'end' has been set past the latest message timestamp. + msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, l_uid=new_ldev.uid) + self.assertEqual(len(msgs), len(msg_list)) + for i, msg in enumerate(msgs): + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) + + # Should return only the latest message. + msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, count=1, l_uid=new_ldev.uid) + self.assertEqual(len(msgs), 1) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) + + self.assertRaises(ValueError, dao.get_physical_timeseries_message) + self.assertRaises(TypeError, dao.get_physical_timeseries_message, p_uid='x') + self.assertRaises(TypeError, dao.get_physical_timeseries_message, l_uid='x') + self.assertRaises(TypeError, dao.get_physical_timeseries_message, start='x', l_uid=1) + self.assertRaises(TypeError, dao.get_physical_timeseries_message, end='x', l_uid=1) + self.assertRaises(TypeError, dao.get_physical_timeseries_message, count='x', l_uid=1) + def test_add_raw_text_message(self): uuid1 = uuid.uuid4() msg1 = 'This is a text message.' - dao.add_raw_text_message('greenbrain', self.now(), uuid1, msg1) + dao.add_raw_text_message('greenbrain', _now(), uuid1, msg1) uuid2 = uuid.uuid4() msg2 = 'This is a text message 2.' - dao.add_raw_text_message('greenbrain', self.now(), uuid2, msg2, 2) + dao.add_raw_text_message('greenbrain', _now(), uuid2, msg2, 2) with dao._get_connection() as conn, conn.cursor() as cursor: cursor.execute('select physical_uid, correlation_id, text_msg from raw_messages') self.assertEqual(2, cursor.rowcount) @@ -616,21 +725,21 @@ def test_add_raw_text_message(self): # Confirm the DAO raises a warning when trying to add a message with a # duplicate UUID, but doesn't throw an exception. with self.assertWarns(UserWarning): - dao.add_raw_text_message('ttn', self.now(), uuid1, msg1) + dao.add_raw_text_message('ttn', _now(), uuid1, msg1) def test_user_add(self): - uname=self._create_test_user() - users=dao.user_ls() + uname = _create_test_user() + users = dao.user_ls() self.assertEqual(uname, users[-1]) def test_user_rm(self): - uname=self._create_test_user() + uname = _create_test_user() dao.user_rm(uname) self.assertFalse(uname in dao.user_ls()) def test_user_set_read_only(self): - uname=self._create_test_user() + uname= _create_test_user() dao.user_set_read_only(uname, False) user_token=dao.user_get_token(username=uname, password='password') user=dao.get_user(auth_token=user_token) @@ -638,39 +747,39 @@ def test_user_set_read_only(self): def test_add_non_unique_user(self): #Check that two users with the same username cannot be created - uname=self._create_test_user() + uname = _create_test_user() self.assertRaises(dao.DAOUniqeConstraintException, dao.user_add, uname, 'password', False) def test_get_user_token(self): - uname=self._create_test_user() + uname = _create_test_user() self.assertIsNotNone(dao.user_get_token(username=uname, password='password')) self.assertIsNone(dao.user_get_token(username=uname, password='x')) def test_user_token_refresh(self): - uname=self._create_test_user() - token1=dao.user_get_token(username=uname, password='password') + uname = _create_test_user() + token1 = dao.user_get_token(username=uname, password='password') dao.token_refresh(uname=uname) - token2=dao.user_get_token(username=uname, password='password') + token2 = dao.user_get_token(username=uname, password='password') self.assertNotEqual(token1, token2) def test_user_token_disable(self): - uname=self._create_test_user() - user_token=dao.user_get_token(username=uname, password='password') + uname = _create_test_user() + user_token = dao.user_get_token(username=uname, password='password') dao.token_disable(uname) self.assertFalse(dao.token_is_valid(user_token)) def test_user_token_enable(self): - uname=self._create_test_user() - user_token=dao.user_get_token(username=uname, password='password') + uname = _create_test_user() + user_token = dao.user_get_token(username=uname, password='password') dao.token_disable(uname) dao.token_enable(uname) self.assertTrue(dao.token_is_valid(user_token)) def test_user_change_password(self): - uname=self._create_test_user() + uname = _create_test_user() dao.user_change_password(uname, 'nuiscyeriygsreiuliu') self.assertIsNotNone(dao.user_get_token(username=uname, password='nuiscyeriygsreiuliu')) diff --git a/test/python/test_get_physical_messages.py b/test/python/test_get_physical_messages.py index 6f76e2ab..8e54ccfd 100644 --- a/test/python/test_get_physical_messages.py +++ b/test/python/test_get_physical_messages.py @@ -6,12 +6,16 @@ import pytest from fastapi.testclient import TestClient +import BrokerConstants import test_utils as tu import api.client.DAO as dao from pdmodels.Models import PhysicalDevice from restapi.RestAPI import app -ts: dt.datetime = tu.now() - dt.timedelta(days=683.0) +ts: dt.datetime = dateutil.parser.isoparse('2024-05-10T23:00:00Z') +latest_ts = ts +earliest_ts = ts + interval = dt.timedelta(minutes=15.0) timestamps = [] @@ -31,7 +35,7 @@ def create_user(): @pytest.fixture(scope='module') def create_msgs(): - global interval, msgs, pd, timestamps, ts + global interval, msgs, pd, timestamps, ts, earliest_ts logging.info('Generating messages') @@ -40,11 +44,12 @@ def create_msgs(): with open('/tmp/msgs.json', 'w') as f: for i in range(0, max_msgs + 1): timestamps.append(ts) - msg = {'ts': ts.isoformat(), 'i': i} + msg = {BrokerConstants.TIMESTAMP_KEY: ts.isoformat(), 'i': i} msgs.append(msg) s = f'{pd.uid}\t{ts.isoformat()}\t{json.dumps(msg)}' print(s, file=f) - ts = ts + interval + ts = ts - interval + earliest_ts = ts with open('/tmp/msgs.json', 'r') as f: with dao.get_connection() as conn, conn.cursor() as cursor: @@ -64,107 +69,109 @@ def test_client(create_user, create_msgs): def test_no_params_no_msgs(test_client): - no_msg_pd: PhysicalDevice = dao.create_physical_device(PhysicalDevice(source_name='wombat', name='dummy', source_ids={'x': 1})) - response = test_client.get(f'/broker/api/physical/messages/{no_msg_pd.uid}') - assert response.status_code == 200 - assert response.json() == [] + response = test_client.get(f'/broker/api/messages/') + assert response.status_code == 404 def test_no_params(test_client): # Confirm the default count parameter value is correct, so 65536 of the 65537 messages are returned. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}') + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid}) assert response.status_code == 200 - assert response.json() == msgs[:-1] + for a, b in zip(response.json(), msgs[:-1]): + assert a == b def test_no_params_ts(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'only_timestamp': 1}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'only_timestamp': 1}) assert response.status_code == 200 + for a, b in zip(response.json(), timestamps): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_count(test_client): # Confirm the count parameter works. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 50}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 50}) assert response.status_code == 200 - assert response.json() == msgs[:50] + #assert response.json() == msgs[:50] + for a, b in zip(response.json(), msgs[:50]): + assert a == b def test_count_ts(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 50, 'only_timestamp': 1}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': 50, 'only_timestamp': 1}) assert response.status_code == 200 for a, b in zip(response.json(), timestamps): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_start_after_end(test_client): - start_ts = ts + interval - # Confirm no messages returned when a start timestamp at or after the last message is used. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': start_ts}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': latest_ts}) assert response.status_code == 200 assert response.json() == [] - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[max_msgs]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[0]}) assert response.status_code == 200 assert response.json() == [] def test_start_gives_gt(test_client): # Confirm start time parameter gets the next message greater than, not greater than equal to. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[max_msgs - 1]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[1]}) assert response.status_code == 200 - assert response.json() == [msgs[max_msgs]] + assert response.json()[0] == msgs[0] def test_invalid_count(test_client): # Check the invalid count values. - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': -1}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': -1}) assert response.status_code == 422 - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 0}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': 0}) assert response.status_code == 422 - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 65537}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'count': 65537}) assert response.status_code == 422 def test_end(test_client): # Test the end parameter - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'end': timestamps[0] - interval}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': earliest_ts}) assert response.status_code == 200 assert response.json() == [] - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'end': timestamps[0]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-1]}) assert response.status_code == 200 - assert response.json() == [msgs[0]] + assert response.json()[0] == msgs[-1] - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'end': timestamps[9]}) + response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-9]}) assert response.status_code == 200 - assert response.json() == msgs[:10] + for a, b in zip(response.json(), msgs[-9:]): + assert a == b def test_start_end(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[5], 'end': timestamps[9]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[9], 'end': timestamps[5]}) assert response.status_code == 200 - assert response.json() == msgs[6:10] + for a, b in zip(response.json(), msgs[5:9]): + assert a == b def test_invalid_start_end(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[5], 'end': timestamps[5]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[5], 'end': timestamps[5]}) assert response.status_code == 422 - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'start': timestamps[5], 'end': timestamps[4]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[4], 'end': timestamps[5]}) assert response.status_code == 422 def test_invalid_count_end(test_client): - response = test_client.get(f'/broker/api/physical/messages/{pd.uid}', params={'count': 1, 'end': timestamps[4]}) + response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 1, 'end': timestamps[4]}) assert response.status_code == 422 diff --git a/test/python/test_restapi.py b/test/python/test_restapi.py index 2ad80647..594b3652 100644 --- a/test/python/test_restapi.py +++ b/test/python/test_restapi.py @@ -1,16 +1,16 @@ import base64 -import copy, datetime, logging, time, unittest, uuid -from typing_extensions import assert_type -import re +import copy +import logging +import os +import time +import unittest from typing import Tuple -import api.client.DAO as dao import requests +import api.client.DAO as dao from pdmodels.Models import DeviceNote, PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice -from typing import Tuple - -import os +from test_utils import now logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z') logger = logging.getLogger(__name__) @@ -62,13 +62,10 @@ def setUp(self): self._admin_token = dao.user_get_token(self._admin_username, 'password') self._ADMIN_HEADERS['Authorization'] = f'Bearer {self._admin_token}' - def now(self): - return datetime.datetime.now(tz=datetime.timezone.utc) - def _create_physical_device(self, expected_code=201, req_header=_HEADERS, dev=None) -> Tuple[ PhysicalDevice, PhysicalDevice]: if dev is None: - last_seen = self.now() + last_seen = now() dev = PhysicalDevice(source_name='ttn', name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, source_ids={'appId': 'x', 'devId': 'y'}, @@ -292,10 +289,12 @@ def test_get_device_notes(self): LOGICAL DEVICES --------------------------------------------------------------------------""" - def _create_default_logical_device(self, expected_code=201, req_header=_HEADERS, dev=None) -> Tuple[ - LogicalDevice, LogicalDevice]: + def _create_default_logical_device(self, expected_code=201, req_header=None, dev=None) -> Tuple[LogicalDevice, LogicalDevice]: + if req_header is None: + req_header = self._HEADERS + if dev is None: - last_seen = self.now() + last_seen = now() dev = LogicalDevice(name='Test Device', location=Location(lat=3, long=-31), last_seen=last_seen, properties={'appId': 'x', 'devId': 'y', 'other': 'z'}) @@ -416,7 +415,7 @@ def test_delete_logical_device(self): def test_insert_mapping(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) # This should work. url = f'{_BASE}/mappings/' @@ -430,7 +429,7 @@ def test_insert_mapping(self): # This should fail because the physical device has a current mapping. time.sleep(0.001) - mapping.start_time = self.now() + mapping.start_time = now() payload = mapping.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) self.assertEqual(r.status_code, 400) @@ -438,14 +437,14 @@ def test_insert_mapping(self): # End the current mapping and create a new one. This should work and # simulates 'pausing' a physical device while working on it. requests.patch(f'{url}physical/end/{mapping.pd.uid}', headers=self._ADMIN_HEADERS) - mapping.start_time = self.now() + mapping.start_time = now() payload = mapping.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) self.assertEqual(r.status_code, 201) pdx = copy.deepcopy(new_pdev) pdx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=pdx, ld=new_ldev, start_time=now()) # This should fail due to invalid physical uid. payload = mapping.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) @@ -453,7 +452,7 @@ def test_insert_mapping(self): ldx = copy.deepcopy(new_ldev) ldx.uid = -1 - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=ldx, start_time=now()) # End the current mapping for the phyiscal device so the RESTAPI doesn't # return status 400. @@ -467,7 +466,7 @@ def test_insert_mapping(self): def test_get_mapping_from_physical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' payload = mapping.json() @@ -491,7 +490,7 @@ def test_get_mapping_from_physical(self): # Confirm the latest mapping is returned. time.sleep(0.001) mapping2 = copy.deepcopy(mapping) - mapping2.start_time = self.now() + mapping2.start_time = now() self.assertNotEqual(mapping, mapping2) payload = mapping2.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) @@ -505,7 +504,7 @@ def test_get_mapping_from_physical(self): def test_get_mapping_from_logical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' payload = mapping.json() @@ -529,7 +528,7 @@ def test_get_mapping_from_logical(self): # Confirm the latest mapping is returned. time.sleep(0.001) mapping2 = copy.deepcopy(mapping) - mapping2.start_time = self.now() + mapping2.start_time = now() self.assertNotEqual(mapping, mapping2) payload = mapping2.json() r = requests.post(url, headers=self._ADMIN_HEADERS, data=payload) @@ -546,7 +545,7 @@ def compare_mappings_ignore_end_time(self, m1: PhysicalToLogicalMapping, m2: Phy def test_get_latest_mapping_from_physical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' @@ -590,7 +589,7 @@ def test_get_latest_mapping_from_physical(self): def test_get_latest_mapping_from_logical(self): pdev, new_pdev = self._create_physical_device(req_header=self._ADMIN_HEADERS) ldev, new_ldev = self._create_default_logical_device(req_header=self._ADMIN_HEADERS) - mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) url = f'{_BASE}/mappings/' @@ -639,7 +638,7 @@ def test_get_all_logical_device_mappings(self): # Using the DAO to create the test data, the REST API methods to do this are # tested elsewhere. - mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=self.now()) + mapping1 = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=now()) dao.insert_mapping(mapping1) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -649,7 +648,7 @@ def test_get_all_logical_device_mappings(self): pdev2.name = 'D2' pdev2, new_pdev2 = self._create_physical_device(dev=pdev2, req_header=self._ADMIN_HEADERS) time.sleep(0.1) - mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=self.now()) + mapping2 = PhysicalToLogicalMapping(pd=new_pdev2, ld=new_ldev, start_time=now()) dao.insert_mapping(mapping2) time.sleep(0.1) dao.end_mapping(ld=new_ldev.uid) @@ -659,7 +658,7 @@ def test_get_all_logical_device_mappings(self): pdev3.name = 'D3' pdev3, new_pdev3 = self._create_physical_device(dev=pdev3, req_header=self._ADMIN_HEADERS) time.sleep(0.1) - mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=self.now()) + mapping3 = PhysicalToLogicalMapping(pd=new_pdev3, ld=new_ldev, start_time=now()) dao.insert_mapping(mapping3) url = f'{_BASE}/mappings/logical/all/{new_ldev.uid}'