diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index c94527c5..8199f875 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -193,6 +193,8 @@ def build_aggregated_results(self): def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]: weighted_metrics = {} + num_executions = len(next(iter(task_metrics.values()))) + total_iterations = iterations * num_executions for metric, values in task_metrics.items(): if isinstance(values[0], dict): @@ -200,23 +202,17 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati for item_key in values[0].keys(): if item_key == 'unit': weighted_metrics[metric][item_key] = values[0][item_key] + elif item_key == 'min': + weighted_metrics[metric]['overall_min'] = min(value.get(item_key, 0) for value in values) + elif item_key == 'max': + weighted_metrics[metric]['overall_max'] = max(value.get(item_key, 0) for value in values) else: + # for items like median or percentile values item_values = [value.get(item_key, 0) for value in values] - if item_key == 'min': - weighted_metrics[metric]['overall_min'] = min(item_values) - elif item_key == 'max': - weighted_metrics[metric]['overall_max'] = max(item_values) - elif item_key == 'median': - weighted_sum = sum(value * iterations for value in item_values) - total_iterations = iterations * len(item_values) - weighted_metrics[metric][item_key] = weighted_sum / total_iterations - else: - weighted_sum = sum(value * iterations for value in item_values) - total_iterations = iterations * len(item_values) - weighted_metrics[metric][item_key] = weighted_sum / total_iterations + weighted_sum = sum(value * iterations for value in item_values) + weighted_metrics[metric][item_key] = weighted_sum / total_iterations else: weighted_sum = sum(value * iterations for value in values) - total_iterations = iterations * len(values) weighted_metrics[metric] = weighted_sum / total_iterations return weighted_metrics diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index d40afc56..6436fe42 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -1,13 +1,12 @@ -from unittest.mock import patch, Mock +from unittest.mock import Mock, patch, mock_open import pytest - from osbenchmark import config -from osbenchmark.aggregator import Aggregator +from osbenchmark.aggregator import Aggregator, AggregatedResults @pytest.fixture def mock_config(): mock_cfg = Mock(spec=config.Config) - mock_cfg.opts.side_effect = lambda *args: "/path/to/root" if args == ("node", "root.dir") else None + mock_cfg.opts.side_effect = lambda *args: "test_procedure_name" if args == ("workload", "test_procedure.name") else "/path/to/root" return mock_cfg @pytest.fixture @@ -29,8 +28,8 @@ def mock_args(): def mock_test_store(): mock_store = Mock() mock_store.find_by_test_execution_id.side_effect = [ - Mock(results={"key1": {"nested": 10}}), - Mock(results={"key1": {"nested": 20}}) + Mock(results={"key1": {"nested": 10}}, workload="workload1", test_procedure="test_proc1"), + Mock(results={"key1": {"nested": 20}}, workload="workload1", test_procedure="test_proc1") ] return mock_store @@ -40,28 +39,35 @@ def aggregator(mock_config, mock_test_executions, mock_args, mock_test_store): aggregator.test_store = mock_test_store return aggregator -def test_iterations(aggregator, mock_args): +def test_count_iterations_for_each_op(aggregator): mock_workload = Mock() - mock_schedule = [Mock(name="op1", iterations=5)] - mock_task = Mock(name="task1", schedule=mock_schedule) - mock_workload.test_procedures = [mock_task] - - # Mock the config.opts call to return the same test_procedure.name - aggregator.config.opts.side_effect = lambda *args: mock_task.name if args == ("workload", "test_procedure.name") else None - + mock_task = Mock(spec=['name', 'iterations']) + mock_task.name = "op1" + mock_task.iterations = 5 + mock_schedule = [mock_task] + mock_test_procedure = Mock(spec=['name', 'schedule']) + mock_test_procedure.name = "test_procedure_name" + mock_test_procedure.schedule = mock_schedule + mock_workload.test_procedures = [mock_test_procedure] + + # Update the config mock to return the correct test_procedure_name + aggregator.config.opts.side_effect = lambda *args: \ + mock_test_procedure.name if args == ("workload", "test_procedure.name") else "/path/to/root" with patch('osbenchmark.workload.load_workload', return_value=mock_workload): aggregator.count_iterations_for_each_op() - assert list(aggregator.accumulated_iterations.values())[0] == 5 + print(f"accumulated_iterations: {aggregator.accumulated_iterations}") # Debug print + assert "op1" in aggregator.accumulated_iterations, "op1 not found in accumulated_iterations" + assert aggregator.accumulated_iterations["op1"] == 5 -def test_results(aggregator): +def test_accumulate_results(aggregator): mock_test_execution = Mock() mock_test_execution.results = { "op_metrics": [ { "task": "task1", "throughput": 100, - "latency": 10, + "latency": {"avg": 10, "unit": "ms"}, "service_time": 5, "client_processing_time": 2, "processing_time": 3, @@ -74,9 +80,19 @@ def test_results(aggregator): aggregator.accumulate_results(mock_test_execution) assert "task1" in aggregator.accumulated_results - assert all(metric in aggregator.accumulated_results["task1"] for metric in - ["throughput", "latency", "service_time", "client_processing_time", - "processing_time", "error_rate", "duration"]) + assert all(metric in aggregator.accumulated_results["task1"] for metric in aggregator.metrics) + +def test_test_execution_compatibility_check(aggregator): + mock_test_store = Mock() + mock_test_store.find_by_test_execution_id.side_effect = [ + Mock(workload="workload1", test_procedure="test_proc1"), + Mock(workload="workload1", test_procedure="test_proc1"), + Mock(workload="workload1", test_procedure="test_proc1"), # Add one more mock response + ] + aggregator.test_store = mock_test_store + aggregator.test_executions = {"test1": Mock(), "test2": Mock()} + + assert aggregator.test_execution_compatibility_check() def test_aggregate_json_by_key(aggregator): result = aggregator.aggregate_json_by_key("key1.nested") @@ -95,25 +111,52 @@ def test_calculate_weighted_average(aggregator): assert result["latency"]["avg"] == 15 assert result["latency"]["unit"] == "ms" -def test_compatibility_check(aggregator): - mock_test_procedure = Mock(name="test_procedure") - mock_test_store = Mock() - mock_test_store.find_by_test_execution_id.side_effect = [ - Mock(workload="workload1", test_procedure=mock_test_procedure), - Mock(workload="workload1", test_procedure=mock_test_procedure), - Mock(workload="workload1", test_procedure=mock_test_procedure) - ] - aggregator.test_store = mock_test_store - assert aggregator.test_execution_compatibility_check() - +def test_calculate_rsd(aggregator): + values = [1, 2, 3, 4, 5] + rsd = aggregator.calculate_rsd(values, "test_metric") + assert isinstance(rsd, float) -def test_compatibility_check_incompatible(aggregator): +def test_test_execution_compatibility_check_incompatible(aggregator): mock_test_store = Mock() mock_test_store.find_by_test_execution_id.side_effect = [ - Mock(workload="workload1"), - Mock(workload="workload2"), - Mock(workload="workload1") + Mock(workload="workload1", test_procedure="test_proc1"), + Mock(workload="workload2", test_procedure="test_proc1"), ] aggregator.test_store = mock_test_store + aggregator.test_executions = {"test1": Mock(), "test2": Mock()} with pytest.raises(ValueError): aggregator.test_execution_compatibility_check() + +def test_aggregate(aggregator): + mock_aggregated_results = Mock(test_execution_id="mock_id", as_dict=lambda: {}) + + with patch.object(aggregator, 'test_execution_compatibility_check', return_value=True), \ + patch.object(aggregator, 'count_iterations_for_each_op'), \ + patch.object(aggregator, 'accumulate_results'), \ + patch.object(aggregator, 'build_aggregated_results', return_value=mock_aggregated_results) as mock_build, \ + patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class, \ + patch('osbenchmark.utils.io.ensure_dir') as mock_ensure_dir, \ + patch('builtins.open', mock_open()) as mock_file: + + mock_store = mock_store_class.return_value + mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}") + + aggregator.aggregate() + + print(f"mock_build called: {mock_build.called}") + print(f"mock_store.store_aggregated_execution called: {mock_store.store_aggregated_execution.called}") + + assert mock_build.called, "build_aggregated_results was not called" + mock_store.store_aggregated_execution.assert_called_once_with(mock_aggregated_results) + + print(f"ensure_dir called: {mock_ensure_dir.called}") + print(f"ensure_dir call args: {mock_ensure_dir.call_args_list}") + print(f"open called: {mock_file.called}") + print(f"open call args: {mock_file.call_args_list}") + + assert mock_store.store_aggregated_execution.called, "store_aggregated_execution was not called" + +def test_aggregated_results(): + results = {"key": "value"} + agg_results = AggregatedResults(results) + assert agg_results.as_dict() == results diff --git a/tests/results_publisher_test.py b/tests/results_publisher_test.py index 98caa395..25544d40 100644 --- a/tests/results_publisher_test.py +++ b/tests/results_publisher_test.py @@ -23,10 +23,11 @@ # under the License. from unittest import TestCase +from unittest.mock import Mock, patch from osbenchmark import results_publisher - +# pylint: disable=protected-access class FormatterTests(TestCase): def setUp(self): self.empty_header = ["Header"] @@ -57,3 +58,58 @@ def test_formats_as_csv(self): formatted = results_publisher.format_as_csv(self.metrics_header, self.metrics_data) # 1 header line, no separation line + 3 data lines self.assertEqual(1 + 3, len(formatted.splitlines())) + + @patch('osbenchmark.results_publisher.convert.to_bool') + def test_publish_throughput_handles_different_metrics(self, mock_to_bool): + config = Mock() + + # Configure mock to return appropriate values for different calls + def config_opts_side_effect(*args, **kwargs): + if args[0] == "results_publishing": + if args[1] == "output.processingtime": + return False + elif args[1] == "percentiles": + return None + return Mock() + + config.opts.side_effect = config_opts_side_effect + + publisher = results_publisher.ComparisonResultsPublisher(config) + + # Mock for regular test execution + regular_stats = Mock() + regular_stats.metrics.return_value = { + "throughput": { + "min": 100, + "max": 200, + "mean": 150, + "median": 160, + "unit": "ops/s" + } + } + + # Mock for aggregated test execution + aggregated_stats = Mock() + aggregated_stats.metrics.return_value = { + "throughput": { + "overall_min": 95, + "overall_max": 205, + "min": 100, + "max": 200, + "mean": 150, + "median": 160, + "unit": "ops/s" + } + } + + # Test with regular stats + result_regular = publisher._publish_throughput(regular_stats, regular_stats, "test_task") + self.assertEqual(len(result_regular), 4) + self.assertEqual(result_regular[0][2], 100) # baseline min + self.assertEqual(result_regular[3][3], 200) # contender max + + # Test with aggregated stats + result_aggregated = publisher._publish_throughput(aggregated_stats, aggregated_stats, "test_task") + self.assertEqual(len(result_aggregated), 4) + self.assertEqual(result_aggregated[0][2], 95) # baseline overall_min + self.assertEqual(result_aggregated[3][3], 205) # contender overall_max