From d0d6a68f9d066ba659c651769fb10fd3700d0d3c Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Wed, 24 Apr 2024 17:38:39 +0800 Subject: [PATCH] Add result output option Signed-off-by: Liyun Xiu --- osbenchmark/benchmark.py | 33 +++--- osbenchmark/tuning/optimal_finder.py | 43 ++++---- osbenchmark/tuning/publisher.py | 148 +++++++++++++++++++++++++++ osbenchmark/tuning/result.py | 7 +- osbenchmark/tuning/schedule.py | 5 +- tests/tuning/optimal_finder_test.py | 20 ++-- tests/tuning/schedule_test.py | 2 +- 7 files changed, 208 insertions(+), 50 deletions(-) create mode 100644 osbenchmark/tuning/publisher.py diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 665fbad45..d57d6180b 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -41,6 +41,7 @@ from osbenchmark.workload_generator import workload_generator from osbenchmark.utils import io, convert, process, console, net, opts, versions from osbenchmark.tuning import optimal_finder +from osbenchmark.tuning.publisher import TuningPublisher def create_arg_parser(): @@ -523,6 +524,20 @@ def add_workload_source(subparser): help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch " f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).", default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS) + p.add_argument( + "--results-format", + help="Define the output format for the command line results (default: markdown).", + choices=["markdown", "csv"], + default="markdown") + p.add_argument( + "--results-numbers-align", + help="Define the output column number alignment for the command line results (default: right).", + choices=["right", "center", "left", "decimal"], + default="right") + p.add_argument( + "--results-file", + help="Write the command line results also to the provided file.", + default="") test_execution_parser.add_argument("--on-error", choices=["continue", "abort"], @@ -555,25 +570,11 @@ def add_workload_source(subparser): help="Define a user-specific key-value pair (separated by ':'). It is added to each metric record as meta info. " "Example: intention:baseline-ticket-12345", default="") - test_execution_parser.add_argument( - "--results-format", - help="Define the output format for the command line results (default: markdown).", - choices=["markdown", "csv"], - default="markdown") - test_execution_parser.add_argument( - "--results-numbers-align", - help="Define the output column number alignment for the command line results (default: right).", - choices=["right", "center", "left", "decimal"], - default="right") test_execution_parser.add_argument( "--show-in-results", help="Define which values are shown in the summary results published (default: available).", choices=["available", "all-percentiles", "all"], default="available") - test_execution_parser.add_argument( - "--results-file", - help="Write the command line results also to the provided file.", - default="") test_execution_parser.add_argument( "--preserve-install", help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).", @@ -976,7 +977,9 @@ def dispatch_sub_command(arg_parser, args, cfg): configure_workload_params(arg_parser, args, cfg) workload.workload_info(cfg) elif sub_command == "tuning": - optimal_finder.run(args) + configure_results_publishing_params(args, cfg) + results = optimal_finder.run(args) + TuningPublisher(cfg).publish(results) else: raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]") return True diff --git a/osbenchmark/tuning/optimal_finder.py b/osbenchmark/tuning/optimal_finder.py index 67d772502..df1cf6ced 100644 --- a/osbenchmark/tuning/optimal_finder.py +++ b/osbenchmark/tuning/optimal_finder.py @@ -31,9 +31,13 @@ import subprocess from datetime import datetime from timeit import default_timer as timer +from osbenchmark.utils import console from osbenchmark.tuning.schedule import BatchSizeSchedule, BulkSizeSchedule, ClientSchedule, ScheduleRunner from osbenchmark.tuning.result import Result +METRIC_KEY = "Metric" +TOTAL_TIME_KEY = "Total time" + def get_benchmark_params(args, batch_size, bulk_size, number_of_client, temp_output_file): params = {} @@ -83,7 +87,7 @@ def run_benchmark(params): return proc.returncode == 0, stderr.decode('ascii') except KeyboardInterrupt as e: proc.terminate() - print("Process is terminated!") + console.info("Process is terminated!") raise e @@ -97,29 +101,29 @@ def run_batch_bulk_client_tests(args, test_id, batch, bulk, client): _, filename = tempfile.mkstemp() params = get_benchmark_params(args, batch, bulk, client, filename) - logger.info("test_id: %s, batch: %d, bulk: %d, client: %d", test_id, batch, bulk, client) + console.info(f"Running benchmark with: bulk size: {bulk}, number of clients: {client}, batch size: {batch}") success = False err = None start = timer() try: success, err = run_benchmark(params) finally: - end = timer() + total_time = int(timer() - start) if success: with open(filename, 'r', newline='') as csvfile: - line_reader = csv.reader(csvfile, delimiter=',') + line_reader = csv.DictReader(csvfile, delimiter=',') output = {} for row in line_reader: - output[row[0]] = row[2] - result.set_output(True, int(end - start), output) + output[row[METRIC_KEY]] = row + output[TOTAL_TIME_KEY] = {METRIC_KEY: TOTAL_TIME_KEY, "Task": "", "Value": str(total_time), "Unit": "s"} + result.set_output(True, total_time, output) else: logger.error(err) - result.set_output(False, int(end - start), None) + result.set_output(False, total_time, None) if os.path.exists(filename): os.remove(filename) - - return result, success, err + return result def batch_bulk_client_tuning(args): @@ -131,28 +135,27 @@ def batch_bulk_client_tuning(args): number_of_clients = client_schedule.steps total = len(batches) * len(bulks) * len(number_of_clients) - print(f"There will be {total} tests to run with {len(batches)} batch sizes, { len(bulks)} bulk sizes, " + console.info(f"There will be {total} tests to run with {len(bulks)} bulk sizes, {len(batches)} batch sizes, " f"{len(number_of_clients)} client numbers.") schedule_runner = ScheduleRunner(args, batch_schedule, bulk_schedule, client_schedule) results = schedule_runner.run(run_batch_bulk_client_tests) - - successful_result_ids = get_successful_ids(results, float(args.allowed_error_rate)) - optimal = find_optimal_result([results[result_id] for result_id in successful_result_ids]) + successful_results = get_successful_results(results, float(args.allowed_error_rate)) + optimal = find_optimal_result(successful_results) if not optimal: - print("All tests failed, couldn't find any results!") + console.info("All tests failed, couldn't find any results!") else: - print(f"the optimal variable combination is: bulk size: {optimal.bulk_size}, " + console.info(f"The optimal variable combination is: bulk size: {optimal.bulk_size}, " f"batch size: {optimal.batch_size}, number of clients: {optimal.number_of_client}") return results -def get_successful_ids(results, allowed_error_rate): - successful_ids = [] +def get_successful_results(results, allowed_error_rate): + successful_results = [] for result in results: if result.success and result.error_rate <= allowed_error_rate: - successful_ids.append(result.test_id) - return successful_ids + successful_results.append(result) + return successful_results def find_optimal_result(results): @@ -166,4 +169,4 @@ def find_optimal_result(results): def run(args): - batch_bulk_client_tuning(args) + return batch_bulk_client_tuning(args) diff --git a/osbenchmark/tuning/publisher.py b/osbenchmark/tuning/publisher.py new file mode 100644 index 000000000..56c3502ca --- /dev/null +++ b/osbenchmark/tuning/publisher.py @@ -0,0 +1,148 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from functools import partial +from osbenchmark.results_publisher import format_as_markdown, format_as_csv +from osbenchmark import exceptions +from osbenchmark.utils import io as rio + +VALUE_KEY = "Value" +TASK_KEY = "Task" +UNIT_KEY = "Unit" +KEYS = [ + "Cumulative indexing time of primary shards", + "Min cumulative indexing time across primary shards", + "Median cumulative indexing time across primary shards", + "Max cumulative indexing time across primary shards", + "Cumulative indexing throttle time of primary shards", + "Min cumulative indexing throttle time across primary shards", + "Median cumulative indexing throttle time across primary shards", + "Max cumulative indexing throttle time across primary shards", + "Cumulative merge time of primary shards", + "Cumulative merge count of primary shards", + "Min cumulative merge time across primary shards", + "Median cumulative merge time across primary shards", + "Max cumulative merge time across primary shards", + "Cumulative merge throttle time of primary shards", + "Min cumulative merge throttle time across primary shards", + "Median cumulative merge throttle time across primary shards", + "Max cumulative merge throttle time across primary shards", + "Cumulative refresh time of primary shards", + "Cumulative refresh count of primary shards", + "Min cumulative refresh time across primary shards", + "Median cumulative refresh time across primary shards", + "Max cumulative refresh time across primary shards", + "Cumulative flush time of primary shards", + "Cumulative flush count of primary shards", + "Min cumulative flush time across primary shards", + "Median cumulative flush time across primary shards", + "Max cumulative flush time across primary shards", + "Total Young Gen GC time", + "Total Young Gen GC count", + "Total Old Gen GC time", + "Total Old Gen GC count", + "Store size", + "Translog size", + "Heap used for segments", + "Heap used for doc values", + "Heap used for terms", + "Heap used for norms", + "Heap used for points", + "Heap used for stored fields", + "Segment count", + "Min Throughput", + "Mean Throughput", + "Median Throughput", + "Max Throughput", + "50th percentile latency", + "90th percentile latency", + "99th percentile latency", + "99.9th percentile latency", + "99.99th percentile latency", + "100th percentile latency", + "50th percentile service time", + "90th percentile service time", + "99th percentile service time", + "99.9th percentile service time", + "99.99th percentile service time", + "100th percentile service time", + "error rate", + "Total time" +] + + +class TuningPublisher: + def __init__(self, config): + self.results_file = config.opts("results_publishing", "output.path", mandatory=False) + self.results_format = config.opts("results_publishing", "format") + self.numbers_align = config.opts("results_publishing", "numbers.align", + mandatory=False, default_value="right") + self.cwd = config.opts("node", "benchmark.cwd") + + def publish(self, results): + write_results(self.results_format, self.numbers_align, self.results_file, self.cwd, results) + + +def construct_data(results, keys): + lines = [] + for key in keys: + last_result_with_key = None + for result in results: + if key in result.output: + last_result_with_key = result + + if not last_result_with_key: + continue + line = [key, last_result_with_key.output[key][TASK_KEY]] + for result in results: + if key not in result.output: + line.append("") + else: + line.append(result.output[key][VALUE_KEY]) + line.append(last_result_with_key.output[key][UNIT_KEY]) + lines.append(line) + return lines + + +def write_results(results_format, numbers_align, results_file, cwd, results): + if results_format == "markdown": + formatter = partial(format_as_markdown, numbers_align=numbers_align) + elif results_format == "csv": + formatter = format_as_csv + else: + raise exceptions.SystemSetupError("Unknown publish format '%s'" % results_format) + + headers = ["Metric", "Task"] + for result in results: + headers.append(str(result)) + headers.append("Unit") + + lines = construct_data(results, KEYS) + + if len(results_file) > 0: + normalized_results_file = rio.normalize_path(results_file, cwd) + # ensure that the parent folder already exists when we try to write the file... + rio.ensure_dir(rio.dirname(normalized_results_file)) + with open(normalized_results_file, mode="a+", encoding="utf-8") as f: + f.writelines(formatter(headers, lines)) diff --git a/osbenchmark/tuning/result.py b/osbenchmark/tuning/result.py index 1218c4327..e3d386572 100644 --- a/osbenchmark/tuning/result.py +++ b/osbenchmark/tuning/result.py @@ -23,6 +23,7 @@ # under the License. ERROR_RATE_KEY = "error rate" +VALUE_KEY = "Value" class Result: @@ -42,4 +43,8 @@ def set_output(self, success, total_time, output): if not output: return self.output = output - self.error_rate = float(output[ERROR_RATE_KEY]) if ERROR_RATE_KEY in output else 0 # percentage + if output and ERROR_RATE_KEY in output: + self.error_rate = float(output[ERROR_RATE_KEY][VALUE_KEY]) + + def __str__(self): + return f"bulk size: {self.bulk_size}, batch size: {self.batch_size}, number of clients: {self.number_of_client}" diff --git a/osbenchmark/tuning/schedule.py b/osbenchmark/tuning/schedule.py index 6d01baa9d..9c6fe08fd 100644 --- a/osbenchmark/tuning/schedule.py +++ b/osbenchmark/tuning/schedule.py @@ -121,11 +121,10 @@ def __init__(self, args, *schedules): self.args = args def run(self, callback): - results = {} - + results = [] schedule_list = [schedule.steps for schedule in self.schedules] for args in list(itertools.product(*schedule_list)): test_id = str(uuid.uuid4()) result = callback(self.args, test_id, *args) - results[test_id] = result + results.append(result) return results diff --git a/tests/tuning/optimal_finder_test.py b/tests/tuning/optimal_finder_test.py index e211e4599..24558af2f 100644 --- a/tests/tuning/optimal_finder_test.py +++ b/tests/tuning/optimal_finder_test.py @@ -23,7 +23,7 @@ # under the License. import pytest -from osbenchmark.tuning.optimal_finder import find_optimal_result, get_successful_ids +from osbenchmark.tuning.optimal_finder import find_optimal_result, get_successful_results from osbenchmark.tuning.result import Result @@ -44,19 +44,19 @@ def test_find_optimal_result(results): assert find_optimal_result(results).test_id == "id2" -def test_get_successful_ids_all_failed(results): +def test_get_successful_results_all_failed(results): results[0].set_output(False, 25, None) results[1].set_output(False, 15, None) results[2].set_output(False, 45, None) results[3].set_output(False, 125, None) - assert len(get_successful_ids(results, 0)) == 0 + assert len(get_successful_results(results, 0)) == 0 def test_get_successful_ids_error_rate(results): - results[0].set_output(False, 25, {"error rate": 0.1}) - results[1].set_output(True, 15, {"error rate": 0.2}) - results[2].set_output(True, 45, {"error rate": 0.3}) - results[3].set_output(True, 125, {"error rate": 0.4}) - assert len(get_successful_ids(results, 0.21)) == 1 - assert len(get_successful_ids(results, 0.31)) == 2 - assert len(get_successful_ids(results, 0.4)) == 3 + results[0].set_output(False, 25, {"error rate": {"Value": 0.1}}) + results[1].set_output(True, 15, {"error rate": {"Value": 0.2}}) + results[2].set_output(True, 45, {"error rate": {"Value": 0.3}}) + results[3].set_output(True, 125, {"error rate": {"Value": 0.4}}) + assert len(get_successful_results(results, 0.21)) == 1 + assert len(get_successful_results(results, 0.31)) == 2 + assert len(get_successful_results(results, 0.4)) == 3 diff --git a/tests/tuning/schedule_test.py b/tests/tuning/schedule_test.py index 5316ad6a2..00b7fb13e 100644 --- a/tests/tuning/schedule_test.py +++ b/tests/tuning/schedule_test.py @@ -60,5 +60,5 @@ def test_ScheduleRunner(self): schedule2 = FakeSchedule([4, 5]) args = {} runner = ScheduleRunner(args, schedule1, schedule2) - results = runner.run(fake_callback).values() + results = runner.run(fake_callback) self.assertEqual({(result["arg1"], result["arg2"]) for result in results}, {(1,4), (2,4), (1,5), (2,5)})