From a8be514e0e53f4c79c15200b7b619424026658aa Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 20 Jun 2024 14:13:41 +1000 Subject: [PATCH 01/10] WIP: Adding received_at field to REST API & DAO get messages function. --- src/python/api/client/DAO.py | 31 ++++++++++++++++++++--- test/python/test_dao.py | 18 ++++++------- test/python/test_get_physical_messages.py | 22 ++++++++++------ test/python/test_restapi.py | 13 +++++----- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index a37ce32..408b525 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -995,7 +995,7 @@ def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None: free_conn(conn) -def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]: +def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, include_received_at: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]: conn = None if start is None: @@ -1029,14 +1029,21 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime if not isinstance(end, datetime): raise TypeError - column_name = 'ts' if only_timestamp else 'json_msg' + column_names = ['ts'] + if include_received_at: + column_names.append('received_at') + + if not only_timestamp: + column_names.append('json_msg') + + column_names = ', '.join(column_names) try: # Order messages by descending timestamp. If a caller asks for one message, they probably want the latest # message. with _get_connection() as conn, conn.cursor() as cursor: qry = f""" - select {column_name} from physical_timeseries + select ts {column_names} from physical_timeseries where {uid_col_name} = %s and ts > %s and ts <= %s @@ -1046,7 +1053,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime args = (uid, start, end, count) cursor.execute(qry, args) - return [row[0] for row in cursor.fetchall()] + return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()] except Exception as err: raise DAOException('get_physical_timeseries_message failed.', err) finally: @@ -1054,6 +1061,22 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime free_conn(conn) +def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: + msg_dict = {'ts': ts} + if arg2 is not None: + if isinstance(arg2, datetime): + msg_dict['received_at'] = arg2 + else: + msg_dict['msg'] = arg2 + if arg3 is not None: + if isinstance(arg3, datetime): + msg_dict['received_at'] = arg3 + else: + msg_dict['msg'] = arg3 + + return msg_dict + + def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None): conn = None try: diff --git a/test/python/test_dao.py b/test/python/test_dao.py index 8689ba3..0d65b27 100644 --- a/test/python/test_dao.py +++ b/test/python/test_dao.py @@ -5,7 +5,6 @@ from pdmodels.Models import PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice from typing import Tuple import os -import pprint as pp logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z') logger = logging.getLogger(__name__) @@ -644,15 +643,16 @@ def test_get_physical_timeseries_messages(self): dao.insert_physical_timeseries_message(msg) msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) - msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, p_uid=new_pdev.uid) + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=False, include_received_at=False, p_uid=new_pdev.uid) + print(msgs) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0], msg_ts[0]) + self.assertEqual(msgs[0]['ts'], msg_ts[0]) msgs = dao.get_physical_timeseries_message(None, None, None, only_timestamp=True, p_uid=new_pdev.uid) self.assertEqual(len(msgs), len(msg_list)) - for i, ts in enumerate(msgs): - self.assertEqual(ts, msg_ts[i]) + for i, msg in enumerate(msgs): + self.assertEqual(msg['ts'], msg_ts[i]) _, new_ldev = self._create_default_logical_device() mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) @@ -679,18 +679,18 @@ def test_get_physical_timeseries_messages(self): # batch should be returned. msgs = dao.get_physical_timeseries_message(only_timestamp=True, l_uid=new_ldev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0], msg_ts[-1]) + self.assertEqual(msgs[0]['ts'], msg_ts[-1]) # This will return all the messages because 'end' has been set past the latest message timestamp. msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, l_uid=new_ldev.uid) self.assertEqual(len(msgs), len(msg_list)) - for i, ts in enumerate(msgs): - self.assertEqual(ts, msg_ts[i]) + for i, msg in enumerate(msgs): + self.assertEqual(msg['ts'], msg_ts[i]) # Should return only the latest message. msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, count=1, l_uid=new_ldev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0], msg_ts[0]) + self.assertEqual(msgs[0]['ts'], msg_ts[0]) self.assertRaises(ValueError, dao.get_physical_timeseries_message) self.assertRaises(TypeError, dao.get_physical_timeseries_message, p_uid='x') diff --git a/test/python/test_get_physical_messages.py b/test/python/test_get_physical_messages.py index a30eccb..a08aac2 100644 --- a/test/python/test_get_physical_messages.py +++ b/test/python/test_get_physical_messages.py @@ -76,24 +76,28 @@ def test_no_params(test_client): # Confirm the default count parameter value is correct, so 65536 of the 65537 messages are returned. response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid}) assert response.status_code == 200 - assert response.json() == msgs[:-1] + for a, b in zip(response.json(), msgs[:-1]): + assert a['msg'] == b def test_no_params_ts(test_client): response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'only_timestamp': 1}) assert response.status_code == 200 + for a, b in zip(response.json(), timestamps): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a['ts']) == b def test_count(test_client): # Confirm the count parameter works. response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 50}) assert response.status_code == 200 - assert response.json() == msgs[:50] + #assert response.json() == msgs[:50] + for a, b in zip(response.json(), msgs[:50]): + assert a['msg'] == b def test_count_ts(test_client): @@ -103,7 +107,7 @@ def test_count_ts(test_client): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a['ts']) == b def test_start_after_end(test_client): @@ -121,7 +125,7 @@ def test_start_gives_gt(test_client): # Confirm start time parameter gets the next message greater than, not greater than equal to. response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[1]}) assert response.status_code == 200 - assert response.json() == [msgs[0]] + assert response.json()[0]['msg'] == msgs[0] def test_invalid_count(test_client): @@ -144,17 +148,19 @@ def test_end(test_client): response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-1]}) assert response.status_code == 200 - assert response.json() == [msgs[-1]] + assert response.json()[0]['msg'] == msgs[-1] response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-9]}) assert response.status_code == 200 - assert response.json() == msgs[-9:] + for a, b in zip(response.json(), msgs[-9:]): + assert a['msg'] == b def test_start_end(test_client): response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[9], 'end': timestamps[5]}) assert response.status_code == 200 - assert response.json() == msgs[5:9] + for a, b in zip(response.json(), msgs[5:9]): + assert a['msg'] == b def test_invalid_start_end(test_client): diff --git a/test/python/test_restapi.py b/test/python/test_restapi.py index 1b3fba1..594b365 100644 --- a/test/python/test_restapi.py +++ b/test/python/test_restapi.py @@ -1,16 +1,15 @@ import base64 -import copy, datetime, logging, time, unittest, uuid -from typing_extensions import assert_type -import re +import copy +import logging +import os +import time +import unittest from typing import Tuple -import api.client.DAO as dao import requests +import api.client.DAO as dao from pdmodels.Models import DeviceNote, PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice -from typing import Tuple - -import os from test_utils import now logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z') From 08520bee79dcc08d957b54aaef6b46e6b0fe9058 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 21 Jun 2024 09:38:21 +1000 Subject: [PATCH 02/10] REST API & DAO updates for this feature. --- src/python/api/client/DAO.py | 17 ++++++++++++----- src/python/restapi/RestAPI.py | 12 ++++++------ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 408b525..9e9a4fa 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -1062,17 +1062,24 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: - msg_dict = {'ts': ts} - if arg2 is not None: + print(ts, arg2, arg3) + if arg2 is None and arg3 is None: + return {BrokerConstants.TIMESTAMP_KEY: ts} + + if arg2 is not None and arg3 is None: if isinstance(arg2, datetime): - msg_dict['received_at'] = arg2 + return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2} else: - msg_dict['msg'] = arg2 + return arg2 + + msg_dict = {} if arg3 is not None: if isinstance(arg3, datetime): + msg_dict: dict = arg2 msg_dict['received_at'] = arg3 else: - msg_dict['msg'] = arg3 + msg_dict: dict = arg3 + msg_dict['received_at'] = arg2 return msg_dict diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 387363e..76b31e8 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -460,7 +460,7 @@ async def end_mapping_of_logical_uid(uid: int) -> None: MESSAGE RELATED --------------------------------------------------------------------------""" -@router.get("/messages", tags=['Messages']) +@router.get("/messages", tags=['Messages'], dependencies=[Depends(token_auth_scheme)]) async def get_physical_timeseries( request: Request, p_uid: int | None = None, @@ -469,7 +469,8 @@ async def get_physical_timeseries( last: str = None, start: datetime.datetime = None, end: datetime.datetime = None, - only_timestamp: bool = False): + include_received_at: bool = False, + only_timestamp: bool = False) -> List[Dict]: """ Get the physical_timeseries entries described by the physical device uid and the parameters. @@ -505,8 +506,7 @@ async def get_physical_timeseries( except: raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "last"], "msg": "the first part of last must be an integer"}]}) - unit = last[-1] - + unit = str(last[-1]).lower() if unit == 'h': diff = datetime.timedelta(hours=i) elif unit == 'd': @@ -524,9 +524,9 @@ async def get_physical_timeseries( msgs = None if p_uid is not None: - msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, p_uid=p_uid) + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, p_uid=p_uid) elif l_uid is not None: - msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, l_uid=l_uid) + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, l_uid=l_uid) if msgs is None: raise HTTPException(status_code=404, detail="Failed to retrieve messages") From c0ab05b84e4b1c28bf2426db6ddd03eef7380e98 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 21 Jun 2024 11:19:44 +1000 Subject: [PATCH 03/10] Webapp changes to handle the geometry location columns, unit test fixes. --- src/python/api/client/DAO.py | 4 ++-- src/www/app/utils/api.py | 6 +++++- test/python/test_dao.py | 12 ++++++------ test/python/test_get_physical_messages.py | 19 ++++++++++--------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 9e9a4fa..841ab98 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -1062,9 +1062,9 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: - print(ts, arg2, arg3) + #print(ts, arg2, arg3) if arg2 is None and arg3 is None: - return {BrokerConstants.TIMESTAMP_KEY: ts} + return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} if arg2 is not None and arg3 is None: if isinstance(arg2, datetime): diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index 36bac10..9cfc983 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -1,4 +1,4 @@ -import json, logging, os +import json, os from typing import List import requests from datetime import datetime, timezone @@ -254,6 +254,10 @@ def create_logical_device(physical_device: PhysicalDevice, token: str) ->str: "location": physical_device.location, } + # This is to work around a problem that turned up after the change to PostGIS. + if logicalJson['location'] is not None and (logicalJson['location'].lat is None or logicalJson['location'].long is None): + logicalJson.pop('location') + response = requests.post(f'{end_point}/broker/api/logical/devices/', json=logicalJson, headers=headers) response.raise_for_status() diff --git a/test/python/test_dao.py b/test/python/test_dao.py index 0d65b27..1fa29f7 100644 --- a/test/python/test_dao.py +++ b/test/python/test_dao.py @@ -644,15 +644,15 @@ def test_get_physical_timeseries_messages(self): msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=False, include_received_at=False, p_uid=new_pdev.uid) - print(msgs) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0]['ts'], msg_ts[0]) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) msgs = dao.get_physical_timeseries_message(None, None, None, only_timestamp=True, p_uid=new_pdev.uid) self.assertEqual(len(msgs), len(msg_list)) for i, msg in enumerate(msgs): - self.assertEqual(msg['ts'], msg_ts[i]) + print(msg) + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) _, new_ldev = self._create_default_logical_device() mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) @@ -679,18 +679,18 @@ def test_get_physical_timeseries_messages(self): # batch should be returned. msgs = dao.get_physical_timeseries_message(only_timestamp=True, l_uid=new_ldev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0]['ts'], msg_ts[-1]) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[-1]) # This will return all the messages because 'end' has been set past the latest message timestamp. msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, l_uid=new_ldev.uid) self.assertEqual(len(msgs), len(msg_list)) for i, msg in enumerate(msgs): - self.assertEqual(msg['ts'], msg_ts[i]) + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) # Should return only the latest message. msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, count=1, l_uid=new_ldev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0]['ts'], msg_ts[0]) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) self.assertRaises(ValueError, dao.get_physical_timeseries_message) self.assertRaises(TypeError, dao.get_physical_timeseries_message, p_uid='x') diff --git a/test/python/test_get_physical_messages.py b/test/python/test_get_physical_messages.py index a08aac2..8e54ccf 100644 --- a/test/python/test_get_physical_messages.py +++ b/test/python/test_get_physical_messages.py @@ -6,6 +6,7 @@ import pytest from fastapi.testclient import TestClient +import BrokerConstants import test_utils as tu import api.client.DAO as dao from pdmodels.Models import PhysicalDevice @@ -43,7 +44,7 @@ def create_msgs(): with open('/tmp/msgs.json', 'w') as f: for i in range(0, max_msgs + 1): timestamps.append(ts) - msg = {'ts': ts.isoformat(), 'i': i} + msg = {BrokerConstants.TIMESTAMP_KEY: ts.isoformat(), 'i': i} msgs.append(msg) s = f'{pd.uid}\t{ts.isoformat()}\t{json.dumps(msg)}' print(s, file=f) @@ -77,7 +78,7 @@ def test_no_params(test_client): response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid}) assert response.status_code == 200 for a, b in zip(response.json(), msgs[:-1]): - assert a['msg'] == b + assert a == b def test_no_params_ts(test_client): @@ -88,7 +89,7 @@ def test_no_params_ts(test_client): if a is None: break - assert dateutil.parser.isoparse(a['ts']) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_count(test_client): @@ -97,7 +98,7 @@ def test_count(test_client): assert response.status_code == 200 #assert response.json() == msgs[:50] for a, b in zip(response.json(), msgs[:50]): - assert a['msg'] == b + assert a == b def test_count_ts(test_client): @@ -107,7 +108,7 @@ def test_count_ts(test_client): if a is None: break - assert dateutil.parser.isoparse(a['ts']) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_start_after_end(test_client): @@ -125,7 +126,7 @@ def test_start_gives_gt(test_client): # Confirm start time parameter gets the next message greater than, not greater than equal to. response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[1]}) assert response.status_code == 200 - assert response.json()[0]['msg'] == msgs[0] + assert response.json()[0] == msgs[0] def test_invalid_count(test_client): @@ -148,19 +149,19 @@ def test_end(test_client): response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-1]}) assert response.status_code == 200 - assert response.json()[0]['msg'] == msgs[-1] + assert response.json()[0] == msgs[-1] response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-9]}) assert response.status_code == 200 for a, b in zip(response.json(), msgs[-9:]): - assert a['msg'] == b + assert a == b def test_start_end(test_client): response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[9], 'end': timestamps[5]}) assert response.status_code == 200 for a, b in zip(response.json(), msgs[5:9]): - assert a['msg'] == b + assert a == b def test_invalid_start_end(test_client): From 953af7d2e3edb80de7ca65b3719e7ce7aa65229d Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 21 Jun 2024 11:38:20 +1000 Subject: [PATCH 04/10] Normalising the DAO get messages function to return timestamps in the message as a string. This means callers to the DAO and the REST API both get ISO-8601 format timestamp strings. --- src/python/api/client/DAO.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 841ab98..28b330a 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -1062,7 +1062,6 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: - #print(ts, arg2, arg3) if arg2 is None and arg3 is None: return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} @@ -1076,10 +1075,16 @@ def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: d if arg3 is not None: if isinstance(arg3, datetime): msg_dict: dict = arg2 - msg_dict['received_at'] = arg3 + if isinstance(arg3, datetime): + msg_dict['received_at'] = arg3.isoformat() + else: + msg_dict['received_at'] = arg3 else: msg_dict: dict = arg3 - msg_dict['received_at'] = arg2 + if isinstance(arg2, datetime): + msg_dict['received_at'] = arg2.isoformat() + else: + msg_dict['received_at'] = arg2 return msg_dict From 0f840c8219df432ebf88efd4e34dd64e621ca3c5 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 17 Jul 2024 08:51:52 +1000 Subject: [PATCH 05/10] WIP: First attempt at download data for a single logical device. --- src/www/app/main.py | 65 +++++++- src/www/app/templates/base.html | 8 + .../app/templates/logical_device_form.html | 149 ++++++++++++++++++ src/www/app/utils/api.py | 17 +- src/www/requirements.txt | Bin 1142 -> 1142 bytes 5 files changed, 237 insertions(+), 2 deletions(-) diff --git a/src/www/app/main.py b/src/www/app/main.py index 83a4758..699adbf 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -1,9 +1,13 @@ import atexit +import io import logging +import pandas as pd import time from typing import Tuple +import uuid +from zoneinfo import ZoneInfo -from flask import Flask, render_template, request, redirect, url_for, session, send_from_directory +from flask import Flask, render_template, request, redirect, url_for, session, send_from_directory, send_file import folium import paho.mqtt.client as mqtt @@ -599,6 +603,62 @@ def UpdateMappings(): return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code +@app.route('/download-data', methods=['POST']) +def DownloadData(): + try: + user_timezone = request.cookies.get('timezone') + logging.info(f'tz = {user_timezone}') + + l_uid = int(request.form.get('l_uid')) + start_ts = request.form.get('start_ts') + end_ts = request.form.get('end_ts') + + token = session.get('token') + + logical_dev = get_logical_device(l_uid, token) + + logging.info(request.form) + logging.info(f'start_ts = {start_ts}') + logging.info(f'end_ts = {end_ts}') + logging.info(f'l_uid = {l_uid}') + + start = None + end = None + if start_ts is not None and len(start_ts) > 7: + start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone)) + if end_ts is not None and len(end_ts) > 7: + start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone)) + + msgs = get_messages(token, l_uid, start, end) + logging.info(msgs) + if len(msgs) < 1: + return 'Success', 200 + + dataset = [] + for msg in msgs: + item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at']} + for obj in msg['timeseries']: + item[obj['name']] = obj['value'] + dataset.append(item) + + df = pd.DataFrame(dataset) + df.set_index(['l_uid', 'ts'], inplace=True) + df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True) + + buffer = io.BytesIO() + df.to_csv(buffer, encoding='UTF-8') + buffer.seek(0) + + return send_file(buffer, as_attachment=True, download_name=f'{logical_dev.name}.csv') + + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 403: + return f"You do not have sufficient permissions to make this change", e.response.status_code + + return f"HTTP request with RestAPI failed with error {e.response.status_code}", e.response.status_code + + @app.route('/end-ld-mapping', methods=['GET']) def EndLogicalDeviceMapping(): uid = request.args['uid'] @@ -721,4 +781,7 @@ def exit_handler(): atexit.register(exit_handler) + #app.jinja_env.auto_reload = True + #app.config['TEMPLATES_AUTO_RELOAD'] = True + #app.run(port='5000', host='0.0.0.0', debug=True) app.run(port='5000', host='0.0.0.0') diff --git a/src/www/app/templates/base.html b/src/www/app/templates/base.html index c72c248..ca1c498 100644 --- a/src/www/app/templates/base.html +++ b/src/www/app/templates/base.html @@ -8,14 +8,22 @@ + + + + + + + +
+

Export data

+
+
+
+ + + + + + + + +
FromTo
+
+ +
+ + +
+
+
+ +
    +
  • Export Data
  • Save
  • Update Mapping
  • End Mapping
  • diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index 9cfc983..ad87ffe 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -1,5 +1,6 @@ +import logging import json, os -from typing import List +from typing import Any, List, Optional import requests from datetime import datetime, timezone import base64 @@ -190,6 +191,20 @@ def end_logical_mapping(uid: str, token: str): requests.patch(url, headers=headers) +def get_messages(token: str, l_uid: int, start_ts: Optional[datetime] = None, end_ts: Optional[datetime] = None) -> List[Any]: + headers = {"Authorization": f"Bearer {token}"} + params = {"l_uid": l_uid, "include_received_at": True} + if start_ts is not None: + params['start'] = start_ts + if end_ts is not None: + params['end'] = start_ts + + response = requests.get(f'{end_point}/broker/api/messages', headers=headers, params=params) + logging.info(response) + response.raise_for_status() + return response.json() + + def end_physical_mapping(uid: str, token: str): """ End device mapping from a physical device (if any). If there was a mapping, the logical device also has no mapping after this call. diff --git a/src/www/requirements.txt b/src/www/requirements.txt index e5779aa69d7d493160817dbe8649e888256131db..b25e4633b1fd6bee9e90e63d2d04b0d5bb923317 100644 GIT binary patch delta 22 bcmeyy@r`4{Dke@N20aE4Hr%|K=`bSzQ%MGY delta 22 ccmeyy@r`4{Dke@t20aE7AU58-ndvYi08?8Af&c&j From 3c712f08dd8971516e2b408d82dd2054f943b732 Mon Sep 17 00:00:00 2001 From: dajtxx Date: Wed, 17 Jul 2024 13:44:53 +1000 Subject: [PATCH 06/10] Fixed copy/paste bugs, more explicit TZ handling, no longer writes csvs. --- src/python/api/client/DAO.py | 90 +++++++++++++++++------------------ src/python/restapi/RestAPI.py | 9 ++-- src/www/app/main.py | 27 ++++++++--- 3 files changed, 70 insertions(+), 56 deletions(-) diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 28b330a..564837a 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -355,7 +355,7 @@ def get_physical_devices(query_args=None) -> List[PhysicalDevice]: devs.append(d) rows = cursor.fetchmany() - + return devs except Exception as err: raise err if isinstance(err, DAOException) else DAOException('get_physical_devices failed.', err) @@ -532,9 +532,9 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice: with conn.cursor() as cursor: """ See: https://dba.stackexchange.com/questions/27732/set-names-to-attributes-when-creating-json-with-row-to-json - + for an explanation of how to name the fields in a row_to_json call. - + Example query tested in psql: select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location from logical_devices; """ @@ -778,14 +778,14 @@ def delete_mapping(mapping: PhysicalToLogicalMapping) -> None: raise err if isinstance(err, DAOException) else DAOException('delete_mapping failed.', err) finally: if conn is not None: - free_conn(conn) + free_conn(conn) def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: """ Change the is_active column in the database """ - + conn = None try: with _get_connection() as conn: @@ -805,7 +805,7 @@ def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, i if pd is not None and ld is not None: raise ValueError('Both pd and ld were provided, only give one.') - + p_uid = None if pd is not None: p_uid = pd.uid if isinstance(pd, PhysicalDevice) else pd @@ -927,7 +927,7 @@ def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[Physica with _get_connection() as conn, conn.cursor() as cursor: p_uid = pd.uid if isinstance(pd, PhysicalDevice) else pd cursor.execute('select physical_uid, logical_uid, start_time, end_time, is_active from physical_logical_map where physical_uid = %s order by start_time desc', (p_uid, )) - + for p_uid, l_uid, start_time, end_time, is_active in cursor: pd = _get_physical_device(conn, p_uid) ld = _get_logical_device(conn, l_uid) @@ -1003,7 +1003,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime if end is None: end = datetime.now(timezone.utc) if count is None or count > 65536: - count = 65536 + count = 65536 * 2 if count < 1: count = 1 @@ -1029,9 +1029,22 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime if not isinstance(end, datetime): raise TypeError - column_names = ['ts'] + """ + select date_trunc('second', ts at time zone 'UTC') as ts_utc, + date_trunc('second', received_at at time zone 'UTC') as recvd_utc, + json_msg + from physical_timeseries + where logical_uid = 413 + and ts + > '1970-01-01T00:00:00Z' + and ts <= NOW() + order by ts desc + limit 65535; + """ + + column_names = ["date_trunc('second', ts at time zone 'UTC') as ts_utc"] if include_received_at: - column_names.append('received_at') + column_names.append("date_trunc('second', received_at at time zone 'UTC') as received_at_utc") if not only_timestamp: column_names.append('json_msg') @@ -1043,7 +1056,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime # message. with _get_connection() as conn, conn.cursor() as cursor: qry = f""" - select ts {column_names} from physical_timeseries + select {column_names} from physical_timeseries where {uid_col_name} = %s and ts > %s and ts <= %s @@ -1053,7 +1066,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime args = (uid, start, end, count) cursor.execute(qry, args) - return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()] + return [_msg_tuple_to_obj(cursor.description, row) for row in cursor.fetchall()] except Exception as err: raise DAOException('get_physical_timeseries_message failed.', err) finally: @@ -1061,30 +1074,15 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime free_conn(conn) -def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: - if arg2 is None and arg3 is None: - return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} - - if arg2 is not None and arg3 is None: - if isinstance(arg2, datetime): - return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2} - else: - return arg2 - +def _msg_tuple_to_obj(cursor_description, values) -> dict: msg_dict = {} - if arg3 is not None: - if isinstance(arg3, datetime): - msg_dict: dict = arg2 - if isinstance(arg3, datetime): - msg_dict['received_at'] = arg3.isoformat() - else: - msg_dict['received_at'] = arg3 - else: - msg_dict: dict = arg3 - if isinstance(arg2, datetime): - msg_dict['received_at'] = arg2.isoformat() - else: - msg_dict['received_at'] = arg2 + + for k, v in zip(cursor_description, values): + if isinstance(v, datetime): + # This function expects naive datetime instances to represent the time in UTC. + msg_dict[k.name] = v.astimezone(timezone.utc).isoformat() + elif isinstance(v, dict): + msg_dict.update(v) return msg_dict @@ -1112,7 +1110,7 @@ def user_add(uname: str, passwd: str, disabled: bool) -> None: #Generate salted password salt=os.urandom(64).hex() pass_hash=hashlib.scrypt(password=passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() - + #Auth token to be used on other endpoints auth_token=os.urandom(64).hex() conn = None @@ -1134,7 +1132,7 @@ def user_rm(uname: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("delete from users where username=%s", (uname,)) - + except Exception as err: raise err if isinstance(err, DAOException) else DAOException('user_ls failed.', err) finally: @@ -1202,14 +1200,14 @@ def user_get_token(username, password) -> str | None: result = cursor.fetchone() if result is None: return None - + db_salt, db_password, auth_token=result input_pw_hash=hashlib.scrypt(password=password.encode(), salt=db_salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() if input_pw_hash != db_password: #Incorrect password supplied return None - + return auth_token except Exception as err: @@ -1239,7 +1237,7 @@ def token_is_valid(user_token) -> bool: def token_refresh(uname) -> None: - + # Auth token to be used on other endpoints. auth_token = os.urandom(64).hex() @@ -1277,10 +1275,10 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: #Generate salted password salt = os.urandom(64).hex() pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() - + #Auth token to be used on other endpoints auth_token = os.urandom(64).hex() - + try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("update users set salt=%s, password=%s, auth_token=%s where auth_token=%s", (salt, pass_hash, auth_token, prev_token)) @@ -1298,11 +1296,11 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: def token_disable(uname) -> None: - + try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("update users set valid='F' where username=%s", (uname,)) - + except Exception as err: raise err if isinstance(err, DAOException) else DAOException('disable_token failed.', err) finally: @@ -1314,10 +1312,10 @@ def token_enable(uname)-> None: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("update users set valid='T' where username=%s", (uname,)) - + except Exception as err: raise err if isinstance(err, DAOException) else DAOException('disable_token failed.', err) finally: if conn is not None: free_conn(conn) - + diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 76b31e8..c228497 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -313,17 +313,17 @@ async def toggle_device_mapping(is_active:bool, puid: int = None, luid:int = Non """ if puid != None and luid != None: raise HTTPException(status_code=400, detail="Both physical and logical uid were provided. Only give one") - + if puid == None and luid == None: raise HTTPException(status_code=400, detail="A uid must be provided") - + current_mapping = dao.get_current_device_mapping(pd=puid, ld=luid) if current_mapping == None: raise HTTPException(status_code=404, detail="Device with uid provided could not be found") try: dao.toggle_device_mapping(is_active=is_active, pd=puid, ld=luid); - + except dao.DAOException as err: raise HTTPException(status_code=500, detail=err.msg) @@ -535,6 +535,7 @@ async def get_physical_timeseries( #logging.info(f'read {len(msgs)} messages') return msgs except dao.DAOException as err: + logging.exception(err) raise HTTPException(status_code=500, detail=err.msg) @@ -577,7 +578,7 @@ async def change_password(password:str, request:Request) -> str: @app.middleware("http") async def check_auth_header(request: Request, call_next): - + try: if not request.url.path in ['/docs', '/openapi.json', '/broker/api/token']: if not 'Authorization' in request.headers: diff --git a/src/www/app/main.py b/src/www/app/main.py index 699adbf..92d0b8c 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -627,7 +627,19 @@ def DownloadData(): if start_ts is not None and len(start_ts) > 7: start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone)) if end_ts is not None and len(end_ts) > 7: - start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone)) + end = datetime.fromisoformat(end_ts).replace(tzinfo=ZoneInfo(user_timezone)) + + if start is not None: + logging.info(f'start = {start}') + + if end is not None: + logging.info(f'end = {end}') + + # The web page accepts an end-date with no time attached. Assuming the user wants the messages from + # the end day, and the DB query does a < on the end date, add 1 day to the end date to take it to + # midnight of the selected end date. + end = end + timedelta(days=1) + logging.info(f'adjusted end = {end}') msgs = get_messages(token, l_uid, start, end) logging.info(msgs) @@ -636,12 +648,15 @@ def DownloadData(): dataset = [] for msg in msgs: - item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at']} + item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at_utc']} for obj in msg['timeseries']: item[obj['name']] = obj['value'] dataset.append(item) df = pd.DataFrame(dataset) + df['ts'] = pd.to_datetime(df['ts']) + df['received_at'] = pd.to_datetime(df['received_at']) + df['ts_local'] = df['ts'].dt.tz_convert(user_timezone) df.set_index(['l_uid', 'ts'], inplace=True) df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True) @@ -781,7 +796,7 @@ def exit_handler(): atexit.register(exit_handler) - #app.jinja_env.auto_reload = True - #app.config['TEMPLATES_AUTO_RELOAD'] = True - #app.run(port='5000', host='0.0.0.0', debug=True) - app.run(port='5000', host='0.0.0.0') + app.jinja_env.auto_reload = True + app.config['TEMPLATES_AUTO_RELOAD'] = True + app.run(port='5000', host='0.0.0.0', debug=True) + #app.run(port='5000', host='0.0.0.0') From ad745dbab3ce82a4392da10889b92cd1d1ea96d5 Mon Sep 17 00:00:00 2001 From: dajtxx Date: Wed, 17 Jul 2024 14:07:19 +1000 Subject: [PATCH 07/10] Another copy/paste error, removed some info logs. --- src/www/app/main.py | 1 - src/www/app/utils/api.py | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/www/app/main.py b/src/www/app/main.py index 857d9a1..5b2a49f 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -642,7 +642,6 @@ def DownloadData(): logging.info(f'adjusted end = {end}') msgs = get_messages(token, l_uid, start, end) - logging.info(msgs) if len(msgs) < 1: return 'Success', 200 diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index ad87ffe..e4de240 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -197,10 +197,12 @@ def get_messages(token: str, l_uid: int, start_ts: Optional[datetime] = None, en if start_ts is not None: params['start'] = start_ts if end_ts is not None: - params['end'] = start_ts + params['end'] = end_ts response = requests.get(f'{end_point}/broker/api/messages', headers=headers, params=params) - logging.info(response) + if response.status_code != 200: + logging.error(response.json()) + response.raise_for_status() return response.json() From c5ad15cb3b33d0e99c605b8068de43659ca56801 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 17 Jul 2024 14:11:50 +1000 Subject: [PATCH 08/10] Issue 88 WIP to master. (#90) * WIP: Adding received_at field to REST API & DAO get messages function. * REST API & DAO updates for this feature. * Webapp changes to handle the geometry location columns, unit test fixes. * Normalising the DAO get messages function to return timestamps in the message as a string. This means callers to the DAO and the REST API both get ISO-8601 format timestamp strings. * WIP: First attempt at download data for a single logical device. * Fixed copy/paste bugs, more explicit TZ handling, no longer writes csvs. * Another copy/paste error, removed some info logs. --------- Co-authored-by: dajtxx --- src/python/api/client/DAO.py | 90 +++++++++++++++++------------------ src/python/restapi/RestAPI.py | 9 ++-- src/www/app/main.py | 20 ++++++-- src/www/app/utils/api.py | 6 ++- 4 files changed, 70 insertions(+), 55 deletions(-) diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 28b330a..564837a 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -355,7 +355,7 @@ def get_physical_devices(query_args=None) -> List[PhysicalDevice]: devs.append(d) rows = cursor.fetchmany() - + return devs except Exception as err: raise err if isinstance(err, DAOException) else DAOException('get_physical_devices failed.', err) @@ -532,9 +532,9 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice: with conn.cursor() as cursor: """ See: https://dba.stackexchange.com/questions/27732/set-names-to-attributes-when-creating-json-with-row-to-json - + for an explanation of how to name the fields in a row_to_json call. - + Example query tested in psql: select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location from logical_devices; """ @@ -778,14 +778,14 @@ def delete_mapping(mapping: PhysicalToLogicalMapping) -> None: raise err if isinstance(err, DAOException) else DAOException('delete_mapping failed.', err) finally: if conn is not None: - free_conn(conn) + free_conn(conn) def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None: """ Change the is_active column in the database """ - + conn = None try: with _get_connection() as conn: @@ -805,7 +805,7 @@ def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, i if pd is not None and ld is not None: raise ValueError('Both pd and ld were provided, only give one.') - + p_uid = None if pd is not None: p_uid = pd.uid if isinstance(pd, PhysicalDevice) else pd @@ -927,7 +927,7 @@ def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[Physica with _get_connection() as conn, conn.cursor() as cursor: p_uid = pd.uid if isinstance(pd, PhysicalDevice) else pd cursor.execute('select physical_uid, logical_uid, start_time, end_time, is_active from physical_logical_map where physical_uid = %s order by start_time desc', (p_uid, )) - + for p_uid, l_uid, start_time, end_time, is_active in cursor: pd = _get_physical_device(conn, p_uid) ld = _get_logical_device(conn, l_uid) @@ -1003,7 +1003,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime if end is None: end = datetime.now(timezone.utc) if count is None or count > 65536: - count = 65536 + count = 65536 * 2 if count < 1: count = 1 @@ -1029,9 +1029,22 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime if not isinstance(end, datetime): raise TypeError - column_names = ['ts'] + """ + select date_trunc('second', ts at time zone 'UTC') as ts_utc, + date_trunc('second', received_at at time zone 'UTC') as recvd_utc, + json_msg + from physical_timeseries + where logical_uid = 413 + and ts + > '1970-01-01T00:00:00Z' + and ts <= NOW() + order by ts desc + limit 65535; + """ + + column_names = ["date_trunc('second', ts at time zone 'UTC') as ts_utc"] if include_received_at: - column_names.append('received_at') + column_names.append("date_trunc('second', received_at at time zone 'UTC') as received_at_utc") if not only_timestamp: column_names.append('json_msg') @@ -1043,7 +1056,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime # message. with _get_connection() as conn, conn.cursor() as cursor: qry = f""" - select ts {column_names} from physical_timeseries + select {column_names} from physical_timeseries where {uid_col_name} = %s and ts > %s and ts <= %s @@ -1053,7 +1066,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime args = (uid, start, end, count) cursor.execute(qry, args) - return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()] + return [_msg_tuple_to_obj(cursor.description, row) for row in cursor.fetchall()] except Exception as err: raise DAOException('get_physical_timeseries_message failed.', err) finally: @@ -1061,30 +1074,15 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime free_conn(conn) -def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: - if arg2 is None and arg3 is None: - return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} - - if arg2 is not None and arg3 is None: - if isinstance(arg2, datetime): - return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2} - else: - return arg2 - +def _msg_tuple_to_obj(cursor_description, values) -> dict: msg_dict = {} - if arg3 is not None: - if isinstance(arg3, datetime): - msg_dict: dict = arg2 - if isinstance(arg3, datetime): - msg_dict['received_at'] = arg3.isoformat() - else: - msg_dict['received_at'] = arg3 - else: - msg_dict: dict = arg3 - if isinstance(arg2, datetime): - msg_dict['received_at'] = arg2.isoformat() - else: - msg_dict['received_at'] = arg2 + + for k, v in zip(cursor_description, values): + if isinstance(v, datetime): + # This function expects naive datetime instances to represent the time in UTC. + msg_dict[k.name] = v.astimezone(timezone.utc).isoformat() + elif isinstance(v, dict): + msg_dict.update(v) return msg_dict @@ -1112,7 +1110,7 @@ def user_add(uname: str, passwd: str, disabled: bool) -> None: #Generate salted password salt=os.urandom(64).hex() pass_hash=hashlib.scrypt(password=passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() - + #Auth token to be used on other endpoints auth_token=os.urandom(64).hex() conn = None @@ -1134,7 +1132,7 @@ def user_rm(uname: str) -> None: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("delete from users where username=%s", (uname,)) - + except Exception as err: raise err if isinstance(err, DAOException) else DAOException('user_ls failed.', err) finally: @@ -1202,14 +1200,14 @@ def user_get_token(username, password) -> str | None: result = cursor.fetchone() if result is None: return None - + db_salt, db_password, auth_token=result input_pw_hash=hashlib.scrypt(password=password.encode(), salt=db_salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() if input_pw_hash != db_password: #Incorrect password supplied return None - + return auth_token except Exception as err: @@ -1239,7 +1237,7 @@ def token_is_valid(user_token) -> bool: def token_refresh(uname) -> None: - + # Auth token to be used on other endpoints. auth_token = os.urandom(64).hex() @@ -1277,10 +1275,10 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: #Generate salted password salt = os.urandom(64).hex() pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex() - + #Auth token to be used on other endpoints auth_token = os.urandom(64).hex() - + try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("update users set salt=%s, password=%s, auth_token=%s where auth_token=%s", (salt, pass_hash, auth_token, prev_token)) @@ -1298,11 +1296,11 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str: def token_disable(uname) -> None: - + try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("update users set valid='F' where username=%s", (uname,)) - + except Exception as err: raise err if isinstance(err, DAOException) else DAOException('disable_token failed.', err) finally: @@ -1314,10 +1312,10 @@ def token_enable(uname)-> None: try: with _get_connection() as conn, conn.cursor() as cursor: cursor.execute("update users set valid='T' where username=%s", (uname,)) - + except Exception as err: raise err if isinstance(err, DAOException) else DAOException('disable_token failed.', err) finally: if conn is not None: free_conn(conn) - + diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 76b31e8..c228497 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -313,17 +313,17 @@ async def toggle_device_mapping(is_active:bool, puid: int = None, luid:int = Non """ if puid != None and luid != None: raise HTTPException(status_code=400, detail="Both physical and logical uid were provided. Only give one") - + if puid == None and luid == None: raise HTTPException(status_code=400, detail="A uid must be provided") - + current_mapping = dao.get_current_device_mapping(pd=puid, ld=luid) if current_mapping == None: raise HTTPException(status_code=404, detail="Device with uid provided could not be found") try: dao.toggle_device_mapping(is_active=is_active, pd=puid, ld=luid); - + except dao.DAOException as err: raise HTTPException(status_code=500, detail=err.msg) @@ -535,6 +535,7 @@ async def get_physical_timeseries( #logging.info(f'read {len(msgs)} messages') return msgs except dao.DAOException as err: + logging.exception(err) raise HTTPException(status_code=500, detail=err.msg) @@ -577,7 +578,7 @@ async def change_password(password:str, request:Request) -> str: @app.middleware("http") async def check_auth_header(request: Request, call_next): - + try: if not request.url.path in ['/docs', '/openapi.json', '/broker/api/token']: if not 'Authorization' in request.headers: diff --git a/src/www/app/main.py b/src/www/app/main.py index 699adbf..5b2a49f 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -627,21 +627,35 @@ def DownloadData(): if start_ts is not None and len(start_ts) > 7: start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone)) if end_ts is not None and len(end_ts) > 7: - start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone)) + end = datetime.fromisoformat(end_ts).replace(tzinfo=ZoneInfo(user_timezone)) + + if start is not None: + logging.info(f'start = {start}') + + if end is not None: + logging.info(f'end = {end}') + + # The web page accepts an end-date with no time attached. Assuming the user wants the messages from + # the end day, and the DB query does a < on the end date, add 1 day to the end date to take it to + # midnight of the selected end date. + end = end + timedelta(days=1) + logging.info(f'adjusted end = {end}') msgs = get_messages(token, l_uid, start, end) - logging.info(msgs) if len(msgs) < 1: return 'Success', 200 dataset = [] for msg in msgs: - item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at']} + item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at_utc']} for obj in msg['timeseries']: item[obj['name']] = obj['value'] dataset.append(item) df = pd.DataFrame(dataset) + df['ts'] = pd.to_datetime(df['ts']) + df['received_at'] = pd.to_datetime(df['received_at']) + df['ts_local'] = df['ts'].dt.tz_convert(user_timezone) df.set_index(['l_uid', 'ts'], inplace=True) df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True) diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index ad87ffe..e4de240 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -197,10 +197,12 @@ def get_messages(token: str, l_uid: int, start_ts: Optional[datetime] = None, en if start_ts is not None: params['start'] = start_ts if end_ts is not None: - params['end'] = start_ts + params['end'] = end_ts response = requests.get(f'{end_point}/broker/api/messages', headers=headers, params=params) - logging.info(response) + if response.status_code != 200: + logging.error(response.json()) + response.raise_for_status() return response.json() From ce828791cb33340c5fc3dc2a937239e9bb7f4c0b Mon Sep 17 00:00:00 2001 From: dajtxx Date: Wed, 17 Jul 2024 15:58:13 +1000 Subject: [PATCH 09/10] Added a spinner to show the data download is happening. --- .../app/templates/logical_device_form.html | 127 +++++++++++++----- 1 file changed, 90 insertions(+), 37 deletions(-) diff --git a/src/www/app/templates/logical_device_form.html b/src/www/app/templates/logical_device_form.html index 5676784..36fcbc6 100644 --- a/src/www/app/templates/logical_device_form.html +++ b/src/www/app/templates/logical_device_form.html @@ -13,7 +13,34 @@ right: 16px; width: 100%; text-align: right; - } + } + + #loading-spinner { + position: fixed; + top: 0; + left: 0; + width: 100%; + height: 100%; + background-color: rgba(0, 0, 0, 0.5); /* semi-transparent background */ + z-index: 9999; /* high z-index to ensure it's on top */ + display: flex; + justify-content: center; + align-items: center; + } + + .spinner { + border: 4px solid #f3f3f3; + border-top: 4px solid #3498db; + border-radius: 50%; + width: 40px; + height: 40px; + animation: spin 1s linear infinite; + } + + @keyframes spin { + 0% { transform: rotate(0deg); } + 100% { transform: rotate(360deg); } + } + + +

    Export data

    @@ -157,8 +210,8 @@

    Export data

  • Update Mapping
  • End Mapping
  • {% if deviceMappings | length > 0 %} -
  • {{ "Pause Mapping" if deviceMappings[0].is_active == True else "Resume Mapping" }}
  • - {% endif %} +
  • {{ "Pause Mapping" if deviceMappings[0].is_active == True else "Resume Mapping" }}
  • + {% endif %}
@@ -262,4 +315,4 @@

Mapping

-{% endblock %} \ No newline at end of file +{% endblock %} From 1f22a161b10c4b6845cca647bfb3ead2383c4b7c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 18 Jul 2024 13:00:41 +1000 Subject: [PATCH 10/10] Sanitising device names in data export code. --- src/www/app/main.py | 4 +++- src/www/app/templates/logical_device_form.html | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/www/app/main.py b/src/www/app/main.py index 5b2a49f..5c9e7c9 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -663,7 +663,9 @@ def DownloadData(): df.to_csv(buffer, encoding='UTF-8') buffer.seek(0) - return send_file(buffer, as_attachment=True, download_name=f'{logical_dev.name}.csv') + sanitised_dev_name = re.sub(r'[^a-zA-Z0-9_-]', '', logical_dev.name) + + return send_file(buffer, as_attachment=True, download_name=f'{sanitised_dev_name}.csv') except requests.exceptions.HTTPError as e: diff --git a/src/www/app/templates/logical_device_form.html b/src/www/app/templates/logical_device_form.html index 36fcbc6..58c9e12 100644 --- a/src/www/app/templates/logical_device_form.html +++ b/src/www/app/templates/logical_device_form.html @@ -127,8 +127,7 @@ dialog.addEventListener("close", doFetch); }); -function exportData(l_uid, name) { - console.log("exportData " + l_uid + " " + name); +function exportData(l_uid) { dialog.returnValue = "Cancel"; dialog.showModal(); @@ -205,7 +204,7 @@

Export data

    -
  • Export Data
  • +
  • Export Data
  • Save
  • Update Mapping
  • End Mapping