diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 016d90f4..b79f8c47 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -3,12 +3,12 @@ from typing import Any, Dict, List, Union import uuid -from osbenchmark.metrics import FileTestExecutionStore +from osbenchmark.metrics import FileTestExecutionStore, TestExecution from osbenchmark import metrics, workload, config from osbenchmark.utils import io as rio class Aggregator: - def __init__(self, cfg, test_executions_dict, args): + def __init__(self, cfg, test_executions_dict, args) -> None: self.config = cfg self.args = args self.test_executions = test_executions_dict @@ -21,69 +21,72 @@ def __init__(self, cfg, test_executions_dict, args): self.test_procedure_name = None self.loaded_workload = None - def count_iterations_for_each_op(self, test_execution) -> None: - matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None) + def count_iterations_for_each_op(self, test_execution: TestExecution) -> None: + """Count iterations for each operation in the test execution""" workload_params = test_execution.workload_params if test_execution.workload_params else {} - test_execution_id = test_execution.test_execution_id self.accumulated_iterations[test_execution_id] = {} - if matching_test_procedure: - for task in matching_test_procedure.schedule: - task_name = task.name - task_name_iterations = f"{task_name}_iterations" - if task_name_iterations in workload_params: - iterations = int(workload_params[task_name_iterations]) - else: - iterations = task.iterations or 1 - self.accumulated_iterations[test_execution_id][task_name] = iterations - else: - raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.") + for task in self.loaded_workload.find_test_procedure_or_default(self.test_procedure_name).schedule: + task_name = task.name + task_name_iterations = f"{task_name}_iterations" + iterations = int(workload_params.get(task_name_iterations, task.iterations or 1)) + self.accumulated_iterations[test_execution_id][task_name] = iterations - def accumulate_results(self, test_execution: Any) -> None: - for item in test_execution.results.get("op_metrics", []): - task = item.get("task", "") + def accumulate_results(self, test_execution: TestExecution) -> None: + """Accumulate results from a single test execution""" + for operation_metric in test_execution.results.get("op_metrics", []): + task = operation_metric.get("task", "") self.accumulated_results.setdefault(task, {}) for metric in self.metrics: self.accumulated_results[task].setdefault(metric, []) - self.accumulated_results[task][metric].append(item.get(metric)) + self.accumulated_results[task][metric].append(operation_metric.get(metric)) def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: - all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] - - # retrieve nested value from a dictionary given a key path - def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: + """ + Aggregates JSON results across multiple test executions using a specified key path. + Handles nested dictionary structures and calculates averages for numeric values + """ + all_json_results = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] + + def get_nested_value(json_data: Dict[str, Any], path: List[str]) -> Any: + """ + Retrieves a value from a nested dictionary structure using a path of keys. + """ for key in path: - if isinstance(obj, dict): - obj = obj.get(key, {}) - elif isinstance(obj, list) and key.isdigit(): - obj = obj[int(key)] if int(key) < len(obj) else {} + if isinstance(json_data, dict): + json_data = json_data.get(key, {}) + elif isinstance(json_data, list) and key.isdigit(): + json_data = json_data[int(key)] if int(key) < len(json_data) else {} else: return None - return obj + return json_data - def aggregate_helper(objects: List[Any]) -> Any: - if not objects: + def aggregate_json_elements(json_elements: List[Any]) -> Any: + if not json_elements: return None - if all(isinstance(obj, (int, float)) for obj in objects): - avg = sum(objects) / len(objects) - return avg - if all(isinstance(obj, dict) for obj in objects): - keys = set().union(*objects) - return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys} - if all(isinstance(obj, list) for obj in objects): - max_length = max(len(obj) for obj in objects) - return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)] - return next((obj for obj in objects if obj is not None), None) + # If all elements are numbers, calculate the average + if all(isinstance(obj, (int, float)) for obj in json_elements): + return sum(json_elements) / len(json_elements) + # If all elements are dictionaries, recursively aggregate their values + if all(isinstance(obj, dict) for obj in json_elements): + keys = set().union(*json_elements) + return {key: aggregate_json_elements([obj.get(key) for obj in json_elements]) for key in keys} + # If all elements are lists, recursively aggregate corresponding elements + if all(isinstance(obj, list) for obj in json_elements): + max_length = max(len(obj) for obj in json_elements) + return [aggregate_json_elements([obj[i] if i < len(obj) else None for obj in json_elements]) for i in range(max_length)] + # If elements are of mixed types, return the first non-None value + return next((obj for obj in json_elements if obj is not None), None) if isinstance(key_path, str): key_path = key_path.split('.') - values = [get_nested_value(json, key_path) for json in all_jsons] - return aggregate_helper(values) + nested_values = [get_nested_value(json_result, key_path) for json_result in all_json_results] + return aggregate_json_elements(nested_values) - def build_aggregated_results(self): - test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + def build_aggregated_results_dict(self) -> Dict[str, Any]: + """Builds a dictionary of aggregated metrics from all test executions""" aggregated_results = { "op_metrics": [], "correctness_metrics": self.aggregate_json_by_key("correctness_metrics"), @@ -147,8 +150,30 @@ def build_aggregated_results(self): aggregated_results["op_metrics"].append(op_metric) - # extract the necessary data from the first test execution, since the configurations should be identical for all test executions + return aggregated_results + + def update_config_object(self, test_execution: TestExecution) -> None: + """ + Updates the configuration object with values from a test execution. + Uses the first test execution as reference since configurations should be identical + """ current_timestamp = self.config.opts("system", "time.start") + self.config.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.names", test_execution.provision_config_instance) + self.config.add(config.Scope.applicationOverride, "system", + "env.name", test_execution.environment_name) + self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) + self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline) + self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params) + self.config.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.params", test_execution.provision_config_instance_params) + self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params) + self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles) + self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles) + + def build_aggregated_results(self) -> TestExecution: + test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + aggregated_results = self.build_aggregated_results_dict() if hasattr(self.args, 'results_file') and self.args.results_file != "": normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd) @@ -165,19 +190,7 @@ def build_aggregated_results(self): print("Aggregate test execution ID: ", test_execution_id) - # add values to the configuration object - self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.names", test_exe.provision_config_instance) - self.config.add(config.Scope.applicationOverride, "system", - "env.name", test_exe.environment_name) - self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) - self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline) - self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params) - self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.params", test_exe.provision_config_instance_params) - self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params) - self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles) - self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles) + self.update_config_object(test_exe) loaded_workload = workload.load_workload(self.config) test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name) @@ -223,7 +236,7 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_na return weighted_metrics - def calculate_rsd(self, values: List[Union[int, float]], metric_name: str): + def calculate_rsd(self, values: List[Union[int, float]], metric_name: str) -> Union[float, str]: if not values: raise ValueError(f"Cannot calculate RSD for metric '{metric_name}': empty list of values") if len(values) == 1: diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index 32be06cb..d963de60 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -50,6 +50,7 @@ def test_count_iterations_for_each_op(aggregator): mock_test_procedure.schedule = mock_schedule mock_workload.test_procedures = [mock_test_procedure] + mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure) mock_test_execution = Mock(test_execution_id="test1", workload_params={}) aggregator.loaded_workload = mock_workload