Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add relative standard deviation to aggregated test execution metrics #681

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import statistics
from typing import Any, Dict, List, Union
import uuid

Expand Down Expand Up @@ -123,14 +124,24 @@ def build_aggregated_results(self):
op_metric = {
"task": task,
"operation": task,
"throughput": aggregated_task_metrics["throughput"],
"latency": aggregated_task_metrics["latency"],
"service_time": aggregated_task_metrics["service_time"],
"client_processing_time": aggregated_task_metrics["client_processing_time"],
"processing_time": aggregated_task_metrics["processing_time"],
"error_rate": aggregated_task_metrics["error_rate"],
"duration": aggregated_task_metrics["duration"]
}
for metric in self.metrics:
op_metric[metric] = aggregated_task_metrics[metric]

# Handle standard metrics (like latency, service_time) which are stored as dictionaries
if isinstance(aggregated_task_metrics[metric], dict):
# Calculate RSD for the mean values across all test executions
# We use mean here as it's more sensitive to outliers, which is desirable for assessing variability
mean_values = [v['mean'] for v in task_metrics[metric]]
rsd = self.calculate_rsd(mean_values, f"{task}.{metric}.mean")
op_metric[metric]['mean_rsd'] = rsd

# Handle derived metrics (like error_rate, duration) which are stored as simple values
else:
# Calculate RSD directly from the metric values across all test executions
rsd = self.calculate_rsd(task_metrics[metric], f"{task}.{metric}")
op_metric[f"{metric}_rsd"] = rsd

aggregated_results["op_metrics"].append(op_metric)

# extract the necessary data from the first test execution, since the configurations should be identical for all test executions
Expand Down Expand Up @@ -184,28 +195,34 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati
weighted_metrics = {}

for metric, values in task_metrics.items():
weighted_metrics[metric] = {}
if isinstance(values[0], dict):
weighted_metrics[metric] = {}
for item_key in values[0].keys():
if item_key == 'unit':
weighted_metrics[metric][item_key] = values[0][item_key]
else:
item_values = [value.get(item_key, 0) for value in values]
if iterations > 1:
weighted_sum = sum(value * iterations for value in item_values)
total_iterations = iterations * len(values)
weighted_metrics[metric][item_key] = weighted_sum / total_iterations
else:
weighted_metrics[metric][item_key] = sum(item_values) / len(item_values)
weighted_sum = sum(value * iterations for value in item_values)
total_iterations = iterations * len(item_values)
weighted_avg = weighted_sum / total_iterations
weighted_metrics[metric][item_key] = weighted_avg
else:
if iterations > 1:
weighted_sum = sum(value * iterations for value in values)
total_iterations = iterations * len(values)
weighted_metrics[metric] = weighted_sum / total_iterations
else:
weighted_metrics[metric] = sum(values) / len(values)
weighted_sum = sum(value * iterations for value in values)
total_iterations = iterations * len(values)
weighted_avg = weighted_sum / total_iterations
weighted_metrics[metric] = weighted_avg

return weighted_metrics

def calculate_rsd(self, values: List[Union[int, float]], metric_name: str):
if not values:
raise ValueError(f"Cannot calculate RSD for metric '{metric_name}': empty list of values")
if len(values) == 1:
return "NA" # RSD is not applicable for a single value
mean = statistics.mean(values)
std_dev = statistics.stdev(values)
return (std_dev / mean) * 100 if mean != 0 else float('inf')

def test_execution_compatibility_check(self) -> None:
first_test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
workload = first_test_execution.workload
Expand Down
Loading