From d5ff27ab625591d75569ea47025716d4dfb8d065 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Fri, 6 Dec 2024 21:45:49 +0000 Subject: [PATCH 1/4] refactor count_iterations function Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 016d90f4..da698bde 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -22,25 +22,19 @@ def __init__(self, cfg, test_executions_dict, args): self.loaded_workload = None def count_iterations_for_each_op(self, test_execution) -> None: - matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None) + """Count iterations for each operation in the test execution.""" workload_params = test_execution.workload_params if test_execution.workload_params else {} - test_execution_id = test_execution.test_execution_id self.accumulated_iterations[test_execution_id] = {} - if matching_test_procedure: - for task in matching_test_procedure.schedule: - task_name = task.name - task_name_iterations = f"{task_name}_iterations" - if task_name_iterations in workload_params: - iterations = int(workload_params[task_name_iterations]) - else: - iterations = task.iterations or 1 - self.accumulated_iterations[test_execution_id][task_name] = iterations - else: - raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.") + for task in self.loaded_workload.find_test_procedure_or_default(self.test_procedure_name).schedule: + task_name = task.name + task_name_iterations = f"{task_name}_iterations" + iterations = int(workload_params.get(task_name_iterations, task.iterations or 1)) + self.accumulated_iterations[test_execution_id][task_name] = iterations def accumulate_results(self, test_execution: Any) -> None: + """Accumulate results from a single test execution.""" for item in test_execution.results.get("op_metrics", []): task = item.get("task", "") self.accumulated_results.setdefault(task, {}) @@ -49,9 +43,9 @@ def accumulate_results(self, test_execution: Any) -> None: self.accumulated_results[task][metric].append(item.get(metric)) def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: + """Aggregate JSON results by a given key path.""" all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] - # retrieve nested value from a dictionary given a key path def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: for key in path: if isinstance(obj, dict): @@ -66,8 +60,7 @@ def aggregate_helper(objects: List[Any]) -> Any: if not objects: return None if all(isinstance(obj, (int, float)) for obj in objects): - avg = sum(objects) / len(objects) - return avg + return sum(objects) / len(objects) if all(isinstance(obj, dict) for obj in objects): keys = set().union(*objects) return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys} From 7c2efb0fc431567a684bc8f54a2f27e10ad9a61e Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Mon, 9 Dec 2024 19:27:10 +0000 Subject: [PATCH 2/4] refactor aggregator class + fix unit tests Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 62 ++++++++++++++++++++++++--------------- tests/aggregator_test.py | 1 + 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index da698bde..f82779e8 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -22,7 +22,7 @@ def __init__(self, cfg, test_executions_dict, args): self.loaded_workload = None def count_iterations_for_each_op(self, test_execution) -> None: - """Count iterations for each operation in the test execution.""" + """Count iterations for each operation in the test execution""" workload_params = test_execution.workload_params if test_execution.workload_params else {} test_execution_id = test_execution.test_execution_id self.accumulated_iterations[test_execution_id] = {} @@ -33,20 +33,26 @@ def count_iterations_for_each_op(self, test_execution) -> None: iterations = int(workload_params.get(task_name_iterations, task.iterations or 1)) self.accumulated_iterations[test_execution_id][task_name] = iterations - def accumulate_results(self, test_execution: Any) -> None: - """Accumulate results from a single test execution.""" - for item in test_execution.results.get("op_metrics", []): - task = item.get("task", "") + def accumulate_results(self, test_execution) -> None: + """Accumulate results from a single test execution""" + for operation_metric in test_execution.results.get("op_metrics", []): + task = operation_metric.get("task", "") self.accumulated_results.setdefault(task, {}) for metric in self.metrics: self.accumulated_results[task].setdefault(metric, []) - self.accumulated_results[task][metric].append(item.get(metric)) + self.accumulated_results[task][metric].append(operation_metric.get(metric)) def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: - """Aggregate JSON results by a given key path.""" + """ + Aggregates JSON results across multiple test executions using a specified key path. + Handles nested dictionary structures and calculates averages for numeric values + """ all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: + """ + Retrieves a value from a nested dictionary structure using a path of keys. + """ for key in path: if isinstance(obj, dict): obj = obj.get(key, {}) @@ -75,8 +81,8 @@ def aggregate_helper(objects: List[Any]) -> Any: values = [get_nested_value(json, key_path) for json in all_jsons] return aggregate_helper(values) - def build_aggregated_results(self): - test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + def build_aggregated_results_dict(self): + """Builds a dictionary of aggregated metrics from all test executions""" aggregated_results = { "op_metrics": [], "correctness_metrics": self.aggregate_json_by_key("correctness_metrics"), @@ -140,8 +146,30 @@ def build_aggregated_results(self): aggregated_results["op_metrics"].append(op_metric) - # extract the necessary data from the first test execution, since the configurations should be identical for all test executions + return aggregated_results + + def update_config_object(self, test_execution): + """ + Updates the configuration object with values from a test execution. + Uses the first test execution as reference since configurations should be identical + """ current_timestamp = self.config.opts("system", "time.start") + self.config.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.names", test_execution.provision_config_instance) + self.config.add(config.Scope.applicationOverride, "system", + "env.name", test_execution.environment_name) + self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) + self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline) + self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params) + self.config.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.params", test_execution.provision_config_instance_params) + self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params) + self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles) + self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles) + + def build_aggregated_results(self): + test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) + aggregated_results = self.build_aggregated_results_dict() if hasattr(self.args, 'results_file') and self.args.results_file != "": normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd) @@ -158,19 +186,7 @@ def build_aggregated_results(self): print("Aggregate test execution ID: ", test_execution_id) - # add values to the configuration object - self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.names", test_exe.provision_config_instance) - self.config.add(config.Scope.applicationOverride, "system", - "env.name", test_exe.environment_name) - self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) - self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline) - self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params) - self.config.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.params", test_exe.provision_config_instance_params) - self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params) - self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles) - self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles) + self.update_config_object(test_exe) loaded_workload = workload.load_workload(self.config) test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name) diff --git a/tests/aggregator_test.py b/tests/aggregator_test.py index 32be06cb..d963de60 100644 --- a/tests/aggregator_test.py +++ b/tests/aggregator_test.py @@ -50,6 +50,7 @@ def test_count_iterations_for_each_op(aggregator): mock_test_procedure.schedule = mock_schedule mock_workload.test_procedures = [mock_test_procedure] + mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure) mock_test_execution = Mock(test_execution_id="test1", workload_params={}) aggregator.loaded_workload = mock_workload From d4a29f24eb0025e4506b66d5b0799a2f158d02d4 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 17 Dec 2024 20:03:55 +0000 Subject: [PATCH 3/4] add return types, type hints, rename variables Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 60 +++-- osbenchmark/test_run_orchestrator.py | 386 +++++++++++++++++++++++++++ 2 files changed, 418 insertions(+), 28 deletions(-) create mode 100644 osbenchmark/test_run_orchestrator.py diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index f82779e8..b79f8c47 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -3,12 +3,12 @@ from typing import Any, Dict, List, Union import uuid -from osbenchmark.metrics import FileTestExecutionStore +from osbenchmark.metrics import FileTestExecutionStore, TestExecution from osbenchmark import metrics, workload, config from osbenchmark.utils import io as rio class Aggregator: - def __init__(self, cfg, test_executions_dict, args): + def __init__(self, cfg, test_executions_dict, args) -> None: self.config = cfg self.args = args self.test_executions = test_executions_dict @@ -21,7 +21,7 @@ def __init__(self, cfg, test_executions_dict, args): self.test_procedure_name = None self.loaded_workload = None - def count_iterations_for_each_op(self, test_execution) -> None: + def count_iterations_for_each_op(self, test_execution: TestExecution) -> None: """Count iterations for each operation in the test execution""" workload_params = test_execution.workload_params if test_execution.workload_params else {} test_execution_id = test_execution.test_execution_id @@ -33,7 +33,7 @@ def count_iterations_for_each_op(self, test_execution) -> None: iterations = int(workload_params.get(task_name_iterations, task.iterations or 1)) self.accumulated_iterations[test_execution_id][task_name] = iterations - def accumulate_results(self, test_execution) -> None: + def accumulate_results(self, test_execution: TestExecution) -> None: """Accumulate results from a single test execution""" for operation_metric in test_execution.results.get("op_metrics", []): task = operation_metric.get("task", "") @@ -47,41 +47,45 @@ def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: Aggregates JSON results across multiple test executions using a specified key path. Handles nested dictionary structures and calculates averages for numeric values """ - all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] + all_json_results = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] - def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: + def get_nested_value(json_data: Dict[str, Any], path: List[str]) -> Any: """ Retrieves a value from a nested dictionary structure using a path of keys. """ for key in path: - if isinstance(obj, dict): - obj = obj.get(key, {}) - elif isinstance(obj, list) and key.isdigit(): - obj = obj[int(key)] if int(key) < len(obj) else {} + if isinstance(json_data, dict): + json_data = json_data.get(key, {}) + elif isinstance(json_data, list) and key.isdigit(): + json_data = json_data[int(key)] if int(key) < len(json_data) else {} else: return None - return obj + return json_data - def aggregate_helper(objects: List[Any]) -> Any: - if not objects: + def aggregate_json_elements(json_elements: List[Any]) -> Any: + if not json_elements: return None - if all(isinstance(obj, (int, float)) for obj in objects): - return sum(objects) / len(objects) - if all(isinstance(obj, dict) for obj in objects): - keys = set().union(*objects) - return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys} - if all(isinstance(obj, list) for obj in objects): - max_length = max(len(obj) for obj in objects) - return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)] - return next((obj for obj in objects if obj is not None), None) + # If all elements are numbers, calculate the average + if all(isinstance(obj, (int, float)) for obj in json_elements): + return sum(json_elements) / len(json_elements) + # If all elements are dictionaries, recursively aggregate their values + if all(isinstance(obj, dict) for obj in json_elements): + keys = set().union(*json_elements) + return {key: aggregate_json_elements([obj.get(key) for obj in json_elements]) for key in keys} + # If all elements are lists, recursively aggregate corresponding elements + if all(isinstance(obj, list) for obj in json_elements): + max_length = max(len(obj) for obj in json_elements) + return [aggregate_json_elements([obj[i] if i < len(obj) else None for obj in json_elements]) for i in range(max_length)] + # If elements are of mixed types, return the first non-None value + return next((obj for obj in json_elements if obj is not None), None) if isinstance(key_path, str): key_path = key_path.split('.') - values = [get_nested_value(json, key_path) for json in all_jsons] - return aggregate_helper(values) + nested_values = [get_nested_value(json_result, key_path) for json_result in all_json_results] + return aggregate_json_elements(nested_values) - def build_aggregated_results_dict(self): + def build_aggregated_results_dict(self) -> Dict[str, Any]: """Builds a dictionary of aggregated metrics from all test executions""" aggregated_results = { "op_metrics": [], @@ -148,7 +152,7 @@ def build_aggregated_results_dict(self): return aggregated_results - def update_config_object(self, test_execution): + def update_config_object(self, test_execution: TestExecution) -> None: """ Updates the configuration object with values from a test execution. Uses the first test execution as reference since configurations should be identical @@ -167,7 +171,7 @@ def update_config_object(self, test_execution): self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles) self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles) - def build_aggregated_results(self): + def build_aggregated_results(self) -> TestExecution: test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) aggregated_results = self.build_aggregated_results_dict() @@ -232,7 +236,7 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_na return weighted_metrics - def calculate_rsd(self, values: List[Union[int, float]], metric_name: str): + def calculate_rsd(self, values: List[Union[int, float]], metric_name: str) -> Union[float, str]: if not values: raise ValueError(f"Cannot calculate RSD for metric '{metric_name}': empty list of values") if len(values) == 1: diff --git a/osbenchmark/test_run_orchestrator.py b/osbenchmark/test_run_orchestrator.py new file mode 100644 index 00000000..e84c7b5c --- /dev/null +++ b/osbenchmark/test_run_orchestrator.py @@ -0,0 +1,386 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import collections +import logging +import os +import sys + +import tabulate +import thespian.actors + +from osbenchmark import actor, config, doc_link, \ + worker_coordinator, exceptions, builder, metrics, \ + results_publisher, workload, version, PROGRAM_NAME +from osbenchmark.utils import console, opts, versions + + +pipelines = collections.OrderedDict() + + +class Pipeline: + """ + Describes a whole execution pipeline. A pipeline can consist of one or more steps. Each pipeline should contain roughly of the following + steps: + + * Prepare the benchmark candidate: It can build OpenSearch from sources, download a ZIP from somewhere etc. + * Launch the benchmark candidate: This can be done directly, with tools like Ansible or it can assume the candidate is already launched + * Run the benchmark + * Publish results + """ + + def __init__(self, name, description, target, stable=True): + """ + Creates a new pipeline. + + :param name: A short name of the pipeline. This name will be used to reference it from the command line. + :param description: A human-readable description what the pipeline does. + :param target: A function that implements this pipeline + :param stable True iff the pipeline is considered production quality. + """ + self.name = name + self.description = description + self.target = target + self.stable = stable + pipelines[name] = self + + def __call__(self, cfg): + self.target(cfg) + + +class Setup: + def __init__(self, cfg, sources=False, distribution=False, external=False, docker=False): + self.cfg = cfg + self.sources = sources + self.distribution = distribution + self.external = external + self.docker = docker + + +class Success: + pass + + +class BenchmarkActor(actor.BenchmarkActor): + def __init__(self): + super().__init__() + self.cfg = None + self.start_sender = None + self.builder = None + self.main_worker_coordinator = None + self.coordinator = None + + def receiveMsg_PoisonMessage(self, msg, sender): + self.logger.info("BenchmarkActor got notified of poison message [%s] (forwarding).", (str(msg))) + if self.coordinator: + self.coordinator.error = True + self.send(self.start_sender, msg) + + def receiveUnrecognizedMessage(self, msg, sender): + self.logger.info("BenchmarkActor received unknown message [%s] (ignoring).", (str(msg))) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_Setup(self, msg, sender): + self.start_sender = sender + self.cfg = msg.cfg + self.coordinator = BenchmarkCoordinator(msg.cfg) + self.coordinator.setup(sources=msg.sources) + self.logger.info("Asking builder to start the engine.") + self.builder = self.createActor(builder.BuilderActor, targetActorRequirements={"coordinator": True}) + self.send(self.builder, builder.StartEngine(self.cfg, + self.coordinator.metrics_store.open_context, + msg.sources, + msg.distribution, + msg.external, + msg.docker)) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_EngineStarted(self, msg, sender): + self.logger.info("Builder has started engine successfully.") + self.coordinator.test_run.provision_config_revision = msg.provision_config_revision + self.main_worker_coordinator = self.createActor( + worker_coordinator.WorkerCoordinatorActor, + targetActorRequirements={"coordinator": True} + ) + self.logger.info("Telling worker_coordinator to prepare for benchmarking.") + self.send(self.main_worker_coordinator, worker_coordinator.PrepareBenchmark(self.cfg, self.coordinator.current_workload)) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_PreparationComplete(self, msg, sender): + self.coordinator.on_preparation_complete(msg.distribution_flavor, msg.distribution_version, msg.revision) + self.logger.info("Telling worker_coordinator to start benchmark.") + self.send(self.main_worker_coordinator, worker_coordinator.StartBenchmark()) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_TaskFinished(self, msg, sender): + self.coordinator.on_task_finished(msg.metrics) + # We choose *NOT* to reset our own metrics store's timer as this one is only used to collect complete metrics records from + # other stores (used by worker_coordinator and builder). Hence there is no need to reset the timer in our own metrics store. + self.send(self.builder, builder.ResetRelativeTime(msg.next_task_scheduled_in)) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_BenchmarkCancelled(self, msg, sender): + self.coordinator.cancelled = True + # even notify the start sender if it is the originator. The reason is that we call #ask() which waits for a reply. + # We also need to ask in order to avoid test_runs between this notification and the following ActorExitRequest. + self.send(self.start_sender, msg) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_BenchmarkFailure(self, msg, sender): + self.logger.info("Received a benchmark failure from [%s] and will forward it now.", sender) + self.coordinator.error = True + self.send(self.start_sender, msg) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_BenchmarkComplete(self, msg, sender): + self.coordinator.on_benchmark_complete(msg.metrics) + self.send(self.main_worker_coordinator, thespian.actors.ActorExitRequest()) + self.main_worker_coordinator = None + self.logger.info("Asking builder to stop the engine.") + self.send(self.builder, builder.StopEngine()) + + @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter + def receiveMsg_EngineStopped(self, msg, sender): + self.logger.info("Builder has stopped engine successfully.") + self.send(self.start_sender, Success()) + + +class BenchmarkCoordinator: + def __init__(self, cfg): + self.logger = logging.getLogger(__name__) + self.cfg = cfg + self.test_run = None + self.metrics_store = None + self.test_run_store = None + self.cancelled = False + self.error = False + self.workload_revision = None + self.current_workload = None + self.current_test_procedure = None + + def setup(self, sources=False): + # to load the workload we need to know the correct cluster distribution version. Usually, this value should be set + # but there are rare cases (external pipeline and user did not specify the distribution version) where we need + # to derive it ourselves. For source builds we always assume "master" + if not sources and not self.cfg.exists("builder", "distribution.version"): + distribution_version = builder.cluster_distribution_version(self.cfg) + self.logger.info("Automatically derived distribution version [%s]", distribution_version) + self.cfg.add(config.Scope.benchmark, "builder", "distribution.version", distribution_version) + min_os_version = versions.Version.from_string(version.minimum_os_version()) + specified_version = versions.Version.from_string(distribution_version) + if specified_version < min_os_version: + raise exceptions.SystemSetupError(f"Cluster version must be at least [{min_os_version}] but was [{distribution_version}]") + + self.current_workload = workload.load_workload(self.cfg) + self.workload_revision = self.cfg.opts("workload", "repository.revision", mandatory=False) + test_procedure_name = self.cfg.opts("workload", "test_procedure.name") + self.current_test_procedure = self.current_workload.find_test_procedure_or_default(test_procedure_name) + if self.current_test_procedure is None: + raise exceptions.SystemSetupError( + "Workload [{}] does not provide test_procedure [{}]. List the available workloads with {} list workloads.".format( + self.current_workload.name, test_procedure_name, PROGRAM_NAME)) + if self.current_test_procedure.user_info: + console.info(self.current_test_procedure.user_info) + self.test_run = metrics.create_test_run( + self.cfg, self.current_workload, + self.current_test_procedure, + self.workload_revision) + + self.metrics_store = metrics.metrics_store( + self.cfg, + workload=self.test_run.workload_name, + test_procedure=self.test_run.test_procedure_name, + read_only=False + ) + self.test_run_store = metrics.test_run_store(self.cfg) + + def on_preparation_complete(self, distribution_flavor, distribution_version, revision): + self.test_run.distribution_flavor = distribution_flavor + self.test_run.distribution_version = distribution_version + self.test_run.revision = revision + # store test_run initially (without any results) so other components can retrieve full metadata + self.test_run_store.store_test_run(self.test_run) + if self.test_run.test_procedure.auto_generated: + console.info("Executing test with workload [{}] and provision_config_instance {} with version [{}].\n" + .format(self.test_run.workload_name, + self.test_run.provision_config_instance, + self.test_run.distribution_version)) + else: + console.info("Executing test with workload [{}], test_procedure [{}] and provision_config_instance {} with version [{}].\n" + .format( + self.test_run.workload_name, + self.test_run.test_procedure_name, + self.test_run.provision_config_instance, + self.test_run.distribution_version + )) + + def on_task_finished(self, new_metrics): + self.logger.info("Task has finished.") + self.logger.info("Bulk adding request metrics to metrics store.") + self.metrics_store.bulk_add(new_metrics) + + def on_benchmark_complete(self, new_metrics): + self.logger.info("Benchmark is complete.") + self.logger.info("Bulk adding request metrics to metrics store.") + self.metrics_store.bulk_add(new_metrics) + self.metrics_store.flush() + if not self.cancelled and not self.error: + final_results = metrics.calculate_results(self.metrics_store, self.test_run) + self.test_run.add_results(final_results) + self.test_run_store.store_test_run(self.test_run) + metrics.results_store(self.cfg).store_results(self.test_run) + results_publisher.summarize(final_results, self.cfg) + else: + self.logger.info("Suppressing output of summary results. Cancelled = [%r], Error = [%r].", self.cancelled, self.error) + self.metrics_store.close() + + +def run(cfg, sources=False, distribution=False, external=False, docker=False): + logger = logging.getLogger(__name__) + # at this point an actor system has to run and we should only join + actor_system = actor.bootstrap_actor_system(try_join=True) + benchmark_actor = actor_system.createActor(BenchmarkActor, targetActorRequirements={"coordinator": True}) + try: + result = actor_system.ask(benchmark_actor, Setup(cfg, sources, distribution, external, docker)) + if isinstance(result, Success): + logger.info("Benchmark has finished successfully.") + # may happen if one of the load generators has detected that the user has cancelled the benchmark. + elif isinstance(result, actor.BenchmarkCancelled): + logger.info("User has cancelled the benchmark (detected by actor).") + elif isinstance(result, actor.BenchmarkFailure): + logger.error("A benchmark failure has occurred") + raise exceptions.BenchmarkError(result.message, result.cause) + else: + raise exceptions.BenchmarkError("Got an unexpected result during benchmarking: [%s]." % str(result)) + except KeyboardInterrupt: + logger.info("User has cancelled the benchmark (detected by test run orchestrator).") + # notify the coordinator so it can properly handle this state. Do it blocking so we don't have a test run between this message + # and the actor exit request. + actor_system.ask(benchmark_actor, actor.BenchmarkCancelled()) + finally: + logger.info("Telling benchmark actor to exit.") + actor_system.tell(benchmark_actor, thespian.actors.ActorExitRequest()) + + +def set_default_hosts(cfg, host="127.0.0.1", port=9200): + logger = logging.getLogger(__name__) + configured_hosts = cfg.opts("client", "hosts") + if len(configured_hosts.default) != 0: + logger.info("Using configured hosts %s", configured_hosts.default) + else: + logger.info("Setting default host to [%s:%d]", host, port) + default_host_object = opts.TargetHosts("{}:{}".format(host,port)) + cfg.add(config.Scope.benchmark, "client", "hosts", default_host_object) + + +# Poor man's curry +def from_sources(cfg): + port = cfg.opts("provisioning", "node.http.port") + set_default_hosts(cfg, port=port) + return run(cfg, sources=True) + + +def from_distribution(cfg): + port = cfg.opts("provisioning", "node.http.port") + set_default_hosts(cfg, port=port) + return run(cfg, distribution=True) + + +def benchmark_only(cfg): + set_default_hosts(cfg) + # We'll use a special provision_config_instance name for external benchmarks. + cfg.add(config.Scope.benchmark, "builder", "provision_config_instance.names", ["external"]) + return run(cfg, external=True) + + +def docker(cfg): + set_default_hosts(cfg) + return run(cfg, docker=True) + + +Pipeline("from-sources", + "Builds and provisions OpenSearch, runs a benchmark and publishes results.", from_sources) + +Pipeline("from-distribution", + "Downloads an OpenSearch distribution, provisions it, runs a benchmark and publishes results.", from_distribution) + +Pipeline("benchmark-only", + "Assumes an already running OpenSearch instance, runs a benchmark and publishes results", benchmark_only) + +# Very experimental Docker pipeline. Should only be used with great care and is also not supported on all platforms. +Pipeline("docker", + "Runs a benchmark against the official OpenSearch Docker container and publishes results", docker, stable=False) + + +def available_pipelines(): + return [[pipeline.name, pipeline.description] for pipeline in pipelines.values() if pipeline.stable] + + +def list_pipelines(): + console.println("Available pipelines:\n") + console.println(tabulate.tabulate(available_pipelines(), headers=["Name", "Description"])) + + +def run(cfg): + logger = logging.getLogger(__name__) + # pipeline is no more mandatory, will default to benchmark-only + name = cfg.opts("test_run", "pipeline", mandatory=False) + test_run_id = cfg.opts("system", "test_run.id") + logger.info("test run id [%s]", test_run_id) + if not name: + # assume from-distribution pipeline if distribution.version has been specified + if cfg.exists("builder", "distribution.version"): + name = "from-distribution" + else: + name = "benchmark-only" + logger.info("User did not specify distribution.version or pipeline. Using default pipeline [%s].", name) + + cfg.add(config.Scope.applicationOverride, "test_run", "pipeline", name) + else: + logger.info("User specified pipeline [%s].", name) + + if os.environ.get("BENCHMARK_RUNNING_IN_DOCKER", "").upper() == "TRUE": + # in this case only benchmarking remote OpenSearch clusters makes sense + if name != "benchmark-only": + raise exceptions.SystemSetupError( + "Only the [benchmark-only] pipeline is supported by the Benchmark Docker image.\n" + "Add --pipeline=benchmark-only in your Benchmark arguments and try again.\n" + "For more details read the docs for the benchmark-only pipeline in {}\n".format( + doc_link(""))) + + try: + pipeline = pipelines[name] + except KeyError: + raise exceptions.SystemSetupError( + "Unknown pipeline [%s]. List the available pipelines with %s list pipelines." % (name, PROGRAM_NAME)) + try: + pipeline(cfg) + except exceptions.BenchmarkError as e: + # just pass on our own errors. It should be treated differently on top-level + raise e + except KeyboardInterrupt: + logger.info("User has cancelled the benchmark.") + except BaseException: + tb = sys.exc_info()[2] + raise exceptions.BenchmarkError("This test_run ended with a fatal crash.").with_traceback(tb) From e0d98152ab16e2b1916757ed3639ed2fd2700834 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 17 Dec 2024 20:12:29 +0000 Subject: [PATCH 4/4] remove unintended file test_run_orchestrator.py Signed-off-by: Michael Oviedo --- osbenchmark/test_run_orchestrator.py | 386 --------------------------- 1 file changed, 386 deletions(-) delete mode 100644 osbenchmark/test_run_orchestrator.py diff --git a/osbenchmark/test_run_orchestrator.py b/osbenchmark/test_run_orchestrator.py deleted file mode 100644 index e84c7b5c..00000000 --- a/osbenchmark/test_run_orchestrator.py +++ /dev/null @@ -1,386 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# The OpenSearch Contributors require contributions made to -# this file be licensed under the Apache-2.0 license or a -# compatible open source license. -# Modifications Copyright OpenSearch Contributors. See -# GitHub history for details. -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import collections -import logging -import os -import sys - -import tabulate -import thespian.actors - -from osbenchmark import actor, config, doc_link, \ - worker_coordinator, exceptions, builder, metrics, \ - results_publisher, workload, version, PROGRAM_NAME -from osbenchmark.utils import console, opts, versions - - -pipelines = collections.OrderedDict() - - -class Pipeline: - """ - Describes a whole execution pipeline. A pipeline can consist of one or more steps. Each pipeline should contain roughly of the following - steps: - - * Prepare the benchmark candidate: It can build OpenSearch from sources, download a ZIP from somewhere etc. - * Launch the benchmark candidate: This can be done directly, with tools like Ansible or it can assume the candidate is already launched - * Run the benchmark - * Publish results - """ - - def __init__(self, name, description, target, stable=True): - """ - Creates a new pipeline. - - :param name: A short name of the pipeline. This name will be used to reference it from the command line. - :param description: A human-readable description what the pipeline does. - :param target: A function that implements this pipeline - :param stable True iff the pipeline is considered production quality. - """ - self.name = name - self.description = description - self.target = target - self.stable = stable - pipelines[name] = self - - def __call__(self, cfg): - self.target(cfg) - - -class Setup: - def __init__(self, cfg, sources=False, distribution=False, external=False, docker=False): - self.cfg = cfg - self.sources = sources - self.distribution = distribution - self.external = external - self.docker = docker - - -class Success: - pass - - -class BenchmarkActor(actor.BenchmarkActor): - def __init__(self): - super().__init__() - self.cfg = None - self.start_sender = None - self.builder = None - self.main_worker_coordinator = None - self.coordinator = None - - def receiveMsg_PoisonMessage(self, msg, sender): - self.logger.info("BenchmarkActor got notified of poison message [%s] (forwarding).", (str(msg))) - if self.coordinator: - self.coordinator.error = True - self.send(self.start_sender, msg) - - def receiveUnrecognizedMessage(self, msg, sender): - self.logger.info("BenchmarkActor received unknown message [%s] (ignoring).", (str(msg))) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_Setup(self, msg, sender): - self.start_sender = sender - self.cfg = msg.cfg - self.coordinator = BenchmarkCoordinator(msg.cfg) - self.coordinator.setup(sources=msg.sources) - self.logger.info("Asking builder to start the engine.") - self.builder = self.createActor(builder.BuilderActor, targetActorRequirements={"coordinator": True}) - self.send(self.builder, builder.StartEngine(self.cfg, - self.coordinator.metrics_store.open_context, - msg.sources, - msg.distribution, - msg.external, - msg.docker)) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_EngineStarted(self, msg, sender): - self.logger.info("Builder has started engine successfully.") - self.coordinator.test_run.provision_config_revision = msg.provision_config_revision - self.main_worker_coordinator = self.createActor( - worker_coordinator.WorkerCoordinatorActor, - targetActorRequirements={"coordinator": True} - ) - self.logger.info("Telling worker_coordinator to prepare for benchmarking.") - self.send(self.main_worker_coordinator, worker_coordinator.PrepareBenchmark(self.cfg, self.coordinator.current_workload)) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_PreparationComplete(self, msg, sender): - self.coordinator.on_preparation_complete(msg.distribution_flavor, msg.distribution_version, msg.revision) - self.logger.info("Telling worker_coordinator to start benchmark.") - self.send(self.main_worker_coordinator, worker_coordinator.StartBenchmark()) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_TaskFinished(self, msg, sender): - self.coordinator.on_task_finished(msg.metrics) - # We choose *NOT* to reset our own metrics store's timer as this one is only used to collect complete metrics records from - # other stores (used by worker_coordinator and builder). Hence there is no need to reset the timer in our own metrics store. - self.send(self.builder, builder.ResetRelativeTime(msg.next_task_scheduled_in)) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_BenchmarkCancelled(self, msg, sender): - self.coordinator.cancelled = True - # even notify the start sender if it is the originator. The reason is that we call #ask() which waits for a reply. - # We also need to ask in order to avoid test_runs between this notification and the following ActorExitRequest. - self.send(self.start_sender, msg) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_BenchmarkFailure(self, msg, sender): - self.logger.info("Received a benchmark failure from [%s] and will forward it now.", sender) - self.coordinator.error = True - self.send(self.start_sender, msg) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_BenchmarkComplete(self, msg, sender): - self.coordinator.on_benchmark_complete(msg.metrics) - self.send(self.main_worker_coordinator, thespian.actors.ActorExitRequest()) - self.main_worker_coordinator = None - self.logger.info("Asking builder to stop the engine.") - self.send(self.builder, builder.StopEngine()) - - @actor.no_retry("test run orchestrator") # pylint: disable=no-value-for-parameter - def receiveMsg_EngineStopped(self, msg, sender): - self.logger.info("Builder has stopped engine successfully.") - self.send(self.start_sender, Success()) - - -class BenchmarkCoordinator: - def __init__(self, cfg): - self.logger = logging.getLogger(__name__) - self.cfg = cfg - self.test_run = None - self.metrics_store = None - self.test_run_store = None - self.cancelled = False - self.error = False - self.workload_revision = None - self.current_workload = None - self.current_test_procedure = None - - def setup(self, sources=False): - # to load the workload we need to know the correct cluster distribution version. Usually, this value should be set - # but there are rare cases (external pipeline and user did not specify the distribution version) where we need - # to derive it ourselves. For source builds we always assume "master" - if not sources and not self.cfg.exists("builder", "distribution.version"): - distribution_version = builder.cluster_distribution_version(self.cfg) - self.logger.info("Automatically derived distribution version [%s]", distribution_version) - self.cfg.add(config.Scope.benchmark, "builder", "distribution.version", distribution_version) - min_os_version = versions.Version.from_string(version.minimum_os_version()) - specified_version = versions.Version.from_string(distribution_version) - if specified_version < min_os_version: - raise exceptions.SystemSetupError(f"Cluster version must be at least [{min_os_version}] but was [{distribution_version}]") - - self.current_workload = workload.load_workload(self.cfg) - self.workload_revision = self.cfg.opts("workload", "repository.revision", mandatory=False) - test_procedure_name = self.cfg.opts("workload", "test_procedure.name") - self.current_test_procedure = self.current_workload.find_test_procedure_or_default(test_procedure_name) - if self.current_test_procedure is None: - raise exceptions.SystemSetupError( - "Workload [{}] does not provide test_procedure [{}]. List the available workloads with {} list workloads.".format( - self.current_workload.name, test_procedure_name, PROGRAM_NAME)) - if self.current_test_procedure.user_info: - console.info(self.current_test_procedure.user_info) - self.test_run = metrics.create_test_run( - self.cfg, self.current_workload, - self.current_test_procedure, - self.workload_revision) - - self.metrics_store = metrics.metrics_store( - self.cfg, - workload=self.test_run.workload_name, - test_procedure=self.test_run.test_procedure_name, - read_only=False - ) - self.test_run_store = metrics.test_run_store(self.cfg) - - def on_preparation_complete(self, distribution_flavor, distribution_version, revision): - self.test_run.distribution_flavor = distribution_flavor - self.test_run.distribution_version = distribution_version - self.test_run.revision = revision - # store test_run initially (without any results) so other components can retrieve full metadata - self.test_run_store.store_test_run(self.test_run) - if self.test_run.test_procedure.auto_generated: - console.info("Executing test with workload [{}] and provision_config_instance {} with version [{}].\n" - .format(self.test_run.workload_name, - self.test_run.provision_config_instance, - self.test_run.distribution_version)) - else: - console.info("Executing test with workload [{}], test_procedure [{}] and provision_config_instance {} with version [{}].\n" - .format( - self.test_run.workload_name, - self.test_run.test_procedure_name, - self.test_run.provision_config_instance, - self.test_run.distribution_version - )) - - def on_task_finished(self, new_metrics): - self.logger.info("Task has finished.") - self.logger.info("Bulk adding request metrics to metrics store.") - self.metrics_store.bulk_add(new_metrics) - - def on_benchmark_complete(self, new_metrics): - self.logger.info("Benchmark is complete.") - self.logger.info("Bulk adding request metrics to metrics store.") - self.metrics_store.bulk_add(new_metrics) - self.metrics_store.flush() - if not self.cancelled and not self.error: - final_results = metrics.calculate_results(self.metrics_store, self.test_run) - self.test_run.add_results(final_results) - self.test_run_store.store_test_run(self.test_run) - metrics.results_store(self.cfg).store_results(self.test_run) - results_publisher.summarize(final_results, self.cfg) - else: - self.logger.info("Suppressing output of summary results. Cancelled = [%r], Error = [%r].", self.cancelled, self.error) - self.metrics_store.close() - - -def run(cfg, sources=False, distribution=False, external=False, docker=False): - logger = logging.getLogger(__name__) - # at this point an actor system has to run and we should only join - actor_system = actor.bootstrap_actor_system(try_join=True) - benchmark_actor = actor_system.createActor(BenchmarkActor, targetActorRequirements={"coordinator": True}) - try: - result = actor_system.ask(benchmark_actor, Setup(cfg, sources, distribution, external, docker)) - if isinstance(result, Success): - logger.info("Benchmark has finished successfully.") - # may happen if one of the load generators has detected that the user has cancelled the benchmark. - elif isinstance(result, actor.BenchmarkCancelled): - logger.info("User has cancelled the benchmark (detected by actor).") - elif isinstance(result, actor.BenchmarkFailure): - logger.error("A benchmark failure has occurred") - raise exceptions.BenchmarkError(result.message, result.cause) - else: - raise exceptions.BenchmarkError("Got an unexpected result during benchmarking: [%s]." % str(result)) - except KeyboardInterrupt: - logger.info("User has cancelled the benchmark (detected by test run orchestrator).") - # notify the coordinator so it can properly handle this state. Do it blocking so we don't have a test run between this message - # and the actor exit request. - actor_system.ask(benchmark_actor, actor.BenchmarkCancelled()) - finally: - logger.info("Telling benchmark actor to exit.") - actor_system.tell(benchmark_actor, thespian.actors.ActorExitRequest()) - - -def set_default_hosts(cfg, host="127.0.0.1", port=9200): - logger = logging.getLogger(__name__) - configured_hosts = cfg.opts("client", "hosts") - if len(configured_hosts.default) != 0: - logger.info("Using configured hosts %s", configured_hosts.default) - else: - logger.info("Setting default host to [%s:%d]", host, port) - default_host_object = opts.TargetHosts("{}:{}".format(host,port)) - cfg.add(config.Scope.benchmark, "client", "hosts", default_host_object) - - -# Poor man's curry -def from_sources(cfg): - port = cfg.opts("provisioning", "node.http.port") - set_default_hosts(cfg, port=port) - return run(cfg, sources=True) - - -def from_distribution(cfg): - port = cfg.opts("provisioning", "node.http.port") - set_default_hosts(cfg, port=port) - return run(cfg, distribution=True) - - -def benchmark_only(cfg): - set_default_hosts(cfg) - # We'll use a special provision_config_instance name for external benchmarks. - cfg.add(config.Scope.benchmark, "builder", "provision_config_instance.names", ["external"]) - return run(cfg, external=True) - - -def docker(cfg): - set_default_hosts(cfg) - return run(cfg, docker=True) - - -Pipeline("from-sources", - "Builds and provisions OpenSearch, runs a benchmark and publishes results.", from_sources) - -Pipeline("from-distribution", - "Downloads an OpenSearch distribution, provisions it, runs a benchmark and publishes results.", from_distribution) - -Pipeline("benchmark-only", - "Assumes an already running OpenSearch instance, runs a benchmark and publishes results", benchmark_only) - -# Very experimental Docker pipeline. Should only be used with great care and is also not supported on all platforms. -Pipeline("docker", - "Runs a benchmark against the official OpenSearch Docker container and publishes results", docker, stable=False) - - -def available_pipelines(): - return [[pipeline.name, pipeline.description] for pipeline in pipelines.values() if pipeline.stable] - - -def list_pipelines(): - console.println("Available pipelines:\n") - console.println(tabulate.tabulate(available_pipelines(), headers=["Name", "Description"])) - - -def run(cfg): - logger = logging.getLogger(__name__) - # pipeline is no more mandatory, will default to benchmark-only - name = cfg.opts("test_run", "pipeline", mandatory=False) - test_run_id = cfg.opts("system", "test_run.id") - logger.info("test run id [%s]", test_run_id) - if not name: - # assume from-distribution pipeline if distribution.version has been specified - if cfg.exists("builder", "distribution.version"): - name = "from-distribution" - else: - name = "benchmark-only" - logger.info("User did not specify distribution.version or pipeline. Using default pipeline [%s].", name) - - cfg.add(config.Scope.applicationOverride, "test_run", "pipeline", name) - else: - logger.info("User specified pipeline [%s].", name) - - if os.environ.get("BENCHMARK_RUNNING_IN_DOCKER", "").upper() == "TRUE": - # in this case only benchmarking remote OpenSearch clusters makes sense - if name != "benchmark-only": - raise exceptions.SystemSetupError( - "Only the [benchmark-only] pipeline is supported by the Benchmark Docker image.\n" - "Add --pipeline=benchmark-only in your Benchmark arguments and try again.\n" - "For more details read the docs for the benchmark-only pipeline in {}\n".format( - doc_link(""))) - - try: - pipeline = pipelines[name] - except KeyError: - raise exceptions.SystemSetupError( - "Unknown pipeline [%s]. List the available pipelines with %s list pipelines." % (name, PROGRAM_NAME)) - try: - pipeline(cfg) - except exceptions.BenchmarkError as e: - # just pass on our own errors. It should be treated differently on top-level - raise e - except KeyboardInterrupt: - logger.info("User has cancelled the benchmark.") - except BaseException: - tb = sys.exc_info()[2] - raise exceptions.BenchmarkError("This test_run ended with a fatal crash.").with_traceback(tb)