Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store aggregated results in separate folder #683

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def aggregate(self) -> None:

aggregated_results = self.build_aggregated_results()
file_test_exe_store = FileTestExecutionStore(self.config)
file_test_exe_store.store_test_execution(aggregated_results)
file_test_exe_store.store_aggregated_execution(aggregated_results)
else:
raise ValueError("Incompatible test execution results")

Expand Down
5 changes: 4 additions & 1 deletion osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def add_workload_source(subparser):
metavar="configuration",
help="The configuration for which Benchmark should show the available options. "
"Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins",
choices=["telemetry", "workloads", "pipelines", "test_executions", "provision_config_instances", "opensearch-plugins"])
choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregated_results",
"provision_config_instances", "opensearch-plugins"])
list_parser.add_argument(
"--limit",
help="Limit the number of search results for recent test_executions (default: 10).",
Expand Down Expand Up @@ -697,6 +698,8 @@ def dispatch_list(cfg):
test_execution_orchestrator.list_pipelines()
elif what == "test_executions":
metrics.list_test_executions(cfg)
elif what == "aggregated_results":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you've introduced a new method called list_aggregations(), how do you feel about using aggregations here as opposed to aggregated_results? Aggregations would be more simple for users to type out. But on second thought, aggregations might be confused with OpenSearch aggregations. I'm open to either

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think aggregated_results is more explicit and clear about what it's referring to. But we could make it so either aggregations or aggregated_results triggers the same function maybe?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregated_results is more explicit and we can go forth with that

metrics.list_aggregated_results(cfg)
elif what == "provision_config_instances":
provision_config.list_provision_config_instances(cfg)
elif what == "opensearch-plugins":
Expand Down
36 changes: 29 additions & 7 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ def results_store(cfg):
return NoopResultsStore()


def list_test_executions(cfg):
def list_test_helper(store_item, title):
def format_dict(d):
if d:
items = sorted(d.items())
Expand All @@ -1284,7 +1284,7 @@ def format_dict(d):
return None

test_executions = []
for test_execution in test_execution_store(cfg).list():
for test_execution in store_item:
test_executions.append([
test_execution.test_execution_id,
time.to_iso8601(test_execution.test_execution_timestamp),
Expand All @@ -1297,7 +1297,7 @@ def format_dict(d):
test_execution.provision_config_revision])

if len(test_executions) > 0:
console.println("\nRecent test_executions:\n")
console.println(f"\nRecent {title}:\n")
console.println(tabulate.tabulate(
test_executions,
headers=[
Expand All @@ -1313,8 +1313,13 @@ def format_dict(d):
]))
else:
console.println("")
console.println("No recent test_executions found.")
console.println(f"No recent {title} found.")

def list_test_executions(cfg):
list_test_helper(test_execution_store(cfg).list(), "test_executions")

def list_aggregated_results(cfg):
list_test_helper(test_execution_store(cfg).list_aggregations(), "aggregated_results")

def create_test_execution(cfg, workload, test_procedure, workload_revision=None):
provision_config_instance = cfg.opts("builder", "provision_config_instance.names")
Expand Down Expand Up @@ -1550,16 +1555,33 @@ def store_test_execution(self, test_execution):
with open(self._test_execution_file(), mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))

def _test_execution_file(self, test_execution_id=None):
return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json")
def store_aggregated_execution(self, test_execution):
doc = test_execution.as_dict()
aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_execution_id=test_execution.test_execution_id)
io.ensure_dir(aggregated_execution_path)
aggregated_file = os.path.join(aggregated_execution_path, "aggregated_test_execution.json")
with open(aggregated_file, mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))

def _test_execution_file(self, test_execution_id=None, is_aggregated=False):
if is_aggregated:
return os.path.join(paths.aggregated_results_root(cfg=self.cfg, test_execution_id=test_execution_id),
"aggregated_test_execution.json")
else:
return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json")

def list(self):
results = glob.glob(self._test_execution_file(test_execution_id="*"))
all_test_executions = self._to_test_executions(results)
return all_test_executions[:self._max_results()]

def list_aggregations(self):
aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True))
return self._to_test_executions(aggregated_results)

def find_by_test_execution_id(self, test_execution_id):
test_execution_file = self._test_execution_file(test_execution_id=test_execution_id)
is_aggregated = test_execution_id.startswith('aggregate')
test_execution_file = self._test_execution_file(test_execution_id=test_execution_id, is_aggregated=is_aggregated)
if io.exists(test_execution_file):
test_executions = self._to_test_executions([test_execution_file])
if test_executions:
Expand Down
4 changes: 4 additions & 0 deletions osbenchmark/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def test_execution_root(cfg, test_execution_id=None):
test_execution_id = cfg.opts("system", "test_execution.id")
return os.path.join(test_excecutions_root(cfg), test_execution_id)

def aggregated_results_root(cfg, test_execution_id=None):
if not test_execution_id:
test_execution_id = cfg.opts("system", "test_execution.id")
return os.path.join(cfg.opts("node", "root.dir"), "aggregated_results", test_execution_id)

def install_root(cfg=None):
install_id = cfg.opts("system", "install.id")
Expand Down
Loading