From f9bc3d28a903d917dca6cf179e629ec08dda10ab Mon Sep 17 00:00:00 2001 From: atsag Date: Thu, 11 Jul 2024 14:28:51 +0300 Subject: [PATCH 01/13] Miscellaneous improvements Added required dependencies in the setup script Improved logging during dataset creation Added README.md file Change-Id: I67e98938f3660656d2ced916f258dfe5d475ecd3 --- .gitignore | 0 .gitreview | 5 --- .hadolint.yaml | 2 - .yamllint | 0 LICENSE | 0 .../.helmignore | 0 .../Chart.yaml | 0 .../templates/NOTES.txt | 0 .../templates/_helpers.tpl | 0 .../templates/deployment.yaml | 0 .../templates/hpa.yaml | 0 .../templates/ingress.yaml | 0 .../templates/service.yaml | 0 .../templates/serviceaccount.yaml | 0 .../values.yaml | 0 exponential-smoothing-predictor/Dockerfile | 0 exponential-smoothing-predictor/README.md | 37 +++++++++++++++++++ .../src/exn/__init__.py | 0 .../src/exn/connector.py | 0 .../src/exn/core/__init__.py | 0 .../src/exn/core/consumer.py | 0 .../src/exn/core/context.py | 0 .../src/exn/core/handler.py | 0 .../src/exn/core/link.py | 0 .../src/exn/core/manager.py | 0 .../src/exn/core/publisher.py | 0 .../src/exn/core/schedule_publisher.py | 0 .../src/exn/core/state_publisher.py | 0 .../src/exn/handler/__init__.py | 0 .../src/exn/handler/connector_handler.py | 0 .../src/exn/settings/__init__.py | 0 .../src/exn/settings/base.py | 0 .../src/prepare_python_dependencies.sh | 0 .../src/r_predictors/__init__.py | 0 .../src/r_predictors/default_application.csv | 0 .../r_predictors/forecasting_real_workload.R | 0 ...rediction_configuration-windows.properties | 0 .../prediction_configuration.properties | 2 +- .../src/r_predictors/r_commands.R | 0 .../src/requirements.txt | 0 .../src/runtime/Predictor.py | 17 ++++++--- .../src/runtime/__init__.py | 0 .../operational_status/ApplicationState.py | 20 ++++------ .../operational_status/EsPredictorState.py | 0 .../runtime/operational_status/__init__.py | 0 .../src/runtime/predictions/Prediction.py | 0 .../src/runtime/predictions/__init__.py | 0 .../predictions/prediction_requirements.txt | 0 .../runtime/utilities/InfluxDBConnector.py | 0 .../runtime/utilities/PredictionPublisher.py | 0 .../src/runtime/utilities/Utilities.py | 0 .../src/runtime/utilities/__init__.py | 0 exponential-smoothing-predictor/src/setup.py | 8 +++- java-spring-boot-demo/.gitignore | 0 java-spring-boot-demo/Dockerfile | 0 java-spring-boot-demo/pom.xml | 0 .../com/example/demo/DemoApplication.java | 0 .../java/com/example/demo/DemoController.java | 0 .../src/main/resources/application.properties | 0 .../example/demo/DemoApplicationTests.java | 0 noxfile.py | 0 zuul.d/jobs.yaml | 0 zuul.d/project.yaml | 0 63 files changed, 64 insertions(+), 27 deletions(-) mode change 100644 => 100755 .gitignore delete mode 100644 .gitreview delete mode 100644 .hadolint.yaml mode change 100644 => 100755 .yamllint mode change 100644 => 100755 LICENSE mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/.helmignore mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/Chart.yaml mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/NOTES.txt mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/_helpers.tpl mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/deployment.yaml mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/hpa.yaml mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/ingress.yaml mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/service.yaml mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/templates/serviceaccount.yaml mode change 100644 => 100755 charts/nebulous-exponential-smoothing-predictor/values.yaml mode change 100644 => 100755 exponential-smoothing-predictor/Dockerfile create mode 100755 exponential-smoothing-predictor/README.md mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/connector.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/consumer.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/context.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/handler.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/link.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/manager.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/publisher.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/schedule_publisher.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/core/state_publisher.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/handler/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/handler/connector_handler.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/settings/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/exn/settings/base.py mode change 100644 => 100755 exponential-smoothing-predictor/src/prepare_python_dependencies.sh mode change 100644 => 100755 exponential-smoothing-predictor/src/r_predictors/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/r_predictors/default_application.csv mode change 100644 => 100755 exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R mode change 100644 => 100755 exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties mode change 100644 => 100755 exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties mode change 100644 => 100755 exponential-smoothing-predictor/src/r_predictors/r_commands.R mode change 100644 => 100755 exponential-smoothing-predictor/src/requirements.txt mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/Predictor.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/operational_status/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/predictions/Prediction.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/predictions/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/predictions/prediction_requirements.txt mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/utilities/Utilities.py mode change 100644 => 100755 exponential-smoothing-predictor/src/runtime/utilities/__init__.py mode change 100644 => 100755 exponential-smoothing-predictor/src/setup.py mode change 100644 => 100755 java-spring-boot-demo/.gitignore mode change 100644 => 100755 java-spring-boot-demo/Dockerfile mode change 100644 => 100755 java-spring-boot-demo/pom.xml mode change 100644 => 100755 java-spring-boot-demo/src/main/java/com/example/demo/DemoApplication.java mode change 100644 => 100755 java-spring-boot-demo/src/main/java/com/example/demo/DemoController.java mode change 100644 => 100755 java-spring-boot-demo/src/main/resources/application.properties mode change 100644 => 100755 java-spring-boot-demo/src/test/java/com/example/demo/DemoApplicationTests.java mode change 100644 => 100755 noxfile.py mode change 100644 => 100755 zuul.d/jobs.yaml mode change 100644 => 100755 zuul.d/project.yaml diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/.gitreview b/.gitreview deleted file mode 100644 index 21767f4..0000000 --- a/.gitreview +++ /dev/null @@ -1,5 +0,0 @@ -[gerrit] -host=review.opendev.org -port=29418 -project=nebulous/exponential-smoothing-predictor.git -defaultbranch=master diff --git a/.hadolint.yaml b/.hadolint.yaml deleted file mode 100644 index 0aa5f7c..0000000 --- a/.hadolint.yaml +++ /dev/null @@ -1,2 +0,0 @@ -ignored: - - DL3008 \ No newline at end of file diff --git a/.yamllint b/.yamllint old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/.helmignore b/charts/nebulous-exponential-smoothing-predictor/.helmignore old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/Chart.yaml b/charts/nebulous-exponential-smoothing-predictor/Chart.yaml old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/NOTES.txt b/charts/nebulous-exponential-smoothing-predictor/templates/NOTES.txt old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/_helpers.tpl b/charts/nebulous-exponential-smoothing-predictor/templates/_helpers.tpl old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/deployment.yaml b/charts/nebulous-exponential-smoothing-predictor/templates/deployment.yaml old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/hpa.yaml b/charts/nebulous-exponential-smoothing-predictor/templates/hpa.yaml old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/ingress.yaml b/charts/nebulous-exponential-smoothing-predictor/templates/ingress.yaml old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/service.yaml b/charts/nebulous-exponential-smoothing-predictor/templates/service.yaml old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/templates/serviceaccount.yaml b/charts/nebulous-exponential-smoothing-predictor/templates/serviceaccount.yaml old mode 100644 new mode 100755 diff --git a/charts/nebulous-exponential-smoothing-predictor/values.yaml b/charts/nebulous-exponential-smoothing-predictor/values.yaml old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/Dockerfile b/exponential-smoothing-predictor/Dockerfile old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/README.md b/exponential-smoothing-predictor/README.md new file mode 100755 index 0000000..9d59045 --- /dev/null +++ b/exponential-smoothing-predictor/README.md @@ -0,0 +1,37 @@ +# Exponential Smoothing predictor + +## Introduction + +The exponential smoothing predictor is one of the predictors that will be available to the NebulOuS platform for forecasting. Along with other prediction methods, it provides its input to the Prediction Orchestrator which then aggregates it and forwards finalized predictions. + +To be used, it follows the process which is outlined in the NebulOuS wiki page related to the generation of predictions: + +https://github.com/eu-nebulous/nebulous/wiki/5.3-Predictions-Generation-Process + +The exponential smoothing forecaster is based on the use of the Holt-Winters R library to generate predictions. It uses data stored by the monitoring data persistor module (https://github.com/eu-nebulous/monitoring-data-persistor). Data is resampled in order not to create too much load on the forecasting component. Based on the input events, it generates a number of forward predictions, which can be exploited by the Prediction Orchestrator + +## Component outline and use + +Before starting the component, please ensure that there is at least one recent prediction (where recent is at most as old as the expected difference between the time of the first prediction and the current time), and at least one data point as old as three times the difference between the first prediction and the current time. +To illustrate, assume that the current time is 1705046000, and the first prediction will happen at 1705046500. Then, one recent datapoint is needed (not older than 1705045500) and at least one older datapoint is needed (not more recent than 1705044500). +Usually, a couple of minutes of recent data should be enough for predictions a few seconds ahead (please scale accordingly, depending on your scenario) + +The component initially waits for a `start_forecasting` sent to the `eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing` topic. +An example payload for this event follows: + +```json +{ + "name": "_Application1", + "metrics": ["cpu_usage"], + "timestamp": 1705046535, + "epoch_start": 1705046500, + "number_of_forward_predictions": 5, + "prediction_horizon": 10 +} +``` + +Once this event is sent, it subscribes to the appropriate metric topics and publishes values as required (in the scenario above, 5 predictions every 10 seconds for the cpu_usage metric). + +Optionally, and preferably before the start_forecasting event, the component can receive a metric_list event (Event type 3 in the SLO Severity-based Violation Detector interface). Using the information related to the metrics which is contained there, it will be able to constrain predictions to an admissible range for each metric it provides predictions for. + +https://github.com/eu-nebulous/nebulous/wiki/5.2-The-SLO-Severity%E2%80%90based-Violation-Detector-Interface diff --git a/exponential-smoothing-predictor/src/exn/__init__.py b/exponential-smoothing-predictor/src/exn/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/connector.py b/exponential-smoothing-predictor/src/exn/connector.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/__init__.py b/exponential-smoothing-predictor/src/exn/core/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/consumer.py b/exponential-smoothing-predictor/src/exn/core/consumer.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/context.py b/exponential-smoothing-predictor/src/exn/core/context.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/handler.py b/exponential-smoothing-predictor/src/exn/core/handler.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/link.py b/exponential-smoothing-predictor/src/exn/core/link.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/manager.py b/exponential-smoothing-predictor/src/exn/core/manager.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/publisher.py b/exponential-smoothing-predictor/src/exn/core/publisher.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/schedule_publisher.py b/exponential-smoothing-predictor/src/exn/core/schedule_publisher.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/core/state_publisher.py b/exponential-smoothing-predictor/src/exn/core/state_publisher.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/handler/__init__.py b/exponential-smoothing-predictor/src/exn/handler/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/handler/connector_handler.py b/exponential-smoothing-predictor/src/exn/handler/connector_handler.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/settings/__init__.py b/exponential-smoothing-predictor/src/exn/settings/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/exn/settings/base.py b/exponential-smoothing-predictor/src/exn/settings/base.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/prepare_python_dependencies.sh b/exponential-smoothing-predictor/src/prepare_python_dependencies.sh old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/r_predictors/__init__.py b/exponential-smoothing-predictor/src/r_predictors/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/r_predictors/default_application.csv b/exponential-smoothing-predictor/src/r_predictors/default_application.csv old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R b/exponential-smoothing-predictor/src/r_predictors/forecasting_real_workload.R old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties old mode 100644 new mode 100755 index d24ac5c..0e38281 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties @@ -1,4 +1,4 @@ -#Thu May 16 12:31:21 UTC 2024 +#Thu Jul 11 09:34:44 UTC 2024 APP_NAME=default_application METHOD=exponential_smoothing INFLUXDB_HOSTNAME=nebulous-influxdb diff --git a/exponential-smoothing-predictor/src/r_predictors/r_commands.R b/exponential-smoothing-predictor/src/r_predictors/r_commands.R old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/requirements.txt b/exponential-smoothing-predictor/src/requirements.txt old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py old mode 100644 new mode 100755 index f2c18e3..cd11653 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -109,7 +109,7 @@ def predict_attribute(application_state, attribute, configuration_file_location, process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): - print_with_time("Empty output from R predictions - the error output is the following:") + logging.info("Empty output from R predictions - the error output is the following:") print(process_output.stderr) #There was an error during the calculation of the predicted value process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split() @@ -139,8 +139,14 @@ def predict_attribute(application_state, attribute, configuration_file_location, prediction_valid = True print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval) else: - print_with_time("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows") - print_with_time(process_output.stdout) + logging.info("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows") + logging.info(process_output.stdout) + logging.info("\n") + logging.info("----------------------") + logging.info("Printing stderr") + logging.info("----------------------") + logging.info("\n") + logging.info(process_output.stderr) output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape) return output_prediction @@ -454,8 +460,9 @@ def main(): #Change to the appropriate directory in order i) To invoke the forecasting script appropriately and ii) To store the monitoring data necessary for predictions from sys import platform - if platform == "win32": - os.chdir("exponential-smoothing-predictor/src/r_predictors") + if platform == "win32" or bool(os.environ["TEST_RUN"]): + print(os.listdir(".")) + os.chdir("../r_predictors") # linux elif platform == "linux" or platform == "linux2": os.chdir("/home/r_predictions") diff --git a/exponential-smoothing-predictor/src/runtime/__init__.py b/exponential-smoothing-predictor/src/runtime/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py old mode 100644 new mode 100755 index cf7f80d..b86d922 --- a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py +++ b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py @@ -77,20 +77,10 @@ def update_monitoring_data(self): Utilities.print_with_time("Starting dataset creation process...") try: - """ - Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method - influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost") - influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086")) - influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic") - influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password") - influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic") - influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic") - application_name = "default_application" - """ for metric_name in self.metrics_to_predict: time_interval_to_get_data_for = str(EsPredictorState.number_of_days_to_use_data_from) + "d" print_data_from_db = True - query_string = 'from(bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")' + query_string = 'from (bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")' influx_connector = InfluxDBConnector() logging.info("performing query for application with bucket "+str(self.influxdb_bucket)) logging.info("The body of the query is "+query_string) @@ -98,9 +88,13 @@ def update_monitoring_data(self): current_time = time.time() result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization) elapsed_time = time.time()-current_time - logging.info("performed query, it took "+str(elapsed_time) + " seconds") + prediction_dataset_filename = self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name) + if len(result)>0: + logging.info(f"Performed query to the database, it took "+str(elapsed_time) + f" seconds to receive {len(result[0].records)} entries (from the first and possibly only table returned). Now logging to {prediction_dataset_filename}") + else: + logging.info("No records returned from database") #print(result.to_values()) - with open(self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name), 'w') as file: + with open(prediction_dataset_filename, 'w') as file: for table in result: #print header row file.write("Timestamp,ems_time,"+metric_name+"\r\n") diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/__init__.py b/exponential-smoothing-predictor/src/runtime/operational_status/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/predictions/Prediction.py b/exponential-smoothing-predictor/src/runtime/predictions/Prediction.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/predictions/__init__.py b/exponential-smoothing-predictor/src/runtime/predictions/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/predictions/prediction_requirements.txt b/exponential-smoothing-predictor/src/runtime/predictions/prediction_requirements.txt old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py b/exponential-smoothing-predictor/src/runtime/utilities/InfluxDBConnector.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/runtime/utilities/__init__.py b/exponential-smoothing-predictor/src/runtime/utilities/__init__.py old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/src/setup.py b/exponential-smoothing-predictor/src/setup.py old mode 100644 new mode 100755 index 1ac1643..045ccde --- a/exponential-smoothing-predictor/src/setup.py +++ b/exponential-smoothing-predictor/src/setup.py @@ -35,7 +35,13 @@ # Dependent packages (distributions) install_requires=[ "python-slugify", - "jproperties" + "jproperties", + "requests", + "numpy", + "python-qpid-proton", + "influxdb-client", + "python-dotenv", + "python-dateutil" ], #package_dir={'': '.'}, entry_points={ diff --git a/java-spring-boot-demo/.gitignore b/java-spring-boot-demo/.gitignore old mode 100644 new mode 100755 diff --git a/java-spring-boot-demo/Dockerfile b/java-spring-boot-demo/Dockerfile old mode 100644 new mode 100755 diff --git a/java-spring-boot-demo/pom.xml b/java-spring-boot-demo/pom.xml old mode 100644 new mode 100755 diff --git a/java-spring-boot-demo/src/main/java/com/example/demo/DemoApplication.java b/java-spring-boot-demo/src/main/java/com/example/demo/DemoApplication.java old mode 100644 new mode 100755 diff --git a/java-spring-boot-demo/src/main/java/com/example/demo/DemoController.java b/java-spring-boot-demo/src/main/java/com/example/demo/DemoController.java old mode 100644 new mode 100755 diff --git a/java-spring-boot-demo/src/main/resources/application.properties b/java-spring-boot-demo/src/main/resources/application.properties old mode 100644 new mode 100755 diff --git a/java-spring-boot-demo/src/test/java/com/example/demo/DemoApplicationTests.java b/java-spring-boot-demo/src/test/java/com/example/demo/DemoApplicationTests.java old mode 100644 new mode 100755 diff --git a/noxfile.py b/noxfile.py old mode 100644 new mode 100755 diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml old mode 100644 new mode 100755 diff --git a/zuul.d/project.yaml b/zuul.d/project.yaml old mode 100644 new mode 100755 From d8b93194228bff42425b3d2204bfce88300e814e Mon Sep 17 00:00:00 2001 From: atsag Date: Thu, 11 Jul 2024 14:35:15 +0300 Subject: [PATCH 02/13] Readme file location correction Corrected the location of the README.md file Change-Id: I89b86b7471123d653592cd98f513ff25181d2520 --- .github/workflows/ci.yml | 0 exponential-smoothing-predictor/README.md => README.md | 0 2 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 .github/workflows/ci.yml rename exponential-smoothing-predictor/README.md => README.md (100%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml old mode 100644 new mode 100755 diff --git a/exponential-smoothing-predictor/README.md b/README.md similarity index 100% rename from exponential-smoothing-predictor/README.md rename to README.md From 0ba7c362f47123f851229020a9742a25342b2b94 Mon Sep 17 00:00:00 2001 From: atsag Date: Mon, 22 Jul 2024 13:40:53 +0300 Subject: [PATCH 03/13] Environment variable correction Added default value for an environmental variable Change-Id: I81ca8f427c78aaec62d3eef9bad47d685e2a84ba --- exponential-smoothing-predictor/src/runtime/Predictor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index cd11653..14c59a3 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -460,7 +460,7 @@ def main(): #Change to the appropriate directory in order i) To invoke the forecasting script appropriately and ii) To store the monitoring data necessary for predictions from sys import platform - if platform == "win32" or bool(os.environ["TEST_RUN"]): + if platform == "win32" or bool(os.environ.get("TEST_RUN",False)): print(os.listdir(".")) os.chdir("../r_predictors") # linux From 3ac146bd1017b6ab4a19889b6280ae63d340214f Mon Sep 17 00:00:00 2001 From: atsag Date: Mon, 16 Sep 2024 12:42:03 +0300 Subject: [PATCH 04/13] Small configuration file improvements --- .../prediction_configuration-windows.properties | 8 -------- .../src/r_predictors/prediction_configuration.properties | 3 --- 2 files changed, 11 deletions(-) diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties index 3afcc14..2bdeedc 100755 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration-windows.properties @@ -1,16 +1,8 @@ -#AMQ_HOST=ems -#AMQ_USER=aaa -#AMQ_PASSWORD=111 -#AMQ_PORT_BROKER=61610 -APP_NAME=default_application -METHOD=exponential_smoothing - INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_PORT=8086 INFLUXDB_USERNAME=my-user INFLUXDB_PASSWORD=my-password INFLUXDB_ORG=my-org -INFLUXDB_ORG_ID=9c929742d57cca02 broker_address=nebulous-activemq broker_port=5672 diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties index 0e38281..150380d 100755 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties @@ -1,12 +1,9 @@ #Thu Jul 11 09:34:44 UTC 2024 -APP_NAME=default_application -METHOD=exponential_smoothing INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_PORT=8086 INFLUXDB_USERNAME=my-user INFLUXDB_PASSWORD=my-password INFLUXDB_ORG=my-org -INFLUXDB_ORG_ID=9c929742d57cca02 broker_address=nebulous-activemq broker_port=5672 broker_username=morphemic From 7622c62147e08ca65b0c629f612af86d63c7bdb7 Mon Sep 17 00:00:00 2001 From: atsag Date: Thu, 19 Sep 2024 15:46:56 +0300 Subject: [PATCH 05/13] Corrected the publishing of forecasts for a particular application --- .../src/runtime/utilities/PredictionPublisher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py index af63a49..094f185 100755 --- a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py @@ -1,11 +1,12 @@ from exn import core +from runtime.operational_status.EsPredictorState import EsPredictorState class PredictionPublisher(core.publisher.Publisher): metric_name = "" def __init__(self,application_name,metric_name): - super().__init__('publisher_'+application_name+'-'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+metric_name, True,True) + super().__init__('publisher_'+application_name+'-'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+EsPredictorState.forecaster_name+"."+metric_name, True,True) self.metric_name = metric_name def send(self, body={}, application=""): - super(PredictionPublisher, self).send(body) + super(PredictionPublisher, self).send(body, application) From 64027ddb441f706fbfec6e50ef6b395d534951d1 Mon Sep 17 00:00:00 2001 From: atsag Date: Thu, 19 Sep 2024 16:59:20 +0300 Subject: [PATCH 06/13] Retrieve the appropriate publisher to publish predictions, using the application name --- exponential-smoothing-predictor/src/runtime/Predictor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 14c59a3..c7e5c66 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -192,7 +192,7 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi def calculate_and_publish_predictions(application_state,maximum_time_required_for_prediction): start_forecasting = application_state.start_forecasting - + application_name = application_state.application_name while start_forecasting: print_with_time("Using " + EsPredictorState.configuration_file_location + " for configuration details...") application_state.next_prediction_time = update_prediction_time(application_state.epoch_start, application_state.prediction_horizon,maximum_time_required_for_prediction) @@ -254,8 +254,8 @@ def calculate_and_publish_predictions(application_state,maximum_time_required_fo for publisher in EsPredictorState.broker_publishers: #if publisher.address=="eu.nebulouscloud.monitoring.preliminary_predicted.exponentialsmoothing"+attribute: - if publisher.key=="publisher_"+attribute: - publisher.send(prediction_message_body) + if publisher.key=="publisher_"+application_name+"-"+attribute: + publisher.send(prediction_message_body,application_name) #State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body) @@ -303,7 +303,7 @@ def on_message(self, key, address, body, context, **kwargs): address = address.replace("topic://"+EsPredictorState.GENERAL_TOPIC_PREFIX,"") if (address).startswith(EsPredictorState.MONITORING_DATA_PREFIX): address = address.replace(EsPredictorState.MONITORING_DATA_PREFIX, "", 1) - + logging.info("New monitoring data arrived at topic "+address) if address == 'metric_list': application_name = body["name"] From b9a5d551f87bee758dd5a93dc07dea0af10ac762 Mon Sep 17 00:00:00 2001 From: atsag Date: Thu, 19 Sep 2024 17:43:53 +0300 Subject: [PATCH 07/13] Fix the check for metric_list message version --- exponential-smoothing-predictor/src/runtime/Predictor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index c7e5c66..2e5c821 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -306,13 +306,14 @@ def on_message(self, key, address, body, context, **kwargs): logging.info("New monitoring data arrived at topic "+address) if address == 'metric_list': + application_name = body["name"] message_version = body["version"] application_state = None individual_application_state = {} application_already_defined = application_name in EsPredictorState.individual_application_state if ( application_already_defined and - ( message_version == EsPredictorState.individual_application_state[application_state].message_version ) + ( message_version == EsPredictorState.individual_application_state[application_name].message_version ) ): individual_application_state = EsPredictorState.individual_application_state application_state = individual_application_state[application_name] From 1df4d1534583a215937134242e56e02961d210c4 Mon Sep 17 00:00:00 2001 From: atsag Date: Fri, 27 Sep 2024 13:45:15 +0300 Subject: [PATCH 08/13] Necessary Improvements to allow multi-application support Fixed the function call procedure for individual attribute predictions Fixed the handling of upper/lower bounds of individual attributes in prediction sanitization --- .../prediction_configuration.properties | 2 +- .../src/runtime/Predictor.py | 100 +++++++++++------- .../operational_status/ApplicationState.py | 1 + 3 files changed, 61 insertions(+), 42 deletions(-) diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties index 150380d..54df708 100755 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties @@ -1,4 +1,4 @@ -#Thu Jul 11 09:34:44 UTC 2024 +#Fri Sep 27 10:28:45 UTC 2024 INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_PORT=8086 INFLUXDB_USERNAME=my-user diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 2e5c821..0808839 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -26,7 +26,7 @@ print_with_time = Utilities.print_with_time -def sanitize_prediction_statistics(prediction_confidence_interval, prediction_value, metric_name, application_state): +def sanitize_prediction_statistics(prediction_confidence_interval, prediction_value, metric_name, lower_bound_value, upper_bound_value): print_with_time("Inside the sanitization process with an interval of " + prediction_confidence_interval +" and a prediction of " + str(prediction_value)) lower_value_prediction_confidence_interval = float(prediction_confidence_interval.split(",")[0]) @@ -36,76 +36,79 @@ def sanitize_prediction_statistics(prediction_confidence_interval, prediction_va print_with_time("There is an issue with the application name"+application_name+" not existing in individual application states") return prediction_confidence_interval,prediction_value_produced""" - lower_bound_value = application_state.lower_bound_value - upper_bound_value = application_state.upper_bound_value + #lower_bound_value = application_state.lower_bound_value + #upper_bound_value = application_state.upper_bound_value - print("Lower_bound_value is "+str(lower_bound_value)) confidence_interval_modified = False new_prediction_confidence_interval = prediction_confidence_interval - if (not (metric_name in lower_bound_value)) or (not (metric_name in upper_bound_value)): + if ((lower_bound_value is None) and (upper_bound_value is None)): print_with_time(f"Lower value is unmodified - {lower_value_prediction_confidence_interval} and upper value is unmodified - {upper_value_prediction_confidence_interval}") return new_prediction_confidence_interval,prediction_value - - if (lower_value_prediction_confidence_interval < lower_bound_value[metric_name]): - lower_value_prediction_confidence_interval = lower_bound_value[metric_name] - confidence_interval_modified = True - elif (lower_value_prediction_confidence_interval > upper_bound_value[metric_name]): - lower_value_prediction_confidence_interval = upper_bound_value[metric_name] - confidence_interval_modified = True - if (upper_value_prediction_confidence_interval> upper_bound_value[metric_name]): - upper_value_prediction_confidence_interval = upper_bound_value[metric_name] - confidence_interval_modified = True - elif (upper_value_prediction_confidence_interval < lower_bound_value[metric_name]): - upper_value_prediction_confidence_interval = lower_bound_value[metric_name] - confidence_interval_modified = True + elif (lower_bound_value is not None): + if (upper_value_prediction_confidence_interval < lower_bound_value[metric_name]): + upper_value_prediction_confidence_interval = lower_bound_value[metric_name] + lower_value_prediction_confidence_interval = lower_bound_value[metric_name] + confidence_interval_modified = True + elif (lower_value_prediction_confidence_interval < lower_bound_value): + lower_value_prediction_confidence_interval = lower_bound_value[metric_name] + confidence_interval_modified = True + elif (upper_bound_value is not None): + if (lower_value_prediction_confidence_interval > upper_bound_value[metric_name]): + lower_value_prediction_confidence_interval = upper_bound_value[metric_name] + upper_value_prediction_confidence_interval = upper_bound_value[metric_name] + confidence_interval_modified = True + elif (upper_value_prediction_confidence_interval> upper_bound_value[metric_name]): + upper_value_prediction_confidence_interval = upper_bound_value[metric_name] + confidence_interval_modified = True + if confidence_interval_modified: new_prediction_confidence_interval = str(lower_value_prediction_confidence_interval)+","+str(upper_value_prediction_confidence_interval) print_with_time("The confidence interval "+prediction_confidence_interval+" was modified, becoming "+str(new_prediction_confidence_interval)+", taking into account the values of the metric") - if (prediction_value upper_bound_value[metric_name]): + elif (prediction_value > upper_bound_value): print_with_time("The prediction value of " + str(prediction_value) + " for metric " + metric_name + " was sanitized to " + str(upper_bound_value)) prediction_value = upper_bound_value return new_prediction_confidence_interval,prediction_value -def predict_attribute(application_state, attribute, configuration_file_location,next_prediction_time): +def predict_attribute(attribute,prediction_data_filename,lower_bound_value,upper_bound_value,next_prediction_time): prediction_confidence_interval_produced = False prediction_value_produced = False prediction_valid = False #os.chdir(os.path.dirname(configuration_file_location)) - application_state.prediction_data_filename = application_state.get_prediction_data_filename(configuration_file_location,attribute) + from sys import platform if EsPredictorState.testing_prediction_functionality: print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data") - print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename)+" "+attribute) + print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_filename)+" "+attribute) # Windows if platform == "win32": - command = ['Rscript', 'forecasting_real_workload.R', application_state.prediction_data_filename, attribute] + command = ['Rscript', 'forecasting_real_workload.R', prediction_data_filename, attribute] # linux elif platform == "linux" or platform == "linux2": - command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)] + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_filename) + " "+ str(attribute)] #Choosing the solution of linux else: - command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)] + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_filename) + " "+ str(attribute)] else: print_with_time("The current directory is "+os.path.abspath(os.getcwd())) - print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename)+" "+attribute+" "+next_prediction_time) + print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_filename)+" "+attribute+" "+next_prediction_time) # Windows if platform == "win32": - command = ['Rscript', 'forecasting_real_workload.R', application_state.prediction_data_filename, attribute, next_prediction_time] + command = ['Rscript', 'forecasting_real_workload.R', prediction_data_filename, attribute, next_prediction_time] # Linux elif platform == "linux" or platform == "linux2": - command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"] + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"] #Choosing the solution of linux else: - command = ["Rscript forecasting_real_workload.R "+str(application_state.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)] + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)] process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): @@ -135,7 +138,7 @@ def predict_attribute(application_state, attribute, configuration_file_location, elif (string.startswith("smape:")): prediction_smape = string.replace("smape:", "") if (prediction_confidence_interval_produced and prediction_value_produced): - prediction_confidence_interval,prediction_value = sanitize_prediction_statistics(prediction_confidence_interval,float(prediction_value),attribute,application_state) + prediction_confidence_interval,prediction_value = sanitize_prediction_statistics(prediction_confidence_interval,float(prediction_value),attribute,lower_bound_value,upper_bound_value) prediction_valid = True print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval) else: @@ -156,16 +159,23 @@ def predict_attributes(application_state,next_prediction_time): attributes = application_state.metrics_to_predict pool = multiprocessing.Pool(len(attributes)) print_with_time("Prediction thread pool size set to " + str(len(attributes))) + prediction_results = {} attribute_predictions = {} for attribute in attributes: print_with_time("Starting " + attribute + " prediction thread") start_time = time.time() - attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[application_state,attribute, EsPredictorState.configuration_file_location, str(next_prediction_time)]) + application_state.prediction_data_filename = application_state.get_prediction_data_filename(EsPredictorState.configuration_file_location,attribute) + prediction_results[attribute] = pool.apply_async(predict_attribute, args=[attribute,application_state.prediction_data_filename, application_state.lower_bound_value[attribute],application_state.upper_bound_value[attribute],str(next_prediction_time)] + ) #attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get() + #for attribute in attributes: + # prediction_results[attribute].wait() #wait until the process is finished + #pool.close() + #pool.join() for attribute in attributes: - attribute_predictions[attribute] = attribute_predictions[attribute].get() #get the results of the processing + attribute_predictions[attribute] = prediction_results[attribute].get() #get the results of the processing attribute_predictions[attribute].set_last_prediction_time_needed(int(time.time() - start_time)) #prediction_time_needed[attribute]) @@ -194,7 +204,7 @@ def calculate_and_publish_predictions(application_state,maximum_time_required_fo start_forecasting = application_state.start_forecasting application_name = application_state.application_name while start_forecasting: - print_with_time("Using " + EsPredictorState.configuration_file_location + " for configuration details...") + print_with_time("Using " + EsPredictorState.configuration_file_location + f" for configuration details related to forecasts of {application_state.application_name}...") application_state.next_prediction_time = update_prediction_time(application_state.epoch_start, application_state.prediction_horizon,maximum_time_required_for_prediction) for attribute in application_state.metrics_to_predict: @@ -285,8 +295,6 @@ class BootStrap(ConnectorHandler): pass class ConsumerHandler(Handler): - prediction_thread = None - def ready(self, context): if context.has_publisher('state'): context.publishers['state'].starting() @@ -367,9 +375,19 @@ def on_message(self, key, address, body, context, **kwargs): EsPredictorState.individual_application_state[application_name] = ApplicationState(application_name,message_version) application_state = EsPredictorState.individual_application_state[application_name] - if (not application_state.start_forecasting) or ((application_state.metrics_to_predict is not None) and (len(application_state.metrics_to_predict)<=len(body["metrics"]))): + if (not application_state.start_forecasting) or ((application_state.metrics_to_predict is not None) and (set(application_state.metrics_to_predict)!=set(body["metrics"]))): application_state.metrics_to_predict = body["metrics"] print_with_time("Received request to start predicting the following metrics: "+ ",".join(application_state.metrics_to_predict)+" for application "+application_name+", proceeding with the prediction process") + if (not application_state.start_forecasting): + #Coarse initialization, needs to be improved with metric_list message + for metric in application_state.metrics_to_predict: + application_state.lower_bound_value[metric] = None + application_state.upper_bound_value[metric] = None + else: + new_metrics = set(body["metrics"]) - set(application_state.metrics_to_predict) + for metric in new_metrics: + application_state.lower_bound_value[metric] = None + application_state.upper_bound_value[metric] = None else: application_state.metrics_to_predict = body["metrics"] print_with_time("Received request to start predicting the following metrics: "+ body["metrics"]+" for application "+application_name+"but it was perceived as a duplicate") @@ -429,9 +447,9 @@ def on_message(self, key, address, body, context, **kwargs): maximum_time_required_for_prediction = EsPredictorState.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction - if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())): - self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[application_state,maximum_time_required_for_prediction]) - self.prediction_thread.start() + if ((EsPredictorState.individual_application_state[application_name].prediction_thread is None) or (not EsPredictorState.individual_application_state[application_name].prediction_thread.is_alive())): + EsPredictorState.individual_application_state[application_name].prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[application_state,maximum_time_required_for_prediction]) + EsPredictorState.individual_application_state[application_name].prediction_thread.start() #waitfor(first period) @@ -447,7 +465,7 @@ def on_message(self, key, address, body, context, **kwargs): application_state.metrics_to_predict.remove(metric) if len(application_state.metrics_to_predict)==0: EsPredictorState.individual_application_state[application_name].start_forecasting = False - self.prediction_thread.join() + EsPredictorState[application_name].prediction_thread.join() else: print_with_time("The address was "+ address +" and did not match metrics_to_predict/test.exponentialsmoothing/start_forecasting.exponentialsmoothing/stop_forecasting.exponentialsmoothing") diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py index b86d922..a318a48 100755 --- a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py +++ b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py @@ -25,6 +25,7 @@ def get_prediction_data_filename(self,configuration_file_location,metric_name): path_to_datasets = Utilities.fix_path_ending(path_to_datasets) return "" + str(path_to_datasets) + str(self.application_name) + "_"+metric_name+ ".csv" def __init__(self,application_name, message_version): + self.prediction_thread = None self.message_version = message_version self.application_name = application_name self.influxdb_bucket = EsPredictorState.application_name_prefix+application_name+"_bucket" From b97f8aabf4108d22354bf958bd0f1af21c3f73c7 Mon Sep 17 00:00:00 2001 From: atsag Date: Mon, 7 Oct 2024 17:46:39 +0300 Subject: [PATCH 09/13] Miscellaneous improvements Improved the logging of the forecaster Fixed the handling of upper/lower bounds of individual attributes in prediction sanitization --- .../src/exn/connector.py | 2 +- .../prediction_configuration.properties | 2 +- .../src/runtime/Predictor.py | 44 +++++++++---------- .../operational_status/ApplicationState.py | 16 +++---- .../src/runtime/utilities/Utilities.py | 4 +- 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/exponential-smoothing-predictor/src/exn/connector.py b/exponential-smoothing-predictor/src/exn/connector.py index d0193c2..22775f3 100755 --- a/exponential-smoothing-predictor/src/exn/connector.py +++ b/exponential-smoothing-predictor/src/exn/connector.py @@ -9,7 +9,7 @@ from .settings import base from .handler import connector_handler -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') _logger = logging.getLogger(__name__) diff --git a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties index 54df708..e454903 100755 --- a/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties +++ b/exponential-smoothing-predictor/src/r_predictors/prediction_configuration.properties @@ -1,4 +1,4 @@ -#Fri Sep 27 10:28:45 UTC 2024 +#Mon Oct 07 14:21:24 UTC 2024 INFLUXDB_HOSTNAME=nebulous-influxdb INFLUXDB_PORT=8086 INFLUXDB_USERNAME=my-user diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 0808839..20ab040 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -45,20 +45,20 @@ def sanitize_prediction_statistics(prediction_confidence_interval, prediction_va print_with_time(f"Lower value is unmodified - {lower_value_prediction_confidence_interval} and upper value is unmodified - {upper_value_prediction_confidence_interval}") return new_prediction_confidence_interval,prediction_value elif (lower_bound_value is not None): - if (upper_value_prediction_confidence_interval < lower_bound_value[metric_name]): - upper_value_prediction_confidence_interval = lower_bound_value[metric_name] - lower_value_prediction_confidence_interval = lower_bound_value[metric_name] + if (upper_value_prediction_confidence_interval < lower_bound_value): + upper_value_prediction_confidence_interval = lower_bound_value + lower_value_prediction_confidence_interval = lower_bound_value confidence_interval_modified = True elif (lower_value_prediction_confidence_interval < lower_bound_value): - lower_value_prediction_confidence_interval = lower_bound_value[metric_name] + lower_value_prediction_confidence_interval = lower_bound_value confidence_interval_modified = True elif (upper_bound_value is not None): - if (lower_value_prediction_confidence_interval > upper_bound_value[metric_name]): - lower_value_prediction_confidence_interval = upper_bound_value[metric_name] - upper_value_prediction_confidence_interval = upper_bound_value[metric_name] + if (lower_value_prediction_confidence_interval > upper_bound_value): + lower_value_prediction_confidence_interval = upper_bound_value + upper_value_prediction_confidence_interval = upper_bound_value confidence_interval_modified = True - elif (upper_value_prediction_confidence_interval> upper_bound_value[metric_name]): - upper_value_prediction_confidence_interval = upper_bound_value[metric_name] + elif (upper_value_prediction_confidence_interval> upper_bound_value): + upper_value_prediction_confidence_interval = upper_bound_value confidence_interval_modified = True if confidence_interval_modified: @@ -112,7 +112,7 @@ def predict_attribute(attribute,prediction_data_filename,lower_bound_value,upper process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): - logging.info("Empty output from R predictions - the error output is the following:") + logging.error("Empty output from R predictions - the error output is the following:") print(process_output.stderr) #There was an error during the calculation of the predicted value process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split() @@ -140,16 +140,16 @@ def predict_attribute(attribute,prediction_data_filename,lower_bound_value,upper if (prediction_confidence_interval_produced and prediction_value_produced): prediction_confidence_interval,prediction_value = sanitize_prediction_statistics(prediction_confidence_interval,float(prediction_value),attribute,lower_bound_value,upper_bound_value) prediction_valid = True - print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval) + print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval + " for prediction time "+str(next_prediction_time)) else: - logging.info("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows") - logging.info(process_output.stdout) - logging.info("\n") - logging.info("----------------------") - logging.info("Printing stderr") - logging.info("----------------------") - logging.info("\n") - logging.info(process_output.stderr) + logging.error("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows") + logging.error(process_output.stdout) + logging.error("\n") + logging.error("----------------------") + logging.error("Printing stderr") + logging.error("----------------------") + logging.error("\n") + logging.error(process_output.stderr) output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape) return output_prediction @@ -312,7 +312,7 @@ def on_message(self, key, address, body, context, **kwargs): if (address).startswith(EsPredictorState.MONITORING_DATA_PREFIX): address = address.replace(EsPredictorState.MONITORING_DATA_PREFIX, "", 1) - logging.info("New monitoring data arrived at topic "+address) + logging.debug("New monitoring data arrived at topic "+address) if address == 'metric_list': application_name = body["name"] @@ -365,7 +365,7 @@ def on_message(self, key, address, body, context, **kwargs): application_name = body["name"] message_version = 0 if (not "version" in body): - logging.info("There was an issue in finding the message version in the body of the start forecasting message, assuming it is 1") + logging.debug("There was an issue in finding the message version in the body of the start forecasting message, assuming it is 1") message_version = 1 else: message_version = body["version"] @@ -493,7 +493,7 @@ def main(): Utilities.update_influxdb_organization_id() # Subscribe to retrieve the metrics which should be used - + logging.basicConfig(level=logging.info) id = "exponentialsmoothing" EsPredictorState.disconnected = True diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py index a318a48..c0e2eab 100755 --- a/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py +++ b/exponential-smoothing-predictor/src/runtime/operational_status/ApplicationState.py @@ -51,14 +51,14 @@ def __init__(self,application_name, message_version): response = requests.get(list_bucket_url, headers=headers) - logging.info("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name) + logging.debug("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name) if ((response.status_code==200) and ("buckets" in response.json()) and (len(response.json()["buckets"])>0)): - logging.info("The bucket already existed for the particular application, skipping its creation...") + logging.debug("The bucket already existed for the particular application, skipping its creation...") else: - logging.info("The response in the request to list a bucket is "+str(response.json())) + logging.debug("The response in the request to list a bucket is "+str(response.json())) logging.info("The bucket did not exist for the particular application, creation in process...") response = requests.post(create_bucket_url, headers=headers, data=json.dumps(data)) - logging.info("The response for creating a new bucket is "+str(response.status_code)) + logging.debug("The response for creating a new bucket is "+str(response.status_code)) self.start_forecasting = False # Whether the component should start (or keep on) forecasting self.prediction_data_filename = application_name+".csv" self.dataset_file_name = "exponential_smoothing_dataset_"+application_name+".csv" @@ -83,9 +83,9 @@ def update_monitoring_data(self): print_data_from_db = True query_string = 'from (bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")' influx_connector = InfluxDBConnector() - logging.info("performing query for application with bucket "+str(self.influxdb_bucket)) - logging.info("The body of the query is "+query_string) - logging.info("The configuration of the client is "+Utilities.get_fields_and_values(influx_connector)) + logging.debug("performing query for application with bucket "+str(self.influxdb_bucket)) + logging.debug("The body of the query is "+query_string) + logging.debug("The configuration of the client is "+Utilities.get_fields_and_values(influx_connector)) current_time = time.time() result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization) elapsed_time = time.time()-current_time @@ -93,7 +93,7 @@ def update_monitoring_data(self): if len(result)>0: logging.info(f"Performed query to the database, it took "+str(elapsed_time) + f" seconds to receive {len(result[0].records)} entries (from the first and possibly only table returned). Now logging to {prediction_dataset_filename}") else: - logging.info("No records returned from database") + logging.warning("No records returned from database") #print(result.to_values()) with open(prediction_dataset_filename, 'w') as file: for table in result: diff --git a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py index 7292859..28e362f 100755 --- a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py @@ -42,7 +42,7 @@ def load_configuration(): EsPredictorState.influxdb_password = EsPredictorState.configuration_details.get("INFLUXDB_PASSWORD").data EsPredictorState.influxdb_org = EsPredictorState.configuration_details.get("INFLUXDB_ORG").data #This method accesses influx db to retrieve the most recent metric values. - Utilities.print_with_time("The configuration effective currently is the following\n "+Utilities.get_fields_and_values(EsPredictorState)) + logging.debug("The configuration effective currently is the following\n "+Utilities.get_fields_and_values(EsPredictorState)) @staticmethod def update_influxdb_organization_id(): @@ -54,7 +54,7 @@ def update_influxdb_organization_id(): # Find the organization by name and print its ID for org in organizations: if org.name == EsPredictorState.influxdb_organization: - logging.info(f"Organization Name: {org.name}, ID: {org.id}") + logging.debug(f"Organization Name: {org.name}, ID: {org.id}") EsPredictorState.influxdb_organization_id = org.id break @staticmethod From bce7f43e8b1cdee64caa690d8a58a62f98a90136 Mon Sep 17 00:00:00 2001 From: atsag Date: Mon, 7 Oct 2024 18:08:03 +0300 Subject: [PATCH 10/13] Corrections to the logging level --- exponential-smoothing-predictor/src/exn/connector.py | 2 +- exponential-smoothing-predictor/src/exn/core/consumer.py | 2 +- exponential-smoothing-predictor/src/exn/core/context.py | 2 +- exponential-smoothing-predictor/src/exn/core/manager.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/exponential-smoothing-predictor/src/exn/connector.py b/exponential-smoothing-predictor/src/exn/connector.py index 22775f3..7a2601c 100755 --- a/exponential-smoothing-predictor/src/exn/connector.py +++ b/exponential-smoothing-predictor/src/exn/connector.py @@ -9,7 +9,7 @@ from .settings import base from .handler import connector_handler -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.WARN, format='%(asctime)s - %(levelname)s - %(message)s') _logger = logging.getLogger(__name__) diff --git a/exponential-smoothing-predictor/src/exn/core/consumer.py b/exponential-smoothing-predictor/src/exn/core/consumer.py index a92a1c2..7422409 100755 --- a/exponential-smoothing-predictor/src/exn/core/consumer.py +++ b/exponential-smoothing-predictor/src/exn/core/consumer.py @@ -8,7 +8,7 @@ from proton.handlers import MessagingHandler _logger = logging.getLogger(__name__) -_logger.setLevel(level=logging.DEBUG) +_logger.setLevel(level=logging.WARN) class Consumer(link.Link, MessagingHandler): diff --git a/exponential-smoothing-predictor/src/exn/core/context.py b/exponential-smoothing-predictor/src/exn/core/context.py index db5cde6..fd668a2 100755 --- a/exponential-smoothing-predictor/src/exn/core/context.py +++ b/exponential-smoothing-predictor/src/exn/core/context.py @@ -7,7 +7,7 @@ _logger = logging.getLogger(__name__) -_logger.setLevel(logging.DEBUG) +_logger.setLevel(logging.WARN) class Context: diff --git a/exponential-smoothing-predictor/src/exn/core/manager.py b/exponential-smoothing-predictor/src/exn/core/manager.py index dd4025e..9319ad0 100755 --- a/exponential-smoothing-predictor/src/exn/core/manager.py +++ b/exponential-smoothing-predictor/src/exn/core/manager.py @@ -9,7 +9,7 @@ from .publisher import Publisher _logger = logging.getLogger(__name__) -_logger.setLevel(logging.DEBUG) +_logger.setLevel(logging.WARN) class SessionPerConsumer(object): From f92de61522cd448e4c9d8f92d2ec4c59f0ec3112 Mon Sep 17 00:00:00 2001 From: atsag Date: Mon, 7 Oct 2024 18:24:11 +0300 Subject: [PATCH 11/13] Additional correction to the logging level --- exponential-smoothing-predictor/src/runtime/Predictor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 950c296..3204cc2 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -494,7 +494,7 @@ def main(): Utilities.update_influxdb_organization_id() # Subscribe to retrieve the metrics which should be used - logging.basicConfig(level=logging.info) + logging.basicConfig(level=logging.INFO) id = "exponentialsmoothing" EsPredictorState.disconnected = True From 071aa81f37f294c1c83a423e994883a9ece403a8 Mon Sep 17 00:00:00 2001 From: atsag Date: Tue, 8 Oct 2024 13:22:40 +0300 Subject: [PATCH 12/13] Amendment in the previous correction to the forecasting termination process --- exponential-smoothing-predictor/src/runtime/Predictor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 3204cc2..978040d 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -459,12 +459,12 @@ def on_message(self, key, address, body, context, **kwargs): application_name = body["name"] application_state = EsPredictorState.individual_application_state[application_name] print_with_time("Received message to stop predicting some of the metrics") - metrics_to_remove = json.loads(body)["metrics"] + metrics_to_remove = body["metrics"] for metric in metrics_to_remove: if (application_state.metrics_to_predict.__contains__(metric)): print_with_time("Stopping generating predictions for metric "+metric) application_state.metrics_to_predict.remove(metric) - if len(application_state.metrics_to_predict)==0: + if (len(metrics_to_remove)==0 or len(application_state.metrics_to_predict)==0): EsPredictorState.individual_application_state[application_name].start_forecasting = False EsPredictorState[application_name].prediction_thread.join() From de4986c8f4b3685844bd7638dc9bbafdf28dac3c Mon Sep 17 00:00:00 2001 From: atsag Date: Tue, 26 Nov 2024 14:42:47 +0200 Subject: [PATCH 13/13] Miscellaneous improvements Support for preliminary predictions Improved logging Changes in the handling of connectors --- .../src/runtime/Predictor.py | 33 +++++++++++++++++-- .../operational_status/EsPredictorState.py | 12 ++++++- .../runtime/utilities/PredictionPublisher.py | 13 ++++++-- .../src/runtime/utilities/Utilities.py | 4 ++- 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/exponential-smoothing-predictor/src/runtime/Predictor.py b/exponential-smoothing-predictor/src/runtime/Predictor.py index 978040d..8b130ba 100755 --- a/exponential-smoothing-predictor/src/runtime/Predictor.py +++ b/exponential-smoothing-predictor/src/runtime/Predictor.py @@ -272,8 +272,8 @@ def calculate_and_publish_predictions(application_state,maximum_time_required_fo #State.connection.send_to_topic('training_models',training_models_message_body) message_not_sent = False - print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s:\n\n%s\n\n" % (attribute, EsPredictorState.forecaster_name, attribute, prediction_message_body)) - except ConnectionError as exception: + print_with_time("Successfully sent prediction message for "+str(attribute)+" to topic "+EsPredictorState.get_prediction_publishing_topic(attribute)+":\n\n%s\n\n" % (prediction_message_body)) + except Exception as exception: #State.connection.disconnect() #State.connection = messaging.morphemic.Connection('admin', 'admin') #State.connection.connect() @@ -471,6 +471,22 @@ def on_message(self, key, address, body, context, **kwargs): else: print_with_time("The address was "+ address +" and did not match metrics_to_predict/test.exponentialsmoothing/start_forecasting.exponentialsmoothing/stop_forecasting.exponentialsmoothing") # logging.info(f"Received {key} => {address}") + + elif (address).startswith(EsPredictorState.COMPONENT_STATE_PREFIX): + + import os + # Specify the directory and filename + directory = "/home" + filename = "is_alive.txt" + + # Create the file + with open(os.path.join(directory, filename), "w") as f: + current_message = print_with_time(f"Liveness probe received at {address}") + f.write(current_message) + + #print(f"Liveness probe file created at {directory}/{filename}") + + else: print_with_time("Received message "+body+" but could not handle it") def get_dataset_file(attribute): @@ -508,15 +524,26 @@ def main(): while True: topics_to_subscribe = ["eu.nebulouscloud.monitoring.metric_list","eu.nebulouscloud.monitoring.realtime.>","eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing","eu.nebulouscloud.forecasting.stop_forecasting.exponentialsmoothing"] + + topics_to_publish = ["eu.nebulouscloud.state.exponentialsmoothing.isalive"] + current_consumers = [] + current_publishers = [] for topic in topics_to_subscribe: - current_consumer = core.consumer.Consumer(key='monitoring_'+topic,address=topic,handler=ConsumerHandler(), topic=True,fqdn=True) + current_consumer = core.consumer.Consumer(key='exsmoothing_forecasting_'+topic,address=topic,handler=ConsumerHandler(), topic=True,fqdn=True) EsPredictorState.broker_consumers.append(current_consumer) current_consumers.append(current_consumer) + + for topic in topics_to_publish: + current_publisher = core.publisher.Publisher(key='exsmoothing_forecasting_'+topic,address=topic, topic=True,fqdn=True) + EsPredictorState.broker_publishers.append(current_publisher) + current_publishers.append(current_publisher) + EsPredictorState.subscribing_connector = connector.EXN(EsPredictorState.forecaster_name, handler=BootStrap(), #consumers=list(State.broker_consumers), consumers=EsPredictorState.broker_consumers, + publishers=current_publishers, url=EsPredictorState.broker_address, port=EsPredictorState.broker_port, username=EsPredictorState.broker_username, diff --git a/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py index e7d49cc..a553bf1 100755 --- a/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py +++ b/exponential-smoothing-predictor/src/runtime/operational_status/EsPredictorState.py @@ -22,10 +22,14 @@ class EsPredictorState: """ Fail-safe default values introduced below """ + preliminary_prediction_publishing_topic_prefix = "eu.nebulouscloud.preliminary_predicted." + prediction_publishing_topic_prefix = "eu.nebulouscloud.monitoring.predicted." + publish_predictions_as_preliminary = True application_name_prefix = "nebulous_" GENERAL_TOPIC_PREFIX = "eu.nebulouscloud." MONITORING_DATA_PREFIX = "monitoring." FORECASTING_CONTROL_PREFIX = "forecasting." + COMPONENT_STATE_PREFIX = "state." #Used to create the dataset from the InfluxDB influxdb_organization = "my-org" @@ -64,4 +68,10 @@ class EsPredictorState: def check_stale_connection(): return (not EsPredictorState.subscribing_connector) - + @staticmethod + def get_prediction_publishing_topic(metric_name): + if EsPredictorState.publish_predictions_as_preliminary: + return EsPredictorState.preliminary_prediction_publishing_topic_prefix+EsPredictorState.forecaster_name+"."+metric_name + #'eu.nebulouscloud.preliminary_predicted.'+EsPredictorState.forecaster_name+"."+metric_name + else: + return EsPredictorState.prediction_publishing_topic_prefix+metric_name \ No newline at end of file diff --git a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py index 094f185..edcf977 100755 --- a/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/PredictionPublisher.py @@ -4,9 +4,16 @@ class PredictionPublisher(core.publisher.Publisher): metric_name = "" + private_publisher = None def __init__(self,application_name,metric_name): - super().__init__('publisher_'+application_name+'-'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+EsPredictorState.forecaster_name+"."+metric_name, True,True) + super().__init__('publisher_'+application_name+'-'+metric_name, EsPredictorState.get_prediction_publishing_topic(metric_name), True,True) self.metric_name = metric_name + self.private_publisher = self - def send(self, body={}, application=""): - super(PredictionPublisher, self).send(body, application) + def send(self, body={}, application_name=""): + try: + super(PredictionPublisher, self).send(body, application_name) + + except Exception as e: + self.private_publisher = super().__init__('publisher_'+application_name+'-'+self.metric_name, EsPredictorState.get_prediction_publishing_topic(self.metric_name), True,True) + super(PredictionPublisher, self).send(body, application_name) diff --git a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py index 28e362f..d5c573a 100755 --- a/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py +++ b/exponential-smoothing-predictor/src/runtime/utilities/Utilities.py @@ -20,7 +20,9 @@ class Utilities: @staticmethod def print_with_time(x): now = datetime.datetime.now() - print("["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x)) + string_to_print = "["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x) + print(string_to_print) + return string_to_print @staticmethod def load_configuration():