Skip to content

Commit

Permalink
Robustness improvements (#6)
Browse files Browse the repository at this point in the history
* Reconcile repositories

Change-Id: I5651affc68165e1739d145f97fb68c7e5d1979cc

* Timestamp fix

Change-Id: Ib537b56bc075de29e3e369349152a3269c6d3e38

* Improved the robustness of the component

Better handling of metric_list messages with an identical version

Improved the robustness of the component in case of a broker connector failure

Change-Id: I79480b1cebc769d1ea09ba6d17f75f535e5c1569
  • Loading branch information
atsag authored Oct 3, 2024
1 parent 5bfd92e commit 7a314d2
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 35 deletions.
95 changes: 60 additions & 35 deletions monitoring-data-persistor/src/main/runtime/DataPersistor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import sys
import threading
from time import sleep

from jproperties import Properties

from influxdb_client import Point, WritePrecision

import exn
from main.runtime import State
from main.runtime.Constants import Constants
from main.runtime.InfluxDBConnector import InfluxDBConnector
from exn import connector, core
Expand Down Expand Up @@ -47,43 +50,65 @@ def GenericConsumerHandler(self):
def on_message(self, key, address, body, context, **kwargs):

if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic):
need_to_restart_connector = False
application_name = body["name"]
message_version = body["version"]
logging.info("New metrics list message for application "+application_name + " - registering new connector")
if (application_name in self.application_consumer_handler_connectors.keys()):
logging.info("Stopping the old existing connector...")
self.application_consumer_handler_connectors[application_name].stop()
logging.info("Attempting to register new connector...")
self.application_consumer_handler_connectors[application_name] = exn.connector.EXN(
Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(),
consumers=[
core.consumer.Consumer('monitoring-'+application_name,
Constants.monitoring_broker_topic + '.realtime.>',
application=application_name,
topic=True,
fqdn=True,
handler=ConsumerHandler(application_name=application_name)
),
],
url=Constants.broker_ip,
port=Constants.broker_port,
username=Constants.broker_username,
password=Constants.broker_password
)
logging.info("Connector ready to be registered")
#connector.start()
#self.application_consumer_handler_connectors[application_name] = self.initialized_connector
#self.application_consumer_handler_connectors[application_name].start()
logging.info(f"Application specific connector registered for application {application_name}")
#self.initialized_connector.start()

#If threading support is explicitly required, uncomment these lines
self.application_threads[application_name] = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
self.application_threads[application_name].start()
logging.info(f"Application specific connector started for application {application_name}")
self.application_threads[application_name].join()
#connector_thread = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
#connector_thread.start()
#connector_thread.join()

if (application_name in State.metric_list_message_versions and message_version > State.metric_list_message_versions[application_name]):
logging.info("Stopping the old connector (as a new message version arrived)...")
State.metric_list_message_versions[application_name] = message_version
need_to_restart_connector = True
elif not (application_name in State.metric_list_message_versions):
logging.info("Starting a new connector...")
State.metric_list_message_versions[application_name] = message_version
need_to_restart_connector = True
else:
logging.info("No need to stop the old existing connector, as the message version has not been updated...")

while (need_to_restart_connector):
try:
need_to_restart_connector = False
if (application_name in self.application_consumer_handler_connectors.keys()):
self.application_consumer_handler_connectors[application_name].stop()
logging.info("Attempting to register new connector...")
self.application_consumer_handler_connectors[application_name] = exn.connector.EXN(
Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(),
consumers=[
core.consumer.Consumer('monitoring-'+application_name,
Constants.monitoring_broker_topic + '.realtime.>',
application=application_name,
topic=True,
fqdn=True,
handler=ConsumerHandler(application_name=application_name)
),
],
url=Constants.broker_ip,
port=Constants.broker_port,
username=Constants.broker_username,
password=Constants.broker_password
)
logging.info("Connector ready to be registered")
#connector.start()
#self.application_consumer_handler_connectors[application_name] = self.initialized_connector
#self.application_consumer_handler_connectors[application_name].start()
logging.info(f"Application specific connector registered for application {application_name}")
#self.initialized_connector.start()

#If threading support is explicitly required, uncomment these lines
self.application_threads[application_name] = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
self.application_threads[application_name].start()
logging.info(f"Application specific connector started for application {application_name}")
self.application_threads[application_name].join()
#connector_thread = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
#connector_thread.start()
#connector_thread.join()
except Exception as e:
logging.error("Exception occurred while trying to re-create connector for application %s, will retry in 5 seconds",application_name)
logging.exception(e)
need_to_restart_connector = True
sleep(5)


def update_properties(configuration_file_location):
p = Properties()
Expand Down
1 change: 1 addition & 0 deletions monitoring-data-persistor/src/main/runtime/State.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
metric_list_message_versions={}

0 comments on commit 7a314d2

Please sign in to comment.