From 9152dba0b28e86dcd988c8d006a1b7baf8e5164b Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Wed, 16 Oct 2024 22:13:27 +0000 Subject: [PATCH 1/6] store aggregated results in separate folder Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 2 +- osbenchmark/metrics.py | 8 ++++++++ osbenchmark/paths.py | 4 ++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 02676daa6..dbc29d578 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -241,7 +241,7 @@ def aggregate(self) -> None: aggregated_results = self.build_aggregated_results() file_test_exe_store = FileTestExecutionStore(self.config) - file_test_exe_store.store_test_execution(aggregated_results) + file_test_exe_store.store_aggregated_execution(aggregated_results) else: raise ValueError("Incompatible test execution results") diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index 80751224b..dd91f1d8e 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1550,6 +1550,14 @@ def store_test_execution(self, test_execution): with open(self._test_execution_file(), mode="wt", encoding="utf-8") as f: f.write(json.dumps(doc, indent=True, ensure_ascii=False)) + def store_aggregated_execution(self, test_execution): + doc = test_execution.as_dict() + aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_execution_id=test_execution.test_execution_id) + io.ensure_dir(aggregated_execution_path) + aggregated_file = os.path.join(aggregated_execution_path, "test_execution.json") + with open(aggregated_file, mode="wt", encoding="utf-8") as f: + f.write(json.dumps(doc, indent=True, ensure_ascii=False)) + def _test_execution_file(self, test_execution_id=None): return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json") diff --git a/osbenchmark/paths.py b/osbenchmark/paths.py index 869c7f9c3..305275fe7 100644 --- a/osbenchmark/paths.py +++ b/osbenchmark/paths.py @@ -42,6 +42,10 @@ def test_execution_root(cfg, test_execution_id=None): test_execution_id = cfg.opts("system", "test_execution.id") return os.path.join(test_excecutions_root(cfg), test_execution_id) +def aggregated_results_root(cfg, test_execution_id=None): + if not test_execution_id: + test_execution_id = cfg.opts("system", "test_execution.id") + return os.path.join(cfg.opts("node", "root.dir"), "aggregated_results", test_execution_id) def install_root(cfg=None): install_id = cfg.opts("system", "install.id") From 63067671f8102df964400763d6eec4cca4a50fb7 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Wed, 23 Oct 2024 18:44:04 +0000 Subject: [PATCH 2/6] update aggregated test exe json file name Signed-off-by: Michael Oviedo --- osbenchmark/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index dd91f1d8e..6e94ce134 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1554,7 +1554,7 @@ def store_aggregated_execution(self, test_execution): doc = test_execution.as_dict() aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_execution_id=test_execution.test_execution_id) io.ensure_dir(aggregated_execution_path) - aggregated_file = os.path.join(aggregated_execution_path, "test_execution.json") + aggregated_file = os.path.join(aggregated_execution_path, "aggregated_test_execution.json") with open(aggregated_file, mode="wt", encoding="utf-8") as f: f.write(json.dumps(doc, indent=True, ensure_ascii=False)) From 447b7d6dffdf716cfd7d03185500541cfc265b02 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Wed, 23 Oct 2024 19:31:39 +0000 Subject: [PATCH 3/6] update OSB to find aggregated test results Signed-off-by: Michael Oviedo --- osbenchmark/metrics.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index 6e94ce134..88fc9c06f 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1558,16 +1558,22 @@ def store_aggregated_execution(self, test_execution): with open(aggregated_file, mode="wt", encoding="utf-8") as f: f.write(json.dumps(doc, indent=True, ensure_ascii=False)) - def _test_execution_file(self, test_execution_id=None): - return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json") + def _test_execution_file(self, test_execution_id=None, is_aggregated=False): + if is_aggregated: + return os.path.join(paths.aggregated_results_root(cfg=self.cfg, test_execution_id=test_execution_id), + "aggregated_test_execution.json") + else: + return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json") def list(self): results = glob.glob(self._test_execution_file(test_execution_id="*")) - all_test_executions = self._to_test_executions(results) + aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True)) + all_test_executions = self._to_test_executions(results + aggregated_results) return all_test_executions[:self._max_results()] def find_by_test_execution_id(self, test_execution_id): - test_execution_file = self._test_execution_file(test_execution_id=test_execution_id) + is_aggregated = test_execution_id.startswith('aggregate') + test_execution_file = self._test_execution_file(test_execution_id=test_execution_id, is_aggregated=is_aggregated) if io.exists(test_execution_file): test_executions = self._to_test_executions([test_execution_file]) if test_executions: From f8b8c8260f5ae2954aa73b5bc3fc8d00c503bd2f Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Thu, 24 Oct 2024 19:24:07 +0000 Subject: [PATCH 4/6] add separate option to list aggregations Signed-off-by: Michael Oviedo --- osbenchmark/benchmark.py | 5 ++++- osbenchmark/metrics.py | 46 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 5334abe29..67684a171 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -120,7 +120,8 @@ def add_workload_source(subparser): metavar="configuration", help="The configuration for which Benchmark should show the available options. " "Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins", - choices=["telemetry", "workloads", "pipelines", "test_executions", "provision_config_instances", "opensearch-plugins"]) + choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregations", + "provision_config_instances", "opensearch-plugins"]) list_parser.add_argument( "--limit", help="Limit the number of search results for recent test_executions (default: 10).", @@ -697,6 +698,8 @@ def dispatch_list(cfg): test_execution_orchestrator.list_pipelines() elif what == "test_executions": metrics.list_test_executions(cfg) + elif what == "aggregations": + metrics.list_aggregated_test_results(cfg) elif what == "provision_config_instances": provision_config.list_provision_config_instances(cfg) elif what == "opensearch-plugins": diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index 88fc9c06f..fa6282c97 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1315,6 +1315,45 @@ def format_dict(d): console.println("") console.println("No recent test_executions found.") +def list_aggregated_test_results(cfg): + def format_dict(d): + if d: + items = sorted(d.items()) + return ", ".join(["%s=%s" % (k, v) for k, v in items]) + else: + return None + + aggregated_test_executions = [] + for test_execution in test_execution_store(cfg).list_aggregations(): + aggregated_test_executions.append([ + test_execution.test_execution_id, + time.to_iso8601(test_execution.test_execution_timestamp), + test_execution.workload, + format_dict(test_execution.workload_params), + test_execution.test_procedure_name, + test_execution.provision_config_instance_name, + format_dict(test_execution.user_tags), + test_execution.workload_revision, + test_execution.provision_config_revision]) + + if len(aggregated_test_executions) > 0: + console.println("\nRecent aggregated test executions:\n") + console.println(tabulate.tabulate( + aggregated_test_executions, + headers=[ + "TestExecution ID", + "TestExecution Timestamp", + "Workload", + "Workload Parameters", + "TestProcedure", + "ProvisionConfigInstance", + "User Tags", + "workload Revision", + "Provision Config Revision" + ])) + else: + console.println("") + console.println("No recent aggregate tests found.") def create_test_execution(cfg, workload, test_procedure, workload_revision=None): provision_config_instance = cfg.opts("builder", "provision_config_instance.names") @@ -1567,10 +1606,13 @@ def _test_execution_file(self, test_execution_id=None, is_aggregated=False): def list(self): results = glob.glob(self._test_execution_file(test_execution_id="*")) - aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True)) - all_test_executions = self._to_test_executions(results + aggregated_results) + all_test_executions = self._to_test_executions(results) return all_test_executions[:self._max_results()] + def list_aggregations(self): + aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True)) + return self._to_test_executions(aggregated_results) + def find_by_test_execution_id(self, test_execution_id): is_aggregated = test_execution_id.startswith('aggregate') test_execution_file = self._test_execution_file(test_execution_id=test_execution_id, is_aggregated=is_aggregated) From 43a9cc5ebf46badabda3731fe9b6c3eadf64a87f Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Thu, 24 Oct 2024 19:58:32 +0000 Subject: [PATCH 5/6] change aggregations to aggregated_results Signed-off-by: Michael Oviedo --- osbenchmark/benchmark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 67684a171..aee837abb 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -120,7 +120,7 @@ def add_workload_source(subparser): metavar="configuration", help="The configuration for which Benchmark should show the available options. " "Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins", - choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregations", + choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregated_results", "provision_config_instances", "opensearch-plugins"]) list_parser.add_argument( "--limit", @@ -698,7 +698,7 @@ def dispatch_list(cfg): test_execution_orchestrator.list_pipelines() elif what == "test_executions": metrics.list_test_executions(cfg) - elif what == "aggregations": + elif what == "aggregated_results": metrics.list_aggregated_test_results(cfg) elif what == "provision_config_instances": provision_config.list_provision_config_instances(cfg) From 7156d479b0370ffd8d5370874dfebadfd880b17e Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Thu, 24 Oct 2024 22:03:57 +0000 Subject: [PATCH 6/6] abstract logic to list test executions + aggregated results Signed-off-by: Michael Oviedo --- osbenchmark/benchmark.py | 2 +- osbenchmark/metrics.py | 50 +++++++--------------------------------- 2 files changed, 9 insertions(+), 43 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index aee837abb..8af429a77 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -699,7 +699,7 @@ def dispatch_list(cfg): elif what == "test_executions": metrics.list_test_executions(cfg) elif what == "aggregated_results": - metrics.list_aggregated_test_results(cfg) + metrics.list_aggregated_results(cfg) elif what == "provision_config_instances": provision_config.list_provision_config_instances(cfg) elif what == "opensearch-plugins": diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index fa6282c97..e6e3c89fe 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -1275,7 +1275,7 @@ def results_store(cfg): return NoopResultsStore() -def list_test_executions(cfg): +def list_test_helper(store_item, title): def format_dict(d): if d: items = sorted(d.items()) @@ -1284,7 +1284,7 @@ def format_dict(d): return None test_executions = [] - for test_execution in test_execution_store(cfg).list(): + for test_execution in store_item: test_executions.append([ test_execution.test_execution_id, time.to_iso8601(test_execution.test_execution_timestamp), @@ -1297,7 +1297,7 @@ def format_dict(d): test_execution.provision_config_revision]) if len(test_executions) > 0: - console.println("\nRecent test_executions:\n") + console.println(f"\nRecent {title}:\n") console.println(tabulate.tabulate( test_executions, headers=[ @@ -1313,47 +1313,13 @@ def format_dict(d): ])) else: console.println("") - console.println("No recent test_executions found.") - -def list_aggregated_test_results(cfg): - def format_dict(d): - if d: - items = sorted(d.items()) - return ", ".join(["%s=%s" % (k, v) for k, v in items]) - else: - return None + console.println(f"No recent {title} found.") - aggregated_test_executions = [] - for test_execution in test_execution_store(cfg).list_aggregations(): - aggregated_test_executions.append([ - test_execution.test_execution_id, - time.to_iso8601(test_execution.test_execution_timestamp), - test_execution.workload, - format_dict(test_execution.workload_params), - test_execution.test_procedure_name, - test_execution.provision_config_instance_name, - format_dict(test_execution.user_tags), - test_execution.workload_revision, - test_execution.provision_config_revision]) +def list_test_executions(cfg): + list_test_helper(test_execution_store(cfg).list(), "test_executions") - if len(aggregated_test_executions) > 0: - console.println("\nRecent aggregated test executions:\n") - console.println(tabulate.tabulate( - aggregated_test_executions, - headers=[ - "TestExecution ID", - "TestExecution Timestamp", - "Workload", - "Workload Parameters", - "TestProcedure", - "ProvisionConfigInstance", - "User Tags", - "workload Revision", - "Provision Config Revision" - ])) - else: - console.println("") - console.println("No recent aggregate tests found.") +def list_aggregated_results(cfg): + list_test_helper(test_execution_store(cfg).list_aggregations(), "aggregated_results") def create_test_execution(cfg, workload, test_procedure, workload_revision=None): provision_config_instance = cfg.opts("builder", "provision_config_instance.names")