Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add automatic testing and aggregation to OSB #655

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def build_aggregated_results(self):
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)

test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
test_execution.user_tags = {
"aggregation-of-runs": list(self.test_executions.keys())
}
test_execution.add_results(AggregatedResults(aggregated_results))
test_execution.distribution_version = test_exe.distribution_version
test_execution.revision = test_exe.revision
Expand Down
134 changes: 89 additions & 45 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,24 @@ def add_workload_source(subparser):
f"high values favor the most common queries. "
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA)
test_execution_parser.add_argument(
"--test-iterations",
help="The number of times to run the workload (default: 1).",
default=1)
test_execution_parser.add_argument(
"--aggregate",
type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']),
help="Aggregate the results of multiple test executions (default: true).",
default=True)
test_execution_parser.add_argument(
"--sleep-timer",
help="Sleep for the specified number of seconds before starting the next test execution (default: 5).",
default=5)
test_execution_parser.add_argument(
"--cancel-on-error",
action="store_true",
help="Stop executing tests if an error occurs in one of the test iterations (default: false).",
)

###############################################################################
#
Expand All @@ -634,8 +652,8 @@ def add_workload_source(subparser):
action="store_true",
default=False)

for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, download_parser, install_parser,
start_parser, stop_parser, info_parser, create_workload_parser]:
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser,
download_parser, install_parser, start_parser, stop_parser, info_parser, create_workload_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
p.add_argument(
"--configuration-name",
Expand Down Expand Up @@ -863,6 +881,49 @@ def prepare_test_executions_dict(args, cfg):
test_executions_dict[execution] = None
return test_executions_dict

def configure_test(arg_parser, args, cfg):
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only
# to run the actual benchmark (i.e. generating load).
print_test_execution_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id)
# use the test_execution id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id)
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.plugins", opts.csv_to_list(
args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)

def print_test_execution_id(args):
console.info(f"[Test Execution ID]: {args.test_execution_id}")

Expand Down Expand Up @@ -920,49 +981,32 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id)
builder.stop(cfg)
elif sub_command == "execute-test":
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only
# to run the actual benchmark (i.e. generating load).
print_test_execution_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id)
# use the test_execution id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id)
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.plugins", opts.csv_to_list(
args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)

execute_test(cfg, args.kill_running_processes)
iterations = int(args.test_iterations)
if iterations > 1:
test_exes = []
for _ in range(iterations):
try:
configure_test(arg_parser, args, cfg)
execute_test(cfg, args.kill_running_processes)
time.sleep(int(args.sleep_timer))
test_exes.append(args.test_execution_id)
args.test_execution_id = str(uuid.uuid4())
except Exception as e:
console.error(f"Error occurred during test execution {_+1}: {str(e)}")
if args.cancel_on_error:
console.info("Cancelling remaining test executions.")
break

if args.aggregate:
args.test_executions = test_exes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this exception block gets reached, test_exeswill still include the failed test_execution_id since it was appended in line 990 and we'd be including the failed test execution id in the aggregation, which might result in a failure when OSB attempts to aggregate the successful results.

Maybe move line 990 to be after line 992, assuming execute_test succeeds, or we can pop from the list in the exception block.

On a side note, based on the way execute_test() function is constructed, I don't think we reach this exception block and aggregate the successful tests in cases where the user cancels the test. We'll need to look into this route but can keep it for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I moved like 990 so it only adds the test ID after the execute_test is called.

test_executions_dict = prepare_test_executions_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
aggregator_instance.aggregate()
elif args.test_iterations == 1:
configure_test(arg_parser, args, cfg)
execute_test(cfg, args.kill_running_processes)
else:
console.info("Please enter a valid number of test iterations")
Comment on lines +1008 to +1009
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we ever reach this point since argparse by default sets the value to 1 if --test-iterations is not provided? If not, let's remove else statement

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I removed this. Although users can potentially enter invalid values

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, you're right. Let's add it back

elif sub_command == "create-workload":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs)
Expand Down
Loading