Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store aggregated results in separate folder #683

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def aggregate(self) -> None:

aggregated_results = self.build_aggregated_results()
file_test_exe_store = FileTestExecutionStore(self.config)
file_test_exe_store.store_test_execution(aggregated_results)
file_test_exe_store.store_aggregated_execution(aggregated_results)
else:
raise ValueError("Incompatible test execution results")

Expand Down
5 changes: 4 additions & 1 deletion osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def add_workload_source(subparser):
metavar="configuration",
help="The configuration for which Benchmark should show the available options. "
"Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins",
choices=["telemetry", "workloads", "pipelines", "test_executions", "provision_config_instances", "opensearch-plugins"])
choices=["telemetry", "workloads", "pipelines", "test_executions", "aggregated_results",
"provision_config_instances", "opensearch-plugins"])
list_parser.add_argument(
"--limit",
help="Limit the number of search results for recent test_executions (default: 10).",
Expand Down Expand Up @@ -697,6 +698,8 @@ def dispatch_list(cfg):
test_execution_orchestrator.list_pipelines()
elif what == "test_executions":
metrics.list_test_executions(cfg)
elif what == "aggregated_results":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you've introduced a new method called list_aggregations(), how do you feel about using aggregations here as opposed to aggregated_results? Aggregations would be more simple for users to type out. But on second thought, aggregations might be confused with OpenSearch aggregations. I'm open to either

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think aggregated_results is more explicit and clear about what it's referring to. But we could make it so either aggregations or aggregated_results triggers the same function maybe?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregated_results is more explicit and we can go forth with that

metrics.list_aggregated_test_results(cfg)
elif what == "provision_config_instances":
provision_config.list_provision_config_instances(cfg)
elif what == "opensearch-plugins":
Expand Down
62 changes: 59 additions & 3 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,45 @@ def format_dict(d):
console.println("")
console.println("No recent test_executions found.")

def list_aggregated_test_results(cfg):
def format_dict(d):
if d:
items = sorted(d.items())
return ", ".join(["%s=%s" % (k, v) for k, v in items])
else:
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid repeating helper functions. This already exists in the list_test_executions() function. It'd be better to move it out of both so that both can use it


aggregated_test_executions = []
for test_execution in test_execution_store(cfg).list_aggregations():
aggregated_test_executions.append([
test_execution.test_execution_id,
time.to_iso8601(test_execution.test_execution_timestamp),
test_execution.workload,
format_dict(test_execution.workload_params),
test_execution.test_procedure_name,
test_execution.provision_config_instance_name,
format_dict(test_execution.user_tags),
test_execution.workload_revision,
test_execution.provision_config_revision])

if len(aggregated_test_executions) > 0:
console.println("\nRecent aggregated test executions:\n")
console.println(tabulate.tabulate(
aggregated_test_executions,
headers=[
"TestExecution ID",
"TestExecution Timestamp",
"Workload",
"Workload Parameters",
"TestProcedure",
"ProvisionConfigInstance",
"User Tags",
"workload Revision",
"Provision Config Revision"
]))
else:
console.println("")
console.println("No recent aggregate tests found.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works but a lot of this is duplicated from list_test_executions(). How do you feel about creating a separate helper method and just passing in the methods like the following pseudo code"

def list_test_executions(cfg):
    list_test_helper(test_execution_store(cfg).list_aggregations(), "test executions")

def list_aggregated_test_results(cfg):
    list_test_helper(test_execution_store(cfg).list_aggregations(), "aggregation tests")

def list_test_helper(test_store_method, test_execution_type):
    def format_dict(d):
        if d:
            items = sorted(d.items())
            return ", ".join(["%s=%s" % (k, v) for k, v in items])
        else:
            return None
    
    test_executions = []
    listed_test_executions = test_store_method
    for test_execution in listed_test_executions:
        ...
    console.println(f"No recent {test_execution_type} found.")
     ...

By putting boiler plate code in list_test_helper(), we will avoid repeating code and slim down both list_test_executions(cfg) and list_aggregated_test_results(cfg).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Did this


def create_test_execution(cfg, workload, test_procedure, workload_revision=None):
provision_config_instance = cfg.opts("builder", "provision_config_instance.names")
Expand Down Expand Up @@ -1550,16 +1589,33 @@ def store_test_execution(self, test_execution):
with open(self._test_execution_file(), mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))

def _test_execution_file(self, test_execution_id=None):
return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json")
def store_aggregated_execution(self, test_execution):
doc = test_execution.as_dict()
aggregated_execution_path = paths.aggregated_results_root(self.cfg, test_execution_id=test_execution.test_execution_id)
io.ensure_dir(aggregated_execution_path)
aggregated_file = os.path.join(aggregated_execution_path, "aggregated_test_execution.json")
with open(aggregated_file, mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))

def _test_execution_file(self, test_execution_id=None, is_aggregated=False):
if is_aggregated:
return os.path.join(paths.aggregated_results_root(cfg=self.cfg, test_execution_id=test_execution_id),
"aggregated_test_execution.json")
else:
return os.path.join(paths.test_execution_root(cfg=self.cfg, test_execution_id=test_execution_id), "test_execution.json")

def list(self):
results = glob.glob(self._test_execution_file(test_execution_id="*"))
all_test_executions = self._to_test_executions(results)
return all_test_executions[:self._max_results()]

def list_aggregations(self):
aggregated_results = glob.glob(self._test_execution_file(test_execution_id="*", is_aggregated=True))
return self._to_test_executions(aggregated_results)

def find_by_test_execution_id(self, test_execution_id):
test_execution_file = self._test_execution_file(test_execution_id=test_execution_id)
is_aggregated = test_execution_id.startswith('aggregate')
test_execution_file = self._test_execution_file(test_execution_id=test_execution_id, is_aggregated=is_aggregated)
if io.exists(test_execution_file):
test_executions = self._to_test_executions([test_execution_file])
if test_executions:
Expand Down
4 changes: 4 additions & 0 deletions osbenchmark/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def test_execution_root(cfg, test_execution_id=None):
test_execution_id = cfg.opts("system", "test_execution.id")
return os.path.join(test_excecutions_root(cfg), test_execution_id)

def aggregated_results_root(cfg, test_execution_id=None):
if not test_execution_id:
test_execution_id = cfg.opts("system", "test_execution.id")
return os.path.join(cfg.opts("node", "root.dir"), "aggregated_results", test_execution_id)

def install_root(cfg=None):
install_id = cfg.opts("system", "install.id")
Expand Down
Loading