Skip to content

Commit

Permalink
store aggregated results in separate folder (#683)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <[email protected]>
  • Loading branch information
OVI3D0 authored Oct 25, 2024
1 parent 107d527 commit bf1debc
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def aggregate(self) -> None:

aggregated_results = self.build_aggregated_results()
file_test_exe_store = FileTestExecutionStore(self.config)
file_test_exe_store.store_test_execution(aggregated_results)
file_test_exe_store.store_aggregated_execution(aggregated_results)
else:
raise ValueError("Incompatible test execution results")

Expand Down
5 changes: 4 additions & 1 deletion osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def add_workload_source(subparser):
metavar="configuration",
help="The configuration for which Benchmark should show the available options. "
"Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins",
choices=["telemetry", "workloads", "pipelines", "test_executions", "provision_config_instances", "opensearch-plugins"])
choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregated_results",
"provision_config_instances", "opensearch-plugins"])
list_parser.add_argument(
"--limit",
help="Limit the number of search results for recent test_executions (default: 10).",
Expand Down Expand Up @@ -697,6 +698,8 @@ def dispatch_list(cfg):
test_execution_orchestrator.list_pipelines()
elif what == "test_executions":
metrics.list_test_executions(cfg)
elif what == "aggregated_results":
metrics.list_aggregated_results(cfg)
elif what == "provision_config_instances":
provision_config.list_provision_config_instances(cfg)
elif what == "opensearch-plugins":
Expand Down
36 changes: 29 additions & 7 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ def results_store(cfg):
return NoopResultsStore()


def list_test_executions(cfg):
def list_test_helper(store_item, title):
def format_dict(d):
if d:
items = sorted(d.items())
Expand All @@ -1284,7 +1284,7 @@ def format_dict(d):
return None

test_executions = []
for test_execution in test_execution_store(cfg).list():
for test_execution in store_item:
test_executions.append([
test_execution.test_execution_id,
time.to_iso8601(test_execution.test_execution_timestamp),
Expand All @@ -1297,7 +1297,7 @@ def format_dict(d):
test_execution.provision_config_revision])

if len(test_executions) > 0:
console.println("\nRecent test_executions:\n")
console.println(f"\nRecent {title}:\n")
console.println(tabulate.tabulate(
test_executions,
headers=[
Expand All @@ -1313,8 +1313,13 @@ def format_dict(d):
]))
else:
console.println("")
console.println("No recent test_executions found.")
console.println(f"No recent {title} found.")

def list_test_executions(cfg):
list_test_helper(test_execution_store(cfg).list(), "test_executions")

def list_aggregated_results(cfg):
list_test_helper(test_execution_store(cfg).list_aggregations(), "aggregated_results")

def create_test_execution(cfg, workload, test_procedure, workload_revision=None):
provision_config_instance = cfg.opts("builder", "provision_config_instance.names")
Expand Down Expand Up @@ -1550,16 +1555,33 @@ def store_test_execution(self, test_execution):
with open(self._test_execution_file(), mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))

def _test_execution_file(self, test_execution_id=None):
return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json")
def store_aggregated_execution(self, test_execution):
doc = test_execution.as_dict()
aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_execution_id=test_execution.test_execution_id)
io.ensure_dir(aggregated_execution_path)
aggregated_file = os.path.join(aggregated_execution_path, "aggregated_test_execution.json")
with open(aggregated_file, mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))

def _test_execution_file(self, test_execution_id=None, is_aggregated=False):
if is_aggregated:
return os.path.join(paths.aggregated_results_root(cfg=self.cfg, test_execution_id=test_execution_id),
"aggregated_test_execution.json")
else:
return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json")

def list(self):
results = glob.glob(self._test_execution_file(test_execution_id="*"))
all_test_executions = self._to_test_executions(results)
return all_test_executions[:self._max_results()]

def list_aggregations(self):
aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True))
return self._to_test_executions(aggregated_results)

def find_by_test_execution_id(self, test_execution_id):
test_execution_file = self._test_execution_file(test_execution_id=test_execution_id)
is_aggregated = test_execution_id.startswith('aggregate')
test_execution_file = self._test_execution_file(test_execution_id=test_execution_id, is_aggregated=is_aggregated)
if io.exists(test_execution_file):
test_executions = self._to_test_executions([test_execution_file])
if test_executions:
Expand Down
4 changes: 4 additions & 0 deletions osbenchmark/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def test_execution_root(cfg, test_execution_id=None):
test_execution_id = cfg.opts("system", "test_execution.id")
return os.path.join(test_excecutions_root(cfg), test_execution_id)

def aggregated_results_root(cfg, test_execution_id=None):
if not test_execution_id:
test_execution_id = cfg.opts("system", "test_execution.id")
return os.path.join(cfg.opts("node", "root.dir"), "aggregated_results", test_execution_id)

def install_root(cfg=None):
install_id = cfg.opts("system", "install.id")
Expand Down

0 comments on commit bf1debc

Please sign in to comment.