From b5da7f2fcc4e782da55cdceeb0f6cddd64ce3a6e Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 17 Sep 2024 21:12:08 +0000 Subject: [PATCH] revision for aggregate PR Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 116 ++++++++++++++++++++++---------------- osbenchmark/benchmark.py | 23 ++++++-- osbenchmark/metrics.py | 2 +- tests/aggregator_test.py | 55 +++++++++++------- 4 files changed, 122 insertions(+), 74 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 0769e777c..94d614c29 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -1,39 +1,41 @@ +import os from typing import Any, Dict, List, Union import uuid from osbenchmark.metrics import FileTestExecutionStore from osbenchmark import metrics, workload, config +from osbenchmark.utils import io as rio class Aggregator: - def __init__(self, cfg, test_executions_dict): + def __init__(self, cfg, test_executions_dict, args): self.config = cfg + self.args = args self.test_executions = test_executions_dict self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {} self.accumulated_iterations: Dict[str, int] = {} + self.statistics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"] + self.test_store = metrics.test_execution_store(self.config) + self.cwd = cfg.opts("node", "benchmark.cwd") - # count iterations for each operation in the workload - def iterations(self) -> None: + def count_iterations_for_each_op(self) -> None: loaded_workload = workload.load_workload(self.config) - for task in loaded_workload.test_procedures: - for operation in task.schedule: - operation_name = operation.name - iterations = operation.iterations or 1 - self.accumulated_iterations.setdefault(operation_name, 0) - self.accumulated_iterations[operation_name] += iterations - - # accumulate metrics for each task from test execution results - def results(self, test_execution: Any) -> None: + for test_procedure in loaded_workload.test_procedures: + for task in test_procedure.schedule: + task_name = task.name + iterations = task.iterations or 1 + self.accumulated_iterations.setdefault(task_name, 0) + self.accumulated_iterations[task_name] += iterations + + def accumulate_results(self, test_execution: Any) -> None: for item in test_execution.results.get("op_metrics", []): task = item.get("task", "") self.accumulated_results.setdefault(task, {}) - for metric in ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]: + for metric in self.statistics: self.accumulated_results[task].setdefault(metric, []) self.accumulated_results[task][metric].append(item.get(metric)) - # aggregate values from multiple test execution result JSON objects by a specified key path def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: - test_store = metrics.test_execution_store(self.config) - all_jsons = [test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] + all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] # retrieve nested value from a dictionary given a key path def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: @@ -46,7 +48,6 @@ def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: return None return obj - # recursively aggregate values, handling different data types def aggregate_helper(objects: List[Any]) -> Any: if not objects: return None @@ -67,9 +68,8 @@ def aggregate_helper(objects: List[Any]) -> Any: values = [get_nested_value(json, key_path) for json in all_jsons] return aggregate_helper(values) - # construct aggregated results dict - def build_aggregated_results(self, test_store): - test_exe = test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + def build_aggregated_results(self): + test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) aggregated_results = { "op-metrics": [], "correctness_metrics": self.aggregate_json_by_key("correctness_metrics"), @@ -125,31 +125,42 @@ def build_aggregated_results(self, test_store): aggregated_results["op-metrics"].append(op_metric) # extract the necessary data from the first test execution, since the configurations should be identical for all test executions - test_exe_store = metrics.test_execution_store(self.config) - first_test_execution = test_exe_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) current_timestamp = self.config.opts("system", "time.start") + if hasattr(self.args, 'results_file') and self.args.results_file != "": + normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd) + # ensure that the parent folder already exists when we try to write the file... + rio.ensure_dir(rio.dirname(normalized_results_file)) + test_execution_id = os.path.basename(normalized_results_file) + self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", normalized_results_file) + elif hasattr(self.args, 'test_execution_id') and self.args.test_execution_id: + test_execution_id = f"aggregate_results_{test_exe.workload}_{self.args.test_execution_id}" + self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id) + else: + test_execution_id = f"aggregate_results_{test_exe.workload}_{str(uuid.uuid4())}" + self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id) + + print("Aggregate test execution ID: ", test_execution_id) + # add values to the configuration object self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.names", first_test_execution.provision_config_instance) + "provision_config_instance.names", test_exe.provision_config_instance) self.config.add(config.Scope.applicationOverride, "system", - "env.name", first_test_execution.environment_name) - self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", - f"aggregate_results_{first_test_execution.workload}_{str(uuid.uuid4())}") + "env.name", test_exe.environment_name) self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) - self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", first_test_execution.pipeline) - self.config.add(config.Scope.applicationOverride, "workload", "params", first_test_execution.workload_params) + self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline) + self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params) self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.params", first_test_execution.provision_config_instance_params) - self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", first_test_execution.plugin_params) - self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", first_test_execution.latency_percentiles) - self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", first_test_execution.throughput_percentiles) + "provision_config_instance.params", test_exe.provision_config_instance_params) + self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params) + self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles) + self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles) loaded_workload = workload.load_workload(self.config) - test_procedure = loaded_workload.find_test_procedure_or_default(first_test_execution.test_procedure) + test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure) - test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, first_test_execution.workload_revision) - test_execution.add_results(aggregated_results) + test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision) + test_execution.add_results(AggregatedResults(aggregated_results)) test_execution.distribution_version = test_exe.distribution_version test_execution.revision = test_exe.revision test_execution.distribution_flavor = test_exe.distribution_flavor @@ -157,7 +168,6 @@ def build_aggregated_results(self, test_store): return test_execution - # calculate weighted averages for task metrics def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]: weighted_metrics = {} @@ -184,33 +194,43 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati weighted_metrics[metric] = sum(values) / len(values) return weighted_metrics - # verify that all test executions have the same workload - def compatibility_check(self, test_store) -> None: - first_test_execution = test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + def test_execution_compatibility_check(self) -> None: + first_test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) workload = first_test_execution.workload + test_procedure = first_test_execution.test_procedure for id in self.test_executions.keys(): - test_execution = test_store.find_by_test_execution_id(id) + test_execution = self.test_store.find_by_test_execution_id(id) if test_execution: if test_execution.workload != workload: raise ValueError(f"Incompatible workload: test {id} has workload '{test_execution.workload}' instead of '{workload}'") + if test_execution.test_procedure != test_procedure: + raise ValueError(f"Incompatible test procedure: test {id} has test procedure + '{test_execution.test_procedure}' instead of '{test_procedure}'") else: raise ValueError("Test execution not found: ", id) + + self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", first_test_execution.test_procedure) return True - # driver code def aggregate(self) -> None: - test_execution_store = metrics.test_execution_store(self.config) - if self.compatibility_check(test_execution_store): + if self.test_execution_compatibility_check(): for id in self.test_executions.keys(): - test_execution = test_execution_store.find_by_test_execution_id(id) + test_execution = self.test_store.find_by_test_execution_id(id) if test_execution: - self.config.add(config.Scope.applicationOverride, "workload", "repository.name", "default") + self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository) self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload) - self.iterations() - self.results(test_execution) + self.count_iterations_for_each_op() + self.accumulate_results(test_execution) - aggregated_results = self.build_aggregated_results(test_execution_store) + aggregated_results = self.build_aggregated_results() file_test_exe_store = FileTestExecutionStore(self.config) file_test_exe_store.store_test_execution(aggregated_results) else: raise ValueError("Incompatible test execution results") + +class AggregatedResults: + def __init__(self, results): + self.results = results + + def as_dict(self): + return self.results diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 46588cc1f..32f28e902 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -229,6 +229,18 @@ def add_workload_source(subparser): type=non_empty_list, required=True, help="Comma-separated list of TestExecution IDs to aggregate") + aggregate_parser.add_argument( + "--test-execution-id", + help="Define a unique id for this aggregated test_execution.", + default="") + aggregate_parser.add_argument( + "--results-file", + help="Write the aggregated results to the provided file.", + default="") + aggregate_parser.add_argument( + "--workload-repository", + help="Define the repository from where OSB will load workloads (default: default).", + default="default") download_parser = subparsers.add_parser("download", help="Downloads an artifact") download_parser.add_argument( @@ -841,10 +853,11 @@ def configure_results_publishing_params(args, cfg): cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file) cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align) -def prepare_test_executions_dict(test_executions_arg): +def prepare_test_executions_dict(args, cfg): + cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file) test_executions_dict = {} - if test_executions_arg: - for execution in test_executions_arg: + if args.test_executions: + for execution in args.test_executions: execution = execution.strip() if execution: test_executions_dict[execution] = None @@ -865,8 +878,8 @@ def dispatch_sub_command(arg_parser, args, cfg): cfg.add(config.Scope.applicationOverride, "results_publishing", "percentiles", args.percentiles) results_publisher.compare(cfg, args.baseline, args.contender) elif sub_command == "aggregate": - test_executions_dict = prepare_test_executions_dict(args.test_executions) - aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict) + test_executions_dict = prepare_test_executions_dict(args, cfg) + aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args) aggregator_instance.aggregate() elif sub_command == "list": cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration) diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index 79ce7bd7e..80751224b 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1428,7 +1428,7 @@ def as_dict(self): } } if self.results: - d["results"] = self.results if isinstance(self.results, dict) else self.results.as_dict() + d["results"] = self.results.as_dict() if self.workload_revision: d["workload-revision"] = self.workload_revision if not self.test_procedure.auto_generated: diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index f7aab9b5a..6a6e5d042 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -6,7 +6,9 @@ @pytest.fixture def mock_config(): - return Mock(spec=config.Config) + mock_cfg = Mock(spec=config.Config) + mock_cfg.opts.side_effect = lambda *args: "/path/to/root" if args == ("node", "root.dir") else None + return mock_cfg @pytest.fixture def mock_test_executions(): @@ -16,8 +18,27 @@ def mock_test_executions(): } @pytest.fixture -def aggregator(mock_config, mock_test_executions): - return Aggregator(mock_config, mock_test_executions) +def mock_args(): + return Mock( + results_file="", + test_execution_id="", + workload_repository="default" + ) + +@pytest.fixture +def mock_test_store(): + mock_store = Mock() + mock_store.find_by_test_execution_id.side_effect = [ + Mock(results={"key1": {"nested": 10}}), + Mock(results={"key1": {"nested": 20}}) + ] + return mock_store + +@pytest.fixture +def aggregator(mock_config, mock_test_executions, mock_args, mock_test_store): + aggregator = Aggregator(mock_config, mock_test_executions, mock_args) + aggregator.test_store = mock_test_store + return aggregator def test_iterations(aggregator): mock_workload = Mock() @@ -27,7 +48,7 @@ def test_iterations(aggregator): mock_workload.test_procedures = [mock_task] with patch('osbenchmark.workload.load_workload', return_value=mock_workload): - aggregator.iterations() + aggregator.count_iterations_for_each_op() assert aggregator.accumulated_iterations == {mock_operation.name: 5} @@ -48,7 +69,7 @@ def test_results(aggregator): ] } - aggregator.results(mock_test_execution) + aggregator.accumulate_results(mock_test_execution) assert "task1" in aggregator.accumulated_results assert all(metric in aggregator.accumulated_results["task1"] for metric in @@ -56,15 +77,7 @@ def test_results(aggregator): "processing_time", "error_rate", "duration"]) def test_aggregate_json_by_key(aggregator): - mock_test_store = Mock() - mock_test_store.find_by_test_execution_id.side_effect = [ - Mock(results={"key1": {"nested": 10}}), - Mock(results={"key1": {"nested": 20}}) - ] - - with patch('osbenchmark.metrics.test_execution_store', return_value=mock_test_store): - result = aggregator.aggregate_json_by_key("key1.nested") - + result = aggregator.aggregate_json_by_key("key1.nested") assert result == 15 def test_calculate_weighted_average(aggregator): @@ -81,14 +94,16 @@ def test_calculate_weighted_average(aggregator): assert result["latency"]["unit"] == "ms" def test_compatibility_check(aggregator): + mock_test_procedure = Mock(name="test_procedure") mock_test_store = Mock() mock_test_store.find_by_test_execution_id.side_effect = [ - Mock(workload="workload1"), - Mock(workload="workload1"), - Mock(workload="workload1") + Mock(workload="workload1", test_procedure=mock_test_procedure), + Mock(workload="workload1", test_procedure=mock_test_procedure), + Mock(workload="workload1", test_procedure=mock_test_procedure) ] + aggregator.test_store = mock_test_store + assert aggregator.test_execution_compatibility_check() - assert aggregator.compatibility_check(mock_test_store) def test_compatibility_check_incompatible(aggregator): mock_test_store = Mock() @@ -97,6 +112,6 @@ def test_compatibility_check_incompatible(aggregator): Mock(workload="workload2"), Mock(workload="workload1") ] - + aggregator.test_store = mock_test_store with pytest.raises(ValueError): - aggregator.compatibility_check(mock_test_store) + aggregator.test_execution_compatibility_check()