From 35c1c38ea0261847ebe0ed008b7270d4f86084b1 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 1 Aug 2024 09:54:25 +1000 Subject: [PATCH 1/2] Issue 93 (#97) This commit vastly simplifies the code of individual delivery services, while adding extra protection against service failure. The BaseWriter class accepts messages from logical_timeseries RabbitMQ exchange and writes them to a service-specific database table. A thread reads from the table and invokes the service-specific on_message method to deliver the message to the destination platform. on_message may return ok, fail, or retry. The service-specific code is responsible to deciding how many times a message should be retried before being marked as a fail. The Ubidots client code was updated to include the broker correlation id in its log messages, and also have the 0.3s delay before calls to adding data which is probably what caused some of the error responses we're seeing. --- src/python/api/client/DAO.py | 121 +++++++- src/python/api/client/Ubidots.py | 21 +- src/python/broker-cli.py | 13 +- src/python/delivery/BaseWriter.py | 220 ++++++++++++++ src/python/delivery/FRRED.py | 192 +++--------- src/python/delivery/LTSLogger.py | 30 ++ src/python/delivery/LTSReader.py | 99 ------ src/python/delivery/UbidotsWriter.py | 433 +++++++++++---------------- 8 files changed, 589 insertions(+), 540 deletions(-) create mode 100644 src/python/delivery/BaseWriter.py create mode 100644 src/python/delivery/LTSLogger.py delete mode 100644 src/python/delivery/LTSReader.py diff --git a/src/python/api/client/DAO.py b/src/python/api/client/DAO.py index 564837a4..cc108600 100644 --- a/src/python/api/client/DAO.py +++ b/src/python/api/client/DAO.py @@ -7,15 +7,17 @@ import psycopg2.errors from psycopg2.extensions import AsIs from psycopg2.extras import Json, register_uuid -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import hashlib import os import BrokerConstants from pdmodels.Models import BaseDevice, DeviceNote, LogicalDevice, PhysicalDevice, PhysicalToLogicalMapping, User +from threading import Lock logging.captureWarnings(True) +_lock = Lock() class DAOException(Exception): def __init__(self, msg: str = None, wrapped: Exception = None): @@ -50,10 +52,19 @@ class DAOUniqeConstraintException(DAOException): """ +_stopped = False + def stop() -> None: + global _stopped logging.info('Closing connection pool.') - if conn_pool is not None: - conn_pool.closeall() + _lock.acquire() + try: + if not _stopped: + _stopped = True + if conn_pool is not None: + conn_pool.closeall() + finally: + _lock.release() @contextmanager @@ -73,8 +84,8 @@ def _get_connection(): # This throws an exception if the db hostname cannot be resolved, or # the database is not accepting connections. try: - # Try lazy initialisation the connection pool and Location/point - # converter to give the db as much time as possible to start. + # Try lazy initialisation the connection pool to give the db as + # much time as possible to start. if conn_pool is None: logging.info('Creating connection pool, registering type converters.') conn_pool = pool.ThreadedConnectionPool(1, 5) @@ -792,7 +803,7 @@ def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, in _toggle_device_mapping(conn, is_active, pd, ld) return except Exception as err: - raise err if isinstance(err, DAOException) else DAOException('pause_current_device_mapping failed.', err) + raise err if isinstance(err, DAOException) else DAOException('toggle_device_mapping failed.', err) finally: if conn is not None: free_conn(conn) @@ -1319,3 +1330,101 @@ def token_enable(uname)-> None: if conn is not None: free_conn(conn) + +# =========================================================================== +# Delivery thread related functions +# =========================================================================== + +def _get_delivery_table_id(name: str) -> str: + #return sql.Identifier(f'{name}_delivery_q') + return f'{name}_delivery_q' + +def create_delivery_table(name: str) -> None: + logging.info(f'Creating message delivery table for service {name}') + + try: + qry = f"""create table if not exists {_get_delivery_table_id(name)} ( + uid integer generated always as identity primary key, + json_msg jsonb not null, + retry_count integer not null default 0)""" + + with _get_connection() as conn, conn.cursor() as cursor: + cursor.execute(qry) + + except Exception as err: + raise err if isinstance(err, DAOException) else DAOException('create_delivery_table failed.', err) + finally: + if conn is not None: + conn.commit() + free_conn(conn) + +def get_delivery_msg_count(name: str) -> int: + try: + with _get_connection() as conn, conn.cursor() as cursor: + cursor.execute(f'select count(uid) from {_get_delivery_table_id(name)}') + return cursor.fetchone()[0] + + except Exception as err: + raise err if isinstance(err, DAOException) else DAOException('get_delivery_msg_count failed.', err) + finally: + if conn is not None: + conn.commit() + free_conn(conn) + +def get_delivery_msg_batch(name: str, from_uid: int = 0, batch_size: int = 10) -> List[Tuple[int, list[dict[Any]]]]: + try: + with _get_connection() as conn, conn.cursor() as cursor: + # Using order by asc in case time series databases need values inserted in timestamp order. + cursor.execute(f'select uid, json_msg, retry_count from {_get_delivery_table_id(name)} where uid > %s order by uid asc limit %s', (from_uid, batch_size)) + if cursor.rowcount < 1: + return 0, [] + + return cursor.fetchall() + + except Exception as err: + raise err if isinstance(err, DAOException) else DAOException('get_delivery_msg_batch failed.', err) + finally: + if conn is not None: + conn.commit() + free_conn(conn) + +def add_delivery_msg(name: str, msg: dict[Any]) -> None: + try: + with _get_connection() as conn, conn.cursor() as cursor: + cursor.execute(f'insert into {_get_delivery_table_id(name)} (json_msg) values (%s)', (Json(msg), )) + + except Exception as err: + raise err if isinstance(err, DAOException) else DAOException('add_delivery_msg failed.', err) + finally: + if conn is not None: + conn.commit() + free_conn(conn) + +def remove_delivery_msg(name: str, uid: int) -> None: + try: + with _get_connection() as conn, conn.cursor() as cursor: + cursor.execute(f'delete from {_get_delivery_table_id(name)} where uid = %s', (uid, )) + + except Exception as err: + raise err if isinstance(err, DAOException) else DAOException('remove_delivery_msg failed.', err) + finally: + if conn is not None: + conn.commit() + free_conn(conn) + +def retry_delivery_msg(name: str, uid: int) -> None: + try: + with _get_connection() as conn, conn.cursor() as cursor: + cursor.execute(f'select retry_count from {_get_delivery_table_id(name)} where uid = %s', (uid, )) + if cursor.rowcount < 1: + return + + retry_count = cursor.fetchone()[0] + 1 + cursor.execute(f'update {_get_delivery_table_id(name)} set retry_count = %s where uid = %s', (retry_count, uid)) + + except Exception as err: + raise err if isinstance(err, DAOException) else DAOException('retry_delivery_msg failed.', err) + finally: + if conn is not None: + conn.commit() + free_conn(conn) diff --git a/src/python/api/client/Ubidots.py b/src/python/api/client/Ubidots.py index 3e5c19e8..592a7c98 100644 --- a/src/python/api/client/Ubidots.py +++ b/src/python/api/client/Ubidots.py @@ -4,6 +4,8 @@ from pdmodels.Models import Location, LogicalDevice +import util.LoggingUtil as lu + BASE_1_6 = "https://industrial.api.ubidots.com.au/api/v1.6" BASE_2_0 = "https://industrial.api.ubidots.com.au/api/v2.0" @@ -100,25 +102,25 @@ def get_all_devices() -> List[LogicalDevice]: if response_obj['next'] is None: break - + page += 1 return devices -def get_device(label: str) -> LogicalDevice: +def get_device(label: str, logging_ctx: dict) -> LogicalDevice: url = f'{BASE_2_0}/devices/~{label}' time.sleep(0.3) r = requests.get(url, headers=headers) if r.status_code != 200: - logging.warn(f'devices/~{label} received response: {r.status_code}: {r.reason}') + lu.cid_logger.error(f'devices/~{label} received response: {r.status_code}: {r.reason}', extra=logging_ctx) return None response_obj = json.loads(r.content) return _dict_to_logical_device(response_obj) -def post_device_data(label: str, body) -> None: +def post_device_data(label: str, body: dict, logging_ctx: dict) -> bool: """ Post timeseries data to an Ubidots device. @@ -130,22 +132,25 @@ def post_device_data(label: str, body) -> None: 'temperature': {'value': 37.17, 'timestamp': 1643934748392} } """ + time.sleep(0.3) url = f'{BASE_1_6}/devices/{label}' hdrs = headers hdrs['Content-Type'] = 'application/json' body_str = json.dumps(body) r = requests.post(url, headers=hdrs, data=body_str) if r.status_code != 200: - logging.info(f'POST {url}: {r.status_code}: {r.reason}') - logging.info(body_str) + lu.cid_logger.info(f'POST {url}: {r.status_code}: {r.reason}', extra=logging_ctx) + return False + + return True -def update_device(label: str, patch_obj) -> None: +def update_device(label: str, patch_obj: dict, logging_ctx: dict) -> None: url = f'{BASE_2_0}/devices/~{label}' time.sleep(0.3) response = requests.patch(url, headers=headers, json=patch_obj) if response.status_code != 200: - logging.warning(f'PATCH response: {response.status_code}: {response.reason}') + lu.cid_logger.error(f'PATCH response: {response.status_code}: {response.reason}', extra=logging_ctx) def main(): diff --git a/src/python/broker-cli.py b/src/python/broker-cli.py index 5d4c9529..780b96f5 100755 --- a/src/python/broker-cli.py +++ b/src/python/broker-cli.py @@ -19,6 +19,7 @@ def str_to_logical_device(val) -> LogicalDevice: def str_to_dict(val) -> Dict: + print(val, type(val)) return json.loads(val) @@ -207,7 +208,7 @@ def plain_pd_list(devs: List[PhysicalDevice]): def dict_from_file_or_string() -> dict: - if (hasattr(args, 'pd') or hasattr(args, 'ld')) and (hasattr(args, 'in_filename') and args.in_filename is not None): + if ((hasattr(args, 'pd') and args.pd is not None) or (hasattr(args, 'ld') and args.ld is not None)) and (hasattr(args, 'in_filename') and args.in_filename is not None): raise RuntimeError('error: --json and --file are mutually exclusive.') json_obj = None @@ -283,7 +284,7 @@ def main() -> None: dev = PhysicalDevice.parse_obj(dev) print(pretty_print_json(dao.update_physical_device(dev))) - + elif args.cmd2 == 'rm': # Delete all physical_logical mappings to avoid foreign key violation mappings = dao.get_physical_device_mappings(pd=args.p_uid) @@ -373,9 +374,9 @@ def main() -> None: current_mapping = dao.get_current_device_mapping(pd=args.p_uid, ld=args.l_uid) if current_mapping is None: raise RuntimeError("No current mapping for the uid given") - + dao.toggle_device_mapping(args.enable, args.p_uid, args.l_uid) - + elif args.cmd1 == 'users': if args.cmd2 == 'add': dao.user_add(uname=args.uname, passwd=args.passwd, disabled=args.disabled) @@ -391,13 +392,13 @@ def main() -> None: elif args.enable == True: dao.token_enable(uname=args.uname) - + if args.refresh == True: dao.token_refresh(uname=args.uname) elif args.cmd2 == 'chng': dao.user_change_password(args.uname, args.passwd) - + elif args.cmd2 == 'ls': print(dao.user_ls()) diff --git a/src/python/delivery/BaseWriter.py b/src/python/delivery/BaseWriter.py new file mode 100644 index 00000000..6e859725 --- /dev/null +++ b/src/python/delivery/BaseWriter.py @@ -0,0 +1,220 @@ +from threading import Thread, Event +from typing import Any + +from pdmodels.Models import LogicalDevice, PhysicalDevice + +import json, logging, os, signal, time + +import BrokerConstants +import pika, pika.channel, pika.spec +from pika.exchange_type import ExchangeType + +import api.client.DAO as dao +import util.LoggingUtil as lu + +_user = os.environ['RABBITMQ_DEFAULT_USER'] +_passwd = os.environ['RABBITMQ_DEFAULT_PASS'] +_host = os.environ['RABBITMQ_HOST'] +_port = os.environ['RABBITMQ_PORT'] + +_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F' + + +class BaseWriter: + MSG_OK = 0 + MSG_RETRY = 1 + MSG_FAIL = 2 + + def __init__(self, name) -> None: + self.name: str = name + self.connection = None + self.channel = None + self.keep_running = True + + # The Event is used to signal the delivery thread that a new message has arrived or + # that it should stop. + self.evt = Event() + + signal.signal(signal.SIGTERM, self.sigterm_handler) + + def run(self) -> None: + """ + This method runs the blocking MQTT loop, waiting for messages from upstream + and writing them to the backing table. A separate thread reads them from + the backing table and attempts to deliver them. + """ + logging.info('===============================================================') + logging.info(f' STARTING {self.name.upper()} WRITER') + logging.info('===============================================================') + + delivery_thread = None + try: + dao.create_delivery_table(self.name) + + self.evt.clear() + delivery_thread = Thread(target=self.delivery_thread_proc, name='delivery_thread') + delivery_thread.start() + except dao.DAOException as err: + logging.exception('Failed to find or create service table') + exit(1) + + while self.keep_running: + try: + logging.info('Opening connection') + self.connection = None + connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str)) + + logging.info('Opening channel') + self.channel = connection.channel() + self.channel.basic_qos(prefetch_count=1) + logging.info('Declaring exchange') + self.channel.exchange_declare( + exchange=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, + exchange_type=ExchangeType.fanout, + durable=True) + logging.info('Declaring queue') + self.channel.queue_declare(queue=f'{self.name}_logical_msg_queue', durable=True) + self.channel.queue_bind(f'{self.name}_logical_msg_queue', BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, 'logical_timeseries') + + logging.info('Waiting for messages.') + # This loops until _channel.cancel is called in the signal handler. + for method, properties, body in self.channel.consume(f'{self.name}_logical_msg_queue'): + delivery_tag = method.delivery_tag + + # If the finish flag is set, reject the message so RabbitMQ will re-queue it + # and return early. + if not self.keep_running: + lu.cid_logger.info(f'NACK delivery tag {delivery_tag}, keep_running is False', extra=msg) + self.channel.basic_reject(delivery_tag) + continue # This will break from loop without running all the logic within the loop below here. + + msg = json.loads(body) + lu.cid_logger.info('Adding message to delivery table', extra=msg) + dao.add_delivery_msg(self.name, msg) + self.channel.basic_ack(delivery_tag) + self.evt.set() + + except pika.exceptions.ConnectionClosedByBroker: + logging.info('Connection closed by server.') + break + + except pika.exceptions.AMQPChannelError as err: + logging.exception(err) + break + + except pika.exceptions.AMQPConnectionError as err: + logging.exception(err) + logging.warning('Connection was closed, retrying after a pause.') + time.sleep(10) + continue + + # Tell the delivery thread to stop if the main thread got an error. + self.keep_running = False + self.evt.set() + if self.connection is not None: + logging.info('Closing connection') + self.connection.close() + + logging.info('Waiting for delivery thread') + delivery_thread.join() + + def delivery_thread_proc(self) -> None: + """ + This method runs in a separate thread and reads messages from the backing table, + calling the on_message handler for each one. It also removes messages from the + backing table when on_message returns MSG_OK or MSG_FAIL, and maintains the retry_count + attribute on MSG_RETRY returns. + """ + logging.info('Delivery threat started') + while self.keep_running: + self.evt.wait() + self.evt.clear() + if not self.keep_running: + break + + count = dao.get_delivery_msg_count(self.name) + if count < 1: + continue + + logging.info(f'Processing {count} messages') + msg_rows = dao.get_delivery_msg_batch(self.name) + for msg_uid, msg, retry_count in msg_rows: + lu.cid_logger.info(f'msg from table {msg_uid}, {retry_count}', extra=msg) + if not self.keep_running: + break + + p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] + l_uid = msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] + lu.cid_logger.info(f'Accepted message from physical / logical device ids {p_uid} / {l_uid}', extra=msg) + + pd = dao.get_physical_device(p_uid) + if pd is None: + lu.cid_logger.error(f'Could not find physical device, dropping message: {msg}', extra=msg) + dao.remove_delivery_msg(self.name, msg_uid) + + ld = dao.get_logical_device(l_uid) + if ld is None: + lu.cid_logger.error(f'Could not find logical device, dropping message: {msg}', extra=msg) + dao.remove_delivery_msg(self.name, msg_uid) + + lu.cid_logger.info(f'{pd.name} / {ld.name}', extra=msg) + + rc = self.on_message(pd, ld, msg, retry_count) + if rc == BaseWriter.MSG_OK: + lu.cid_logger.info('Message processed ok.', extra=msg) + dao.remove_delivery_msg(self.name, msg_uid) + elif rc == BaseWriter.MSG_RETRY: + # This is where the message should be published to a different exchange, + # private to the delivery service in question, so it can be retried later + # but not stuck at the head of the queue and immediately redelivered to + # here, possibly causing an endless loop. + lu.cid_logger.warning('Message processing failed, retrying message.', extra=msg) + dao.retry_delivery_msg(self.name, msg_uid) + elif rc == BaseWriter.MSG_FAIL: + lu.cid_logger.error('Message processing failed, dropping message.', extra=msg) + dao.remove_delivery_msg(self.name, msg_uid) + else: + lu.cid_logger.error(f'Invalid message processing return value: {rc}', extra=msg) + + dao.stop() + logging.info('Delivery threat stopped.') + + def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int: + """ + Subclasses must override this method and perform all transformation and delivery during + its execution. + + Implementations must decide what constitutes a temporary failure that can be retried, how + many times it is reasonable to retry, and when a message is undeliverable. + + pd: The PhysicalDevice that sent the message. + ld: The LogicalDevice to deliver the message to. + msg: The message content, in IoTa format. + retry_count: How many times this method has returned MSG_RETRY so far. + + return BaseWriter.MSG_OK to signal the message has been delivered and can be removed from the backing table. + BaseWriter.MSG_RETRY to signal a temporary delivery failure which is retryable. + BaseWriter.MSG_FAIL to signal the message cannot be delivered, or has failed too many times. + """ + lu.cid_logger.info(f'{pd.name} / {ld.name} / {retry_count}: {msg}', extra=msg) + return BaseWriter.MSG_OK + + def sigterm_handler(self, sig_no, stack_frame) -> None: + """ + Handle SIGTERM from docker by closing the mq and db connections and setting a + flag to tell the main loop to exit. + """ + logging.info(f'{signal.strsignal(sig_no)}, setting keep_running to False') + self.keep_running = False + dao.stop() + + # This breaks the endless loop in main. + self.channel.cancel() + + +if __name__ == '__main__': + # Does not return until SIGTERM is received. + deliverer = BaseWriter('test') + deliverer.run() + + logging.info('Exiting') diff --git a/src/python/delivery/FRRED.py b/src/python/delivery/FRRED.py index aed79c7b..b1bb5ba5 100644 --- a/src/python/delivery/FRRED.py +++ b/src/python/delivery/FRRED.py @@ -14,174 +14,61 @@ batch has been successfully processed. """ -import dateutil.parser +from typing import Any -import json, logging, math, os, signal, sys, time, uuid +import json, logging, os, sys, time import BrokerConstants -import pika, pika.channel, pika.spec -from pika.exchange_type import ExchangeType - -import api.client.Ubidots as ubidots - -import api.client.DAO as dao import util.LoggingUtil as lu -_user = os.environ['RABBITMQ_DEFAULT_USER'] -_passwd = os.environ['RABBITMQ_DEFAULT_PASS'] -_host = os.environ['RABBITMQ_HOST'] -_port = os.environ['RABBITMQ_PORT'] - -_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F' - -_channel = None +from delivery.BaseWriter import BaseWriter +from pdmodels.Models import LogicalDevice, PhysicalDevice -_finish = False - -_q_name = 'databolt_delivery' _raw_data_name = '/raw_data' -def sigterm_handler(sig_no, stack_frame) -> None: - """ - Handle SIGTERM from docker by closing the mq and db connections and setting a - flag to tell the main loop to exit. - """ - global _finish, _channel - - logging.info(f'{signal.strsignal(sig_no)}, setting finish to True') - _finish = True - dao.stop() - - # This breaks the endless loop in main. - _channel.cancel() +class DataboltWriter(BaseWriter): + def __init__(self) -> None: + super().__init__('databolt') + def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int: + try: + # Only messages from Wombat or Axistech nodes should be processed for SCMN. + if pd.source_name not in [BrokerConstants.WOMBAT, BrokerConstants.AXISTECH]: + lu.cid_logger.info(f'Rejecting message from source {pd.source_name}', extra=msg) + return DataboltWriter.MSG_OK -def main(): - """ - Initiate the connection to RabbitMQ and then idle until asked to stop. - - Because the messages from RabbitMQ arrive via async processing this function - has nothing to do after starting connection. - - It would be good to find a better way to do nothing than the current loop. - """ - global _channel, _finish, _q_name + # May as well use the message context id for the DataBolt directory and file name. + if not os.path.isdir(_raw_data_name): + lu.cid_logger.error(f'DataBolt {_raw_data_name} directory not found. This should be a mounted volume shared with the DataBolt container.', extra=msg) + sys.exit(1) - logging.info('===============================================================') - logging.info(' STARTING FRRED DATABOLT WRITER') - logging.info('===============================================================') + msg_uuid = msg[BrokerConstants.CORRELATION_ID_KEY] - logging.info('Opening connection') - connection = None - conn_attempts = 0 - backoff = 10 - while connection is None: - try: - connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str)) - except: - conn_attempts += 1 - logging.warning(f'Connection to RabbitMQ attempt {conn_attempts} failed.') - - if conn_attempts % 5 == 0 and backoff < 60: - backoff += 10 - - time.sleep(backoff) - - logging.info('Opening channel') - _channel = connection.channel() - _channel.basic_qos(prefetch_count=1) - logging.info('Declaring exchange') - _channel.exchange_declare( - exchange=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, - exchange_type=ExchangeType.fanout, - durable=True) - logging.info('Declaring queue') - _channel.queue_declare(queue=_q_name, durable=True) - logging.info('Binding queue to exchange') - _channel.queue_bind(_q_name, BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, 'logical_timeseries') - - # This loops until _channel.cancel is called in the signal handler. - for method, properties, body in _channel.consume(_q_name): - on_message(_channel, method, properties, body) - - logging.info('Closing connection') - connection.close() - - -def on_message(channel, method, properties, body): - """ - This function is called when a message arrives from RabbitMQ. - """ - - global _channel, _finish - - delivery_tag = method.delivery_tag - - # If the finish flag is set, reject the message so RabbitMQ will re-queue it - # and return early. - if _finish: - lu.cid_logger.info(f'NACK delivery tag {delivery_tag}, _finish is True', extra=msg) - _channel.basic_reject(delivery_tag) - return - - try: - # Parse the message just to confirm it is valid JSON. - msg = json.loads(body) - p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] - l_uid = msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] - - # TODO: Look into routing keys for deciding which messages to pass on to Intersect. - - # Only messages from Wombat nodes should be processed. This is a bit coarse-grained - # because Wombats could conceivably be used for other projects, but that is very - # unlikely now. To deterimine if the message is from a Wombat, look up the physical - # device and check the source_name. - pd = dao.get_physical_device(p_uid) - if pd is None: - lu.cid_logger.error(f'Physical device not found, cannot continue. Dropping message.', extra=msg) - # Ack the message, even though we cannot process it. We don't want it redelivered. - # We can change this to a Nack if that would provide extra context somewhere. - _channel.basic_ack(delivery_tag) - return - - if pd.source_name not in [BrokerConstants.WOMBAT, BrokerConstants.AXISTECH]: - _channel.basic_ack(delivery_tag) - return - - lu.cid_logger.info(f'Physical device: {pd.name}', extra=msg) - - # May as well use the message context id for the DataBolt directory and file name. - if not os.path.isdir(_raw_data_name): - logging.error(f'DataBolt {_raw_data_name} directory not found. This should be a mounted volume shared with the DataBolt container.') - sys.exit(1) - - msg_uuid = msg[BrokerConstants.CORRELATION_ID_KEY] - - old_umask = os.umask(0) - try: - lu.cid_logger.info(json.dumps(msg), extra=msg) - os.mkdir(f'{_raw_data_name}/{msg_uuid}') - with open(f'{_raw_data_name}/{msg_uuid}/{msg_uuid}.json', 'w') as f: - # The body argument is bytes, not a string. Using json.dump is a - # simple way to get a string written to the file. - json.dump(msg, f) + # Set a permissive umask to try and avoid problems with user-based file permissions between + # different containers and the host system. + old_umask = os.umask(0) + try: + os.mkdir(f'{_raw_data_name}/{msg_uuid}') + with open(f'{_raw_data_name}/{msg_uuid}/{msg_uuid}.json', 'w') as f: + # The body argument is bytes, not a string. Using json.dump is a + # simple way to get a string written to the file. + json.dump(msg, f) - except: - logging.exception('Failed to write message to DataBolt directory.') - _channel.basic_ack(delivery_tag) - sys.exit(1) + except: + lu.cid_logger.exception('Failed to write message to DataBolt directory.', extra=msg) + return DataboltWriter.MSG_FAIL - os.umask(old_umask) + # Put the old umask back in case some other file operations are done in the base class. + os.umask(old_umask) - # This tells RabbitMQ the message is handled and can be deleted from the queue. - _channel.basic_ack(delivery_tag) + return DataboltWriter.MSG_OK - except BaseException: - logging.exception('Error while processing message.') - _channel.basic_reject(delivery_tag, requeue=False) + except BaseException: + lu.cid_logger.exception('Error while processing message.', extra=msg) + return DataboltWriter.MSG_FAIL if __name__ == '__main__': @@ -189,10 +76,5 @@ def on_message(channel, method, properties, body): logging.error(f'DataBolt {_raw_data_name} directory not found. This should be a mounted volume shared with the DataBolt container.') sys.exit(1) - # Docker sends SIGTERM to tell the process the container is stopping so set - # a handler to catch the signal and initiate an orderly shutdown. - signal.signal(signal.SIGTERM, sigterm_handler) - - # Does not return until SIGTERM is received. - main() + DataboltWriter().run() logging.info('Exiting.') diff --git a/src/python/delivery/LTSLogger.py b/src/python/delivery/LTSLogger.py new file mode 100644 index 00000000..61445225 --- /dev/null +++ b/src/python/delivery/LTSLogger.py @@ -0,0 +1,30 @@ +""" +A test delivery service. Simply logs the messages it receives. +""" + +from typing import Any + +import logging + +import util.LoggingUtil as lu + +from delivery.BaseWriter import BaseWriter +from pdmodels.Models import LogicalDevice, PhysicalDevice + +class LTSLogger(BaseWriter): + def __init__(self) -> None: + super().__init__('ltslogger') + + def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int: + try: + lu.cid_logger.info(f'{msg}, retry_count = {retry_count}', extra=msg) + return LTSLogger.MSG_OK + + except BaseException: + logging.exception('Error while processing message.') + return LTSLogger.MSG_FAIL + + +if __name__ == '__main__': + LTSLogger().run() + logging.info('Exiting.') diff --git a/src/python/delivery/LTSReader.py b/src/python/delivery/LTSReader.py deleted file mode 100644 index 7a78c451..00000000 --- a/src/python/delivery/LTSReader.py +++ /dev/null @@ -1,99 +0,0 @@ -""" -This program receives logical device timeseries messages and logs -them as a test of having multiple queues attached to the logical_timeseries exchange. - -It can be used as a template for any program that wants to read from the logical -timeseries exchange. To do that, change the queue name to something unique. -""" - -import asyncio, json, logging, signal - -from pika.exchange_type import ExchangeType -import api.client.RabbitMQ as mq -import BrokerConstants -import util.LoggingUtil as lu - -rx_channel = None -mq_client = None -finish = False - - -def sigterm_handler(sig_no, stack_frame) -> None: - """ - Handle SIGTERM from docker by closing the mq and db connections and setting a - flag to tell the main loop to exit. - """ - global finish, mq_client - - logging.info(f'Caught signal {signal.strsignal(sig_no)}, setting finish to True') - finish = True - mq_client.stop() - - -async def main(): - """ - Initiate the connection to RabbitMQ and then idle until asked to stop. - - Because the messages from RabbitMQ arrive via async processing this function - has nothing to do after starting connection. - """ - global mq_client, rx_channel, finish - - logging.info('===============================================================') - logging.info(' STARTING LOGICAL_TIMESERIES TEST READER') - logging.info('===============================================================') - - # The routing key is ignored by fanout exchanges so it does not need to be a constant. - # Change the queue name. This code should change to use a server generated queue name. - rx_channel = mq.RxChannel(BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout, queue_name='ltsreader_logical_msg_queue', on_message=on_message, routing_key='logical_timeseries') - mq_client = mq.RabbitMQConnection(channels=[rx_channel]) - asyncio.create_task(mq_client.connect()) - - while not rx_channel.is_open: - await asyncio.sleep(0) - - while not finish: - await asyncio.sleep(2) - - while not mq_client.stopped: - await asyncio.sleep(1) - - -def on_message(channel, method, properties, body): - """ - This function is called when a message arrives from RabbitMQ. - """ - - global rx_channel, finish - - delivery_tag = method.delivery_tag - - # If the finish flag is set, reject the message so RabbitMQ will re-queue it - # and return early. - if finish: - rx_channel._channel.basic_reject(delivery_tag) - return - - msg = json.loads(body) - lu.cid_logger.info(f'Accepted message {msg}', extra=msg) - - # - # Message processing goes here - # - - - # This tells RabbitMQ the message is handled and can be deleted from the queue. - rx_channel._channel.basic_ack(delivery_tag) - - -if __name__ == '__main__': - # Docker sends SIGTERM to tell the process the container is stopping so set - # a handler to catch the signal and initiate an orderly shutdown. - signal.signal(signal.SIGTERM, sigterm_handler) - - # Ctrl-C sends SIGINT, handle it the same way. - signal.signal(signal.SIGINT, sigterm_handler) - - # Does not return until SIGTERM is received. - asyncio.run(main()) - logging.info('Exiting.') diff --git a/src/python/delivery/UbidotsWriter.py b/src/python/delivery/UbidotsWriter.py index 138dbc1b..9e0378f9 100644 --- a/src/python/delivery/UbidotsWriter.py +++ b/src/python/delivery/UbidotsWriter.py @@ -3,289 +3,190 @@ on to Ubidots. """ +import time +from typing import Any import dateutil.parser -import json, logging, math, os, signal, time, uuid +import logging, math, uuid import BrokerConstants -import pika, pika.channel, pika.spec -from pika.exchange_type import ExchangeType - import api.client.Ubidots as ubidots import api.client.DAO as dao +from delivery.BaseWriter import BaseWriter +from pdmodels.Models import LogicalDevice, PhysicalDevice import util.LoggingUtil as lu -_user = os.environ['RABBITMQ_DEFAULT_USER'] -_passwd = os.environ['RABBITMQ_DEFAULT_PASS'] -_host = os.environ['RABBITMQ_HOST'] -_port = os.environ['RABBITMQ_PORT'] - -_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F' - -_channel = None - -_finish = False - -def sigterm_handler(sig_no, stack_frame) -> None: - """ - Handle SIGTERM from docker by closing the mq and db connections and setting a - flag to tell the main loop to exit. - """ - global _finish, _channel - - logging.info(f'{signal.strsignal(sig_no)}, setting finish to True') - _finish = True - dao.stop() - - # This breaks the endless loop in main. - _channel.cancel() +class UbidotsWriter(BaseWriter): + def __init__(self) -> None: + super().__init__('ubidots') + + def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retry_count: int) -> int: + """ + This function is called when a message arrives from RabbitMQ. + + logical_timeseries has (not sure if physical dev uid is useful, discuss): + { + "physical_uid": 27, + "logical_uid": 16, + "timestamp": "2022-02-04T00:32:28.392595503Z", + "timeseries": [ + {"name": "battery", "value": 3.5}, + {"name": "humidity", "value": 95.11}, + {"name": "temperature", "value": 4.87} + ] + } + + This needs to be transformed to: + + { + 'battery': {'value': 3.6, 'timestamp': 1643934748392}, + 'humidity': {'value': 37.17, 'timestamp': 1643934748392}, + 'temperature': {'value': 37.17, 'timestamp': 1643934748392} + } + + So get the logical device from the db via the id in the message, and convert the iso-8601 timestamp to an epoch-style timestamp. + """ - -def main(): - """ - Initiate the connection to RabbitMQ and then idle until asked to stop. - - Because the messages from RabbitMQ arrive via async processing this function - has nothing to do after starting connection. - - It would be good to find a better way to do nothing than the current loop. - """ - global _channel, _finish - - logging.info('===============================================================') - logging.info(' STARTING UBIDOTS WRITER') - logging.info('===============================================================') - - logging.info('Opening connection') - connection = None - conn_attempts = 0 - backoff = 10 - while connection is None: try: - connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str)) - except: - conn_attempts += 1 - logging.warning(f'Connection to RabbitMQ attempt {conn_attempts} failed.') - - if conn_attempts % 5 == 0 and backoff < 60: - backoff += 10 - - time.sleep(backoff) - - logging.info('Opening channel') - _channel = connection.channel() - _channel.basic_qos(prefetch_count=1) - logging.info('Declaring exchange') - _channel.exchange_declare( - exchange=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, - exchange_type=ExchangeType.fanout, - durable=True) - logging.info('Declaring queue') - _channel.queue_declare(queue='ubidots_logical_msg_queue', durable=True) - _channel.queue_bind('ubidots_logical_msg_queue', BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, 'logical_timeseries') - - # This loops until _channel.cancel is called in the signal handler. - for method, properties, body in _channel.consume('ubidots_logical_msg_queue'): - on_message(_channel, method, properties, body) - - logging.info('Closing connection') - connection.close() - - -def on_message(channel, method, properties, body): - """ - This function is called when a message arrives from RabbitMQ. - - logical_timeseries has (not sure if physical dev uid is useful, discuss): - { - "physical_uid": 27, - "logical_uid": 16, - "timestamp": "2022-02-04T00:32:28.392595503Z", - "timeseries": [ - {"name": "battery", "value": 3.5}, - {"name": "humidity", "value": 95.11}, - {"name": "temperature", "value": 4.87} - ] - } - - This needs to be transformed to: - - { - 'battery': {'value': 3.6, 'timestamp': 1643934748392}, - 'humidity': {'value': 37.17, 'timestamp': 1643934748392}, - 'temperature': {'value': 37.17, 'timestamp': 1643934748392} - } - - So get the logical device from the db via the id in the message, and convert the iso-8601 timestamp to an epoch-style timestamp. - """ - - global _channel, _finish - - delivery_tag = method.delivery_tag - - # If the finish flag is set, reject the message so RabbitMQ will re-queue it - # and return early. - if _finish: - lu.cid_logger.info(f'NACK delivery tag {delivery_tag}, _finish is True', extra=msg) - _channel.basic_reject(delivery_tag) - return - - try: - msg = json.loads(body) - l_uid = msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY] - lu.cid_logger.info(f'Accepted message from logical device id {l_uid}', extra=msg) - - ld = dao.get_logical_device(l_uid) - if ld is None: - lu.cid_logger.error(f'Could not find logical device, dropping message: {body}', extra=msg) - _channel.basic_reject(delivery_tag, requeue=False) - return - - ts = 0.0 - # TODO: Find or create a class to hide all the Python datetime processing. - try: - ts_float = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]).timestamp() - # datetime.timestamp() returns a float where the ms are to the right of the - # decimal point. This should get us an integer value in ms. - ts = math.floor(ts_float * 1000) - except: - lu.cid_logger.error(f'Failed to parse timestamp from message: {msg[BrokerConstants.TIMESTAMP_KEY]}', extra=msg) - _channel.basic_reject(delivery_tag, requeue=False) - return - - ubidots_payload = {} - for v in msg[BrokerConstants.TIMESERIES_KEY]: - dot_ts = ts + ts = 0.0 + try: + ts_float = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY]).timestamp() + # datetime.timestamp() returns a float where the ms are to the right of the + # decimal point. This should get us an integer value in ms. + ts = math.floor(ts_float * 1000) + except: + lu.cid_logger.error(f'Failed to parse timestamp from message: {msg[BrokerConstants.TIMESTAMP_KEY]}', extra=msg) + return self.MSG_FAIL + + ubidots_payload = {} + for v in msg[BrokerConstants.TIMESERIES_KEY]: + dot_ts = ts + + # Override the default message timestamp if one of the dot entries has its + # own timestamp. + if BrokerConstants.TIMESTAMP_KEY in v: + try: + dot_ts_float = dateutil.parser.isoparse(v[BrokerConstants.TIMESTAMP_KEY]).timestamp() + dot_ts = math.floor(dot_ts_float * 1000) + except: + # dot_ts has already been set to msg timestamp above as a default value. + pass - # Override the default message timestamp if one of the dot entries has its - # own timestamp. - if BrokerConstants.TIMESTAMP_KEY in v: try: - dot_ts_float = dateutil.parser.isoparse(v[BrokerConstants.TIMESTAMP_KEY]).timestamp() - dot_ts = math.floor(dot_ts_float * 1000) - except: - # dot_ts has already been set to msg timestamp above as a default value. - pass - - try: - value = float(v['value']) - ubidots_payload[v['name']] = { - 'value': value, - 'timestamp': dot_ts, - 'context': { - BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY] + value = float(v['value']) + ubidots_payload[v['name']] = { + 'value': value, + 'timestamp': dot_ts, + 'context': { + BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY] + } } - } - except ValueError: - # Ubidots will not accept values that are not floats, so skip this value. - pass - - # - # TODO: Add some way to abstract the source-specific details of creating the Ubidots device. - # Anywhere this code has something like 'if pd.source_name...' it should be handled better. - # - # One idea is that once the broker is live (and if Ubidots supports this) we can stop the - # logical mapper for a short while and do a bulk relabel of all Ubidots devices to some - # scheme that does not require source-specific information, change this code, and restart - # the logical mapper. - # - new_device = False - if not 'ubidots' in ld.properties or not 'label' in ld.properties['ubidots']: - # If the Ubidots label is not in the logical device properties, the device - # may no exist in Ubidots yet so we must remember to read the Ubidots - # device back after writing the timeseries data so the device info can be - # stored in the logical device properties. - lu.cid_logger.info('No Ubidots label found in logical device.', extra=msg) - pd = dao.get_physical_device(msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY]) - if pd is None: - lu.cid_logger.error(f'Could not find physical device, dropping message: {body}', extra=msg) - _channel.basic_reject(delivery_tag, requeue=False) - return - - new_device = True - ld.properties['ubidots'] = {} - # TODO: Remove the device source specific code here and always use a random - # UUID for the Ubidots label. This cannot be done until the current TTN ubifunction - # is switched off because the broker must be able to determine the same device label - # used by the ubifunction when it creates Ubidots devices. - if pd.source_name == BrokerConstants.TTN: - ld.properties['ubidots']['label'] = pd.source_ids['dev_eui'] - lu.cid_logger.info(f'Using physical device eui for label: {ld.properties["ubidots"]["label"]}', extra=msg) - elif pd.source_name == BrokerConstants.GREENBRAIN: - lu.cid_logger.info('Using system-station-sensor-group ids as label', extra=msg) - system_id = pd.source_ids['system_id'] - station_id = pd.source_ids['station_id'] - sensor_group_id = pd.source_ids['sensor_group_id'] - ubi_label = f'{system_id}-{station_id}-{sensor_group_id}' - ld.properties['ubidots']['label'] = ubi_label - else: - lu.cid_logger.info('Using a UUID as Ubidots label', extra=msg) - ld.properties['ubidots']['label'] = uuid.uuid4() - - # - # So it doesn't get lost in all the surrounding code, here is where the - # data is posted to Ubidots. - # - ubidots_dev_label = ld.properties['ubidots']['label'] - ubidots.post_device_data(ubidots_dev_label, ubidots_payload) - - if new_device: - # Update the Ubidots device with info from the source device and/or the - # broker. - lu.cid_logger.info('Updating Ubidots device with information from source device.', extra=msg) - patch_obj = {'name': ld.name} - patch_obj['properties'] = {} - - # Prefer the logical device location, fall back to the mapped physical device - # location, if any. - loc = ld.location if ld.location is not None else pd.location - if loc is not None: - patch_obj['properties'] |= {'_location_type': 'manual', '_location_fixed': {'lat': loc.lat, 'lng': loc.long}} - - # We could include the correlation id of the message that caused the device to be created - # in the same format as the QR code id below, but I'm not sure that's useful and it might clutter - # up the Ubidots UI. - - if pd.source_name == BrokerConstants.TTN: - if BrokerConstants.TTN in pd.properties: - ttn_props = pd.properties[BrokerConstants.TTN] - if 'description' in ttn_props: - patch_obj['description'] = ttn_props['description'] - - if 'attributes' in ttn_props and 'uid' in ttn_props['attributes']: - cfg = {'dpi-uid': {'text': 'DPI UID', 'type': 'text', 'description': 'The uid from the DPI QR code used to activate the device.'}} - patch_obj['properties'] |= {'_config': cfg, 'dpi-uid': ttn_props['attributes']['uid']} - - # TODO: What about Green Brain devices? - - ubidots.update_device(ubidots_dev_label, patch_obj) - - # Update the newly created logical device properties with the information - # returned from Ubidots, but nothing else. We don't want to overwite the - # last_seen value because that should be set to the timestamp from the - # message, which was done in the mapper process. - lu.cid_logger.info('Updating logical device properties from Ubidots.', extra=msg) - ud = ubidots.get_device(ubidots_dev_label) - if ud is not None: - ld.properties['ubidots'] = ud.properties['ubidots'] - dao.update_logical_device(ld) - - # This tells RabbitMQ the message is handled and can be deleted from the queue. - _channel.basic_ack(delivery_tag) - #lu.cid_logger.info(f'ACK delivery tag {delivery_tag}', extra=msg) + except (ValueError, TypeError): + # Ubidots will not accept values that are not floats, so skip this value. + pass - except BaseException: - logging.exception('Error while processing message.') - _channel.basic_reject(delivery_tag, requeue=False) + # + # TODO: Add some way to abstract the source-specific details of creating the Ubidots device. + # Anywhere this code has something like 'if pd.source_name...' it should be handled better. + # + # One idea is that once the broker is live (and if Ubidots supports this) we can stop the + # logical mapper for a short while and do a bulk relabel of all Ubidots devices to some + # scheme that does not require source-specific information, change this code, and restart + # the logical mapper. + # + new_device = False + if not 'ubidots' in ld.properties or not 'label' in ld.properties['ubidots']: + # If the Ubidots label is not in the logical device properties, the device + # may no exist in Ubidots yet so we must remember to read the Ubidots + # device back after writing the timeseries data so the device info can be + # stored in the logical device properties. + lu.cid_logger.info('No Ubidots label found in logical device.', extra=msg) + + new_device = True + ld.properties['ubidots'] = {} + # TODO: Remove the device source specific code here and always use a random + # UUID for the Ubidots label. This cannot be done until the current TTN ubifunction + # is switched off because the broker must be able to determine the same device label + # used by the ubifunction when it creates Ubidots devices. + if pd.source_name == BrokerConstants.TTN: + ld.properties['ubidots']['label'] = pd.source_ids['dev_eui'] + lu.cid_logger.info(f'Using physical device eui for label: {ld.properties["ubidots"]["label"]}', extra=msg) + elif pd.source_name == BrokerConstants.GREENBRAIN: + lu.cid_logger.info('Using system-station-sensor-group ids as label', extra=msg) + system_id = pd.source_ids['system_id'] + station_id = pd.source_ids['station_id'] + sensor_group_id = pd.source_ids['sensor_group_id'] + ubi_label = f'{system_id}-{station_id}-{sensor_group_id}' + ld.properties['ubidots']['label'] = ubi_label + else: + lu.cid_logger.info('Using a UUID as Ubidots label', extra=msg) + ld.properties['ubidots']['label'] = uuid.uuid4() + + # + # So it doesn't get lost in all the surrounding code, here is where the + # data is posted to Ubidots. + # + ubidots_dev_label = ld.properties['ubidots']['label'] + if not ubidots.post_device_data(ubidots_dev_label, ubidots_payload, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}): + # The write to Ubidots failed. + lu.cid_logger.error('Delivery to Ubidots failed at API call.', extra=msg) + if retry_count > 4: + lu.cid_logger.error(f'Retried message 5 times, giving up.', extra=msg) + return BaseWriter.MSG_FAIL + else: + time.sleep(5) # Pause in case Ubidtos is flooded. + return BaseWriter.MSG_RETRY + + if new_device: + # Update the Ubidots device with info from the source device and/or the + # broker. + lu.cid_logger.info('Updating Ubidots device with information from source device.', extra=msg) + patch_obj = {'name': ld.name} + patch_obj['properties'] = {} + + # Prefer the logical device location, fall back to the mapped physical device + # location, if any. + loc = ld.location if ld.location is not None else pd.location + if loc is not None: + patch_obj['properties'] |= {'_location_type': 'manual', '_location_fixed': {'lat': loc.lat, 'lng': loc.long}} + + # We could include the correlation id of the message that caused the device to be created + # in the same format as the QR code id below, but I'm not sure that's useful and it might clutter + # up the Ubidots UI. + + if pd.source_name == BrokerConstants.TTN: + if BrokerConstants.TTN in pd.properties: + ttn_props = pd.properties[BrokerConstants.TTN] + if 'description' in ttn_props: + patch_obj['description'] = ttn_props['description'] + + if 'attributes' in ttn_props and 'uid' in ttn_props['attributes']: + cfg = {'dpi-uid': {'text': 'DPI UID', 'type': 'text', 'description': 'The uid from the DPI QR code used to activate the device.'}} + patch_obj['properties'] |= {'_config': cfg, 'dpi-uid': ttn_props['attributes']['uid']} + + # TODO: What about Green Brain devices? + + ubidots.update_device(ubidots_dev_label, patch_obj, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}) + + # Update the newly created logical device properties with the information + # returned from Ubidots, but nothing else. We don't want to overwite the + # last_seen value because that should be set to the timestamp from the + # message, which was done in the mapper process. + lu.cid_logger.info('Updating logical device properties from Ubidots.', extra=msg) + ud = ubidots.get_device(ubidots_dev_label, {BrokerConstants.CORRELATION_ID_KEY: msg[BrokerConstants.CORRELATION_ID_KEY]}) + if ud is not None: + ld.properties['ubidots'] = ud.properties['ubidots'] + dao.update_logical_device(ld) + + return self.MSG_OK + + except BaseException: + lu.cid_logger.exception('Error while processing message.', extra=msg) + return self.MSG_FAIL if __name__ == '__main__': - # Docker sends SIGTERM to tell the process the container is stopping so set - # a handler to catch the signal and initiate an orderly shutdown. - signal.signal(signal.SIGTERM, sigterm_handler) - - # Does not return until SIGTERM is received. - main() + UbidotsWriter().run() logging.info('Exiting.') From 69ad24ed9ac4e1104bfe7175fb4b91673e0bf8ee Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 11 Sep 2024 18:39:08 +1000 Subject: [PATCH 2/2] No longer using deveui from TTN devices for Ubidots device label, webapp map now colour codes icons based upon time last seen. --- src/python/delivery/UbidotsWriter.py | 10 +-- src/www/Dockerfile | 4 -- src/www/app/main.py | 104 ++++++++++++++++++++++----- src/www/requirements.txt | Bin 1142 -> 1130 bytes 4 files changed, 90 insertions(+), 28 deletions(-) diff --git a/src/python/delivery/UbidotsWriter.py b/src/python/delivery/UbidotsWriter.py index 9e0378f9..425c8329 100644 --- a/src/python/delivery/UbidotsWriter.py +++ b/src/python/delivery/UbidotsWriter.py @@ -107,13 +107,8 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retr new_device = True ld.properties['ubidots'] = {} # TODO: Remove the device source specific code here and always use a random - # UUID for the Ubidots label. This cannot be done until the current TTN ubifunction - # is switched off because the broker must be able to determine the same device label - # used by the ubifunction when it creates Ubidots devices. - if pd.source_name == BrokerConstants.TTN: - ld.properties['ubidots']['label'] = pd.source_ids['dev_eui'] - lu.cid_logger.info(f'Using physical device eui for label: {ld.properties["ubidots"]["label"]}', extra=msg) - elif pd.source_name == BrokerConstants.GREENBRAIN: + # UUID for the Ubidots label. + if pd.source_name == BrokerConstants.GREENBRAIN: lu.cid_logger.info('Using system-station-sensor-group ids as label', extra=msg) system_id = pd.source_ids['system_id'] station_id = pd.source_ids['station_id'] @@ -121,7 +116,6 @@ def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any], retr ubi_label = f'{system_id}-{station_id}-{sensor_group_id}' ld.properties['ubidots']['label'] = ubi_label else: - lu.cid_logger.info('Using a UUID as Ubidots label', extra=msg) ld.properties['ubidots']['label'] = uuid.uuid4() # diff --git a/src/www/Dockerfile b/src/www/Dockerfile index 5ac49ba6..9279ccdf 100644 --- a/src/www/Dockerfile +++ b/src/www/Dockerfile @@ -1,5 +1,3 @@ -# syntax=docker/dockerfile:1 - FROM python:3.10 WORKDIR /app @@ -7,8 +5,6 @@ WORKDIR /app COPY src/www/requirements.txt requirements.txt RUN pip install -r requirements.txt -COPY . . - EXPOSE 5000 CMD [ "python", "./app/main.py"] diff --git a/src/www/app/main.py b/src/www/app/main.py index e0e09285..397957a7 100644 --- a/src/www/app/main.py +++ b/src/www/app/main.py @@ -3,13 +3,15 @@ import logging import pandas as pd import time -from typing import Tuple +from typing import Dict, Tuple import uuid from zoneinfo import ZoneInfo from flask import Flask, render_template, request, redirect, url_for, session, send_from_directory, send_file import folium +import folium.plugins + import paho.mqtt.client as mqtt import os from datetime import timedelta, datetime, timezone @@ -78,7 +80,11 @@ def parse_location(loc_str: str) -> Tuple[bool, Location | None]: return True, location -def time_since(date: datetime) -> str: +_warning_seconds = 3600 * 6 ### How many seconds ago a device was seen to show a warning. +_error_seconds = 3600 * 12 ### How many seconds ago a device was seen to show an error. +_seconds_per_day = 3600 * 24 + +def time_since(date: datetime) -> Dict[str, int|str]: now = datetime.now(timezone.utc) date_utc = date.astimezone(timezone.utc) delta = now - date_utc @@ -86,14 +92,24 @@ def time_since(date: datetime) -> str: hours = int(delta.seconds / 3600) minutes = int((delta.seconds % 3600) / 60) seconds = int(delta.seconds % 60) + + ret_val = { + 'days': days, + 'hours': hours, + 'minutes': minutes, + 'delta_seconds': delta.seconds + (delta.days * _seconds_per_day) + } + if days > 0: - return f'{days} days ago' + ret_val['desc'] = f'{days} days ago' elif hours > 0: - return f'{hours} hours ago' + ret_val['desc'] = f'{hours} hours ago' elif minutes > 0: - return f'{minutes} minutes ago' + ret_val['desc'] = f'{minutes} minutes ago' + else: + ret_val['desc'] = f'{seconds} seconds ago' - return f'{seconds} seconds ago' + return ret_val #------------- @@ -474,17 +490,46 @@ def logical_device_form(uid): @app.route('/map', methods=['GET']) def show_map(): try: - center_map = folium.Map(location=[-32.2400951991083, 148.6324743348766], title='PhysicalDeviceMap', zoom_start=10) - # folium.Marker([-31.956194913619864, 115.85911692112582], popup="Mt. Hood Meadows", tooltip='click me').add_to(center_map) + # Map limits cover NSW. + center_map = folium.Map( + location=[-32.42, 147.5], + min_lat=-36.8, max_lat=-29.6, min_lon=141.7, max_lon=152.9, + max_bounds=True, + title='IoTa Logical Devices', + zoom_start=7) + + live_nodes = folium.FeatureGroup(name='Live', show=False) + late_nodes = folium.FeatureGroup(name='Late') + dead_nodes = folium.FeatureGroup(name='Missing') + + live_markers = [] + late_markers = [] + dead_markers = [] + data: List[LogicalDevice] = get_logical_devices(session.get('token'), include_properties=True) for dev in data: if dev.location is not None and dev.location.lat is not None and dev.location.long is not None: - color = 'blue' + color = 'green' + icon_name = 'circle' + marker_list = live_markers + last_seen = None if dev.last_seen is None: last_seen_desc = 'Never' + icon_name = 'circle-question' + color = 'red' + marker_list = dead_markers else: - last_seen_desc = time_since(dev.last_seen) + last_seen = time_since(dev.last_seen) + last_seen_desc = last_seen['desc'] + if last_seen['delta_seconds'] > _error_seconds: + color = 'red' + icon_name = 'circle-xmark' + marker_list = dead_markers + elif last_seen['delta_seconds'] > _warning_seconds: + color = 'orange' + icon_name = 'circle-exclamation' + marker_list = late_markers popup_str = f'Device: {dev.uid} / {dev.name}
Last seen: {last_seen_desc}' @@ -497,14 +542,41 @@ def show_map(): popup_str = popup_str + '
' - folium.Marker([dev.location.lat, dev.location.long], + marker = folium.Marker([dev.location.lat, dev.location.long], popup=popup_str, - icon=folium.Icon(color=color, icon='cloud'), - tooltip=dev.name).add_to(center_map) + icon=folium.Icon(color=color, icon=icon_name, prefix='fa'), + tooltip=f'{dev.name}, last seen {last_seen_desc}') + + marker_list.append(marker) + + # This was an attempt to set the draw order of the markers. It did not work + # but the code has been kept in case having this structure is useful or a + # way to make it work is found. + for marker in live_markers: + live_nodes.add_child(marker) + + for marker in late_markers: + late_nodes.add_child(marker) + + for marker in dead_markers: + dead_nodes.add_child(marker) + + center_map.add_child(live_nodes) + center_map.add_child(late_nodes) + center_map.add_child(dead_nodes) + + # It seems to be important to add the LayerControl down here. Doing it before + # the FeatureGroups are defined doesn't work. + folium.LayerControl(collapsed=False).add_to(center_map) + folium.plugins.Fullscreen( + position="topleft", + title="Full screen", + title_cancel="Exit full screen", + force_separate_button=True, + ).add_to(center_map) + + return center_map.get_root().render() - return center_map._repr_html_() - # center_map - # return render_template('map.html') except requests.exceptions.HTTPError as e: return render_template('error_page.html', reason=e), e.response.status_code diff --git a/src/www/requirements.txt b/src/www/requirements.txt index b25e4633b1fd6bee9e90e63d2d04b0d5bb923317..e9592485f80fcf8bf6fe497cb5ab4ed6223e1672 100644 GIT binary patch delta 23 ecmeyy@rq-D1*6$UOJhb>a|S&IgUz*!KbZhoI|kGM delta 35 pcmaFG@r`4G1*7RkOJhbsBL+PNLm)0-$Y&^KC}A+%T*&y72>__)2=o8|