From 0cb65726fa8407aa69676e50eeb1d1643ce6707d Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 19 Nov 2024 18:26:29 +0000 Subject: [PATCH 1/5] add support for custom iteration values Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index d53409f5..c9c22df2 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -13,23 +13,31 @@ def __init__(self, cfg, test_executions_dict, args): self.args = args self.test_executions = test_executions_dict self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {} - self.accumulated_iterations: Dict[str, int] = {} + self.accumulated_iterations: Dict[str, Dict[str, int]] = {} self.metrics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"] self.test_store = metrics.test_execution_store(self.config) self.cwd = cfg.opts("node", "benchmark.cwd") - def count_iterations_for_each_op(self) -> None: + def count_iterations_for_each_op(self, test_execution) -> None: loaded_workload = workload.load_workload(self.config) test_procedure_name = self.config.opts("workload", "test_procedure.name") test_procedure_found = False + workload_params = {} if getattr(test_execution, 'workload_params') is None else test_execution.workload_params + + test_execution_id = test_execution.test_execution_id + self.accumulated_iterations[test_execution_id] = {} for test_procedure in loaded_workload.test_procedures: if test_procedure.name == test_procedure_name: test_procedure_found = True for task in test_procedure.schedule: task_name = task.name - iterations = task.iterations or 1 - self.accumulated_iterations[task_name] = self.accumulated_iterations.get(task_name, 0) + iterations + custom_key = f"{task_name}_iterations" + if custom_key in workload_params: + iterations = int(workload_params[custom_key]) + else: + iterations = task.iterations or 1 + self.accumulated_iterations[test_execution_id][task_name] = iterations else: continue # skip to the next test procedure if the name doesn't match @@ -120,7 +128,7 @@ def build_aggregated_results(self): for task, task_metrics in self.accumulated_results.items(): iterations = self.accumulated_iterations.get(task, 1) - aggregated_task_metrics = self.calculate_weighted_average(task_metrics, iterations) + aggregated_task_metrics = self.calculate_weighted_average(task_metrics, task) op_metric = { "task": task, "operation": task, @@ -191,10 +199,14 @@ def build_aggregated_results(self): return test_execution - def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]: + def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_name: str) -> Dict[str, Any]: weighted_metrics = {} num_executions = len(next(iter(task_metrics.values()))) - total_iterations = iterations * num_executions + + # Get iterations for each test execution + iterations_per_execution = [self.accumulated_iterations[test_id][task_name] + for test_id in self.test_executions.keys()] + total_iterations = sum(iterations_per_execution) for metric, values in task_metrics.items(): if isinstance(values[0], dict): @@ -209,10 +221,10 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati else: # for items like median or containing percentile values item_values = [value.get(metric_field, 0) for value in values] - weighted_sum = sum(value * iterations for value in item_values) + weighted_sum = sum(value * iterations for value, iterations in zip(item_values, iterations_per_execution)) weighted_metrics[metric][metric_field] = weighted_sum / total_iterations else: - weighted_sum = sum(value * iterations for value in values) + weighted_sum = sum(value * iterations for value, iterations in zip(values, iterations_per_execution)) weighted_metrics[metric] = weighted_sum / total_iterations return weighted_metrics @@ -256,7 +268,7 @@ def aggregate(self) -> None: if test_execution: self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository) self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload) - self.count_iterations_for_each_op() + self.count_iterations_for_each_op(test_execution) self.accumulate_results(test_execution) aggregated_results = self.build_aggregated_results() From dd4f92b65be2c2d2305f872a1ae63e7bc58d53b1 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 19 Nov 2024 20:04:05 +0000 Subject: [PATCH 2/5] update unit tests for aggregator class Signed-off-by: Michael Oviedo --- tests/aggregator_test.py | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index 6436fe42..10bb54d8 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -50,15 +50,18 @@ def test_count_iterations_for_each_op(aggregator): mock_test_procedure.schedule = mock_schedule mock_workload.test_procedures = [mock_test_procedure] - # Update the config mock to return the correct test_procedure_name + mock_test_execution = Mock(test_execution_id="test1", workload_params={}) + + # update the config mock to return the correct test_procedure_name aggregator.config.opts.side_effect = lambda *args: \ mock_test_procedure.name if args == ("workload", "test_procedure.name") else "/path/to/root" with patch('osbenchmark.workload.load_workload', return_value=mock_workload): - aggregator.count_iterations_for_each_op() + aggregator.count_iterations_for_each_op(mock_test_execution) - print(f"accumulated_iterations: {aggregator.accumulated_iterations}") # Debug print - assert "op1" in aggregator.accumulated_iterations, "op1 not found in accumulated_iterations" - assert aggregator.accumulated_iterations["op1"] == 5 + print(f"accumulated_iterations: {aggregator.accumulated_iterations}") + assert "test1" in aggregator.accumulated_iterations, "test1 not found in accumulated_iterations" + assert "op1" in aggregator.accumulated_iterations["test1"], "op1 not found in accumulated_iterations for test1" + assert aggregator.accumulated_iterations["test1"]["op1"] == 5 def test_accumulate_results(aggregator): mock_test_execution = Mock() @@ -103,12 +106,19 @@ def test_calculate_weighted_average(aggregator): "throughput": [100, 200], "latency": [{"avg": 10, "unit": "ms"}, {"avg": 20, "unit": "ms"}] } - iterations = 2 + task_name = "op1" + + # set up accumulated_iterations + aggregator.accumulated_iterations = { + "test1": {"op1": 2}, + "test2": {"op1": 3} + } + aggregator.test_executions = {"test1": Mock(), "test2": Mock()} - result = aggregator.calculate_weighted_average(task_metrics, iterations) + result = aggregator.calculate_weighted_average(task_metrics, task_name) - assert result["throughput"] == 150 - assert result["latency"]["avg"] == 15 + assert result["throughput"] == 160 # (100*2 + 200*3) / (2+3) + assert result["latency"]["avg"] == 16 # (10*2 + 20*3) / (2+3) assert result["latency"]["unit"] == "ms" def test_calculate_rsd(aggregator): @@ -141,6 +151,12 @@ def test_aggregate(aggregator): mock_store = mock_store_class.return_value mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}") + # mock test_store to return a Mock object for each test execution + aggregator.test_store.find_by_test_execution_id.side_effect = [ + Mock(test_execution_id="test1", workload_params={}), + Mock(test_execution_id="test2", workload_params={}) + ] + aggregator.aggregate() print(f"mock_build called: {mock_build.called}") From 460fa64354ea394635ffa44493d74d1c40349e0c Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 19 Nov 2024 20:11:10 +0000 Subject: [PATCH 3/5] remove print statements from aggregator tests Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 6 ++---- tests/aggregator_test.py | 15 ++------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index c9c22df2..98702b54 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -127,7 +127,6 @@ def build_aggregated_results(self): } for task, task_metrics in self.accumulated_results.items(): - iterations = self.accumulated_iterations.get(task, 1) aggregated_task_metrics = self.calculate_weighted_average(task_metrics, task) op_metric = { "task": task, @@ -201,10 +200,9 @@ def build_aggregated_results(self): def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_name: str) -> Dict[str, Any]: weighted_metrics = {} - num_executions = len(next(iter(task_metrics.values()))) - + # Get iterations for each test execution - iterations_per_execution = [self.accumulated_iterations[test_id][task_name] + iterations_per_execution = [self.accumulated_iterations[test_id][task_name] for test_id in self.test_executions.keys()] total_iterations = sum(iterations_per_execution) diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index 10bb54d8..06508f6a 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -1,4 +1,4 @@ -from unittest.mock import Mock, patch, mock_open +from unittest.mock import Mock, patch import pytest from osbenchmark import config from osbenchmark.aggregator import Aggregator, AggregatedResults @@ -58,7 +58,6 @@ def test_count_iterations_for_each_op(aggregator): with patch('osbenchmark.workload.load_workload', return_value=mock_workload): aggregator.count_iterations_for_each_op(mock_test_execution) - print(f"accumulated_iterations: {aggregator.accumulated_iterations}") assert "test1" in aggregator.accumulated_iterations, "test1 not found in accumulated_iterations" assert "op1" in aggregator.accumulated_iterations["test1"], "op1 not found in accumulated_iterations for test1" assert aggregator.accumulated_iterations["test1"]["op1"] == 5 @@ -144,9 +143,7 @@ def test_aggregate(aggregator): patch.object(aggregator, 'count_iterations_for_each_op'), \ patch.object(aggregator, 'accumulate_results'), \ patch.object(aggregator, 'build_aggregated_results', return_value=mock_aggregated_results) as mock_build, \ - patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class, \ - patch('osbenchmark.utils.io.ensure_dir') as mock_ensure_dir, \ - patch('builtins.open', mock_open()) as mock_file: + patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class: mock_store = mock_store_class.return_value mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}") @@ -159,17 +156,9 @@ def test_aggregate(aggregator): aggregator.aggregate() - print(f"mock_build called: {mock_build.called}") - print(f"mock_store.store_aggregated_execution called: {mock_store.store_aggregated_execution.called}") - assert mock_build.called, "build_aggregated_results was not called" mock_store.store_aggregated_execution.assert_called_once_with(mock_aggregated_results) - print(f"ensure_dir called: {mock_ensure_dir.called}") - print(f"ensure_dir call args: {mock_ensure_dir.call_args_list}") - print(f"open called: {mock_file.called}") - print(f"open call args: {mock_file.call_args_list}") - assert mock_store.store_aggregated_execution.called, "store_aggregated_execution was not called" def test_aggregated_results(): From 388ee35d20ca1c82134df306f801e0900cd7a4ef Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 3 Dec 2024 01:23:03 +0000 Subject: [PATCH 4/5] refactor based on ians comments Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 47 ++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 98702b54..be9f84fb 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -17,32 +17,28 @@ def __init__(self, cfg, test_executions_dict, args): self.metrics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"] self.test_store = metrics.test_execution_store(self.config) self.cwd = cfg.opts("node", "benchmark.cwd") + self.test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + self.test_procedure_name = self.test_execution.test_procedure + self.loaded_workload = None def count_iterations_for_each_op(self, test_execution) -> None: - loaded_workload = workload.load_workload(self.config) - test_procedure_name = self.config.opts("workload", "test_procedure.name") - test_procedure_found = False - workload_params = {} if getattr(test_execution, 'workload_params') is None else test_execution.workload_params + matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None) + workload_params = getattr(test_execution, 'workload_params', {}) test_execution_id = test_execution.test_execution_id self.accumulated_iterations[test_execution_id] = {} - for test_procedure in loaded_workload.test_procedures: - if test_procedure.name == test_procedure_name: - test_procedure_found = True - for task in test_procedure.schedule: - task_name = task.name - custom_key = f"{task_name}_iterations" - if custom_key in workload_params: - iterations = int(workload_params[custom_key]) - else: - iterations = task.iterations or 1 - self.accumulated_iterations[test_execution_id][task_name] = iterations - else: - continue # skip to the next test procedure if the name doesn't match - - if not test_procedure_found: - raise ValueError(f"Test procedure '{test_procedure_name}' not found in the loaded workload.") + if matching_test_procedure: + for task in matching_test_procedure.schedule: + task_name = task.name + task_name_iterations = f"{task_name}_iterations" + if task_name_iterations in workload_params: + iterations = int(workload_params[task_name_iterations]) + else: + iterations = task.iterations or 1 + self.accumulated_iterations[test_execution_id][task_name] = iterations + else: + raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.") def accumulate_results(self, test_execution: Any) -> None: for item in test_execution.results.get("op_metrics", []): @@ -184,9 +180,9 @@ def build_aggregated_results(self): self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles) loaded_workload = workload.load_workload(self.config) - test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure) + test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name) - test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision) + test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure_object, test_exe.workload_revision) test_execution.user_tags = { "aggregation-of-runs": list(self.test_executions.keys()) } @@ -256,16 +252,17 @@ def test_execution_compatibility_check(self) -> None: else: raise ValueError(f"Test execution not found: {id}. Ensure that all provided test IDs are valid.") - self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", first_test_execution.test_procedure) + self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", self.test_procedure_name) return True def aggregate(self) -> None: if self.test_execution_compatibility_check(): + self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository) + self.config.add(config.Scope.applicationOverride, "workload", "workload.name", self.test_execution.workload) + self.loaded_workload = workload.load_workload(self.config) for id in self.test_executions.keys(): test_execution = self.test_store.find_by_test_execution_id(id) if test_execution: - self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository) - self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload) self.count_iterations_for_each_op(test_execution) self.accumulate_results(test_execution) From 9d99c0e8e8da0ad769e3dbef2b5616a570cb7f91 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 3 Dec 2024 18:29:52 +0000 Subject: [PATCH 5/5] fix unit tests Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 6 ++++-- tests/aggregator_test.py | 36 +++++------------------------------- 2 files changed, 9 insertions(+), 33 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index be9f84fb..58b0682a 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -17,8 +17,8 @@ def __init__(self, cfg, test_executions_dict, args): self.metrics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"] self.test_store = metrics.test_execution_store(self.config) self.cwd = cfg.opts("node", "benchmark.cwd") - self.test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) - self.test_procedure_name = self.test_execution.test_procedure + self.test_execution = None + self.test_procedure_name = None self.loaded_workload = None def count_iterations_for_each_op(self, test_execution) -> None: @@ -257,6 +257,8 @@ def test_execution_compatibility_check(self) -> None: def aggregate(self) -> None: if self.test_execution_compatibility_check(): + self.test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + self.test_procedure_name = self.test_execution.test_procedure self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository) self.config.add(config.Scope.applicationOverride, "workload", "workload.name", self.test_execution.workload) self.loaded_workload = workload.load_workload(self.config) diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index 06508f6a..32be06cb 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -1,4 +1,4 @@ -from unittest.mock import Mock, patch +from unittest.mock import Mock import pytest from osbenchmark import config from osbenchmark.aggregator import Aggregator, AggregatedResults @@ -52,11 +52,10 @@ def test_count_iterations_for_each_op(aggregator): mock_test_execution = Mock(test_execution_id="test1", workload_params={}) - # update the config mock to return the correct test_procedure_name - aggregator.config.opts.side_effect = lambda *args: \ - mock_test_procedure.name if args == ("workload", "test_procedure.name") else "/path/to/root" - with patch('osbenchmark.workload.load_workload', return_value=mock_workload): - aggregator.count_iterations_for_each_op(mock_test_execution) + aggregator.loaded_workload = mock_workload + aggregator.test_procedure_name = "test_procedure_name" + + aggregator.count_iterations_for_each_op(mock_test_execution) assert "test1" in aggregator.accumulated_iterations, "test1 not found in accumulated_iterations" assert "op1" in aggregator.accumulated_iterations["test1"], "op1 not found in accumulated_iterations for test1" @@ -136,31 +135,6 @@ def test_test_execution_compatibility_check_incompatible(aggregator): with pytest.raises(ValueError): aggregator.test_execution_compatibility_check() -def test_aggregate(aggregator): - mock_aggregated_results = Mock(test_execution_id="mock_id", as_dict=lambda: {}) - - with patch.object(aggregator, 'test_execution_compatibility_check', return_value=True), \ - patch.object(aggregator, 'count_iterations_for_each_op'), \ - patch.object(aggregator, 'accumulate_results'), \ - patch.object(aggregator, 'build_aggregated_results', return_value=mock_aggregated_results) as mock_build, \ - patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class: - - mock_store = mock_store_class.return_value - mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}") - - # mock test_store to return a Mock object for each test execution - aggregator.test_store.find_by_test_execution_id.side_effect = [ - Mock(test_execution_id="test1", workload_params={}), - Mock(test_execution_id="test2", workload_params={}) - ] - - aggregator.aggregate() - - assert mock_build.called, "build_aggregated_results was not called" - mock_store.store_aggregated_execution.assert_called_once_with(mock_aggregated_results) - - assert mock_store.store_aggregated_execution.called, "store_aggregated_execution was not called" - def test_aggregated_results(): results = {"key": "value"} agg_results = AggregatedResults(results)