diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 02676daa..ede2e0f0 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -1,4 +1,6 @@ +import math import os +import statistics from typing import Any, Dict, List, Union import uuid @@ -120,17 +122,14 @@ def build_aggregated_results(self): for task, task_metrics in self.accumulated_results.items(): iterations = self.accumulated_iterations.get(task, 1) aggregated_task_metrics = self.calculate_weighted_average(task_metrics, iterations) + op_metric = { "task": task, "operation": task, - "throughput": aggregated_task_metrics["throughput"], - "latency": aggregated_task_metrics["latency"], - "service_time": aggregated_task_metrics["service_time"], - "client_processing_time": aggregated_task_metrics["client_processing_time"], - "processing_time": aggregated_task_metrics["processing_time"], - "error_rate": aggregated_task_metrics["error_rate"], - "duration": aggregated_task_metrics["duration"] } + + op_metric = self.build_op_metric(task_metrics, op_metric, aggregated_task_metrics) + aggregated_results["op_metrics"].append(op_metric) # extract the necessary data from the first test execution, since the configurations should be identical for all test executions @@ -180,6 +179,26 @@ def build_aggregated_results(self): return test_execution + def build_op_metric(self, task_metrics, op_metric, aggregated_task_metrics): + for metric, value in aggregated_task_metrics.items(): + if isinstance(value, dict): + op_metric[metric] = {k: v for k, v in value.items() if k in ['50_0', '100_0', 'mean', 'mean_median', 'unit']} + if metric in self.metrics: + values = task_metrics[metric] + if isinstance(values[0], dict): + for key in ['50_0', '100_0', 'mean']: + if key in value: + vals = [v[key] for v in values] + rsd = self.calculate_rsd(vals) + op_metric[metric][f"{key}_rsd"] = rsd + elif not metric.endswith('_median'): + op_metric[metric] = value + if metric in self.metrics: + rsd = self.calculate_rsd(task_metrics[metric]) + op_metric[f"{metric}_rsd"] = rsd + elif metric == 'mean_median': + op_metric[metric] = value + def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]: weighted_metrics = {} @@ -194,18 +213,31 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati if iterations > 1: weighted_sum = sum(value * iterations for value in item_values) total_iterations = iterations * len(values) - weighted_metrics[metric][item_key] = weighted_sum / total_iterations + weighted_avg = weighted_sum / total_iterations else: - weighted_metrics[metric][item_key] = sum(item_values) / len(item_values) + weighted_avg = sum(item_values) / len(item_values) + weighted_metrics[metric][item_key] = weighted_avg + if item_key == 'mean': + weighted_metrics[metric]['mean_median'] = statistics.median(item_values) else: if iterations > 1: weighted_sum = sum(value * iterations for value in values) total_iterations = iterations * len(values) - weighted_metrics[metric] = weighted_sum / total_iterations + weighted_avg = weighted_sum / total_iterations else: - weighted_metrics[metric] = sum(values) / len(values) + weighted_avg = sum(values) / len(values) + weighted_metrics[metric] = weighted_avg + weighted_metrics[f"{metric}_median"] = statistics.median(values) return weighted_metrics + def calculate_rsd(self, values): + if not values: + return 0 + mean = sum(values) / len(values) + variance = sum((x - mean) ** 2 for x in values) / len(values) + std_dev = math.sqrt(variance) + return (std_dev / mean) * 100 if mean != 0 else 0 + def test_execution_compatibility_check(self) -> None: first_test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) workload = first_test_execution.workload