From 8f5c1a6231eeca16c29e054a856698246b14ec26 Mon Sep 17 00:00:00 2001 From: atsag Date: Thu, 3 Oct 2024 12:05:52 +0300 Subject: [PATCH] Improved the robustness of the component Better handling of metric_list messages with an identical version Improved the robustness of the component in case of a broker connector failure Change-Id: I79480b1cebc769d1ea09ba6d17f75f535e5c1569 --- .../src/main/runtime/DataPersistor.py | 95 ++++++++++++------- .../src/main/runtime/State.py | 1 + 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/monitoring-data-persistor/src/main/runtime/DataPersistor.py b/monitoring-data-persistor/src/main/runtime/DataPersistor.py index b2c6f58..d009ca0 100755 --- a/monitoring-data-persistor/src/main/runtime/DataPersistor.py +++ b/monitoring-data-persistor/src/main/runtime/DataPersistor.py @@ -1,11 +1,14 @@ import logging import sys import threading +from time import sleep + from jproperties import Properties from influxdb_client import Point, WritePrecision import exn +from main.runtime import State from main.runtime.Constants import Constants from main.runtime.InfluxDBConnector import InfluxDBConnector from exn import connector, core @@ -47,43 +50,65 @@ def GenericConsumerHandler(self): def on_message(self, key, address, body, context, **kwargs): if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic): + need_to_restart_connector = False application_name = body["name"] + message_version = body["version"] logging.info("New metrics list message for application "+application_name + " - registering new connector") - if (application_name in self.application_consumer_handler_connectors.keys()): - logging.info("Stopping the old existing connector...") - self.application_consumer_handler_connectors[application_name].stop() - logging.info("Attempting to register new connector...") - self.application_consumer_handler_connectors[application_name] = exn.connector.EXN( - Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(), - consumers=[ - core.consumer.Consumer('monitoring-'+application_name, - Constants.monitoring_broker_topic + '.realtime.>', - application=application_name, - topic=True, - fqdn=True, - handler=ConsumerHandler(application_name=application_name) - ), - ], - url=Constants.broker_ip, - port=Constants.broker_port, - username=Constants.broker_username, - password=Constants.broker_password - ) - logging.info("Connector ready to be registered") - #connector.start() - #self.application_consumer_handler_connectors[application_name] = self.initialized_connector - #self.application_consumer_handler_connectors[application_name].start() - logging.info(f"Application specific connector registered for application {application_name}") - #self.initialized_connector.start() - - #If threading support is explicitly required, uncomment these lines - self.application_threads[application_name] = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=()) - self.application_threads[application_name].start() - logging.info(f"Application specific connector started for application {application_name}") - self.application_threads[application_name].join() - #connector_thread = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=()) - #connector_thread.start() - #connector_thread.join() + + if (application_name in State.metric_list_message_versions and message_version > State.metric_list_message_versions[application_name]): + logging.info("Stopping the old connector (as a new message version arrived)...") + State.metric_list_message_versions[application_name] = message_version + need_to_restart_connector = True + elif not (application_name in State.metric_list_message_versions): + logging.info("Starting a new connector...") + State.metric_list_message_versions[application_name] = message_version + need_to_restart_connector = True + else: + logging.info("No need to stop the old existing connector, as the message version has not been updated...") + + while (need_to_restart_connector): + try: + need_to_restart_connector = False + if (application_name in self.application_consumer_handler_connectors.keys()): + self.application_consumer_handler_connectors[application_name].stop() + logging.info("Attempting to register new connector...") + self.application_consumer_handler_connectors[application_name] = exn.connector.EXN( + Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(), + consumers=[ + core.consumer.Consumer('monitoring-'+application_name, + Constants.monitoring_broker_topic + '.realtime.>', + application=application_name, + topic=True, + fqdn=True, + handler=ConsumerHandler(application_name=application_name) + ), + ], + url=Constants.broker_ip, + port=Constants.broker_port, + username=Constants.broker_username, + password=Constants.broker_password + ) + logging.info("Connector ready to be registered") + #connector.start() + #self.application_consumer_handler_connectors[application_name] = self.initialized_connector + #self.application_consumer_handler_connectors[application_name].start() + logging.info(f"Application specific connector registered for application {application_name}") + #self.initialized_connector.start() + + #If threading support is explicitly required, uncomment these lines + self.application_threads[application_name] = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=()) + self.application_threads[application_name].start() + logging.info(f"Application specific connector started for application {application_name}") + self.application_threads[application_name].join() + #connector_thread = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=()) + #connector_thread.start() + #connector_thread.join() + except Exception as e: + logging.error("Exception occurred while trying to re-create connector for application %s, will retry in 5 seconds",application_name) + logging.exception(e) + need_to_restart_connector = True + sleep(5) + def update_properties(configuration_file_location): p = Properties() diff --git a/monitoring-data-persistor/src/main/runtime/State.py b/monitoring-data-persistor/src/main/runtime/State.py index e69de29..36e69e7 100755 --- a/monitoring-data-persistor/src/main/runtime/State.py +++ b/monitoring-data-persistor/src/main/runtime/State.py @@ -0,0 +1 @@ +metric_list_message_versions={} \ No newline at end of file