Skip to content

Commit

Permalink
refactor based on ians comments
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <[email protected]>
  • Loading branch information
OVI3D0 committed Dec 3, 2024
1 parent 460fa64 commit 388ee35
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,28 @@ def __init__(self, cfg, test_executions_dict, args):
self.metrics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]
self.test_store = metrics.test_execution_store(self.config)
self.cwd = cfg.opts("node", "benchmark.cwd")
self.test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
self.test_procedure_name = self.test_execution.test_procedure
self.loaded_workload = None

def count_iterations_for_each_op(self, test_execution) -> None:
loaded_workload = workload.load_workload(self.config)
test_procedure_name = self.config.opts("workload", "test_procedure.name")
test_procedure_found = False
workload_params = {} if getattr(test_execution, 'workload_params') is None else test_execution.workload_params
matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None)
workload_params = getattr(test_execution, 'workload_params', {})

test_execution_id = test_execution.test_execution_id
self.accumulated_iterations[test_execution_id] = {}

for test_procedure in loaded_workload.test_procedures:
if test_procedure.name == test_procedure_name:
test_procedure_found = True
for task in test_procedure.schedule:
task_name = task.name
custom_key = f"{task_name}_iterations"
if custom_key in workload_params:
iterations = int(workload_params[custom_key])
else:
iterations = task.iterations or 1
self.accumulated_iterations[test_execution_id][task_name] = iterations
else:
continue # skip to the next test procedure if the name doesn't match

if not test_procedure_found:
raise ValueError(f"Test procedure '{test_procedure_name}' not found in the loaded workload.")
if matching_test_procedure:
for task in matching_test_procedure.schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
if task_name_iterations in workload_params:
iterations = int(workload_params[task_name_iterations])
else:
iterations = task.iterations or 1
self.accumulated_iterations[test_execution_id][task_name] = iterations
else:
raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.")

def accumulate_results(self, test_execution: Any) -> None:
for item in test_execution.results.get("op_metrics", []):
Expand Down Expand Up @@ -184,9 +180,9 @@ def build_aggregated_results(self):
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)

loaded_workload = workload.load_workload(self.config)
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)
test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name)

test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure_object, test_exe.workload_revision)
test_execution.user_tags = {
"aggregation-of-runs": list(self.test_executions.keys())
}
Expand Down Expand Up @@ -256,16 +252,17 @@ def test_execution_compatibility_check(self) -> None:
else:
raise ValueError(f"Test execution not found: {id}. Ensure that all provided test IDs are valid.")

self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", first_test_execution.test_procedure)
self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", self.test_procedure_name)
return True

def aggregate(self) -> None:
if self.test_execution_compatibility_check():
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository)
self.config.add(config.Scope.applicationOverride, "workload", "workload.name", self.test_execution.workload)
self.loaded_workload = workload.load_workload(self.config)
for id in self.test_executions.keys():
test_execution = self.test_store.find_by_test_execution_id(id)
if test_execution:
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository)
self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload)
self.count_iterations_for_each_op(test_execution)
self.accumulate_results(test_execution)

Expand Down

0 comments on commit 388ee35

Please sign in to comment.