diff --git a/notebooks/tuning_ingest_params.ipynb b/notebooks/tuning_ingest_params.ipynb new file mode 100644 index 000000000..03c65f0d7 --- /dev/null +++ b/notebooks/tuning_ingest_params.ipynb @@ -0,0 +1,202 @@ +{ + "cells": [ + { + "cell_type": "raw", + "id": "561e61a3-3ba6-4bc8-a747-a8b51e34c493", + "metadata": {}, + "source": [ + "# Run benchmark with different variables to find combination which could lead to optimal ingestion performance\n", + "When ingesting data to OpenSearch using bulk API, using different variables could result in different ingestion performance. For example, the amount of document in bulk API, how many OpenSearch clients are used to send requests etc. It's not easy for user to experiment with all the combinations of the variables and find the option which could lead to optimal ingestion performance. In OpenSearch-2.15.0, a new parameter \"batch size\" was introduced in bulk API which could significantly reduce ingestion time when using with `text_embedding` processor and `sparse_encoding` processor. However, this additional impactor could make the variable tuning even more difficult.\n", + "\n", + "This tool is to help dealing with the pain point of tuning these variables which could impact ingestion performance and automatically find the optimal combination of the variables. It utilizes the OpenSearch-Benchmark, uses different varible combination to run benchmark, collects their outputs, analyzes and visualizes the results.\n", + "\n", + "There are three variables that you can test against: bulk size, OS client number, batch size. If you already have a perferred value for certain variable, you can simply set it to a fixed value and only test other variables. For example, you always use 1 client to run bulk API with bulk size equals to 100, then you can only try different batch size to see which batch size can have the best performance." + ] + }, + { + "cell_type": "markdown", + "id": "e40b1cc3-a2b2-4fac-b857-6f40943ef4f8", + "metadata": {}, + "source": [ + "## Step 1: Import Packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "140f4d8b-83d3-48fc-83c5-0c631104c355", + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "from pathlib import Path\n", + "\n", + "sys.path.insert(0, \"../\")\n", + "\n", + "from osbenchmark.tuning.optimal_finder import run\n", + "from osbenchmark.benchmark import create_arg_parser" + ] + }, + { + "cell_type": "raw", + "id": "e8c59ff8-ac0b-4a4c-ad05-944abc3dae13", + "metadata": {}, + "source": [ + "## Step 2: Prepare for test inputs\n", + "### Variable Test Schedule\n", + "All three variables have two different parameters, one to set fixed value and one to set a testing schedule. The schedule has two patterns:\n", + "1. set starting value, end value, step size and trend, separated by `:`, e.g. \"10:100:1:10\" means we should test with \"10, 20, 30, 40, 50, 60, 70, 80, 90, 100\". \"20:100:-1:20\" means we should test reversely with \"100, 80, 60, 40, 20\"。\n", + "2. configure testing values manually by adding a prefix symbol `@` and still separate values using `:` e.g. \"@10:20:50\" means we only test with 10, 20, 50.\n", + "\n", + "Use `BULK_SIZE` to set a fixed bulk size value, e.g. `BULK_SIZE=100`\n", + "Use `BULK_SIZE_SCHEDULE` to set a testing schedule for bulk size. e.g. `BULK_SIZE_SCHEDULE=\"@10:20:50:100\"`\n", + "\n", + "Use `BATCH_SIZE` to set a fixed batch size value, e.g. `BATCH_SIZE=100`\n", + "Use `BATCH_SIZE_SCHEDULE` to set a testing schedule for batch size. e.g. `BATCH_SIZE_SCHEDULE=\"10:100:1:10\"`\n", + "\n", + "Use `CLIENT` to set a fixed client count, e.g. `CLIENT=1`\n", + "Use `CLIENT_SCHEDULE` to set a testing schedule for client count. e.g. `CLIENT_SCHEDULE=\"@1:2:4\"`\n", + "\n", + "### Parameters shared with OpenSearch-Benchmark\n", + "We reuse these parameters with OpenSearch-Benchmark `execute-test`:\n", + "1. WORKLOAD_PATH same as \"--workload-path\" in OSB `execute-test`, \"Define the path to a workload\"\n", + "2. TARGET_HOSTS same as \"--target-hosts\" in OSB `execute-test`, \"Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' (default: localhost:9200).\"\n", + "3. CLIENT_OPTIONS same as \"--client-options\" in OSB `execute-test`, \"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch Python client (default: timeout:60).\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7f656ab2-1659-414a-81db-76812a43132d", + "metadata": {}, + "outputs": [], + "source": [ + "# bulk size\n", + "BULK_SIZE=100\n", + "\n", + "# the clients used to run test in parallel\n", + "CLIENT=1\n", + "\n", + "# single test with error rate higher than this will be dropped\n", + "ALLOWED_ERROR_RATE=0\n", + "\n", + "# directory where the workload files locate\n", + "WORKLOAD_PATH=\"\"\n", + "\n", + "# remote ML server type, based on type we can recommend a set of testing parameters.\n", + "# choices are: \"sagemaker\", \"cohere\", \"openai\", \"unknown\". \n", + "REMOTE_ML_SERVER_TYPE=\"unknown\"\n", + "\n", + "# a comma-separated list of host:port pairs\n", + "TARGET_HOSTS=\"localhost:9200\"\n", + "\n", + "# a comma-separated list of client options to use\n", + "CLIENT_OPTIONS=\"timeout:60\"\n", + "\n", + "BATCH_SIZE_SCHEDULE=\"1:100:1:20\"" + ] + }, + { + "cell_type": "markdown", + "id": "e3513460-b440-4a4e-bf9a-db2a8c9917d6", + "metadata": {}, + "source": [ + "## Step 3: Run tests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "00b00450-97f1-4f58-ab76-259e112c3361", + "metadata": {}, + "outputs": [], + "source": [ + "# construct arguments for testing\n", + "argv = [\n", + " \"tuning\",\n", + " \"--allowed-error-rate\", str(ALLOWED_ERROR_RATE),\n", + " \"--bulk-size\", str(BULK_SIZE),\n", + " \"--client\", str(CLIENT),\n", + " \"--workload-path\", WORKLOAD_PATH,\n", + " \"--remote-ml-server-type\", REMOTE_ML_SERVER_TYPE,\n", + " \"--target-hosts\", TARGET_HOSTS,\n", + " \"--client-options\", CLIENT_OPTIONS,\n", + " \"--batch-size-schedule\", BATCH_SIZE_SCHEDULE\n", + "]\n", + "\n", + "# validate arguments\n", + "if not Path(WORKLOAD_PATH).exists():\n", + " print(\"WORKLOAD_PATH does not exist!\")\n", + "\n", + "# construct arguments to run\n", + "parser = create_arg_parser()\n", + "args = parser.parse_args(argv)\n", + "\n", + "# run tests with different arguments\n", + "results = run(args)" + ] + }, + { + "cell_type": "markdown", + "id": "97fdb2c2-7846-4daf-aced-67e6d2ea8aa2", + "metadata": {}, + "source": [ + "## Step 4: Visualize test results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ef26cfa8-7509-4d58-99f1-8e0bac115f96", + "metadata": {}, + "outputs": [], + "source": [ + "# visualize benchmark result\n", + "import matplotlib.pyplot as plt \n", + "\n", + "if results:\n", + " \n", + " batches = [int(result.batch_size) for result in results.values()]\n", + " latencies = [result.total_time for result in results.values()]\n", + " \n", + " plt.plot(batches, latencies)\n", + " \n", + " plt.xlabel('batch size')\n", + " plt.ylabel('latency')\n", + " \n", + " plt.show()\n", + "else:\n", + " print(\"please wait until last step completed!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cf5f074-bad9-481e-a4b3-056aafc77431", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index e92f00f30..d57d6180b 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -40,6 +40,8 @@ from osbenchmark.builder import provision_config, builder from osbenchmark.workload_generator import workload_generator from osbenchmark.utils import io, convert, process, console, net, opts, versions +from osbenchmark.tuning import optimal_finder +from osbenchmark.tuning.publisher import TuningPublisher def create_arg_parser(): @@ -221,6 +223,42 @@ def add_workload_source(subparser): help="Whether to include the comparison in the results file.", default=True) + tuning_parser = subparsers.add_parser("tuning", help="Run benchmarks with different combinations of " + "arguments, then compare results to find one which leads " + "to optimal result.") + tuning_parser.add_argument("--bulk-size", + type=int, + default=100, + help="A fixed bulk size used in benchmark") + tuning_parser.add_argument("--bulk-size-schedule", + help="A schedule for a series of bulk size used in benchmark, see " + "'https://tiny.amazon.com/zgliokjl' on the schedule pattern") + tuning_parser.add_argument("--client", + type=int, + default=1, + help="A fixed number of clients used in benchmark") + tuning_parser.add_argument("--client-schedule", + help="A schedule for a series of number of client used in benchmark, see " + "'https://tiny.amazon.com/zgliokjl' on the schedule pattern") + # hide these two variables until batch ingestion feature is launched + # https://github.com/opensearch-project/OpenSearch/issues/12457 + tuning_parser.add_argument("--batch-size", + type=int, + default=1, + help=argparse.SUPPRESS) + tuning_parser.add_argument("--batch-size-schedule", + help=argparse.SUPPRESS) + tuning_parser.add_argument("--remote-ml-server-type", + default="unknown", + choices=["sagemaker", "cohere", "openai", "unknown"], + help=argparse.SUPPRESS) + tuning_parser.add_argument("--allowed-error-rate", + type=float, + default=0, + help="The maximum allowed error rate from a single test within which could indicate a " + "successful run") + add_workload_source(tuning_parser) + download_parser = subparsers.add_parser("download", help="Downloads an artifact") download_parser.add_argument( "--provision-config-repository", @@ -469,20 +507,38 @@ def add_workload_source(subparser): help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.", default="" ) - test_execution_parser.add_argument( - "--target-hosts", - help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' " - "(default: localhost:9200).", - default="") # actually the default is pipeline specific and it is set later + test_execution_parser.add_argument( "--load-worker-coordinator-hosts", help="Define a comma-separated list of hosts which should generate load (default: localhost).", default="localhost") - test_execution_parser.add_argument( - "--client-options", - help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch " - f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).", - default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS) + + for p in [test_execution_parser, tuning_parser]: + p.add_argument( + "--target-hosts", + help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' " + "(default: localhost:9200).", + default="") # actually the default is pipeline specific and it is set later + p.add_argument( + "--client-options", + help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch " + f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).", + default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS) + p.add_argument( + "--results-format", + help="Define the output format for the command line results (default: markdown).", + choices=["markdown", "csv"], + default="markdown") + p.add_argument( + "--results-numbers-align", + help="Define the output column number alignment for the command line results (default: right).", + choices=["right", "center", "left", "decimal"], + default="right") + p.add_argument( + "--results-file", + help="Write the command line results also to the provided file.", + default="") + test_execution_parser.add_argument("--on-error", choices=["continue", "abort"], help="Controls how Benchmark behaves on response errors (default: continue).", @@ -514,25 +570,11 @@ def add_workload_source(subparser): help="Define a user-specific key-value pair (separated by ':'). It is added to each metric record as meta info. " "Example: intention:baseline-ticket-12345", default="") - test_execution_parser.add_argument( - "--results-format", - help="Define the output format for the command line results (default: markdown).", - choices=["markdown", "csv"], - default="markdown") - test_execution_parser.add_argument( - "--results-numbers-align", - help="Define the output column number alignment for the command line results (default: right).", - choices=["right", "center", "left", "decimal"], - default="right") test_execution_parser.add_argument( "--show-in-results", help="Define which values are shown in the summary results published (default: available).", choices=["available", "all-percentiles", "all"], default="available") - test_execution_parser.add_argument( - "--results-file", - help="Write the command line results also to the provided file.", - default="") test_execution_parser.add_argument( "--preserve-install", help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).", @@ -608,7 +650,7 @@ def add_workload_source(subparser): default=False) for p in [list_parser, test_execution_parser, compare_parser, download_parser, install_parser, - start_parser, stop_parser, info_parser, create_workload_parser]: + start_parser, stop_parser, info_parser, create_workload_parser, tuning_parser]: # This option is needed to support a separate configuration for the integration tests on the same machine p.add_argument( "--configuration-name", @@ -934,6 +976,10 @@ def dispatch_sub_command(arg_parser, args, cfg): elif sub_command == "info": configure_workload_params(arg_parser, args, cfg) workload.workload_info(cfg) + elif sub_command == "tuning": + configure_results_publishing_params(args, cfg) + results = optimal_finder.run(args) + TuningPublisher(cfg).publish(results) else: raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]") return True diff --git a/osbenchmark/tuning/__init__.py b/osbenchmark/tuning/__init__.py new file mode 100644 index 000000000..e7a26e8e3 --- /dev/null +++ b/osbenchmark/tuning/__init__.py @@ -0,0 +1,23 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/osbenchmark/tuning/optimal_finder.py b/osbenchmark/tuning/optimal_finder.py new file mode 100644 index 000000000..df1cf6ced --- /dev/null +++ b/osbenchmark/tuning/optimal_finder.py @@ -0,0 +1,172 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys +import csv +import uuid +import logging +import tempfile +import subprocess +from datetime import datetime +from timeit import default_timer as timer +from osbenchmark.utils import console +from osbenchmark.tuning.schedule import BatchSizeSchedule, BulkSizeSchedule, ClientSchedule, ScheduleRunner +from osbenchmark.tuning.result import Result + +METRIC_KEY = "Metric" +TOTAL_TIME_KEY = "Total time" + + +def get_benchmark_params(args, batch_size, bulk_size, number_of_client, temp_output_file): + params = {} + params["--target-hosts"] = args.target_hosts + if args.client_options: + params["--client-options"] = args.client_options + params["--kill-running-processes"] = None + # we only test remote cluster + params["--pipeline"] = "benchmark-only" + params["--telemetry"] = "node-stats" + params["--telemetry-params"] = ("node-stats-include-indices:true," + "node-stats-sample-interval:10," + "node-stats-include-mem:true," + "node-stats-include-process:true") + params["--workload-path"] = args.workload_path + params["--workload-params"] = get_workload_params(batch_size, bulk_size, number_of_client) + # generate output + params["--results-format"] = "csv" + params["--results-file"] = temp_output_file + return params + + +def get_workload_params(batch_size, bulk_size, number_of_client): + params = [f"bulk_size:{bulk_size}", + f"batch_size:{batch_size}", + f"bulk_indexing_clients:{number_of_client}", + f"index_name:{generate_random_index_name()}"] + + return ",".join(params) + + +def run_benchmark(params): + commands = ["opensearch-benchmark", "execute-test"] + for k, v in params.items(): + commands.append(k) + if v: + commands.append(v) + + proc = None + try: + proc = subprocess.Popen( + commands, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + _, stderr = proc.communicate() + return proc.returncode == 0, stderr.decode('ascii') + except KeyboardInterrupt as e: + proc.terminate() + console.info("Process is terminated!") + raise e + + +def generate_random_index_name(): + return str(datetime.now().timestamp()) + "_" + str(uuid.uuid4()) + + +def run_batch_bulk_client_tests(args, test_id, batch, bulk, client): + logger = logging.getLogger(__name__) + result = Result(test_id, batch, bulk, client) + _, filename = tempfile.mkstemp() + params = get_benchmark_params(args, batch, bulk, client, filename) + + console.info(f"Running benchmark with: bulk size: {bulk}, number of clients: {client}, batch size: {batch}") + success = False + err = None + start = timer() + try: + success, err = run_benchmark(params) + finally: + total_time = int(timer() - start) + if success: + with open(filename, 'r', newline='') as csvfile: + line_reader = csv.DictReader(csvfile, delimiter=',') + output = {} + for row in line_reader: + output[row[METRIC_KEY]] = row + output[TOTAL_TIME_KEY] = {METRIC_KEY: TOTAL_TIME_KEY, "Task": "", "Value": str(total_time), "Unit": "s"} + result.set_output(True, total_time, output) + else: + logger.error(err) + result.set_output(False, total_time, None) + + if os.path.exists(filename): + os.remove(filename) + return result + + +def batch_bulk_client_tuning(args): + batch_schedule = BatchSizeSchedule(args) + bulk_schedule = BulkSizeSchedule(args) + client_schedule = ClientSchedule(args) + batches = batch_schedule.steps + bulks = bulk_schedule.steps + number_of_clients = client_schedule.steps + + total = len(batches) * len(bulks) * len(number_of_clients) + console.info(f"There will be {total} tests to run with {len(bulks)} bulk sizes, {len(batches)} batch sizes, " + f"{len(number_of_clients)} client numbers.") + + schedule_runner = ScheduleRunner(args, batch_schedule, bulk_schedule, client_schedule) + results = schedule_runner.run(run_batch_bulk_client_tests) + successful_results = get_successful_results(results, float(args.allowed_error_rate)) + optimal = find_optimal_result(successful_results) + if not optimal: + console.info("All tests failed, couldn't find any results!") + else: + console.info(f"The optimal variable combination is: bulk size: {optimal.bulk_size}, " + f"batch size: {optimal.batch_size}, number of clients: {optimal.number_of_client}") + return results + + +def get_successful_results(results, allowed_error_rate): + successful_results = [] + for result in results: + if result.success and result.error_rate <= allowed_error_rate: + successful_results.append(result) + return successful_results + + +def find_optimal_result(results): + total_time = sys.maxsize + optimal = None + for result in results: + if result.total_time < total_time: + total_time = result.total_time + optimal = result + return optimal + + +def run(args): + return batch_bulk_client_tuning(args) diff --git a/osbenchmark/tuning/publisher.py b/osbenchmark/tuning/publisher.py new file mode 100644 index 000000000..56c3502ca --- /dev/null +++ b/osbenchmark/tuning/publisher.py @@ -0,0 +1,148 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from functools import partial +from osbenchmark.results_publisher import format_as_markdown, format_as_csv +from osbenchmark import exceptions +from osbenchmark.utils import io as rio + +VALUE_KEY = "Value" +TASK_KEY = "Task" +UNIT_KEY = "Unit" +KEYS = [ + "Cumulative indexing time of primary shards", + "Min cumulative indexing time across primary shards", + "Median cumulative indexing time across primary shards", + "Max cumulative indexing time across primary shards", + "Cumulative indexing throttle time of primary shards", + "Min cumulative indexing throttle time across primary shards", + "Median cumulative indexing throttle time across primary shards", + "Max cumulative indexing throttle time across primary shards", + "Cumulative merge time of primary shards", + "Cumulative merge count of primary shards", + "Min cumulative merge time across primary shards", + "Median cumulative merge time across primary shards", + "Max cumulative merge time across primary shards", + "Cumulative merge throttle time of primary shards", + "Min cumulative merge throttle time across primary shards", + "Median cumulative merge throttle time across primary shards", + "Max cumulative merge throttle time across primary shards", + "Cumulative refresh time of primary shards", + "Cumulative refresh count of primary shards", + "Min cumulative refresh time across primary shards", + "Median cumulative refresh time across primary shards", + "Max cumulative refresh time across primary shards", + "Cumulative flush time of primary shards", + "Cumulative flush count of primary shards", + "Min cumulative flush time across primary shards", + "Median cumulative flush time across primary shards", + "Max cumulative flush time across primary shards", + "Total Young Gen GC time", + "Total Young Gen GC count", + "Total Old Gen GC time", + "Total Old Gen GC count", + "Store size", + "Translog size", + "Heap used for segments", + "Heap used for doc values", + "Heap used for terms", + "Heap used for norms", + "Heap used for points", + "Heap used for stored fields", + "Segment count", + "Min Throughput", + "Mean Throughput", + "Median Throughput", + "Max Throughput", + "50th percentile latency", + "90th percentile latency", + "99th percentile latency", + "99.9th percentile latency", + "99.99th percentile latency", + "100th percentile latency", + "50th percentile service time", + "90th percentile service time", + "99th percentile service time", + "99.9th percentile service time", + "99.99th percentile service time", + "100th percentile service time", + "error rate", + "Total time" +] + + +class TuningPublisher: + def __init__(self, config): + self.results_file = config.opts("results_publishing", "output.path", mandatory=False) + self.results_format = config.opts("results_publishing", "format") + self.numbers_align = config.opts("results_publishing", "numbers.align", + mandatory=False, default_value="right") + self.cwd = config.opts("node", "benchmark.cwd") + + def publish(self, results): + write_results(self.results_format, self.numbers_align, self.results_file, self.cwd, results) + + +def construct_data(results, keys): + lines = [] + for key in keys: + last_result_with_key = None + for result in results: + if key in result.output: + last_result_with_key = result + + if not last_result_with_key: + continue + line = [key, last_result_with_key.output[key][TASK_KEY]] + for result in results: + if key not in result.output: + line.append("") + else: + line.append(result.output[key][VALUE_KEY]) + line.append(last_result_with_key.output[key][UNIT_KEY]) + lines.append(line) + return lines + + +def write_results(results_format, numbers_align, results_file, cwd, results): + if results_format == "markdown": + formatter = partial(format_as_markdown, numbers_align=numbers_align) + elif results_format == "csv": + formatter = format_as_csv + else: + raise exceptions.SystemSetupError("Unknown publish format '%s'" % results_format) + + headers = ["Metric", "Task"] + for result in results: + headers.append(str(result)) + headers.append("Unit") + + lines = construct_data(results, KEYS) + + if len(results_file) > 0: + normalized_results_file = rio.normalize_path(results_file, cwd) + # ensure that the parent folder already exists when we try to write the file... + rio.ensure_dir(rio.dirname(normalized_results_file)) + with open(normalized_results_file, mode="a+", encoding="utf-8") as f: + f.writelines(formatter(headers, lines)) diff --git a/osbenchmark/tuning/result.py b/osbenchmark/tuning/result.py new file mode 100644 index 000000000..e3d386572 --- /dev/null +++ b/osbenchmark/tuning/result.py @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ERROR_RATE_KEY = "error rate" +VALUE_KEY = "Value" + + +class Result: + def __init__(self, test_id, batch_size, bulk_size, number_of_client): + self.success = None + self.test_id = test_id + self.batch_size = batch_size + self.bulk_size = bulk_size + self.number_of_client = number_of_client + self.total_time = 0 + self.error_rate = 0 + self.output = None + + def set_output(self, success, total_time, output): + self.success = success + self.total_time = total_time + if not output: + return + self.output = output + if output and ERROR_RATE_KEY in output: + self.error_rate = float(output[ERROR_RATE_KEY][VALUE_KEY]) + + def __str__(self): + return f"bulk size: {self.bulk_size}, batch size: {self.batch_size}, number of clients: {self.number_of_client}" diff --git a/osbenchmark/tuning/schedule.py b/osbenchmark/tuning/schedule.py new file mode 100644 index 000000000..9c6fe08fd --- /dev/null +++ b/osbenchmark/tuning/schedule.py @@ -0,0 +1,130 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import uuid +import itertools +from functools import partial + + +def get_int_from_list_or_default(l, idx, default_val): + return int(l[idx]) if idx < len(l) else default_val + + +def get_recommended_maximum_batch_size(args): + ml_server = args.remote_ml_server_type + if ml_server == "sagemaker": + return 100 # sagemaker doesn't have a restriction, it's magic number for now + elif ml_server == "cohere": + # https://docs.cohere.com/reference/embed + return 96 + elif ml_server == "openai": + # https://community.openai.com/t/embeddings-api-max-batch-size/655329 + return 2048 + else: # ml_server == "unknown" + return 200 + + +def exceeding_bound_check(bound, trend, current): + if trend > 0: + return current > bound + else: + return current < bound + + +def exceeding_and_equal_check(bound, trend, current): + if trend > 0: + return current >= bound + else: + return current <= bound + + +class Schedule: + def __init__(self, single_val, schedule_val, default_minimal, default_maximal, default_step_size): + self.default_step_size = default_step_size + self.default_maximal = default_maximal + self.default_minimal = default_minimal + self.schedule_val = schedule_val + self.single_val = single_val + self.steps = self._get_steps() + + def _get_steps(self): + if self.schedule_val: + # user specified schedule "@10:20:90" + if self.schedule_val[0] == "@": + schedule = self.schedule_val[1:] + return [int(s) for s in schedule.split(":")] + # a pattern to calculate schedule + else: + sections = [] if self.schedule_val is None else self.schedule_val.split(":") + minimum_batch_size = get_int_from_list_or_default(sections, 0, self.default_minimal) + maximum_batch_size = get_int_from_list_or_default(sections, 1, self.default_maximal) + trend = 1 if get_int_from_list_or_default(sections, 2, 1) > 0 else -1 + step_size = get_int_from_list_or_default(sections, 3, self.default_step_size) + current = minimum_batch_size if trend > 0 else maximum_batch_size + + steps = [] + open_bound_check = partial(exceeding_bound_check, + maximum_batch_size if trend > 0 else minimum_batch_size, trend) + close_bound_check = partial(exceeding_and_equal_check, + maximum_batch_size if trend > 0 else minimum_batch_size, trend) + while not open_bound_check(current): + steps.append(current) + previous = current + current = current + trend * step_size + if not close_bound_check(previous): + current = min(current, maximum_batch_size) if trend > 0 else max(current, minimum_batch_size) + return steps + else: + return [int(self.single_val)] + + +class BatchSizeSchedule(Schedule): + def __init__(self, args): + super().__init__(args.batch_size, args.batch_size_schedule, 1, get_recommended_maximum_batch_size(args), + 20) + + +class BulkSizeSchedule(Schedule): + def __init__(self, args): + super().__init__(args.bulk_size, args.bulk_size_schedule, 100, 1000, 100) + + +class ClientSchedule(Schedule): + def __init__(self, args): + super().__init__(args.client, args.client_schedule, 1, 10, 1) + + +class ScheduleRunner: + def __init__(self, args, *schedules): + self.schedules = list(schedules) + self.args = args + + def run(self, callback): + results = [] + schedule_list = [schedule.steps for schedule in self.schedules] + for args in list(itertools.product(*schedule_list)): + test_id = str(uuid.uuid4()) + result = callback(self.args, test_id, *args) + results.append(result) + return results diff --git a/osbenchmark/utils/process.py b/osbenchmark/utils/process.py index 95446e7b2..a6553e934 100644 --- a/osbenchmark/utils/process.py +++ b/osbenchmark/utils/process.py @@ -141,6 +141,15 @@ def is_benchmark_process(p): os.path.basename(cmdline[1]) == "opensearch-benchmark") +def is_benchmark_observer_process(p): + cmdline = p.cmdline() + return is_benchmark_process(p) and len(cmdline) > 2 and cmdline[2] == "tuning" + + +def is_benchmark_but_not_observer_process(p): + return is_benchmark_process(p) and not is_benchmark_observer_process(p) + + def find_all_other_benchmark_processes(): others = [] for_all_other_processes(is_benchmark_process, others.append) @@ -174,4 +183,4 @@ def for_all_other_processes(predicate, action): def kill_running_benchmark_instances(): - kill_all(is_benchmark_process) + kill_all(is_benchmark_but_not_observer_process) diff --git a/tests/tuning/optimal_finder_test.py b/tests/tuning/optimal_finder_test.py new file mode 100644 index 000000000..24558af2f --- /dev/null +++ b/tests/tuning/optimal_finder_test.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from osbenchmark.tuning.optimal_finder import find_optimal_result, get_successful_results +from osbenchmark.tuning.result import Result + + +@pytest.fixture() +def results(): + result1 = Result("id1", 0, 0, 0) + result2 = Result("id2", 0, 0, 0) + result3 = Result("id3", 0, 0, 0) + result4 = Result("id4", 0, 0, 0) + return [result1, result2, result3, result4] + + +def test_find_optimal_result(results): + results[0].set_output(True, 25, None) + results[1].set_output(True, 15, None) + results[2].set_output(True, 45, None) + results[3].set_output(True, 125, None) + assert find_optimal_result(results).test_id == "id2" + + +def test_get_successful_results_all_failed(results): + results[0].set_output(False, 25, None) + results[1].set_output(False, 15, None) + results[2].set_output(False, 45, None) + results[3].set_output(False, 125, None) + assert len(get_successful_results(results, 0)) == 0 + + +def test_get_successful_ids_error_rate(results): + results[0].set_output(False, 25, {"error rate": {"Value": 0.1}}) + results[1].set_output(True, 15, {"error rate": {"Value": 0.2}}) + results[2].set_output(True, 45, {"error rate": {"Value": 0.3}}) + results[3].set_output(True, 125, {"error rate": {"Value": 0.4}}) + assert len(get_successful_results(results, 0.21)) == 1 + assert len(get_successful_results(results, 0.31)) == 2 + assert len(get_successful_results(results, 0.4)) == 3 diff --git a/tests/tuning/schedule_test.py b/tests/tuning/schedule_test.py new file mode 100644 index 000000000..00b7fb13e --- /dev/null +++ b/tests/tuning/schedule_test.py @@ -0,0 +1,64 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import TestCase +from osbenchmark.tuning.schedule import Schedule, ScheduleRunner + + +class TestSchedule(TestCase): + def test_Schedule_with_batch_size(self): + schedule = Schedule("1", None, 0, 0, 0) + self.assertEqual([1], schedule.steps) + + def test_Schedule_with_schedule_val(self): + schedule = Schedule(None, "10:100:1:10", 0, 0, 0) + self.assertEqual(list(range(10, 101, 10)), schedule.steps) + + schedule = Schedule("1", "10:100:-11:10", 0, 0, 0) + self.assertEqual(list(range(100, 9, -10)), schedule.steps) + + schedule = Schedule("1", "@10:20:100", 0, 0, 0) + self.assertEqual([10, 20, 100], schedule.steps) + + schedule = Schedule(None, "10", 0, 100, 20) + self.assertEqual([10, 30, 50, 70, 90, 100], schedule.steps) + + +class FakeSchedule: + def __init__(self, steps): + self.steps = steps + + +def fake_callback(args, test_id, arg1, arg2): + return {"args": args, "arg1": arg1, "arg2": arg2} + + +class TestScheduleRunner(TestCase): + def test_ScheduleRunner(self): + schedule1 = FakeSchedule([1, 2]) + schedule2 = FakeSchedule([4, 5]) + args = {} + runner = ScheduleRunner(args, schedule1, schedule2) + results = runner.run(fake_callback) + self.assertEqual({(result["arg1"], result["arg2"]) for result in results}, {(1,4), (2,4), (1,5), (2,5)})