Skip to content

Commit

Permalink
Improved how no messages in date range is handled.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajtxx committed Jul 21, 2024
2 parents a311491 + 1f22a16 commit dee3b95
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 65 deletions.
90 changes: 44 additions & 46 deletions src/python/api/client/DAO.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def get_physical_devices(query_args=None) -> List[PhysicalDevice]:
devs.append(d)

rows = cursor.fetchmany()

return devs
except Exception as err:
raise err if isinstance(err, DAOException) else DAOException('get_physical_devices failed.', err)
Expand Down Expand Up @@ -532,9 +532,9 @@ def _get_logical_device(conn, uid: int) -> LogicalDevice:
with conn.cursor() as cursor:
"""
See: https://dba.stackexchange.com/questions/27732/set-names-to-attributes-when-creating-json-with-row-to-json
for an explanation of how to name the fields in a row_to_json call.
Example query tested in psql:
select uid, name, (select row_to_json(_) from (select ST_Y(location) as lat, ST_X(location) as long) as _) as location from logical_devices;
"""
Expand Down Expand Up @@ -778,14 +778,14 @@ def delete_mapping(mapping: PhysicalToLogicalMapping) -> None:
raise err if isinstance(err, DAOException) else DAOException('delete_mapping failed.', err)
finally:
if conn is not None:
free_conn(conn)
free_conn(conn)


def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, int]] = None, ld: Optional[Union[LogicalDevice, int]] = None) -> None:
"""
Change the is_active column in the database
"""

conn = None
try:
with _get_connection() as conn:
Expand All @@ -805,7 +805,7 @@ def _toggle_device_mapping(conn, is_active, pd: Optional[Union[PhysicalDevice, i

if pd is not None and ld is not None:
raise ValueError('Both pd and ld were provided, only give one.')

p_uid = None
if pd is not None:
p_uid = pd.uid if isinstance(pd, PhysicalDevice) else pd
Expand Down Expand Up @@ -927,7 +927,7 @@ def get_physical_device_mappings(pd: Union[PhysicalDevice, int]) -> List[Physica
with _get_connection() as conn, conn.cursor() as cursor:
p_uid = pd.uid if isinstance(pd, PhysicalDevice) else pd
cursor.execute('select physical_uid, logical_uid, start_time, end_time, is_active from physical_logical_map where physical_uid = %s order by start_time desc', (p_uid, ))

for p_uid, l_uid, start_time, end_time, is_active in cursor:
pd = _get_physical_device(conn, p_uid)
ld = _get_logical_device(conn, l_uid)
Expand Down Expand Up @@ -1003,7 +1003,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime
if end is None:
end = datetime.now(timezone.utc)
if count is None or count > 65536:
count = 65536
count = 65536 * 2
if count < 1:
count = 1

Expand All @@ -1029,9 +1029,22 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime
if not isinstance(end, datetime):
raise TypeError

column_names = ['ts']
"""
select date_trunc('second', ts at time zone 'UTC') as ts_utc,
date_trunc('second', received_at at time zone 'UTC') as recvd_utc,
json_msg
from physical_timeseries
where logical_uid = 413
and ts
> '1970-01-01T00:00:00Z'
and ts <= NOW()
order by ts desc
limit 65535;
"""

column_names = ["date_trunc('second', ts at time zone 'UTC') as ts_utc"]
if include_received_at:
column_names.append('received_at')
column_names.append("date_trunc('second', received_at at time zone 'UTC') as received_at_utc")

if not only_timestamp:
column_names.append('json_msg')
Expand All @@ -1043,7 +1056,7 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime
# message.
with _get_connection() as conn, conn.cursor() as cursor:
qry = f"""
select ts {column_names} from physical_timeseries
select {column_names} from physical_timeseries
where {uid_col_name} = %s
and ts > %s
and ts <= %s
Expand All @@ -1053,38 +1066,23 @@ def get_physical_timeseries_message(start: datetime | None = None, end: datetime

args = (uid, start, end, count)
cursor.execute(qry, args)
return [_msg_tuple_to_obj(*row) for row in cursor.fetchall()]
return [_msg_tuple_to_obj(cursor.description, row) for row in cursor.fetchall()]
except Exception as err:
raise DAOException('get_physical_timeseries_message failed.', err)
finally:
if conn is not None:
free_conn(conn)


def _msg_tuple_to_obj(ts: datetime, arg2: datetime | dict | None = None, arg3: datetime | dict | None = None) -> dict:
if arg2 is None and arg3 is None:
return {BrokerConstants.TIMESTAMP_KEY: ts.isoformat()}

if arg2 is not None and arg3 is None:
if isinstance(arg2, datetime):
return {BrokerConstants.TIMESTAMP_KEY: ts, 'received_at': arg2}
else:
return arg2

def _msg_tuple_to_obj(cursor_description, values) -> dict:
msg_dict = {}
if arg3 is not None:
if isinstance(arg3, datetime):
msg_dict: dict = arg2
if isinstance(arg3, datetime):
msg_dict['received_at'] = arg3.isoformat()
else:
msg_dict['received_at'] = arg3
else:
msg_dict: dict = arg3
if isinstance(arg2, datetime):
msg_dict['received_at'] = arg2.isoformat()
else:
msg_dict['received_at'] = arg2

for k, v in zip(cursor_description, values):
if isinstance(v, datetime):
# This function expects naive datetime instances to represent the time in UTC.
msg_dict[k.name] = v.astimezone(timezone.utc).isoformat()
elif isinstance(v, dict):
msg_dict.update(v)

return msg_dict

Expand Down Expand Up @@ -1112,7 +1110,7 @@ def user_add(uname: str, passwd: str, disabled: bool) -> None:
#Generate salted password
salt=os.urandom(64).hex()
pass_hash=hashlib.scrypt(password=passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex()

#Auth token to be used on other endpoints
auth_token=os.urandom(64).hex()
conn = None
Expand All @@ -1134,7 +1132,7 @@ def user_rm(uname: str) -> None:
try:
with _get_connection() as conn, conn.cursor() as cursor:
cursor.execute("delete from users where username=%s", (uname,))

except Exception as err:
raise err if isinstance(err, DAOException) else DAOException('user_ls failed.', err)
finally:
Expand Down Expand Up @@ -1202,14 +1200,14 @@ def user_get_token(username, password) -> str | None:
result = cursor.fetchone()
if result is None:
return None

db_salt, db_password, auth_token=result
input_pw_hash=hashlib.scrypt(password=password.encode(), salt=db_salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex()

if input_pw_hash != db_password:
#Incorrect password supplied
return None

return auth_token

except Exception as err:
Expand Down Expand Up @@ -1239,7 +1237,7 @@ def token_is_valid(user_token) -> bool:


def token_refresh(uname) -> None:

# Auth token to be used on other endpoints.
auth_token = os.urandom(64).hex()

Expand Down Expand Up @@ -1277,10 +1275,10 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str:
#Generate salted password
salt = os.urandom(64).hex()
pass_hash = hashlib.scrypt(password=new_passwd.encode(), salt=salt.encode(), n=2**14, r=8, p=1, maxmem=0, dklen=64).hex()

#Auth token to be used on other endpoints
auth_token = os.urandom(64).hex()

try:
with _get_connection() as conn, conn.cursor() as cursor:
cursor.execute("update users set salt=%s, password=%s, auth_token=%s where auth_token=%s", (salt, pass_hash, auth_token, prev_token))
Expand All @@ -1298,11 +1296,11 @@ def user_change_password_and_token(new_passwd: str, prev_token: str) -> str:


def token_disable(uname) -> None:

try:
with _get_connection() as conn, conn.cursor() as cursor:
cursor.execute("update users set valid='F' where username=%s", (uname,))

except Exception as err:
raise err if isinstance(err, DAOException) else DAOException('disable_token failed.', err)
finally:
Expand All @@ -1314,10 +1312,10 @@ def token_enable(uname)-> None:
try:
with _get_connection() as conn, conn.cursor() as cursor:
cursor.execute("update users set valid='T' where username=%s", (uname,))

except Exception as err:
raise err if isinstance(err, DAOException) else DAOException('disable_token failed.', err)
finally:
if conn is not None:
free_conn(conn)

9 changes: 5 additions & 4 deletions src/python/restapi/RestAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,17 @@ async def toggle_device_mapping(is_active:bool, puid: int = None, luid:int = Non
"""
if puid != None and luid != None:
raise HTTPException(status_code=400, detail="Both physical and logical uid were provided. Only give one")

if puid == None and luid == None:
raise HTTPException(status_code=400, detail="A uid must be provided")

current_mapping = dao.get_current_device_mapping(pd=puid, ld=luid)
if current_mapping == None:
raise HTTPException(status_code=404, detail="Device with uid provided could not be found")

try:
dao.toggle_device_mapping(is_active=is_active, pd=puid, ld=luid);

except dao.DAOException as err:
raise HTTPException(status_code=500, detail=err.msg)

Expand Down Expand Up @@ -535,6 +535,7 @@ async def get_physical_timeseries(
#logging.info(f'read {len(msgs)} messages')
return msgs
except dao.DAOException as err:
logging.exception(err)
raise HTTPException(status_code=500, detail=err.msg)


Expand Down Expand Up @@ -577,7 +578,7 @@ async def change_password(password:str, request:Request) -> str:

@app.middleware("http")
async def check_auth_header(request: Request, call_next):

try:
if not request.url.path in ['/docs', '/openapi.json', '/broker/api/token']:
if not 'Authorization' in request.headers:
Expand Down
26 changes: 21 additions & 5 deletions src/www/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,29 +627,45 @@ def DownloadData():
if start_ts is not None and len(start_ts) > 7:
start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone))
if end_ts is not None and len(end_ts) > 7:
start = datetime.fromisoformat(start_ts).replace(tzinfo=ZoneInfo(user_timezone))
end = datetime.fromisoformat(end_ts).replace(tzinfo=ZoneInfo(user_timezone))

if start is not None:
logging.info(f'start = {start}')

if end is not None:
logging.info(f'end = {end}')

# The web page accepts an end-date with no time attached. Assuming the user wants the messages from
# the end day, and the DB query does a < on the end date, add 1 day to the end date to take it to
# midnight of the selected end date.
end = end + timedelta(days=1)
logging.info(f'adjusted end = {end}')

msgs = get_messages(token, l_uid, start, end)
logging.info(msgs)
if len(msgs) < 1:
return 'Success', 200
return 'No messages.', 204

dataset = []
for msg in msgs:
item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at']}
item = {'l_uid': l_uid, 'ts': msg['timestamp'], 'received_at': msg['received_at_utc']}
for obj in msg['timeseries']:
item[obj['name']] = obj['value']
dataset.append(item)

df = pd.DataFrame(dataset)
df['ts'] = pd.to_datetime(df['ts'])
df['received_at'] = pd.to_datetime(df['received_at'])
df['ts_local'] = df['ts'].dt.tz_convert(user_timezone)
df.set_index(['l_uid', 'ts'], inplace=True)
df.sort_index(level=0, sort_remaining=True, inplace=True, ascending=True)

buffer = io.BytesIO()
df.to_csv(buffer, encoding='UTF-8')
buffer.seek(0)

return send_file(buffer, as_attachment=True, download_name=f'{logical_dev.name}.csv')
sanitised_dev_name = re.sub(r'[^a-zA-Z0-9_-]', '', logical_dev.name)

return send_file(buffer, as_attachment=True, download_name=f'{sanitised_dev_name}.csv')


except requests.exceptions.HTTPError as e:
Expand Down
Loading

0 comments on commit dee3b95

Please sign in to comment.