diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index a37ce32..28b330a 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -995,7 +995,7 @@ def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None: free_conn(conn) -def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]: +def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, include_received_at: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]: conn = None if start is None: @@ -1029,14 +1029,21 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime if not isinstance(end, datetime): raise TypeError - column_name = 'ts' if only_timestamp else 'json_msg' + column_names = ['ts'] + if include_received_at: + column_names.append('received_at') + + if not only_timestamp: + column_names.append('json_msg') + + column_names = ', '.join(column_names) try: # Order messages by descending timestamp. If a caller asks for one message, they probably want the latest # message. with _get_connection() as conn, conn.cursor() as cursor: qry = f""" - select {column_name} from physical_timeseries + select ts {column_names} from physical_timeseries where {uid_col_name} = %s and ts > %s and ts <= %s @@ -1046,7 +1053,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime args = (uid, start, end, count) cursor.execute(qry, args) - return [row[0] for row in cursor.fetchall()] + return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()] except Exception as err: raise DAOException('get_physical_timeseries_message failed.', err) finally: @@ -1054,6 +1061,34 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime free_conn(conn) +def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict: + if arg2 is None and arg3 is None: + return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()} + + if arg2 is not None and arg3 is None: + if isinstance(arg2, datetime): + return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2} + else: + return arg2 + + msg_dict = {} + if arg3 is not None: + if isinstance(arg3, datetime): + msg_dict: dict = arg2 + if isinstance(arg3, datetime): + msg_dict['received_at'] = arg3.isoformat() + else: + msg_dict['received_at'] = arg3 + else: + msg_dict: dict = arg3 + if isinstance(arg2, datetime): + msg_dict['received_at'] = arg2.isoformat() + else: + msg_dict['received_at'] = arg2 + + return msg_dict + + def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None): conn = None try: diff --git a/src/python/restapi/RestAPI.py b/src/python/restapi/RestAPI.py index 387363e..76b31e8 100644 --- a/src/python/restapi/RestAPI.py +++ b/src/python/restapi/RestAPI.py @@ -460,7 +460,7 @@ async def end_mapping_of_logical_uid(uid: int) -> None: MESSAGE RELATED --------------------------------------------------------------------------""" -@router.get("/messages", tags=['Messages']) +@router.get("/messages", tags=['Messages'], dependencies=[Depends(token_auth_scheme)]) async def get_physical_timeseries( request: Request, p_uid: int | None = None, @@ -469,7 +469,8 @@ async def get_physical_timeseries( last: str = None, start: datetime.datetime = None, end: datetime.datetime = None, - only_timestamp: bool = False): + include_received_at: bool = False, + only_timestamp: bool = False) -> List[Dict]: """ Get the physical_timeseries entries described by the physical device uid and the parameters. @@ -505,8 +506,7 @@ async def get_physical_timeseries( except: raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "last"], "msg": "the first part of last must be an integer"}]}) - unit = last[-1] - + unit = str(last[-1]).lower() if unit == 'h': diff = datetime.timedelta(hours=i) elif unit == 'd': @@ -524,9 +524,9 @@ async def get_physical_timeseries( msgs = None if p_uid is not None: - msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, p_uid=p_uid) + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, p_uid=p_uid) elif l_uid is not None: - msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, l_uid=l_uid) + msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, l_uid=l_uid) if msgs is None: raise HTTPException(status_code=404, detail="Failed to retrieve messages") diff --git a/src/www/app/utils/api.py b/src/www/app/utils/api.py index 36bac10..9cfc983 100644 --- a/src/www/app/utils/api.py +++ b/src/www/app/utils/api.py @@ -1,4 +1,4 @@ -import json, logging, os +import json, os from typing import List import requests from datetime import datetime, timezone @@ -254,6 +254,10 @@ def create_logical_device(physical_device: PhysicalDevice, token: str) ->str: "location": physical_device.location, } + # This is to work around a problem that turned up after the change to PostGIS. + if logicalJson['location'] is not None and (logicalJson['location'].lat is None or logicalJson['location'].long is None): + logicalJson.pop('location') + response = requests.post(f'{end_point}/broker/api/logical/devices/', json=logicalJson, headers=headers) response.raise_for_status() diff --git a/test/python/test_dao.py b/test/python/test_dao.py index 8689ba3..1fa29f7 100644 --- a/test/python/test_dao.py +++ b/test/python/test_dao.py @@ -5,7 +5,6 @@ from pdmodels.Models import PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice from typing import Tuple import os -import pprint as pp logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z') logger = logging.getLogger(__name__) @@ -644,15 +643,16 @@ def test_get_physical_timeseries_messages(self): dao.insert_physical_timeseries_message(msg) msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])) - msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, p_uid=new_pdev.uid) + msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=False, include_received_at=False, p_uid=new_pdev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0], msg_ts[0]) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) msgs = dao.get_physical_timeseries_message(None, None, None, only_timestamp=True, p_uid=new_pdev.uid) self.assertEqual(len(msgs), len(msg_list)) - for i, ts in enumerate(msgs): - self.assertEqual(ts, msg_ts[i]) + for i, msg in enumerate(msgs): + print(msg) + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) _, new_ldev = self._create_default_logical_device() mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now()) @@ -679,18 +679,18 @@ def test_get_physical_timeseries_messages(self): # batch should be returned. msgs = dao.get_physical_timeseries_message(only_timestamp=True, l_uid=new_ldev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0], msg_ts[-1]) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[-1]) # This will return all the messages because 'end' has been set past the latest message timestamp. msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, l_uid=new_ldev.uid) self.assertEqual(len(msgs), len(msg_list)) - for i, ts in enumerate(msgs): - self.assertEqual(ts, msg_ts[i]) + for i, msg in enumerate(msgs): + self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i]) # Should return only the latest message. msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, count=1, l_uid=new_ldev.uid) self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0], msg_ts[0]) + self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0]) self.assertRaises(ValueError, dao.get_physical_timeseries_message) self.assertRaises(TypeError, dao.get_physical_timeseries_message, p_uid='x') diff --git a/test/python/test_get_physical_messages.py b/test/python/test_get_physical_messages.py index a30eccb..8e54ccf 100644 --- a/test/python/test_get_physical_messages.py +++ b/test/python/test_get_physical_messages.py @@ -6,6 +6,7 @@ import pytest from fastapi.testclient import TestClient +import BrokerConstants import test_utils as tu import api.client.DAO as dao from pdmodels.Models import PhysicalDevice @@ -43,7 +44,7 @@ def create_msgs(): with open('/tmp/msgs.json', 'w') as f: for i in range(0, max_msgs + 1): timestamps.append(ts) - msg = {'ts': ts.isoformat(), 'i': i} + msg = {BrokerConstants.TIMESTAMP_KEY: ts.isoformat(), 'i': i} msgs.append(msg) s = f'{pd.uid}\t{ts.isoformat()}\t{json.dumps(msg)}' print(s, file=f) @@ -76,24 +77,28 @@ def test_no_params(test_client): # Confirm the default count parameter value is correct, so 65536 of the 65537 messages are returned. response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid}) assert response.status_code == 200 - assert response.json() == msgs[:-1] + for a, b in zip(response.json(), msgs[:-1]): + assert a == b def test_no_params_ts(test_client): response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'only_timestamp': 1}) assert response.status_code == 200 + for a, b in zip(response.json(), timestamps): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_count(test_client): # Confirm the count parameter works. response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 50}) assert response.status_code == 200 - assert response.json() == msgs[:50] + #assert response.json() == msgs[:50] + for a, b in zip(response.json(), msgs[:50]): + assert a == b def test_count_ts(test_client): @@ -103,7 +108,7 @@ def test_count_ts(test_client): if a is None: break - assert dateutil.parser.isoparse(a) == b + assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b def test_start_after_end(test_client): @@ -121,7 +126,7 @@ def test_start_gives_gt(test_client): # Confirm start time parameter gets the next message greater than, not greater than equal to. response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[1]}) assert response.status_code == 200 - assert response.json() == [msgs[0]] + assert response.json()[0] == msgs[0] def test_invalid_count(test_client): @@ -144,17 +149,19 @@ def test_end(test_client): response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-1]}) assert response.status_code == 200 - assert response.json() == [msgs[-1]] + assert response.json()[0] == msgs[-1] response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-9]}) assert response.status_code == 200 - assert response.json() == msgs[-9:] + for a, b in zip(response.json(), msgs[-9:]): + assert a == b def test_start_end(test_client): response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[9], 'end': timestamps[5]}) assert response.status_code == 200 - assert response.json() == msgs[5:9] + for a, b in zip(response.json(), msgs[5:9]): + assert a == b def test_invalid_start_end(test_client): diff --git a/test/python/test_restapi.py b/test/python/test_restapi.py index 1b3fba1..594b365 100644 --- a/test/python/test_restapi.py +++ b/test/python/test_restapi.py @@ -1,16 +1,15 @@ import base64 -import copy, datetime, logging, time, unittest, uuid -from typing_extensions import assert_type -import re +import copy +import logging +import os +import time +import unittest from typing import Tuple -import api.client.DAO as dao import requests +import api.client.DAO as dao from pdmodels.Models import DeviceNote, PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice -from typing import Tuple - -import os from test_utils import now logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z')