From f9159fd9c94584a21c8646ebde6cf57b097baf46 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Mon, 9 Dec 2024 19:27:10 +0000 Subject: [PATCH] refactor aggregator class + fix unit tests Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 64 ++++++++++++++++++++++++--------------- tests/aggregator_test.py | 1 + 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index da698bde..6e9b66bc 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -22,7 +22,7 @@ def __init__(self, cfg, test_executions_dict, args): self.loaded_workload = None def count_iterations_for_each_op(self, test_execution) -> None: - """Count iterations for each operation in the test execution.""" + """Count iterations for each operation in the test execution""" workload_params = test_execution.workload_params if test_execution.workload_params else {} test_execution_id = test_execution.test_execution_id self.accumulated_iterations[test_execution_id] = {} @@ -33,20 +33,26 @@ def count_iterations_for_each_op(self, test_execution) -> None: iterations = int(workload_params.get(task_name_iterations, task.iterations or 1)) self.accumulated_iterations[test_execution_id][task_name] = iterations - def accumulate_results(self, test_execution: Any) -> None: - """Accumulate results from a single test execution.""" - for item in test_execution.results.get("op_metrics", []): - task = item.get("task", "") + def accumulate_results(self, test_execution) -> None: + """Accumulate results from a single test execution""" + for operation_metric in test_execution.results.get("op_metrics", []): + task = operation_metric.get("task", "") self.accumulated_results.setdefault(task, {}) for metric in self.metrics: self.accumulated_results[task].setdefault(metric, []) - self.accumulated_results[task][metric].append(item.get(metric)) + self.accumulated_results[task][metric].append(operation_metric.get(metric)) def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: - """Aggregate JSON results by a given key path.""" + """ + Aggregates JSON results across multiple test executions using a specified key path. + Handles nested dictionary structures and calculates averages for numeric values + """ all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: + """ + Retrieves a value from a nested dictionary structure using a path of keys. + """ for key in path: if isinstance(obj, dict): obj = obj.get(key, {}) @@ -75,8 +81,8 @@ def aggregate_helper(objects: List[Any]) -> Any: values = [get_nested_value(json, key_path) for json in all_jsons] return aggregate_helper(values) - def build_aggregated_results(self): - test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + def build_aggregated_results_dict(self): + """Builds a dictionary of aggregated metrics from all test executions""" aggregated_results = { "op_metrics": [], "correctness_metrics": self.aggregate_json_by_key("correctness_metrics"), @@ -139,9 +145,31 @@ def build_aggregated_results(self): op_metric[f"{metric}_rsd"] = rsd aggregated_results["op_metrics"].append(op_metric) - - # extract the necessary data from the first test execution, since the configurations should be identical for all test executions + + return aggregated_results + + def update_config_object(self, test_execution): + """ + Updates the configuration object with values from a test execution. + Uses the first test execution as reference since configurations should be identical + """ current_timestamp = self.config.opts("system", "time.start") + self.config.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.names", test_execution.provision_config_instance) + self.config.add(config.Scope.applicationOverride, "system", + "env.name", test_execution.environment_name) + self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) + self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline) + self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params) + self.config.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.params", test_execution.provision_config_instance_params) + self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params) + self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles) + self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles) + + def build_aggregated_results(self): + test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + aggregated_results = self.build_aggregated_results_dict() if hasattr(self.args, 'results_file') and self.args.results_file != "": normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd) @@ -158,19 +186,7 @@ def build_aggregated_results(self): print("Aggregate test execution ID: ", test_execution_id) - # add values to the configuration object - self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.names", test_exe.provision_config_instance) - self.config.add(config.Scope.applicationOverride, "system", - "env.name", test_exe.environment_name) - self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) - self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline) - self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params) - self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.params", test_exe.provision_config_instance_params) - self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params) - self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles) - self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles) + self.update_config_object(test_exe) loaded_workload = workload.load_workload(self.config) test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name) diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index 32be06cb..d963de60 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -50,6 +50,7 @@ def test_count_iterations_for_each_op(aggregator): mock_test_procedure.schedule = mock_schedule mock_workload.test_procedures = [mock_test_procedure] + mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure) mock_test_execution = Mock(test_execution_id="test1", workload_params={}) aggregator.loaded_workload = mock_workload