Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

origin/misc_improvements #13

Merged
merged 16 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exponential-smoothing-predictor/src/exn/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .settings import base
from .handler import connector_handler

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger(__name__)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#Fri Sep 27 10:28:45 UTC 2024
INFLUXDB_HOSTNAME=nebulous-influxdb
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=my-user
Expand Down
45 changes: 23 additions & 22 deletions exponential-smoothing-predictor/src/runtime/Predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ def sanitize_prediction_statistics(prediction_confidence_interval, prediction_va
print_with_time(f"Lower value is unmodified - {lower_value_prediction_confidence_interval} and upper value is unmodified - {upper_value_prediction_confidence_interval}")
return new_prediction_confidence_interval,prediction_value
elif (lower_bound_value is not None):
if (upper_value_prediction_confidence_interval < lower_bound_value[metric_name]):
upper_value_prediction_confidence_interval = lower_bound_value[metric_name]
lower_value_prediction_confidence_interval = lower_bound_value[metric_name]
if (upper_value_prediction_confidence_interval < lower_bound_value):
upper_value_prediction_confidence_interval = lower_bound_value
lower_value_prediction_confidence_interval = lower_bound_value
confidence_interval_modified = True
elif (lower_value_prediction_confidence_interval < lower_bound_value):
lower_value_prediction_confidence_interval = lower_bound_value[metric_name]
lower_value_prediction_confidence_interval = lower_bound_value
confidence_interval_modified = True
elif (upper_bound_value is not None):
if (lower_value_prediction_confidence_interval > upper_bound_value[metric_name]):
lower_value_prediction_confidence_interval = upper_bound_value[metric_name]
upper_value_prediction_confidence_interval = upper_bound_value[metric_name]
if (lower_value_prediction_confidence_interval > upper_bound_value):
lower_value_prediction_confidence_interval = upper_bound_value
upper_value_prediction_confidence_interval = upper_bound_value
confidence_interval_modified = True
elif (upper_value_prediction_confidence_interval> upper_bound_value[metric_name]):
upper_value_prediction_confidence_interval = upper_bound_value[metric_name]
elif (upper_value_prediction_confidence_interval> upper_bound_value):
upper_value_prediction_confidence_interval = upper_bound_value
confidence_interval_modified = True

if confidence_interval_modified:
Expand Down Expand Up @@ -112,7 +112,7 @@ def predict_attribute(attribute,prediction_data_filename,lower_bound_value,upper

process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
logging.info("Empty output from R predictions - the error output is the following:")
logging.error("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value

process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split()
Expand Down Expand Up @@ -140,16 +140,16 @@ def predict_attribute(attribute,prediction_data_filename,lower_bound_value,upper
if (prediction_confidence_interval_produced and prediction_value_produced):
prediction_confidence_interval,prediction_value = sanitize_prediction_statistics(prediction_confidence_interval,float(prediction_value),attribute,lower_bound_value,upper_bound_value)
prediction_valid = True
print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval + " for prediction time "+str(next_prediction_time))
else:
logging.info("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
logging.info(process_output.stdout)
logging.info("\n")
logging.info("----------------------")
logging.info("Printing stderr")
logging.info("----------------------")
logging.info("\n")
logging.info(process_output.stderr)
logging.error("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
logging.error(process_output.stdout)
logging.error("\n")
logging.error("----------------------")
logging.error("Printing stderr")
logging.error("----------------------")
logging.error("\n")
logging.error(process_output.stderr)

output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape)
return output_prediction
Expand Down Expand Up @@ -312,7 +312,8 @@ def on_message(self, key, address, body, context, **kwargs):
if (address).startswith(EsPredictorState.MONITORING_DATA_PREFIX):
address = address.replace(EsPredictorState.MONITORING_DATA_PREFIX, "", 1)

logging.info("New monitoring data arrived at topic "+address)
logging.debug("New monitoring data arrived at topic "+address)

if address == 'metric_list':

application_name = body["name"]
Expand Down Expand Up @@ -365,7 +366,7 @@ def on_message(self, key, address, body, context, **kwargs):
application_name = body["name"]
message_version = 0
if (not "version" in body):
logging.info("There was an issue in finding the message version in the body of the start forecasting message, assuming it is 1")
logging.debug("There was an issue in finding the message version in the body of the start forecasting message, assuming it is 1")
message_version = 1
else:
message_version = body["version"]
Expand Down Expand Up @@ -493,7 +494,7 @@ def main():
Utilities.update_influxdb_organization_id()
# Subscribe to retrieve the metrics which should be used


logging.basicConfig(level=logging.info)
id = "exponentialsmoothing"
EsPredictorState.disconnected = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def __init__(self,application_name, message_version):

response = requests.get(list_bucket_url, headers=headers)

logging.info("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name)
logging.debug("The response for listing a possibly existing bucket is "+str(response.status_code)+" for application "+application_name)
if ((response.status_code==200) and ("buckets" in response.json()) and (len(response.json()["buckets"])>0)):
logging.info("The bucket already existed for the particular application, skipping its creation...")
logging.debug("The bucket already existed for the particular application, skipping its creation...")
else:
logging.info("The response in the request to list a bucket is "+str(response.json()))
logging.debug("The response in the request to list a bucket is "+str(response.json()))
logging.info("The bucket did not exist for the particular application, creation in process...")
response = requests.post(create_bucket_url, headers=headers, data=json.dumps(data))
logging.info("The response for creating a new bucket is "+str(response.status_code))
logging.debug("The response for creating a new bucket is "+str(response.status_code))
self.start_forecasting = False # Whether the component should start (or keep on) forecasting
self.prediction_data_filename = application_name+".csv"
self.dataset_file_name = "exponential_smoothing_dataset_"+application_name+".csv"
Expand All @@ -83,17 +83,17 @@ def update_monitoring_data(self):
print_data_from_db = True
query_string = 'from (bucket: "'+self.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
influx_connector = InfluxDBConnector()
logging.info("performing query for application with bucket "+str(self.influxdb_bucket))
logging.info("The body of the query is "+query_string)
logging.info("The configuration of the client is "+Utilities.get_fields_and_values(influx_connector))
logging.debug("performing query for application with bucket "+str(self.influxdb_bucket))
logging.debug("The body of the query is "+query_string)
logging.debug("The configuration of the client is "+Utilities.get_fields_and_values(influx_connector))
current_time = time.time()
result = influx_connector.client.query_api().query(query_string, EsPredictorState.influxdb_organization)
elapsed_time = time.time()-current_time
prediction_dataset_filename = self.get_prediction_data_filename(EsPredictorState.configuration_file_location, metric_name)
if len(result)>0:
logging.info(f"Performed query to the database, it took "+str(elapsed_time) + f" seconds to receive {len(result[0].records)} entries (from the first and possibly only table returned). Now logging to {prediction_dataset_filename}")
else:
logging.info("No records returned from database")
logging.warning("No records returned from database")
#print(result.to_values())
with open(prediction_dataset_filename, 'w') as file:
for table in result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def load_configuration():
EsPredictorState.influxdb_password = EsPredictorState.configuration_details.get("INFLUXDB_PASSWORD").data
EsPredictorState.influxdb_org = EsPredictorState.configuration_details.get("INFLUXDB_ORG").data
#This method accesses influx db to retrieve the most recent metric values.
Utilities.print_with_time("The configuration effective currently is the following\n "+Utilities.get_fields_and_values(EsPredictorState))
logging.debug("The configuration effective currently is the following\n "+Utilities.get_fields_and_values(EsPredictorState))

@staticmethod
def update_influxdb_organization_id():
Expand All @@ -54,7 +54,7 @@ def update_influxdb_organization_id():
# Find the organization by name and print its ID
for org in organizations:
if org.name == EsPredictorState.influxdb_organization:
logging.info(f"Organization Name: {org.name}, ID: {org.id}")
logging.debug(f"Organization Name: {org.name}, ID: {org.id}")
EsPredictorState.influxdb_organization_id = org.id
break
@staticmethod
Expand Down