Skip to content

Commit

Permalink
Add result output option
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <[email protected]>
  • Loading branch information
chishui committed Apr 28, 2024
1 parent 4800397 commit d0d6a68
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 50 deletions.
33 changes: 18 additions & 15 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from osbenchmark.workload_generator import workload_generator
from osbenchmark.utils import io, convert, process, console, net, opts, versions
from osbenchmark.tuning import optimal_finder
from osbenchmark.tuning.publisher import TuningPublisher


def create_arg_parser():
Expand Down Expand Up @@ -523,6 +524,20 @@ def add_workload_source(subparser):
help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch "
f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).",
default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS)
p.add_argument(
"--results-format",
help="Define the output format for the command line results (default: markdown).",
choices=["markdown", "csv"],
default="markdown")
p.add_argument(
"--results-numbers-align",
help="Define the output column number alignment for the command line results (default: right).",
choices=["right", "center", "left", "decimal"],
default="right")
p.add_argument(
"--results-file",
help="Write the command line results also to the provided file.",
default="")

test_execution_parser.add_argument("--on-error",
choices=["continue", "abort"],
Expand Down Expand Up @@ -555,25 +570,11 @@ def add_workload_source(subparser):
help="Define a user-specific key-value pair (separated by ':'). It is added to each metric record as meta info. "
"Example: intention:baseline-ticket-12345",
default="")
test_execution_parser.add_argument(
"--results-format",
help="Define the output format for the command line results (default: markdown).",
choices=["markdown", "csv"],
default="markdown")
test_execution_parser.add_argument(
"--results-numbers-align",
help="Define the output column number alignment for the command line results (default: right).",
choices=["right", "center", "left", "decimal"],
default="right")
test_execution_parser.add_argument(
"--show-in-results",
help="Define which values are shown in the summary results published (default: available).",
choices=["available", "all-percentiles", "all"],
default="available")
test_execution_parser.add_argument(
"--results-file",
help="Write the command line results also to the provided file.",
default="")
test_execution_parser.add_argument(
"--preserve-install",
help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).",
Expand Down Expand Up @@ -976,7 +977,9 @@ def dispatch_sub_command(arg_parser, args, cfg):
configure_workload_params(arg_parser, args, cfg)
workload.workload_info(cfg)
elif sub_command == "tuning":
optimal_finder.run(args)
configure_results_publishing_params(args, cfg)
results = optimal_finder.run(args)
TuningPublisher(cfg).publish(results)
else:
raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]")
return True
Expand Down
43 changes: 23 additions & 20 deletions osbenchmark/tuning/optimal_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
import subprocess
from datetime import datetime
from timeit import default_timer as timer
from osbenchmark.utils import console
from osbenchmark.tuning.schedule import BatchSizeSchedule, BulkSizeSchedule, ClientSchedule, ScheduleRunner
from osbenchmark.tuning.result import Result

METRIC_KEY = "Metric"
TOTAL_TIME_KEY = "Total time"


def get_benchmark_params(args, batch_size, bulk_size, number_of_client, temp_output_file):
params = {}
Expand Down Expand Up @@ -83,7 +87,7 @@ def run_benchmark(params):
return proc.returncode == 0, stderr.decode('ascii')
except KeyboardInterrupt as e:
proc.terminate()
print("Process is terminated!")
console.info("Process is terminated!")
raise e


Expand All @@ -97,29 +101,29 @@ def run_batch_bulk_client_tests(args, test_id, batch, bulk, client):
_, filename = tempfile.mkstemp()
params = get_benchmark_params(args, batch, bulk, client, filename)

logger.info("test_id: %s, batch: %d, bulk: %d, client: %d", test_id, batch, bulk, client)
console.info(f"Running benchmark with: bulk size: {bulk}, number of clients: {client}, batch size: {batch}")
success = False
err = None
start = timer()
try:
success, err = run_benchmark(params)
finally:
end = timer()
total_time = int(timer() - start)
if success:
with open(filename, 'r', newline='') as csvfile:
line_reader = csv.reader(csvfile, delimiter=',')
line_reader = csv.DictReader(csvfile, delimiter=',')
output = {}
for row in line_reader:
output[row[0]] = row[2]
result.set_output(True, int(end - start), output)
output[row[METRIC_KEY]] = row
output[TOTAL_TIME_KEY] = {METRIC_KEY: TOTAL_TIME_KEY, "Task": "", "Value": str(total_time), "Unit": "s"}
result.set_output(True, total_time, output)
else:
logger.error(err)
result.set_output(False, int(end - start), None)
result.set_output(False, total_time, None)

if os.path.exists(filename):
os.remove(filename)

return result, success, err
return result


def batch_bulk_client_tuning(args):
Expand All @@ -131,28 +135,27 @@ def batch_bulk_client_tuning(args):
number_of_clients = client_schedule.steps

total = len(batches) * len(bulks) * len(number_of_clients)
print(f"There will be {total} tests to run with {len(batches)} batch sizes, { len(bulks)} bulk sizes, "
console.info(f"There will be {total} tests to run with {len(bulks)} bulk sizes, {len(batches)} batch sizes, "
f"{len(number_of_clients)} client numbers.")

schedule_runner = ScheduleRunner(args, batch_schedule, bulk_schedule, client_schedule)
results = schedule_runner.run(run_batch_bulk_client_tests)

successful_result_ids = get_successful_ids(results, float(args.allowed_error_rate))
optimal = find_optimal_result([results[result_id] for result_id in successful_result_ids])
successful_results = get_successful_results(results, float(args.allowed_error_rate))
optimal = find_optimal_result(successful_results)
if not optimal:
print("All tests failed, couldn't find any results!")
console.info("All tests failed, couldn't find any results!")
else:
print(f"the optimal variable combination is: bulk size: {optimal.bulk_size}, "
console.info(f"The optimal variable combination is: bulk size: {optimal.bulk_size}, "
f"batch size: {optimal.batch_size}, number of clients: {optimal.number_of_client}")
return results


def get_successful_ids(results, allowed_error_rate):
successful_ids = []
def get_successful_results(results, allowed_error_rate):
successful_results = []
for result in results:
if result.success and result.error_rate <= allowed_error_rate:
successful_ids.append(result.test_id)
return successful_ids
successful_results.append(result)
return successful_results


def find_optimal_result(results):
Expand All @@ -166,4 +169,4 @@ def find_optimal_result(results):


def run(args):
batch_bulk_client_tuning(args)
return batch_bulk_client_tuning(args)
148 changes: 148 additions & 0 deletions osbenchmark/tuning/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from functools import partial
from osbenchmark.results_publisher import format_as_markdown, format_as_csv
from osbenchmark import exceptions
from osbenchmark.utils import io as rio

VALUE_KEY = "Value"
TASK_KEY = "Task"
UNIT_KEY = "Unit"
KEYS = [
"Cumulative indexing time of primary shards",
"Min cumulative indexing time across primary shards",
"Median cumulative indexing time across primary shards",
"Max cumulative indexing time across primary shards",
"Cumulative indexing throttle time of primary shards",
"Min cumulative indexing throttle time across primary shards",
"Median cumulative indexing throttle time across primary shards",
"Max cumulative indexing throttle time across primary shards",
"Cumulative merge time of primary shards",
"Cumulative merge count of primary shards",
"Min cumulative merge time across primary shards",
"Median cumulative merge time across primary shards",
"Max cumulative merge time across primary shards",
"Cumulative merge throttle time of primary shards",
"Min cumulative merge throttle time across primary shards",
"Median cumulative merge throttle time across primary shards",
"Max cumulative merge throttle time across primary shards",
"Cumulative refresh time of primary shards",
"Cumulative refresh count of primary shards",
"Min cumulative refresh time across primary shards",
"Median cumulative refresh time across primary shards",
"Max cumulative refresh time across primary shards",
"Cumulative flush time of primary shards",
"Cumulative flush count of primary shards",
"Min cumulative flush time across primary shards",
"Median cumulative flush time across primary shards",
"Max cumulative flush time across primary shards",
"Total Young Gen GC time",
"Total Young Gen GC count",
"Total Old Gen GC time",
"Total Old Gen GC count",
"Store size",
"Translog size",
"Heap used for segments",
"Heap used for doc values",
"Heap used for terms",
"Heap used for norms",
"Heap used for points",
"Heap used for stored fields",
"Segment count",
"Min Throughput",
"Mean Throughput",
"Median Throughput",
"Max Throughput",
"50th percentile latency",
"90th percentile latency",
"99th percentile latency",
"99.9th percentile latency",
"99.99th percentile latency",
"100th percentile latency",
"50th percentile service time",
"90th percentile service time",
"99th percentile service time",
"99.9th percentile service time",
"99.99th percentile service time",
"100th percentile service time",
"error rate",
"Total time"
]


class TuningPublisher:
def __init__(self, config):
self.results_file = config.opts("results_publishing", "output.path", mandatory=False)
self.results_format = config.opts("results_publishing", "format")
self.numbers_align = config.opts("results_publishing", "numbers.align",
mandatory=False, default_value="right")
self.cwd = config.opts("node", "benchmark.cwd")

def publish(self, results):
write_results(self.results_format, self.numbers_align, self.results_file, self.cwd, results)


def construct_data(results, keys):
lines = []
for key in keys:
last_result_with_key = None
for result in results:
if key in result.output:
last_result_with_key = result

if not last_result_with_key:
continue
line = [key, last_result_with_key.output[key][TASK_KEY]]
for result in results:
if key not in result.output:
line.append("")
else:
line.append(result.output[key][VALUE_KEY])
line.append(last_result_with_key.output[key][UNIT_KEY])
lines.append(line)
return lines


def write_results(results_format, numbers_align, results_file, cwd, results):
if results_format == "markdown":
formatter = partial(format_as_markdown, numbers_align=numbers_align)
elif results_format == "csv":
formatter = format_as_csv
else:
raise exceptions.SystemSetupError("Unknown publish format '%s'" % results_format)

headers = ["Metric", "Task"]
for result in results:
headers.append(str(result))
headers.append("Unit")

lines = construct_data(results, KEYS)

if len(results_file) > 0:
normalized_results_file = rio.normalize_path(results_file, cwd)
# ensure that the parent folder already exists when we try to write the file...
rio.ensure_dir(rio.dirname(normalized_results_file))
with open(normalized_results_file, mode="a+", encoding="utf-8") as f:
f.writelines(formatter(headers, lines))
7 changes: 6 additions & 1 deletion osbenchmark/tuning/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# under the License.

ERROR_RATE_KEY = "error rate"
VALUE_KEY = "Value"


class Result:
Expand All @@ -42,4 +43,8 @@ def set_output(self, success, total_time, output):
if not output:
return
self.output = output
self.error_rate = float(output[ERROR_RATE_KEY]) if ERROR_RATE_KEY in output else 0 # percentage
if output and ERROR_RATE_KEY in output:
self.error_rate = float(output[ERROR_RATE_KEY][VALUE_KEY])

def __str__(self):
return f"bulk size: {self.bulk_size}, batch size: {self.batch_size}, number of clients: {self.number_of_client}"
5 changes: 2 additions & 3 deletions osbenchmark/tuning/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,10 @@ def __init__(self, args, *schedules):
self.args = args

def run(self, callback):
results = {}

results = []
schedule_list = [schedule.steps for schedule in self.schedules]
for args in list(itertools.product(*schedule_list)):
test_id = str(uuid.uuid4())
result = callback(self.args, test_id, *args)
results[test_id] = result
results.append(result)
return results
20 changes: 10 additions & 10 deletions tests/tuning/optimal_finder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# under the License.

import pytest
from osbenchmark.tuning.optimal_finder import find_optimal_result, get_successful_ids
from osbenchmark.tuning.optimal_finder import find_optimal_result, get_successful_results
from osbenchmark.tuning.result import Result


Expand All @@ -44,19 +44,19 @@ def test_find_optimal_result(results):
assert find_optimal_result(results).test_id == "id2"


def test_get_successful_ids_all_failed(results):
def test_get_successful_results_all_failed(results):
results[0].set_output(False, 25, None)
results[1].set_output(False, 15, None)
results[2].set_output(False, 45, None)
results[3].set_output(False, 125, None)
assert len(get_successful_ids(results, 0)) == 0
assert len(get_successful_results(results, 0)) == 0


def test_get_successful_ids_error_rate(results):
results[0].set_output(False, 25, {"error rate": 0.1})
results[1].set_output(True, 15, {"error rate": 0.2})
results[2].set_output(True, 45, {"error rate": 0.3})
results[3].set_output(True, 125, {"error rate": 0.4})
assert len(get_successful_ids(results, 0.21)) == 1
assert len(get_successful_ids(results, 0.31)) == 2
assert len(get_successful_ids(results, 0.4)) == 3
results[0].set_output(False, 25, {"error rate": {"Value": 0.1}})
results[1].set_output(True, 15, {"error rate": {"Value": 0.2}})
results[2].set_output(True, 45, {"error rate": {"Value": 0.3}})
results[3].set_output(True, 125, {"error rate": {"Value": 0.4}})
assert len(get_successful_results(results, 0.21)) == 1
assert len(get_successful_results(results, 0.31)) == 2
assert len(get_successful_results(results, 0.4)) == 3
2 changes: 1 addition & 1 deletion tests/tuning/schedule_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ def test_ScheduleRunner(self):
schedule2 = FakeSchedule([4, 5])
args = {}
runner = ScheduleRunner(args, schedule1, schedule2)
results = runner.run(fake_callback).values()
results = runner.run(fake_callback)
self.assertEqual({(result["arg1"], result["arg2"]) for result in results}, {(1,4), (2,4), (1,5), (2,5)})

0 comments on commit d0d6a68

Please sign in to comment.