diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 02676daa..dbc29d57 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -241,7 +241,7 @@ def aggregate(self) -> None: aggregated_results = self.build_aggregated_results() file_test_exe_store = FileTestExecutionStore(self.config) - file_test_exe_store.store_test_execution(aggregated_results) + file_test_exe_store.store_aggregated_execution(aggregated_results) else: raise ValueError("Incompatible test execution results") diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 5334abe2..8af429a7 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -120,7 +120,8 @@ def add_workload_source(subparser): metavar="configuration", help="The configuration for which Benchmark should show the available options. " "Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins", - choices=["telemetry", "workloads", "pipelines", "test_executions", "provision_config_instances", "opensearch-plugins"]) + choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregated_results", + "provision_config_instances", "opensearch-plugins"]) list_parser.add_argument( "--limit", help="Limit the number of search results for recent test_executions (default: 10).", @@ -697,6 +698,8 @@ def dispatch_list(cfg): test_execution_orchestrator.list_pipelines() elif what == "test_executions": metrics.list_test_executions(cfg) + elif what == "aggregated_results": + metrics.list_aggregated_results(cfg) elif what == "provision_config_instances": provision_config.list_provision_config_instances(cfg) elif what == "opensearch-plugins": diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index 80751224..e6e3c89f 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1275,7 +1275,7 @@ def results_store(cfg): return NoopResultsStore() -def list_test_executions(cfg): +def list_test_helper(store_item, title): def format_dict(d): if d: items = sorted(d.items()) @@ -1284,7 +1284,7 @@ def format_dict(d): return None test_executions = [] - for test_execution in test_execution_store(cfg).list(): + for test_execution in store_item: test_executions.append([ test_execution.test_execution_id, time.to_iso8601(test_execution.test_execution_timestamp), @@ -1297,7 +1297,7 @@ def format_dict(d): test_execution.provision_config_revision]) if len(test_executions) > 0: - console.println("\nRecent test_executions:\n") + console.println(f"\nRecent {title}:\n") console.println(tabulate.tabulate( test_executions, headers=[ @@ -1313,8 +1313,13 @@ def format_dict(d): ])) else: console.println("") - console.println("No recent test_executions found.") + console.println(f"No recent {title} found.") + +def list_test_executions(cfg): + list_test_helper(test_execution_store(cfg).list(), "test_executions") +def list_aggregated_results(cfg): + list_test_helper(test_execution_store(cfg).list_aggregations(), "aggregated_results") def create_test_execution(cfg, workload, test_procedure, workload_revision=None): provision_config_instance = cfg.opts("builder", "provision_config_instance.names") @@ -1550,16 +1555,33 @@ def store_test_execution(self, test_execution): with open(self._test_execution_file(), mode="wt", encoding="utf-8") as f: f.write(json.dumps(doc, indent=True, ensure_ascii=False)) - def _test_execution_file(self, test_execution_id=None): - return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json") + def store_aggregated_execution(self, test_execution): + doc = test_execution.as_dict() + aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_execution_id=test_execution.test_execution_id) + io.ensure_dir(aggregated_execution_path) + aggregated_file = os.path.join(aggregated_execution_path, "aggregated_test_execution.json") + with open(aggregated_file, mode="wt", encoding="utf-8") as f: + f.write(json.dumps(doc, indent=True, ensure_ascii=False)) + + def _test_execution_file(self, test_execution_id=None, is_aggregated=False): + if is_aggregated: + return os.path.join(paths.aggregated_results_root(cfg=self.cfg, test_execution_id=test_execution_id), + "aggregated_test_execution.json") + else: + return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json") def list(self): results = glob.glob(self._test_execution_file(test_execution_id="*")) all_test_executions = self._to_test_executions(results) return all_test_executions[:self._max_results()] + def list_aggregations(self): + aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True)) + return self._to_test_executions(aggregated_results) + def find_by_test_execution_id(self, test_execution_id): - test_execution_file = self._test_execution_file(test_execution_id=test_execution_id) + is_aggregated = test_execution_id.startswith('aggregate') + test_execution_file = self._test_execution_file(test_execution_id=test_execution_id, is_aggregated=is_aggregated) if io.exists(test_execution_file): test_executions = self._to_test_executions([test_execution_file]) if test_executions: diff --git a/osbenchmark/paths.py b/osbenchmark/paths.py index 869c7f9c..305275fe 100644 --- a/osbenchmark/paths.py +++ b/osbenchmark/paths.py @@ -42,6 +42,10 @@ def test_execution_root(cfg, test_execution_id=None): test_execution_id = cfg.opts("system", "test_execution.id") return os.path.join(test_excecutions_root(cfg), test_execution_id) +def aggregated_results_root(cfg, test_execution_id=None): + if not test_execution_id: + test_execution_id = cfg.opts("system", "test_execution.id") + return os.path.join(cfg.opts("node", "root.dir"), "aggregated_results", test_execution_id) def install_root(cfg=None): install_id = cfg.opts("system", "install.id")