diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 978040d..8b130ba 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -272,8 +272,8 @@ def calculate_and_publish_predictions(application_state,maximum_time_required_fo #State.connection.send_to_topic('training_models',training_models_message_body) message_not_sent = False - print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s:\n\n%s\n\n" % (attribute, EsPredictorState.forecaster_name, attribute, prediction_message_body)) - except ConnectionError as exception: + print_with_time("Successfully sent prediction message for "+str(attribute)+" to topic "+EsPredictorState.get_prediction_publishing_topic(attribute)+":\n\n%s\n\n" % (prediction_message_body)) + except Exception as exception: #State.connection.disconnect() #State.connection = messaging.morphemic.Connection('admin', 'admin') #State.connection.connect() @@ -471,6 +471,22 @@ def on_message(self, key, address, body, context, **kwargs): else: print_with_time("The address was "+ address +" and did not match metrics_to_predict/test.exponentialsmoothing/start_forecasting.exponentialsmoothing/stop_forecasting.exponentialsmoothing") # logging.info(f"Received {key} => {address}") + + elif (address).startswith(EsPredictorState.COMPONENT_STATE_PREFIX): + + import os + # Specify the directory and filename + directory = "/home" + filename = "is_alive.txt" + + # Create the file + with open(os.path.join(directory, filename), "w") as f: + current_message = print_with_time(f"Liveness probe received at {address}") + f.write(current_message) + + #print(f"Liveness probe file created at {directory}/{filename}") + + else: print_with_time("Received message "+body+" but could not handle it") def get_dataset_file(attribute): @@ -508,15 +524,26 @@ def main(): while True: topics_to_subscribe = ["eu.nebulouscloud.monitoring.metric_list","eu.nebulouscloud.monitoring.realtime.>","eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing","eu.nebulouscloud.forecasting.stop_forecasting.exponentialsmoothing"] + + topics_to_publish = ["eu.nebulouscloud.state.exponentialsmoothing.isalive"] + current_consumers = [] + current_publishers = [] for topic in topics_to_subscribe: - current_consumer = core.consumer.Consumer(key='monitoring_'+topic,address=topic,handler=ConsumerHandler(), topic=True,fqdn=True) + current_consumer = core.consumer.Consumer(key='exsmoothing_forecasting_'+topic,address=topic,handler=ConsumerHandler(), topic=True,fqdn=True) EsPredictorState.broker_consumers.append(current_consumer) current_consumers.append(current_consumer) + + for topic in topics_to_publish: + current_publisher = core.publisher.Publisher(key='exsmoothing_forecasting_'+topic,address=topic, topic=True,fqdn=True) + EsPredictorState.broker_publishers.append(current_publisher) + current_publishers.append(current_publisher) + EsPredictorState.subscribing_connector = connector.EXN(EsPredictorState.forecaster_name, handler=BootStrap(), #consumers=list(State.broker_consumers), consumers=EsPredictorState.broker_consumers, + publishers=current_publishers, url=EsPredictorState.broker_address, port=EsPredictorState.broker_port, username=EsPredictorState.broker_username, diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py index e7d49cc..a553bf1 100755 --- a/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py +++ b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py @@ -22,10 +22,14 @@ class EsPredictorState: """ Fail-safe default values introduced below """ + preliminary_prediction_publishing_topic_prefix = "eu.nebulouscloud.preliminary_predicted." + prediction_publishing_topic_prefix = "eu.nebulouscloud.monitoring.predicted." + publish_predictions_as_preliminary = True application_name_prefix = "nebulous_" GENERAL_TOPIC_PREFIX = "eu.nebulouscloud." MONITORING_DATA_PREFIX = "monitoring." FORECASTING_CONTROL_PREFIX = "forecasting." + COMPONENT_STATE_PREFIX = "state." #Used to create the dataset from the InfluxDB influxdb_organization = "my-org" @@ -64,4 +68,10 @@ class EsPredictorState: def check_stale_connection(): return (not EsPredictorState.subscribing_connector) - + @staticmethod + def get_prediction_publishing_topic(metric_name): + if EsPredictorState.publish_predictions_as_preliminary: + return EsPredictorState.preliminary_prediction_publishing_topic_prefix+EsPredictorState.forecaster_name+"."+metric_name + #'eu.nebulouscloud.preliminary_predicted.'+EsPredictorState.forecaster_name+"."+metric_name + else: + return EsPredictorState.prediction_publishing_topic_prefix+metric_name \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py index 094f185..edcf977 100755 --- a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py @@ -4,9 +4,16 @@ class PredictionPublisher(core.publisher.Publisher): metric_name = "" + private_publisher = None def __init__(self,application_name,metric_name): - super().__init__('publisher_'+application_name+'-'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+EsPredictorState.forecaster_name+"."+metric_name, True,True) + super().__init__('publisher_'+application_name+'-'+metric_name, EsPredictorState.get_prediction_publishing_topic(metric_name), True,True) self.metric_name = metric_name + self.private_publisher = self - def send(self, body={}, application=""): - super(PredictionPublisher, self).send(body, application) + def send(self, body={}, application_name=""): + try: + super(PredictionPublisher, self).send(body, application_name) + + except Exception as e: + self.private_publisher = super().__init__('publisher_'+application_name+'-'+self.metric_name, EsPredictorState.get_prediction_publishing_topic(self.metric_name), True,True) + super(PredictionPublisher, self).send(body, application_name) diff --git a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py index 28e362f..d5c573a 100755 --- a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py @@ -20,7 +20,9 @@ class Utilities: @staticmethod def print_with_time(x): now = datetime.datetime.now() - print("["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x)) + string_to_print = "["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x) + print(string_to_print) + return string_to_print @staticmethod def load_configuration():