Skip to content

Commit

Permalink
Issue 82 (#83) (#85)
Browse files Browse the repository at this point in the history
Implements issue #82
  • Loading branch information
dajtxx authored Jun 21, 2024
1 parent 6cb9993 commit 456fb97
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 36 deletions.
43 changes: 39 additions & 4 deletions src/python/api/client/DAO.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def insert_physical_timeseries_message(msg: Dict[str, Any]) -> None:
free_conn(conn)


def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]:
def get_physical_timeseries_message(start: datetime | None = None, end: datetime | None = None, count: int | None = None, only_timestamp: bool = False, include_received_at: bool = False, p_uid: int = None, l_uid: int = None) -> List[Dict]:
conn = None

if start is None:
Expand Down Expand Up @@ -1029,14 +1029,21 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime
if not isinstance(end, datetime):
raise TypeError

column_name = 'ts' if only_timestamp else 'json_msg'
column_names = ['ts']
if include_received_at:
column_names.append('received_at')

if not only_timestamp:
column_names.append('json_msg')

column_names = ', '.join(column_names)

try:
# Order messages by descending timestamp. If a caller asks for one message, they probably want the latest
# message.
with _get_connection() as conn, conn.cursor() as cursor:
qry = f"""
select {column_name} from physical_timeseries
select ts {column_names} from physical_timeseries
where {uid_col_name} = %s
and ts > %s
and ts <= %s
Expand All @@ -1046,14 +1053,42 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime

args = (uid, start, end, count)
cursor.execute(qry, args)
return [row[0] for row in cursor.fetchall()]
return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()]
except Exception as err:
raise DAOException('get_physical_timeseries_message failed.', err)
finally:
if conn is not None:
free_conn(conn)


def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict:
if arg2 is None and arg3 is None:
return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()}

if arg2 is not None and arg3 is None:
if isinstance(arg2, datetime):
return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2}
else:
return arg2

msg_dict = {}
if arg3 is not None:
if isinstance(arg3, datetime):
msg_dict: dict = arg2
if isinstance(arg3, datetime):
msg_dict['received_at'] = arg3.isoformat()
else:
msg_dict['received_at'] = arg3
else:
msg_dict: dict = arg3
if isinstance(arg2, datetime):
msg_dict['received_at'] = arg2.isoformat()
else:
msg_dict['received_at'] = arg2

return msg_dict


def add_raw_text_message(source_name: str, ts: datetime, correlation_uuid: str, msg, uid: int=None):
conn = None
try:
Expand Down
12 changes: 6 additions & 6 deletions src/python/restapi/RestAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ async def end_mapping_of_logical_uid(uid: int) -> None:
MESSAGE RELATED
--------------------------------------------------------------------------"""

@router.get("/messages", tags=['Messages'])
@router.get("/messages", tags=['Messages'], dependencies=[Depends(token_auth_scheme)])
async def get_physical_timeseries(
request: Request,
p_uid: int | None = None,
Expand All @@ -469,7 +469,8 @@ async def get_physical_timeseries(
last: str = None,
start: datetime.datetime = None,
end: datetime.datetime = None,
only_timestamp: bool = False):
include_received_at: bool = False,
only_timestamp: bool = False) -> List[Dict]:
"""
Get the physical_timeseries entries described by the physical device uid and the parameters.
Expand Down Expand Up @@ -505,8 +506,7 @@ async def get_physical_timeseries(
except:
raise HTTPException(status_code=422, detail={"detail": [{"loc": ["query", "last"], "msg": "the first part of last must be an integer"}]})

unit = last[-1]

unit = str(last[-1]).lower()
if unit == 'h':
diff = datetime.timedelta(hours=i)
elif unit == 'd':
Expand All @@ -524,9 +524,9 @@ async def get_physical_timeseries(

msgs = None
if p_uid is not None:
msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, p_uid=p_uid)
msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, p_uid=p_uid)
elif l_uid is not None:
msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, l_uid=l_uid)
msgs = dao.get_physical_timeseries_message(start, end, count, only_timestamp, include_received_at, l_uid=l_uid)

if msgs is None:
raise HTTPException(status_code=404, detail="Failed to retrieve messages")
Expand Down
6 changes: 5 additions & 1 deletion src/www/app/utils/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import json, logging, os
import json, os
from typing import List
import requests
from datetime import datetime, timezone
Expand Down Expand Up @@ -254,6 +254,10 @@ def create_logical_device(physical_device: PhysicalDevice, token: str) ->str:
"location": physical_device.location,
}

# This is to work around a problem that turned up after the change to PostGIS.
if logicalJson['location'] is not None and (logicalJson['location'].lat is None or logicalJson['location'].long is None):
logicalJson.pop('location')

response = requests.post(f'{end_point}/broker/api/logical/devices/', json=logicalJson, headers=headers)
response.raise_for_status()

Expand Down
18 changes: 9 additions & 9 deletions test/python/test_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from pdmodels.Models import PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice
from typing import Tuple
import os
import pprint as pp

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z')
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -644,15 +643,16 @@ def test_get_physical_timeseries_messages(self):
dao.insert_physical_timeseries_message(msg)
msg_ts.append(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]))

msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=True, p_uid=new_pdev.uid)
msgs = dao.get_physical_timeseries_message(None, None, 1, only_timestamp=False, include_received_at=False, p_uid=new_pdev.uid)
self.assertEqual(len(msgs), 1)
self.assertEqual(msgs[0], msg_ts[0])
self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0])

msgs = dao.get_physical_timeseries_message(None, None, None, only_timestamp=True, p_uid=new_pdev.uid)

self.assertEqual(len(msgs), len(msg_list))
for i, ts in enumerate(msgs):
self.assertEqual(ts, msg_ts[i])
for i, msg in enumerate(msgs):
print(msg)
self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i])

_, new_ldev = self._create_default_logical_device()
mapping = PhysicalToLogicalMapping(pd=new_pdev, ld=new_ldev, start_time=_now())
Expand All @@ -679,18 +679,18 @@ def test_get_physical_timeseries_messages(self):
# batch should be returned.
msgs = dao.get_physical_timeseries_message(only_timestamp=True, l_uid=new_ldev.uid)
self.assertEqual(len(msgs), 1)
self.assertEqual(msgs[0], msg_ts[-1])
self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[-1])

# This will return all the messages because 'end' has been set past the latest message timestamp.
msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, l_uid=new_ldev.uid)
self.assertEqual(len(msgs), len(msg_list))
for i, ts in enumerate(msgs):
self.assertEqual(ts, msg_ts[i])
for i, msg in enumerate(msgs):
self.assertEqual(dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]), msg_ts[i])

# Should return only the latest message.
msgs = dao.get_physical_timeseries_message(end=now + datetime.timedelta(days=1), only_timestamp=True, count=1, l_uid=new_ldev.uid)
self.assertEqual(len(msgs), 1)
self.assertEqual(msgs[0], msg_ts[0])
self.assertEqual(dateutil.parser.isoparse(msgs[0][BrokerConstants.TIMESTAMP_KEY]), msg_ts[0])

self.assertRaises(ValueError, dao.get_physical_timeseries_message)
self.assertRaises(TypeError, dao.get_physical_timeseries_message, p_uid='x')
Expand Down
25 changes: 16 additions & 9 deletions test/python/test_get_physical_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from fastapi.testclient import TestClient

import BrokerConstants
import test_utils as tu
import api.client.DAO as dao
from pdmodels.Models import PhysicalDevice
Expand Down Expand Up @@ -43,7 +44,7 @@ def create_msgs():
with open('/tmp/msgs.json', 'w') as f:
for i in range(0, max_msgs + 1):
timestamps.append(ts)
msg = {'ts': ts.isoformat(), 'i': i}
msg = {BrokerConstants.TIMESTAMP_KEY: ts.isoformat(), 'i': i}
msgs.append(msg)
s = f'{pd.uid}\t{ts.isoformat()}\t{json.dumps(msg)}'
print(s, file=f)
Expand Down Expand Up @@ -76,24 +77,28 @@ def test_no_params(test_client):
# Confirm the default count parameter value is correct, so 65536 of the 65537 messages are returned.
response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid})
assert response.status_code == 200
assert response.json() == msgs[:-1]
for a, b in zip(response.json(), msgs[:-1]):
assert a == b


def test_no_params_ts(test_client):
response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'only_timestamp': 1})
assert response.status_code == 200

for a, b in zip(response.json(), timestamps):
if a is None:
break

assert dateutil.parser.isoparse(a) == b
assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b


def test_count(test_client):
# Confirm the count parameter works.
response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'count': 50})
assert response.status_code == 200
assert response.json() == msgs[:50]
#assert response.json() == msgs[:50]
for a, b in zip(response.json(), msgs[:50]):
assert a == b


def test_count_ts(test_client):
Expand All @@ -103,7 +108,7 @@ def test_count_ts(test_client):
if a is None:
break

assert dateutil.parser.isoparse(a) == b
assert dateutil.parser.isoparse(a[BrokerConstants.TIMESTAMP_KEY]) == b


def test_start_after_end(test_client):
Expand All @@ -121,7 +126,7 @@ def test_start_gives_gt(test_client):
# Confirm start time parameter gets the next message greater than, not greater than equal to.
response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'start': timestamps[1]})
assert response.status_code == 200
assert response.json() == [msgs[0]]
assert response.json()[0] == msgs[0]


def test_invalid_count(test_client):
Expand All @@ -144,17 +149,19 @@ def test_end(test_client):

response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-1]})
assert response.status_code == 200
assert response.json() == [msgs[-1]]
assert response.json()[0] == msgs[-1]

response = test_client.get(f'/broker/api/messages', params={'p_uid': pd.uid, 'end': timestamps[-9]})
assert response.status_code == 200
assert response.json() == msgs[-9:]
for a, b in zip(response.json(), msgs[-9:]):
assert a == b


def test_start_end(test_client):
response = test_client.get(f'/broker/api/messages/', params={'p_uid': pd.uid, 'start': timestamps[9], 'end': timestamps[5]})
assert response.status_code == 200
assert response.json() == msgs[5:9]
for a, b in zip(response.json(), msgs[5:9]):
assert a == b


def test_invalid_start_end(test_client):
Expand Down
13 changes: 6 additions & 7 deletions test/python/test_restapi.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import base64
import copy, datetime, logging, time, unittest, uuid
from typing_extensions import assert_type
import re
import copy
import logging
import os
import time
import unittest
from typing import Tuple

import api.client.DAO as dao
import requests

import api.client.DAO as dao
from pdmodels.Models import DeviceNote, PhysicalDevice, PhysicalToLogicalMapping, Location, LogicalDevice
from typing import Tuple

import os
from test_utils import now

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S%z')
Expand Down

0 comments on commit 456fb97

Please sign in to comment.