From 67ebbd01d1f9fc0a369be7916f6a121bbd8ff346 Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Wed, 1 Nov 2023 13:12:34 -0400 Subject: [PATCH 01/10] Testing complete, rendering incomplete Signed-off-by: AkshathRaghav --- osbenchmark/workload_generator/corpus.py | 164 +++++++++++++++++++---- 1 file changed, 141 insertions(+), 23 deletions(-) diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index 7512d5c77..780b43081 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -27,27 +27,32 @@ import logging import os +from concurrent.futures import ThreadPoolExecutor from osbenchmark.utils import console +from threading import Lock +progress_lock = Lock() DOCS_COMPRESSOR = bz2.BZ2Compressor COMP_EXT = ".bz2" +OUT_EXT = ".json" def template_vars(index_name, out_path, doc_count): - comp_outpath = out_path + COMP_EXT + comp_outpath = out_path + OUT_EXT + COMP_EXT + out_path = out_path + OUT_EXT return { "index_name": index_name, "filename": os.path.basename(comp_outpath), "path": comp_outpath, "doc_count": doc_count, "uncompressed_bytes": os.path.getsize(out_path), - "compressed_bytes": os.path.getsize(comp_outpath) + "compressed_bytes": os.path.getsize(comp_outpath), } def get_doc_outpath(outdir, name, suffix=""): - return os.path.join(outdir, f"{name}-documents{suffix}.json") + return os.path.join(outdir, f"{name}-documents{suffix}") def extract(client, output_path, index, number_of_docs_requested=None): @@ -64,49 +69,162 @@ def extract(client, output_path, index, number_of_docs_requested=None): number_of_docs = client.count(index=index)["count"] - total_docs = number_of_docs if not number_of_docs_requested else min(number_of_docs, number_of_docs_requested) + total_docs = ( + number_of_docs + if not number_of_docs_requested + else min(number_of_docs, number_of_docs_requested) + ) if total_docs > 0: - logger.info("[%d] total docs in index [%s]. Extracting [%s] docs.", number_of_docs, index, total_docs) + logger.info( + "[%d] total docs in index [%s]. Extracting [%s] docs.", + number_of_docs, + index, + total_docs, + ) docs_path = get_doc_outpath(output_path, index) - dump_documents(client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), " for test mode") + dump_documents( + client, + index, + get_doc_outpath(output_path, index, "-1k"), + min(total_docs, 1000), + " for test mode", + ) dump_documents(client, index, docs_path, total_docs) return template_vars(index, docs_path, total_docs) else: - logger.info("Skipping corpus extraction fo index [%s] as it contains no documents.", index) + logger.info( + "Skipping corpus extraction fo index [%s] as it contains no documents.", + index, + ) return None -def dump_documents(client, index, out_path, number_of_docs, progress_message_suffix=""): - # pylint: disable=import-outside-toplevel - from opensearchpy import helpers +def dump_documents_range( + client, index, out_path, start_doc, end_doc, total_docs, progress_message_suffix="" +): + """ + Extract documents in the range of start_doc and end_doc and write to induvidual files + + :param client: OpenSearch client used to extract data + :param index: Name of index to dump + :param out_path: Destination directory for corpus dump + :param start_doc: Start index of the document chunk + :param end_doc: End index of the document chunk + :param total_docs: Total number of documents + :return: dict of properties describing the corpus for templates + """ logger = logging.getLogger(__name__) - freq = max(1, number_of_docs // 1000) + freq = max(1, total_docs // 1000) progress = console.progress() compressor = DOCS_COMPRESSOR() + out_path = f"{out_path}_{start_doc}_{end_doc}" + OUT_EXT comp_outpath = out_path + COMP_EXT + with open(out_path, "wb") as outfile: with open(comp_outpath, "wb") as comp_outfile: - logger.info("Dumping corpus for index [%s] to [%s].", index, out_path) - query = {"query": {"match_all": {}}} - for n, doc in enumerate(helpers.scan(client, query=query, index=index)): - if n >= number_of_docs: + logger.info( + f"Dumping corpus for index [{index}] to [{out_path}] for docs {start_doc}-{end_doc}." + ) + query = { + "query": {"match_all": {}}, + "from": start_doc, + "size": end_doc - start_doc, + } + + search_after = None + + n = 0 + + while n < (end_doc - start_doc): + if search_after: + query = { + "query": {"match_all": {}}, + "size": 1, + "sort": [{"_id": "asc"}], + "search_after": search_after, + } + else: + query = { + "query": {"match_all": {}}, + "size": 1, + "sort": [{"_id": "asc"}], + "from": start_doc, + } + + response = client.search(index=index, body=query) + hits = response["hits"]["hits"] + + if not hits: break - data = (json.dumps(doc["_source"], separators=(",", ":")) + "\n").encode("utf-8") + + doc = hits[0] + search_after = doc["sort"] + data = ( + json.dumps(doc["_source"], separators=(",", ":")) + "\n" + ).encode("utf-8") outfile.write(data) comp_outfile.write(compressor.compress(data)) - render_progress(progress, progress_message_suffix, index, n + 1, number_of_docs, freq) + render_progress( + progress, + progress_message_suffix, + index, + start_doc + n + 1, + total_docs, + freq, + ) + + n += 1 comp_outfile.write(compressor.flush()) - progress.finish() + + +def dump_documents(client, index, out_path, number_of_docs, progress_message_suffix=""): + """ + Splits the dumping process into 8 threads. + First, they split the documents into chunks to be dumped. Then, they are dumped as "{index}-documents{suffix}_{start}_{end}.json(.bz2)" + Finally, they are all collated into their file "{out_path}-documents{suffix}.json(.bz2)" format. + + :param client: OpenSearch client used to extract data + :param index: Name of index to dump + :param out_path: Destination directory for corpus dump + :param number_of_docs: Total number of documents + """ + + num_threads = 8 + with ThreadPoolExecutor(max_workers=num_threads) as executor: + step = number_of_docs // num_threads + ranges = [(i, i + step) for i in range(0, number_of_docs, step)] + executor.map( + lambda args: dump_documents_range( + client, index, out_path, *args, number_of_docs, progress_message_suffix + ), + ranges, + ) + merge_json_files(out_path, ranges) + + +def merge_json_files(out_path, ranges): + for EXT in [OUT_EXT, OUT_EXT + COMP_EXT]: + merged_file_path = f"{out_path}" + EXT + with open(merged_file_path, "wb") as merged_file: + for start, end in ranges: + file_path = f"{out_path}_{start}_{end}" + EXT + with open(file_path, "rb") as f: + for line in f: + merged_file.write(line) + os.remove(file_path) def render_progress(progress, progress_message_suffix, index, cur, total, freq): - if cur % freq == 0 or total - cur < freq: - msg = f"Extracting documents for index [{index}]{progress_message_suffix}..." - percent = (cur * 100) / total - progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]") + with progress_lock: + if cur % freq == 0 or total - cur < freq: + msg = ( + f"Extracting documents for index [{index}]{progress_message_suffix}..." + ) + percent = (cur * 100) / total + progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]") From 058deabee86b17c34146c1add14208ed812eb305 Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Wed, 1 Nov 2023 17:26:08 -0400 Subject: [PATCH 02/10] Added changes, requires 2+ index testing Signed-off-by: AkshathRaghav --- .vscode/settings.json | 6 + osbenchmark/benchmark.py | 995 +++++++++++++----- osbenchmark/workload_generator/corpus.py | 141 +-- .../workload_generator/workload_generator.py | 83 +- 4 files changed, 885 insertions(+), 340 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000..6ba1afd2f --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter" + }, + "python.formatting.provider": "none" +} \ No newline at end of file diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index ff5527e89..a08e0d5f5 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -33,10 +33,27 @@ import thespian.actors -from osbenchmark import PROGRAM_NAME, BANNER, FORUM_LINK, SKULL, check_python_version, doc_link, telemetry -from osbenchmark import version, actor, config, paths, \ - test_execution_orchestrator, results_publisher, \ - metrics, workload, exceptions, log +from osbenchmark import ( + PROGRAM_NAME, + BANNER, + FORUM_LINK, + SKULL, + check_python_version, + doc_link, + telemetry, +) +from osbenchmark import ( + version, + actor, + config, + paths, + test_execution_orchestrator, + results_publisher, + metrics, + workload, + exceptions, + log, +) from osbenchmark.builder import provision_config, builder from osbenchmark.workload_generator import workload_generator from osbenchmark.utils import io, convert, process, console, net, opts, versions @@ -52,7 +69,9 @@ def positive_number(v): def non_empty_list(arg): lst = opts.csv_to_list(arg) if len(lst) < 1: - raise argparse.ArgumentError(argument=None, message="At least one argument required!") + raise argparse.ArgumentError( + argument=None, message="At least one argument required!" + ) return lst def runtime_jdk(v): @@ -62,14 +81,18 @@ def runtime_jdk(v): try: return positive_number(v) except argparse.ArgumentTypeError: - raise argparse.ArgumentTypeError(f"must be a positive number or 'bundled' but was {v}") + raise argparse.ArgumentTypeError( + f"must be a positive number or 'bundled' but was {v}" + ) def supported_os_version(v): if v: min_os_version = versions.Version.from_string(version.minimum_os_version()) specified_version = versions.Version.from_string(v) if specified_version < min_os_version: - raise argparse.ArgumentTypeError(f"must be at least {min_os_version} but was {v}") + raise argparse.ArgumentTypeError( + f"must be at least {min_os_version} but was {v}" + ) return v def add_workload_source(subparser): @@ -78,48 +101,67 @@ def add_workload_source(subparser): "--workload-repository", help="Define the repository from where Benchmark will load workloads (default: default).", # argparse is smart enough to use this default only if the user did not use --workload-path and also did not specify anything - default="default" + default="default", ) workload_source_group.add_argument( - "--workload-path", - help="Define the path to a workload.") + "--workload-path", help="Define the path to a workload." + ) subparser.add_argument( "--workload-revision", help="Define a specific revision in the workload repository that Benchmark should use.", - default=None) + default=None, + ) # try to preload configurable defaults, but this does not work together with `--configuration-name` (which is undocumented anyway) cfg = config.Config() if cfg.config_present(): cfg.load_config() - preserve_install = cfg.opts("defaults", "preserve_benchmark_candidate", default_value=False, mandatory=False) + preserve_install = cfg.opts( + "defaults", + "preserve_benchmark_candidate", + default_value=False, + mandatory=False, + ) else: preserve_install = False - parser = argparse.ArgumentParser(prog=PROGRAM_NAME, - description=BANNER + "\n\n A benchmarking tool for OpenSearch", - epilog="Find out more about Benchmark at {}".format(console.format.link(doc_link())), - formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('--version', action='version', version="%(prog)s " + version.version()) + parser = argparse.ArgumentParser( + prog=PROGRAM_NAME, + description=BANNER + "\n\n A benchmarking tool for OpenSearch", + epilog="Find out more about Benchmark at {}".format( + console.format.link(doc_link()) + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--version", action="version", version="%(prog)s " + version.version() + ) if len(sys.argv) == 1: parser.print_help() sys.exit(1) - subparsers = parser.add_subparsers( - title="subcommands", - dest="subcommand", - help="") + subparsers = parser.add_subparsers(title="subcommands", dest="subcommand", help="") - test_execution_parser = subparsers.add_parser("execute-test", help="Run a benchmark") + test_execution_parser = subparsers.add_parser( + "execute-test", help="Run a benchmark" + ) # change in favor of "list telemetry", "list workloads", "list pipelines" list_parser = subparsers.add_parser("list", help="List configuration options") list_parser.add_argument( "configuration", metavar="configuration", help="The configuration for which Benchmark should show the available options. " - "Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins", - choices=["telemetry", "workloads", "pipelines", "test_executions", "provision_config_instances", "opensearch-plugins"]) + "Possible values are: telemetry, workloads, pipelines, test_executions, provision_config_instances, opensearch-plugins", + choices=[ + "telemetry", + "workloads", + "pipelines", + "test_executions", + "provision_config_instances", + "opensearch-plugins", + ], + ) list_parser.add_argument( "--limit", help="Limit the number of search results for recent test_executions (default: 10).", @@ -139,115 +181,143 @@ def add_workload_source(subparser): info_parser.add_argument( "--workload-params", help="Define a comma-separated list of key:value pairs that are injected verbatim to the workload as variables.", - default="" + default="", ) info_parser.add_argument( "--test-procedure", - help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`." + help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`.", ) info_task_filter_group = info_parser.add_mutually_exclusive_group() info_task_filter_group.add_argument( "--include-tasks", - help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.") + help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.", + ) info_task_filter_group.add_argument( "--exclude-tasks", - help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.") + help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.", + ) - create_workload_parser = subparsers.add_parser("create-workload", help="Create a Benchmark workload from existing data") + create_workload_parser = subparsers.add_parser( + "create-workload", help="Create a Benchmark workload from existing data" + ) create_workload_parser.add_argument( - "--workload", - required=True, - help="Name of the generated workload") + "--workload", required=True, help="Name of the generated workload" + ) create_workload_parser.add_argument( "--indices", type=non_empty_list, required=True, - help="Comma-separated list of indices to include in the workload") + help="Comma-separated list of indices to include in the workload", + ) create_workload_parser.add_argument( "--target-hosts", default="", required=True, - help="Comma-separated list of host:port pairs which should be targeted") + help="Comma-separated list of host:port pairs which should be targeted", + ) create_workload_parser.add_argument( "--client-options", default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS, - help=f"Comma-separated list of client options to use. (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS})") + help=f"Comma-separated list of client options to use. (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS})", + ) create_workload_parser.add_argument( "--output-path", default=os.path.join(os.getcwd(), "workloads"), - help="Workload output directory (default: workloads/)") + help="Workload output directory (default: workloads/)", + ) create_workload_parser.add_argument( "--custom-queries", - type=argparse.FileType('r'), - help="Input JSON file to use containing custom workload queries that override the default match_all query") + type=argparse.FileType("r"), + help="Input JSON file to use containing custom workload queries that override the default match_all query", + ) create_workload_parser.add_argument( "--number-of-docs", action=opts.StoreKeyPairAsDict, - nargs='+', + nargs="+", metavar="KEY:VAL", - help="Map of index name and integer doc count to extract. Ensure that index name also exists in --indices parameter. " + - "To specify several indices and doc counts, use format: : : ...") + help="Map of index name and integer doc count to extract. Ensure that index name also exists in --indices parameter. " + + "To specify several indices and doc counts, use format: : : ...", + ) + create_workload_parser.add_argument( + "--concurrent", + action="store_true", + help="Whether to generate workload concurrently (default: false)", + ) - compare_parser = subparsers.add_parser("compare", help="Compare two test_executions") + compare_parser = subparsers.add_parser( + "compare", help="Compare two test_executions" + ) compare_parser.add_argument( "--baseline", required=True, - help=f"TestExecution ID of the baseline (see {PROGRAM_NAME} list test_executions).") + help=f"TestExecution ID of the baseline (see {PROGRAM_NAME} list test_executions).", + ) compare_parser.add_argument( "--contender", required=True, - help=f"TestExecution ID of the contender (see {PROGRAM_NAME} list test_executions).") + help=f"TestExecution ID of the contender (see {PROGRAM_NAME} list test_executions).", + ) compare_parser.add_argument( "--results-format", help="Define the output format for the command line results (default: markdown).", choices=["markdown", "csv"], - default="markdown") + default="markdown", + ) compare_parser.add_argument( "--results-numbers-align", help="Define the output column number alignment for the command line results (default: right).", choices=["right", "center", "left", "decimal"], - default="right") + default="right", + ) compare_parser.add_argument( "--results-file", help="Write the command line results also to the provided file.", - default="") + default="", + ) compare_parser.add_argument( "--show-in-results", help="Whether to include the comparison in the results file.", - default=True) + default=True, + ) download_parser = subparsers.add_parser("download", help="Downloads an artifact") download_parser.add_argument( "--provision-config-repository", help="Define the repository from where Benchmark will load provision_configs and provision_config_instances (default: default).", - default="default") + default="default", + ) download_parser.add_argument( "--provision-config-revision", help="Define a specific revision in the provision_config repository that Benchmark should use.", - default=None) + default=None, + ) download_parser.add_argument( "--provision-config-path", - help="Define the path to the provision_config_instance and plugin configurations to use.") + help="Define the path to the provision_config_instance and plugin configurations to use.", + ) download_parser.add_argument( "--distribution-version", type=supported_os_version, help="Define the version of the OpenSearch distribution to download. " - "Check https://opensearch.org/docs/version-history/ for released versions.", - default="") + "Check https://opensearch.org/docs/version-history/ for released versions.", + default="", + ) download_parser.add_argument( "--distribution-repository", help="Define the repository from where the OpenSearch distribution should be downloaded (default: release).", - default="release") + default="release", + ) download_parser.add_argument( "--provision-config-instance", help=f"Define the provision_config_instance to use. List possible " f"provision_config_instances with `{PROGRAM_NAME} list " f"provision_config_instances` (default: defaults).", - default="defaults") # optimized for local usage + default="defaults", + ) # optimized for local usage download_parser.add_argument( "--provision-config-instance-params", help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the provision_config_instance.", - default="" + default="", ) download_parser.add_argument( "--target-os", @@ -258,93 +328,102 @@ def add_workload_source(subparser): help="The name of the CPU architecture for which an artifact should be downloaded (default: current architecture)", ) - install_parser = subparsers.add_parser("install", help="Installs an OpenSearch node locally") + install_parser = subparsers.add_parser( + "install", help="Installs an OpenSearch node locally" + ) install_parser.add_argument( "--revision", help="Define the source code revision for building the benchmark candidate. 'current' uses the source tree as is," - " 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp." - " The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, " - "e.g. \"@2013-07-27T10:37:00Z\" (default: current).", - default="current") # optimized for local usage, don't fetch sources + " 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp." + ' The timestamp must be specified as: "@ts" where "ts" must be a valid ISO 8601 timestamp, ' + 'e.g. "@2013-07-27T10:37:00Z" (default: current).', + default="current", + ) # optimized for local usage, don't fetch sources # Intentionally undocumented as we do not consider Docker a fully supported option. install_parser.add_argument( - "--build-type", - help=argparse.SUPPRESS, - choices=["tar", "docker"], - default="tar") + "--build-type", help=argparse.SUPPRESS, choices=["tar", "docker"], default="tar" + ) install_parser.add_argument( "--provision-config-repository", help="Define the repository from where Benchmark will load provision_configs and provision_config_instances (default: default).", - default="default") + default="default", + ) install_parser.add_argument( "--provision-config-revision", help="Define a specific revision in the provision_config repository that Benchmark should use.", - default=None) + default=None, + ) install_parser.add_argument( "--provision-config-path", - help="Define the path to the provision_config_instance and plugin configurations to use.") + help="Define the path to the provision_config_instance and plugin configurations to use.", + ) install_parser.add_argument( "--runtime-jdk", type=runtime_jdk, help="The major version of the runtime JDK to use during installation.", - default=None) + default=None, + ) install_parser.add_argument( "--distribution-repository", help="Define the repository from where the OpenSearch distribution should be downloaded (default: release).", - default="release") + default="release", + ) install_parser.add_argument( "--distribution-version", type=supported_os_version, help="Define the version of the OpenSearch distribution to download. " - "Check https://opensearch.org/docs/version-history/ for released versions.", - default="") + "Check https://opensearch.org/docs/version-history/ for released versions.", + default="", + ) install_parser.add_argument( "--provision-config-instance", help=f"Define the provision_config_instance to use. List possible " f"provision_config_instances with `{PROGRAM_NAME} list " f"provision_config_instances` (default: defaults).", - default="defaults") # optimized for local usage + default="defaults", + ) # optimized for local usage install_parser.add_argument( "--provision-config-instance-params", help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the provision_config_instance.", - default="" + default="", ) install_parser.add_argument( "--opensearch-plugins", help="Define the OpenSearch plugins to install. (default: install no plugins).", - default="") + default="", + ) install_parser.add_argument( "--plugin-params", help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.", - default="" + default="", ) install_parser.add_argument( "--network-host", help="The IP address to bind to and publish", - default="127.0.0.1" + default="127.0.0.1", ) install_parser.add_argument( - "--http-port", - help="The port to expose for HTTP traffic", - default="39200" + "--http-port", help="The port to expose for HTTP traffic", default="39200" ) install_parser.add_argument( "--node-name", help="The name of this OpenSearch node", - default="benchmark-node-0" + default="benchmark-node-0", ) install_parser.add_argument( "--master-nodes", help="A comma-separated list of the initial master node names", - default="" + default="", ) install_parser.add_argument( "--seed-hosts", help="A comma-separated list of the initial seed host IPs", - default="" + default="", ) - start_parser = subparsers.add_parser("start", help="Starts an OpenSearch node locally") + start_parser = subparsers.add_parser( + "start", help="Starts an OpenSearch node locally" + ) start_parser.add_argument( "--installation-id", required=True, @@ -352,26 +431,30 @@ def add_workload_source(subparser): # the default will be dynamically derived by # test_execution_orchestrator based on the # presence / absence of other command line options - default="") + default="", + ) start_parser.add_argument( "--test-execution-id", required=True, help="Define a unique id for this test_execution.", - default="") + default="", + ) start_parser.add_argument( "--runtime-jdk", type=runtime_jdk, help="The major version of the runtime JDK to use.", - default=None) + default=None, + ) start_parser.add_argument( "--telemetry", help=f"Enable the provided telemetry devices, provided as a comma-separated list. List possible telemetry " - f"devices with `{PROGRAM_NAME} list telemetry`.", - default="") + f"devices with `{PROGRAM_NAME} list telemetry`.", + default="", + ) start_parser.add_argument( "--telemetry-params", help="Define a comma-separated list of key:value pairs that are injected verbatim to the telemetry devices as parameters.", - default="" + default="", ) stop_parser = subparsers.add_parser("stop", help="Stops an OpenSearch node locally") @@ -382,177 +465,208 @@ def add_workload_source(subparser): # the default will be dynamically derived by # test_execution_orchestrator based on the # presence / absence of other command line options - default="") + default="", + ) stop_parser.add_argument( "--preserve-install", help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).", default=preserve_install, - action="store_true") + action="store_true", + ) for p in [list_parser, test_execution_parser]: p.add_argument( "--distribution-version", type=supported_os_version, help="Define the version of the OpenSearch distribution to download. " - "Check https://opensearch.org/docs/version-history/ for released versions.", - default="") + "Check https://opensearch.org/docs/version-history/ for released versions.", + default="", + ) p.add_argument( "--provision-config-path", - help="Define the path to the provision_config_instance and plugin configurations to use.") + help="Define the path to the provision_config_instance and plugin configurations to use.", + ) p.add_argument( "--provision-config-repository", help="Define repository from where Benchmark will load provision_configs and provision_config_instances (default: default).", - default="default") + default="default", + ) p.add_argument( "--provision-config-revision", help="Define a specific revision in the provision_config repository that Benchmark should use.", - default=None) + default=None, + ) test_execution_parser.add_argument( "--test-execution-id", help="Define a unique id for this test_execution.", - default=str(uuid.uuid4())) + default=str(uuid.uuid4()), + ) test_execution_parser.add_argument( "--pipeline", help="Select the pipeline to run.", # the default will be dynamically derived by # test_execution_orchestrator based on the # presence / absence of other command line options - default="") + default="", + ) test_execution_parser.add_argument( "--revision", help="Define the source code revision for building the benchmark candidate. 'current' uses the source tree as is," - " 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp." - " The timestamp must be specified as: \"@ts\" where \"ts\" must be a valid ISO 8601 timestamp, " - "e.g. \"@2013-07-27T10:37:00Z\" (default: current).", - default="current") # optimized for local usage, don't fetch sources + " 'latest' fetches the latest version on main. It is also possible to specify a commit id or a timestamp." + ' The timestamp must be specified as: "@ts" where "ts" must be a valid ISO 8601 timestamp, ' + 'e.g. "@2013-07-27T10:37:00Z" (default: current).', + default="current", + ) # optimized for local usage, don't fetch sources add_workload_source(test_execution_parser) test_execution_parser.add_argument( "--workload", - help=f"Define the workload to use. List possible workloads with `{PROGRAM_NAME} list workloads`." + help=f"Define the workload to use. List possible workloads with `{PROGRAM_NAME} list workloads`.", ) test_execution_parser.add_argument( "--workload-params", help="Define a comma-separated list of key:value pairs that are injected verbatim to the workload as variables.", - default="" + default="", ) test_execution_parser.add_argument( "--test-procedure", - help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`.") + help=f"Define the test_procedure to use. List possible test_procedures for workloads with `{PROGRAM_NAME} list workloads`.", + ) test_execution_parser.add_argument( "--provision-config-instance", help=f"Define the provision_config_instance to use. List possible " f"provision_config_instances with `{PROGRAM_NAME} list " f"provision_config_instances` (default: defaults).", - default="defaults") # optimized for local usage + default="defaults", + ) # optimized for local usage test_execution_parser.add_argument( "--provision-config-instance-params", help="Define a comma-separated list of key:value pairs that are injected verbatim as variables for the provision_config_instance.", - default="" + default="", ) test_execution_parser.add_argument( "--runtime-jdk", type=runtime_jdk, help="The major version of the runtime JDK to use.", - default=None) + default=None, + ) test_execution_parser.add_argument( "--opensearch-plugins", help="Define the OpenSearch plugins to install. (default: install no plugins).", - default="") + default="", + ) test_execution_parser.add_argument( "--plugin-params", help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.", - default="" + default="", ) test_execution_parser.add_argument( "--target-hosts", help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' " - "(default: localhost:9200).", - default="") # actually the default is pipeline specific and it is set later + "(default: localhost:9200).", + default="", + ) # actually the default is pipeline specific and it is set later test_execution_parser.add_argument( "--load-worker-coordinator-hosts", help="Define a comma-separated list of hosts which should generate load (default: localhost).", - default="localhost") + default="localhost", + ) test_execution_parser.add_argument( "--client-options", help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch " - f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).", - default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS) - test_execution_parser.add_argument("--on-error", - choices=["continue", "abort"], - help="Controls how Benchmark behaves on response errors (default: continue).", - default="continue") + f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).", + default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS, + ) + test_execution_parser.add_argument( + "--on-error", + choices=["continue", "abort"], + help="Controls how Benchmark behaves on response errors (default: continue).", + default="continue", + ) test_execution_parser.add_argument( "--telemetry", help=f"Enable the provided telemetry devices, provided as a comma-separated list. List possible telemetry " - f"devices with `{PROGRAM_NAME} list telemetry`.", - default="") + f"devices with `{PROGRAM_NAME} list telemetry`.", + default="", + ) test_execution_parser.add_argument( "--telemetry-params", help="Define a comma-separated list of key:value pairs that are injected verbatim to the telemetry devices as parameters.", - default="" + default="", ) test_execution_parser.add_argument( "--distribution-repository", help="Define the repository from where the OpenSearch distribution should be downloaded (default: release).", - default="release") + default="release", + ) task_filter_group = test_execution_parser.add_mutually_exclusive_group() task_filter_group.add_argument( "--include-tasks", - help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.") + help="Defines a comma-separated list of tasks to run. By default all tasks of a test_procedure are run.", + ) task_filter_group.add_argument( "--exclude-tasks", - help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.") + help="Defines a comma-separated list of tasks not to run. By default all tasks of a test_procedure are run.", + ) test_execution_parser.add_argument( "--user-tag", help="Define a user-specific key-value pair (separated by ':'). It is added to each metric record as meta info. " - "Example: intention:baseline-ticket-12345", - default="") + "Example: intention:baseline-ticket-12345", + default="", + ) test_execution_parser.add_argument( "--results-format", help="Define the output format for the command line results (default: markdown).", choices=["markdown", "csv"], - default="markdown") + default="markdown", + ) test_execution_parser.add_argument( "--results-numbers-align", help="Define the output column number alignment for the command line results (default: right).", choices=["right", "center", "left", "decimal"], - default="right") + default="right", + ) test_execution_parser.add_argument( "--show-in-results", help="Define which values are shown in the summary publish (default: available).", choices=["available", "all-percentiles", "all"], - default="available") + default="available", + ) test_execution_parser.add_argument( "--results-file", help="Write the command line results also to the provided file.", - default="") + default="", + ) test_execution_parser.add_argument( "--preserve-install", help=f"Keep the benchmark candidate and its index. (default: {str(preserve_install).lower()}).", default=preserve_install, - action="store_true") + action="store_true", + ) test_execution_parser.add_argument( "--test-mode", help="Runs the given workload in 'test mode'. Meant to check a workload for errors but not for real benchmarks (default: false).", default=False, - action="store_true") + action="store_true", + ) test_execution_parser.add_argument( "--enable-worker-coordinator-profiling", help="Enables a profiler for analyzing the performance of calls in Benchmark's worker coordinator (default: false).", default=False, - action="store_true") + action="store_true", + ) test_execution_parser.add_argument( "--enable-assertions", help="Enables assertion checks for tasks (default: false).", default=False, - action="store_true") + action="store_true", + ) test_execution_parser.add_argument( "--kill-running-processes", action="store_true", default=False, - help="If any processes is running, it is going to kill them and allow Benchmark to continue to run." + help="If any processes is running, it is going to kill them and allow Benchmark to continue to run.", ) ############################################################################### @@ -566,31 +680,41 @@ def add_workload_source(subparser): "--effective-start-date", help=argparse.SUPPRESS, type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), - default=None) + default=None, + ) # Skips checking that the REST API is available before proceeding with the benchmark test_execution_parser.add_argument( "--skip-rest-api-check", help=argparse.SUPPRESS, action="store_true", - default=False) + default=False, + ) - for p in [list_parser, test_execution_parser, compare_parser, download_parser, install_parser, - start_parser, stop_parser, info_parser, create_workload_parser]: + for p in [ + list_parser, + test_execution_parser, + compare_parser, + download_parser, + install_parser, + start_parser, + stop_parser, + info_parser, + create_workload_parser, + ]: # This option is needed to support a separate configuration for the integration tests on the same machine - p.add_argument( - "--configuration-name", - help=argparse.SUPPRESS, - default=None) + p.add_argument("--configuration-name", help=argparse.SUPPRESS, default=None) p.add_argument( "--quiet", help="Suppress as much as output as possible (default: false).", default=False, - action="store_true") + action="store_true", + ) p.add_argument( "--offline", help="Assume that Benchmark has no connection to the Internet (default: false).", default=False, - action="store_true") + action="store_true", + ) return parser @@ -610,7 +734,9 @@ def dispatch_list(cfg): elif what == "opensearch-plugins": provision_config.list_plugins(cfg) else: - raise exceptions.SystemSetupError("Cannot list unknown configuration option [%s]" % what) + raise exceptions.SystemSetupError( + "Cannot list unknown configuration option [%s]" % what + ) def print_help_on_errors(): @@ -619,9 +745,13 @@ def print_help_on_errors(): console.println(console.format.underline_for(heading)) console.println(f"* Check the log files in {paths.logs()} for errors.") console.println(f"* Read the documentation at {console.format.link(doc_link())}.") - console.println(f"* Ask a question on the forum at {console.format.link(FORUM_LINK)}.") - console.println(f"* Raise an issue at {console.format.link('https://github.com/opensearch-project/OpenSearch-Benchmark/issues')} " - f"and include the log files in {paths.logs()}.") + console.println( + f"* Ask a question on the forum at {console.format.link(FORUM_LINK)}." + ) + console.println( + f"* Raise an issue at {console.format.link('https://github.com/opensearch-project/OpenSearch-Benchmark/issues')} " + f"and include the log files in {paths.logs()}." + ) def execute_test(cfg, kill_running_processes=False): @@ -636,16 +766,19 @@ def execute_test(cfg, kill_running_processes=False): process.kill_running_benchmark_instances() except BaseException: logger.exception( - "Could not terminate potentially running Benchmark instances correctly. Attempting to go on anyway.") + "Could not terminate potentially running Benchmark instances correctly. Attempting to go on anyway." + ) else: other_benchmark_processes = process.find_all_other_benchmark_processes() if other_benchmark_processes: pids = [p.pid for p in other_benchmark_processes] - msg = f"There are other Benchmark processes running on this machine (PIDs: {pids}) but only one Benchmark " \ - f"benchmark is allowed to run at the same time.\n\nYou can use --kill-running-processes flag " \ - f"to kill running processes automatically and allow Benchmark to continue to run a new benchmark. " \ - f"Otherwise, you need to manually kill them." + msg = ( + f"There are other Benchmark processes running on this machine (PIDs: {pids}) but only one Benchmark " + f"benchmark is allowed to run at the same time.\n\nYou can use --kill-running-processes flag " + f"to kill running processes automatically and allow Benchmark to continue to run a new benchmark. " + f"Otherwise, you need to manually kill them." + ) raise exceptions.BenchmarkError(msg) with_actor_system(test_execution_orchestrator.run, cfg) @@ -656,9 +789,16 @@ def with_actor_system(runnable, cfg): already_running = actor.actor_system_already_running() logger.info("Actor system already running locally? [%s]", str(already_running)) try: - actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running) + actors = actor.bootstrap_actor_system( + try_join=already_running, prefer_local_only=not already_running + ) # We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1 - cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running) + cfg.add( + config.Scope.application, + "system", + "remote.benchmarking.supported", + already_running, + ) # This happens when the admin process could not be started, e.g. because it could not open a socket. except thespian.actors.InvalidActorAddress: logger.info("Falling back to offline actor system.") @@ -667,8 +807,10 @@ def with_actor_system(runnable, cfg): except Exception as e: logger.exception("Could not bootstrap actor system.") if str(e) == "Unable to determine valid external socket address.": - console.warn("Could not determine a socket address. Are you running without any network? Switching to degraded mode.", - logger=logger) + console.warn( + "Could not determine a socket address. Are you running without any network? Switching to degraded mode.", + logger=logger, + ) logger.info("Falling back to offline actor system.") actor.use_offline_actor_system() actors = actor.bootstrap_actor_system(try_join=True) @@ -697,82 +839,181 @@ def with_actor_system(runnable, cfg): shutdown_complete = True logger.info("Shutdown completed.") else: - logger.warning("Shutdown timed out. Actor system is still running.") + logger.warning( + "Shutdown timed out. Actor system is still running." + ) break except KeyboardInterrupt: times_interrupted += 1 - logger.warning("User interrupted shutdown of internal actor system.") - console.info("Please wait a moment for Benchmark's internal components to shutdown.") + logger.warning( + "User interrupted shutdown of internal actor system." + ) + console.info( + "Please wait a moment for Benchmark's internal components to shutdown." + ) if not shutdown_complete and times_interrupted > 0: - logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", - times_interrupted) + logger.warning( + "Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", + times_interrupted, + ) console.println("") - console.warn("Terminating now at the risk of leaving child processes behind.") + console.warn( + "Terminating now at the risk of leaving child processes behind." + ) console.println("") - console.warn("The next test_execution may fail due to an unclean shutdown.") + console.warn( + "The next test_execution may fail due to an unclean shutdown." + ) console.println("") console.println(SKULL) console.println("") elif not shutdown_complete: - console.warn("Could not terminate all internal processes within timeout. Please check and force-terminate " - "all Benchmark processes.") - + console.warn( + "Could not terminate all internal processes within timeout. Please check and force-terminate " + "all Benchmark processes." + ) def configure_telemetry_params(args, cfg): - cfg.add(config.Scope.applicationOverride, "telemetry", "devices", opts.csv_to_list(args.telemetry)) - cfg.add(config.Scope.applicationOverride, "telemetry", "params", opts.to_dict(args.telemetry_params)) + cfg.add( + config.Scope.applicationOverride, + "telemetry", + "devices", + opts.csv_to_list(args.telemetry), + ) + cfg.add( + config.Scope.applicationOverride, + "telemetry", + "params", + opts.to_dict(args.telemetry_params), + ) def configure_workload_params(arg_parser, args, cfg, command_requires_workload=True): - cfg.add(config.Scope.applicationOverride, "workload", "repository.revision", args.workload_revision) + cfg.add( + config.Scope.applicationOverride, + "workload", + "repository.revision", + args.workload_revision, + ) # We can assume here that if a workload-path is given, the user did not specify a repository either (although argparse sets it to # its default value) if args.workload_path: - cfg.add(config.Scope.applicationOverride, "workload", "workload.path", os.path.abspath(io.normalize_path(args.workload_path))) + cfg.add( + config.Scope.applicationOverride, + "workload", + "workload.path", + os.path.abspath(io.normalize_path(args.workload_path)), + ) cfg.add(config.Scope.applicationOverride, "workload", "repository.name", None) if args.workload_revision: # stay as close as possible to argparse errors although we have a custom validation. - arg_parser.error("argument --workload-revision not allowed with argument --workload-path") + arg_parser.error( + "argument --workload-revision not allowed with argument --workload-path" + ) if command_requires_workload and args.workload: # stay as close as possible to argparse errors although we have a custom validation. - arg_parser.error("argument --workload not allowed with argument --workload-path") + arg_parser.error( + "argument --workload not allowed with argument --workload-path" + ) else: - cfg.add(config.Scope.applicationOverride, "workload", "repository.name", args.workload_repository) + cfg.add( + config.Scope.applicationOverride, + "workload", + "repository.name", + args.workload_repository, + ) if command_requires_workload: if not args.workload: raise arg_parser.error("argument --workload is required") - cfg.add(config.Scope.applicationOverride, "workload", "workload.name", args.workload) + cfg.add( + config.Scope.applicationOverride, + "workload", + "workload.name", + args.workload, + ) if command_requires_workload: - cfg.add(config.Scope.applicationOverride, "workload", "params", opts.to_dict(args.workload_params)) - cfg.add(config.Scope.applicationOverride, "workload", "test_procedure.name", args.test_procedure) - cfg.add(config.Scope.applicationOverride, "workload", "include.tasks", opts.csv_to_list(args.include_tasks)) - cfg.add(config.Scope.applicationOverride, "workload", "exclude.tasks", opts.csv_to_list(args.exclude_tasks)) + cfg.add( + config.Scope.applicationOverride, + "workload", + "params", + opts.to_dict(args.workload_params), + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "test_procedure.name", + args.test_procedure, + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "include.tasks", + opts.csv_to_list(args.include_tasks), + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "exclude.tasks", + opts.csv_to_list(args.exclude_tasks), + ) -def configure_builder_params(args, cfg, command_requires_provision_config_instance=True): +def configure_builder_params( + args, cfg, command_requires_provision_config_instance=True +): if args.provision_config_path: cfg.add( - config.Scope.applicationOverride, "builder", - "provision_config.path", os.path.abspath( - io.normalize_path(args.provision_config_path))) + config.Scope.applicationOverride, + "builder", + "provision_config.path", + os.path.abspath(io.normalize_path(args.provision_config_path)), + ) cfg.add(config.Scope.applicationOverride, "builder", "repository.name", None) - cfg.add(config.Scope.applicationOverride, "builder", "repository.revision", None) + cfg.add( + config.Scope.applicationOverride, "builder", "repository.revision", None + ) else: - cfg.add(config.Scope.applicationOverride, "builder", "repository.name", args.provision_config_repository) - cfg.add(config.Scope.applicationOverride, "builder", "repository.revision", args.provision_config_revision) + cfg.add( + config.Scope.applicationOverride, + "builder", + "repository.name", + args.provision_config_repository, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "repository.revision", + args.provision_config_revision, + ) if command_requires_provision_config_instance: if args.distribution_version: - cfg.add(config.Scope.applicationOverride, "builder", "distribution.version", args.distribution_version) - cfg.add(config.Scope.applicationOverride, "builder", "distribution.repository", args.distribution_repository) - cfg.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.names", opts.csv_to_list( - args.provision_config_instance)) - cfg.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.params", opts.to_dict( - args.provision_config_instance_params)) + cfg.add( + config.Scope.applicationOverride, + "builder", + "distribution.version", + args.distribution_version, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "distribution.repository", + args.distribution_repository, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "provision_config_instance.names", + opts.csv_to_list(args.provision_config_instance), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "provision_config_instance.params", + opts.to_dict(args.provision_config_instance_params), + ) def configure_connection_params(arg_parser, args, cfg): @@ -782,16 +1023,40 @@ def configure_connection_params(arg_parser, args, cfg): client_options = opts.ClientOptions(args.client_options, target_hosts=target_hosts) cfg.add(config.Scope.applicationOverride, "client", "options", client_options) if "timeout" not in client_options.default: - console.info("You did not provide an explicit timeout in the client options. Assuming default of 10 seconds.") + console.info( + "You did not provide an explicit timeout in the client options. Assuming default of 10 seconds." + ) if list(target_hosts.all_hosts) != list(client_options.all_client_options): - arg_parser.error("--target-hosts and --client-options must define the same keys for multi cluster setups.") + arg_parser.error( + "--target-hosts and --client-options must define the same keys for multi cluster setups." + ) def configure_results_publishing_params(args, cfg): - cfg.add(config.Scope.applicationOverride, "results_publishing", "format", args.results_format) - cfg.add(config.Scope.applicationOverride, "results_publishing", "values", args.show_in_results) - cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file) - cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align) + cfg.add( + config.Scope.applicationOverride, + "results_publishing", + "format", + args.results_format, + ) + cfg.add( + config.Scope.applicationOverride, + "results_publishing", + "values", + args.show_in_results, + ) + cfg.add( + config.Scope.applicationOverride, + "results_publishing", + "output.path", + args.results_file, + ) + cfg.add( + config.Scope.applicationOverride, + "results_publishing", + "numbers.align", + args.results_numbers_align, + ) def dispatch_sub_command(arg_parser, args, cfg): @@ -805,41 +1070,137 @@ def dispatch_sub_command(arg_parser, args, cfg): configure_results_publishing_params(args, cfg) results_publisher.compare(cfg, args.baseline, args.contender) elif sub_command == "list": - cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration) - cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit) - configure_builder_params(args, cfg, command_requires_provision_config_instance=False) - configure_workload_params(arg_parser, args, cfg, command_requires_workload=False) + cfg.add( + config.Scope.applicationOverride, + "system", + "list.config.option", + args.configuration, + ) + cfg.add( + config.Scope.applicationOverride, + "system", + "list.test_executions.max_results", + args.limit, + ) + configure_builder_params( + args, cfg, command_requires_provision_config_instance=False + ) + configure_workload_params( + arg_parser, args, cfg, command_requires_workload=False + ) dispatch_list(cfg) elif sub_command == "download": - cfg.add(config.Scope.applicationOverride, "builder", "target.os", args.target_os) - cfg.add(config.Scope.applicationOverride, "builder", "target.arch", args.target_arch) + cfg.add( + config.Scope.applicationOverride, "builder", "target.os", args.target_os + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "target.arch", + args.target_arch, + ) configure_builder_params(args, cfg) builder.download(cfg) elif sub_command == "install": - cfg.add(config.Scope.applicationOverride, "system", "install.id", str(uuid.uuid4())) - cfg.add(config.Scope.applicationOverride, "builder", "network.host", args.network_host) - cfg.add(config.Scope.applicationOverride, "builder", "network.http.port", args.http_port) - cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) - cfg.add(config.Scope.applicationOverride, "builder", "build.type", args.build_type) - cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) - cfg.add(config.Scope.applicationOverride, "builder", "node.name", args.node_name) - cfg.add(config.Scope.applicationOverride, "builder", "master.nodes", opts.csv_to_list(args.master_nodes)) - cfg.add(config.Scope.applicationOverride, "builder", "seed.hosts", opts.csv_to_list(args.seed_hosts)) - cfg.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.plugins", opts.csv_to_list( - args.opensearch_plugins)) - cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) + cfg.add( + config.Scope.applicationOverride, + "system", + "install.id", + str(uuid.uuid4()), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "network.host", + args.network_host, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "network.http.port", + args.http_port, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "source.revision", + args.revision, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "build.type", + args.build_type, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "runtime.jdk", + args.runtime_jdk, + ) + cfg.add( + config.Scope.applicationOverride, "builder", "node.name", args.node_name + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "master.nodes", + opts.csv_to_list(args.master_nodes), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "seed.hosts", + opts.csv_to_list(args.seed_hosts), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "provision_config_instance.plugins", + opts.csv_to_list(args.opensearch_plugins), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "plugin.params", + opts.to_dict(args.plugin_params), + ) configure_builder_params(args, cfg) builder.install(cfg) elif sub_command == "start": - cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id) - cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id) - cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) + cfg.add( + config.Scope.applicationOverride, + "system", + "test_execution.id", + args.test_execution_id, + ) + cfg.add( + config.Scope.applicationOverride, + "system", + "install.id", + args.installation_id, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "runtime.jdk", + args.runtime_jdk, + ) configure_telemetry_params(args, cfg) builder.start(cfg) elif sub_command == "stop": - cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) - cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id) + cfg.add( + config.Scope.applicationOverride, + "builder", + "preserve.install", + convert.to_bool(args.preserve_install), + ) + cfg.add( + config.Scope.applicationOverride, + "system", + "install.id", + args.installation_id, + ) builder.stop(cfg) elif sub_command == "execute-test": # As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters @@ -847,43 +1208,145 @@ def dispatch_sub_command(arg_parser, args, cfg): # these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only # to run the actual benchmark (i.e. generating load). if args.effective_start_date: - cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date) - cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id) + cfg.add( + config.Scope.applicationOverride, + "system", + "time.start", + args.effective_start_date, + ) + cfg.add( + config.Scope.applicationOverride, + "system", + "test_execution.id", + args.test_execution_id, + ) # use the test_execution id implicitly also as the install id. - cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id) - cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline) - cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag) - cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling) - cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions) - cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error) + cfg.add( + config.Scope.applicationOverride, + "system", + "install.id", + args.test_execution_id, + ) + cfg.add( + config.Scope.applicationOverride, + "test_execution", + "pipeline", + args.pipeline, + ) + cfg.add( + config.Scope.applicationOverride, + "test_execution", + "user.tag", + args.user_tag, + ) + cfg.add( + config.Scope.applicationOverride, + "worker_coordinator", + "profiling", + args.enable_worker_coordinator_profiling, + ) + cfg.add( + config.Scope.applicationOverride, + "worker_coordinator", + "assertions", + args.enable_assertions, + ) + cfg.add( + config.Scope.applicationOverride, + "worker_coordinator", + "on.error", + args.on_error, + ) cfg.add( config.Scope.applicationOverride, "worker_coordinator", "load_worker_coordinator_hosts", - opts.csv_to_list(args.load_worker_coordinator_hosts)) - cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) + opts.csv_to_list(args.load_worker_coordinator_hosts), + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "test.mode.enabled", + args.test_mode, + ) configure_workload_params(arg_parser, args, cfg) configure_connection_params(arg_parser, args, cfg) configure_telemetry_params(args, cfg) configure_builder_params(args, cfg) - cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) - cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) - cfg.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.plugins", opts.csv_to_list( - args.opensearch_plugins)) - cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) - cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) - cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check)) + cfg.add( + config.Scope.applicationOverride, + "builder", + "runtime.jdk", + args.runtime_jdk, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "source.revision", + args.revision, + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "provision_config_instance.plugins", + opts.csv_to_list(args.opensearch_plugins), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "plugin.params", + opts.to_dict(args.plugin_params), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "preserve.install", + convert.to_bool(args.preserve_install), + ) + cfg.add( + config.Scope.applicationOverride, + "builder", + "skip.rest.api.check", + convert.to_bool(args.skip_rest_api_check), + ) configure_results_publishing_params(args, cfg) execute_test(cfg, args.kill_running_processes) elif sub_command == "create-workload": - cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) - cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs) - cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path) - cfg.add(config.Scope.applicationOverride, "workload", "workload.name", args.workload) - cfg.add(config.Scope.applicationOverride, "workload", "custom_queries", args.custom_queries) + cfg.add( + config.Scope.applicationOverride, "generator", "indices", args.indices + ) + cfg.add( + config.Scope.applicationOverride, + "generator", + "number_of_docs", + args.number_of_docs, + ) + cfg.add( + config.Scope.applicationOverride, + "generator", + "output.path", + args.output_path, + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "workload.name", + args.workload, + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "custom_queries", + args.custom_queries, + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "concurrent", + args.concurrent, + ) configure_connection_params(arg_parser, args, cfg) workload_generator.create_workload(cfg) @@ -894,7 +1357,9 @@ def dispatch_sub_command(arg_parser, args, cfg): raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]") return True except exceptions.BenchmarkError as e: - logging.getLogger(__name__).exception("Cannot run subcommand [%s].", sub_command) + logging.getLogger(__name__).exception( + "Cannot run subcommand [%s].", sub_command + ) msg = str(e.message) nesting = 0 while hasattr(e, "cause") and e.cause: @@ -910,7 +1375,9 @@ def dispatch_sub_command(arg_parser, args, cfg): print_help_on_errors() return False except BaseException as e: - logging.getLogger(__name__).exception("A fatal error occurred while running subcommand [%s].", sub_command) + logging.getLogger(__name__).exception( + "A fatal error occurred while running subcommand [%s].", sub_command + ) console.error("Cannot %s. %s." % (sub_command, e)) console.println("") print_help_on_errors() @@ -937,7 +1404,9 @@ def main(): if not cfg.config_present(): cfg.install_default_config() cfg.load_config(auto_upgrade=True) - cfg.add(config.Scope.application, "system", "time.start", datetime.datetime.utcnow()) + cfg.add( + config.Scope.application, "system", "time.start", datetime.datetime.utcnow() + ) # Local config per node cfg.add(config.Scope.application, "node", "benchmark.root", paths.benchmark_root()) cfg.add(config.Scope.application, "node", "benchmark.cwd", os.getcwd()) @@ -949,10 +1418,14 @@ def main(): # Configure networking net.init() if not args.offline: - probing_url = cfg.opts("system", "probing.url", default_value="https://github.com", mandatory=False) + probing_url = cfg.opts( + "system", "probing.url", default_value="https://github.com", mandatory=False + ) if not net.has_internet_connection(probing_url): - console.warn("No Internet connection detected. Automatic download of workload data sets etc. is disabled.", - logger=logger) + console.warn( + "No Internet connection detected. Automatic download of workload data sets etc. is disabled.", + logger=logger, + ) cfg.add(config.Scope.applicationOverride, "system", "offline.mode", True) else: logger.info("Detected a working Internet connection.") @@ -962,10 +1435,14 @@ def main(): end = time.time() if success: console.println("") - console.info("SUCCESS (took %d seconds)" % (end - start), overline="-", underline="-") + console.info( + "SUCCESS (took %d seconds)" % (end - start), overline="-", underline="-" + ) else: console.println("") - console.info("FAILURE (took %d seconds)" % (end - start), overline="-", underline="-") + console.info( + "FAILURE (took %d seconds)" % (end - start), overline="-", underline="-" + ) sys.exit(64) diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index 780b43081..864772597 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -26,13 +26,9 @@ import json import logging import os - from concurrent.futures import ThreadPoolExecutor -from osbenchmark.utils import console - -from threading import Lock +from tqdm import tqdm -progress_lock = Lock() DOCS_COMPRESSOR = bz2.BZ2Compressor COMP_EXT = ".bz2" OUT_EXT = ".json" @@ -55,7 +51,9 @@ def get_doc_outpath(outdir, name, suffix=""): return os.path.join(outdir, f"{name}-documents{suffix}") -def extract(client, output_path, index, number_of_docs_requested=None): +def extract( + client, output_path, index, number_of_docs_requested=None, concurrent=False +): """ Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``. @@ -84,13 +82,14 @@ def extract(client, output_path, index, number_of_docs_requested=None): ) docs_path = get_doc_outpath(output_path, index) dump_documents( + concurrent, client, index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), - " for test mode", + "for test mode", ) - dump_documents(client, index, docs_path, total_docs) + dump_documents(concurrent, client, index, docs_path, total_docs) return template_vars(index, docs_path, total_docs) else: logger.info( @@ -101,13 +100,20 @@ def extract(client, output_path, index, number_of_docs_requested=None): def dump_documents_range( - client, index, out_path, start_doc, end_doc, total_docs, progress_message_suffix="" + pbar, + client, + index, + out_path, + start_doc, + end_doc, + total_docs, + progress_message_suffix="", ): """ - Extract documents in the range of start_doc and end_doc and write to induvidual files + Extract documents in the range of start_doc and end_doc and write to individual files :param client: OpenSearch client used to extract data - :param index: Name of index to dump + :param index: Name of OpenSearch index to extract documents from :param out_path: Destination directory for corpus dump :param start_doc: Start index of the document chunk :param end_doc: End index of the document chunk @@ -116,9 +122,7 @@ def dump_documents_range( """ logger = logging.getLogger(__name__) - freq = max(1, total_docs // 1000) - progress = console.progress() compressor = DOCS_COMPRESSOR() out_path = f"{out_path}_{start_doc}_{end_doc}" + OUT_EXT comp_outpath = out_path + COMP_EXT @@ -126,7 +130,11 @@ def dump_documents_range( with open(out_path, "wb") as outfile: with open(comp_outpath, "wb") as comp_outfile: logger.info( - f"Dumping corpus for index [{index}] to [{out_path}] for docs {start_doc}-{end_doc}." + "Dumping corpus for index [%s] to [%s] for docs %s-%s.", + index, + out_path, + start_doc, + end_doc, ) query = { "query": {"match_all": {}}, @@ -134,22 +142,22 @@ def dump_documents_range( "size": end_doc - start_doc, } + batch_size = (end_doc - start_doc) // 5 search_after = None - n = 0 while n < (end_doc - start_doc): if search_after: query = { "query": {"match_all": {}}, - "size": 1, + "size": batch_size, "sort": [{"_id": "asc"}], "search_after": search_after, } else: query = { "query": {"match_all": {}}, - "size": 1, + "size": batch_size, "sort": [{"_id": "asc"}], "from": start_doc, } @@ -160,52 +168,77 @@ def dump_documents_range( if not hits: break - doc = hits[0] - search_after = doc["sort"] - data = ( - json.dumps(doc["_source"], separators=(",", ":")) + "\n" - ).encode("utf-8") - - outfile.write(data) - comp_outfile.write(compressor.compress(data)) - - render_progress( - progress, - progress_message_suffix, - index, - start_doc + n + 1, - total_docs, - freq, - ) + for doc in hits: + search_after = doc["sort"] + data = ( + json.dumps(doc["_source"], separators=(",", ":")) + "\n" + ).encode("utf-8") + + outfile.write(data) + comp_outfile.write(compressor.compress(data)) - n += 1 + n += 1 + pbar.update(1) + if n >= (end_doc - start_doc): + break comp_outfile.write(compressor.flush()) -def dump_documents(client, index, out_path, number_of_docs, progress_message_suffix=""): +def dump_documents( + concurrent, client, index, out_path, number_of_docs, progress_message_suffix="" +): """ Splits the dumping process into 8 threads. First, they split the documents into chunks to be dumped. Then, they are dumped as "{index}-documents{suffix}_{start}_{end}.json(.bz2)" Finally, they are all collated into their file "{out_path}-documents{suffix}.json(.bz2)" format. :param client: OpenSearch client used to extract data - :param index: Name of index to dump + :param index: Name of OpenSearch index to extract documents from :param out_path: Destination directory for corpus dump :param number_of_docs: Total number of documents """ - - num_threads = 8 - with ThreadPoolExecutor(max_workers=num_threads) as executor: - step = number_of_docs // num_threads - ranges = [(i, i + step) for i in range(0, number_of_docs, step)] - executor.map( - lambda args: dump_documents_range( - client, index, out_path, *args, number_of_docs, progress_message_suffix - ), - ranges, - ) - merge_json_files(out_path, ranges) + if concurrent: + num_threads = 8 + with tqdm( + total=number_of_docs, + desc="Extracting documents" + + (f" [{progress_message_suffix}]" if progress_message_suffix else ""), + unit="doc", + ) as pbar: + with ThreadPoolExecutor(max_workers=num_threads) as executor: + step = number_of_docs // num_threads + ranges = [(i, i + step) for i in range(0, number_of_docs, step)] + executor.map( + lambda args: dump_documents_range( + pbar, + client, + index, + out_path, + *args, + number_of_docs, + progress_message_suffix, + ), + ranges, + ) + merge_json_files(out_path, ranges) + else: + with tqdm( + total=number_of_docs, + desc="Extracting documents" + + (f" [{progress_message_suffix}]" if progress_message_suffix else ""), + unit="doc", + ) as pbar: + dump_documents_range( + pbar, + client, + index, + out_path, + 0, + number_of_docs, + number_of_docs, + progress_message_suffix, + ) def merge_json_files(out_path, ranges): @@ -218,13 +251,3 @@ def merge_json_files(out_path, ranges): for line in f: merged_file.write(line) os.remove(file_path) - - -def render_progress(progress, progress_message_suffix, index, cur, total, freq): - with progress_lock: - if cur % freq == 0 or total - cur < freq: - msg = ( - f"Extracting documents for index [{index}]{progress_message_suffix}..." - ) - percent = (cur * 100) / total - progress.print(msg, f"{cur}/{total} docs [{percent:.1f}% done]") diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 3bcaacc42..e4bac6277 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -36,30 +36,37 @@ def process_template(templates_path, template_filename, template_vars, output_path): - env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) + env = Environment( + loader=FileSystemLoader(templates_path), + autoescape=select_autoescape(["html", "xml"]), + ) template = env.get_template(template_filename) with open(output_path, "w") as f: f.write(template.render(template_vars)) + def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): if not docs_were_requested: return if len(indices) < len(indices_docs_map): raise exceptions.SystemSetupError( - "Number of : pairs exceeds number of indices in --indices. " + - "Ensure number of : pairs is less than or equal to number of indices in --indices." + "Number of : pairs exceeds number of indices in --indices. " + + "Ensure number of : pairs is less than or equal to number of indices in --indices." ) for index_name in indices_docs_map: if index_name not in indices: raise exceptions.SystemSetupError( - "Index from : pair was not found in --indices. " + - "Ensure that indices from all : pairs exist in --indices." + "Index from : pair was not found in --indices. " + + "Ensure that indices from all : pairs exist in --indices." ) -def extract_mappings_and_corpora(client, output_path, indices_to_extract, indices_docs_map): + +def extract_mappings_and_corpora( + client, output_path, indices_to_extract, indices_docs_map, concurrent=False +): indices = [] corpora = [] docs_were_requested = indices_docs_map is not None and len(indices_docs_map) > 0 @@ -72,7 +79,9 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract, indice try: indices += index.extract(client, output_path, index_name) except OpenSearchException: - logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name) + logging.getLogger(__name__).exception( + "Failed to extract index [%s]", index_name + ) # That list only contains valid indices (with index patterns already resolved) # For each index, check if docs were requested. If so, extract the number of docs from the map @@ -87,13 +96,18 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract, indice f"The string [{indices_docs_map.get(i['name'])}] in : pair cannot be converted to an integer." ) - logging.getLogger(__name__).info("Extracting [%s] docs for index [%s]", custom_docs_to_extract, i["name"]) - c = corpus.extract(client, output_path, i["name"], custom_docs_to_extract) + logging.getLogger(__name__).info( + "Extracting [%s] docs for index [%s]", custom_docs_to_extract, i["name"] + ) + c = corpus.extract( + client, output_path, i["name"], custom_docs_to_extract, concurrent + ) if c: corpora.append(c) return indices, corpora + def process_custom_queries(custom_queries): if not custom_queries: return [] @@ -104,10 +118,13 @@ def process_custom_queries(custom_queries): if isinstance(data, dict): data = [data] except ValueError as err: - raise exceptions.SystemSetupError(f"Ensure JSON schema is valid and queries are contained in a list: {err}") + raise exceptions.SystemSetupError( + f"Ensure JSON schema is valid and queries are contained in a list: {err}" + ) return data + def create_workload(cfg): logger = logging.getLogger(__name__) @@ -118,21 +135,31 @@ def create_workload(cfg): client_options = cfg.opts("client", "options") number_of_docs = cfg.opts("generator", "number_of_docs") unprocessed_custom_queries = cfg.opts("workload", "custom_queries") + concurrent = cfg.opts("workload", "concurrent") custom_queries = process_custom_queries(unprocessed_custom_queries) logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices) logger.info("Number of Docs: %s", number_of_docs) - client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], - client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create() + client = OsClientFactory( + hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], + client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT], + ).create() info = client.info() - console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) - - output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name)) + console.info( + f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", + logger=logger, + ) + + output_path = os.path.abspath( + os.path.join(io.normalize_path(root_path), workload_name) + ) io.ensure_dir(output_path) - indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs) + indices, corpora = extract_mappings_and_corpora( + client, output_path, indices, number_of_docs, concurrent + ) if len(indices) == 0: raise RuntimeError("Failed to extract any indices for workload!") @@ -141,7 +168,7 @@ def create_workload(cfg): "workload_name": workload_name, "indices": indices, "corpora": corpora, - "custom_queries": custom_queries + "custom_queries": custom_queries, } logger.info("Template Vars: %s", template_vars) @@ -150,9 +177,21 @@ def create_workload(cfg): templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") if custom_queries: - process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path) + process_template( + templates_path, + "custom-query-workload.json.j2", + template_vars, + workload_path, + ) else: - process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path) + process_template( + templates_path, + "default-query-workload.json.j2", + template_vars, + workload_path, + ) - console.println("") - console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}") + console.println("\n") + console.info( + f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}" + ) From fe92788abd5963e40f79b9f8a0d29e69817bf57d Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Wed, 1 Nov 2023 17:29:35 -0400 Subject: [PATCH 03/10] TQDM added to setup.py Signed-off-by: AkshathRaghav --- setup.py | 126 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 67 insertions(+), 59 deletions(-) diff --git a/setup.py b/setup.py index f93e9d000..e4cbd4204 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -92,6 +92,8 @@ def str_from_file(name): "google-auth==1.22.1", # License: MIT "wheel==0.38.4", + # License: MIT + "tqdm==4.66.1", # License: Apache 2.0 # transitive dependencies: # botocore: Apache 2.0 @@ -104,7 +106,7 @@ def str_from_file(name): "ujson", "pytest==7.2.2", "pytest-benchmark==3.2.2", - "pytest-asyncio==0.14.0" + "pytest-asyncio==0.14.0", ] # These packages are only required when developing Benchmark @@ -115,65 +117,71 @@ def str_from_file(name): "wheel==0.38.4", "github3.py==1.3.0", "pylint==2.6.0", - "pylint-quotes==0.2.1" + "pylint-quotes==0.2.1", ] -python_version_classifiers = ["Programming Language :: Python :: {}.{}".format(major, minor) - for major, minor in supported_python_versions] +python_version_classifiers = [ + "Programming Language :: Python :: {}.{}".format(major, minor) + for major, minor in supported_python_versions +] -first_supported_version = "{}.{}".format(supported_python_versions[0][0], supported_python_versions[0][1]) +first_supported_version = "{}.{}".format( + supported_python_versions[0][0], supported_python_versions[0][1] +) # next minor after the latest supported version -first_unsupported_version = "{}.{}".format(supported_python_versions[-1][0], supported_python_versions[-1][1] + 1) +first_unsupported_version = "{}.{}".format( + supported_python_versions[-1][0], supported_python_versions[-1][1] + 1 +) -setup(name="opensearch-benchmark", - maintainer="Ian Hoang, Govind Kamat", - maintainer_email="hoangia@amazon.com, govkamat@amazon.com", - version=__versionstr__, - description="Macrobenchmarking framework for OpenSearch", - long_description=long_description, - long_description_content_type='text/markdown', - url="https://github.com/opensearch-project/OpenSearch-Benchmark", - license="Apache License, Version 2.0", - packages=find_packages( - where=".", - exclude=("tests*", "benchmarks*", "it*") - ), - include_package_data=True, - # supported Python versions. This will prohibit pip (> 9.0.0) from even installing Benchmark on an unsupported - # Python version. - # See also https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires - # - # According to https://www.python.org/dev/peps/pep-0440/#version-matching, a trailing ".*" should - # ignore patch versions: - # - # "additional trailing segments will be ignored when determining whether or not a version identifier matches - # the clause" - # - # However, with the pattern ">=3.5.*,<=3.8.*", the version "3.8.0" is not accepted. Therefore, we match - # the minor version after the last supported one (i.e. if 3.8 is the last supported, we'll emit "<3.9") - python_requires=">={},<{}".format(first_supported_version, first_unsupported_version), - package_data={"": ["*.json", "*.yml"]}, - install_requires=install_requires, - test_suite="tests", - tests_require=tests_require, - extras_require={ - "develop": tests_require + develop_require - }, - entry_points={ - "console_scripts": [ - "opensearch-benchmark=osbenchmark.benchmark:main", - "opensearch-benchmarkd=osbenchmark.benchmarkd:main" - ], - }, - scripts=['scripts/expand-data-corpus.py'], - classifiers=[ - "Topic :: System :: Benchmark", - "Development Status :: 5 - Production/Stable", - "License :: OSI Approved :: Apache Software License", - "Intended Audience :: Developers", - "Operating System :: MacOS :: MacOS X", - "Operating System :: POSIX", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - ] + python_version_classifiers, - zip_safe=False) +setup( + name="opensearch-benchmark", + maintainer="Ian Hoang, Govind Kamat", + maintainer_email="hoangia@amazon.com, govkamat@amazon.com", + version=__versionstr__, + description="Macrobenchmarking framework for OpenSearch", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/opensearch-project/OpenSearch-Benchmark", + license="Apache License, Version 2.0", + packages=find_packages(where=".", exclude=("tests*", "benchmarks*", "it*")), + include_package_data=True, + # supported Python versions. This will prohibit pip (> 9.0.0) from even installing Benchmark on an unsupported + # Python version. + # See also https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires + # + # According to https://www.python.org/dev/peps/pep-0440/#version-matching, a trailing ".*" should + # ignore patch versions: + # + # "additional trailing segments will be ignored when determining whether or not a version identifier matches + # the clause" + # + # However, with the pattern ">=3.5.*,<=3.8.*", the version "3.8.0" is not accepted. Therefore, we match + # the minor version after the last supported one (i.e. if 3.8 is the last supported, we'll emit "<3.9") + python_requires=">={},<{}".format( + first_supported_version, first_unsupported_version + ), + package_data={"": ["*.json", "*.yml"]}, + install_requires=install_requires, + test_suite="tests", + tests_require=tests_require, + extras_require={"develop": tests_require + develop_require}, + entry_points={ + "console_scripts": [ + "opensearch-benchmark=osbenchmark.benchmark:main", + "opensearch-benchmarkd=osbenchmark.benchmarkd:main", + ], + }, + scripts=["scripts/expand-data-corpus.py"], + classifiers=[ + "Topic :: System :: Benchmark", + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: Apache Software License", + "Intended Audience :: Developers", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + ] + + python_version_classifiers, + zip_safe=False, +) From 828b10e487e97f505905d7c41c0dd07e730360c2 Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Fri, 3 Nov 2023 01:00:40 -0400 Subject: [PATCH 04/10] Adding concurrent flag Signed-off-by: AkshathRaghav --- osbenchmark/utils/opts.py | 28 +++++++++++++++++------- osbenchmark/workload_generator/corpus.py | 6 ++++- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/osbenchmark/utils/opts.py b/osbenchmark/utils/opts.py index 866cfec92..5326b2bc5 100644 --- a/osbenchmark/utils/opts.py +++ b/osbenchmark/utils/opts.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -38,6 +38,7 @@ def csv_to_list(csv): else: return [e.strip() for e in csv.split(",")] + def to_bool(v): if v is None: return None @@ -98,7 +99,7 @@ def bulleted_list_of(src_list): def double_quoted_list_of(src_list): - return ["\"{}\"".format(param) for param in src_list] + return ['"{}"'.format(param) for param in src_list] def make_list_of_close_matches(word_list, all_possibilities): @@ -118,21 +119,24 @@ def make_list_of_close_matches(word_list, all_possibilities): return close_matches + class StoreKeyPairAsDict(argparse.Action): """ Custom Argparse action that allows users to pass in a key:value pairs after specifying a parameter. Used as action for --number-of-docs parameter for create-workload subcommand. """ + def __call__(self, parser, namespace, values, option_string=None): custom_dict = {} for kv in values: try: - k,v = kv.split(":") + print(kv) + k, v = kv.split(":") custom_dict[k] = v except ValueError: raise exceptions.InvalidSyntax( "StoreKeyPairAsDict: Could not convert string to dict due to invalid syntax." - ) + ) setattr(namespace, self.dest, custom_dict) return custom_dict @@ -180,6 +184,7 @@ def normalize_to_dict(arg): """ # pylint: disable=import-outside-toplevel from opensearchpy.client import _normalize_hosts + return {TargetHosts.DEFAULT: _normalize_hosts(arg)} self.parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) @@ -218,12 +223,19 @@ def normalize_to_dict(arg): return {TargetHosts.DEFAULT: kv_to_map(arg)} - if self.argvalue == ClientOptions.DEFAULT_CLIENT_OPTIONS and self.target_hosts is not None: + if ( + self.argvalue == ClientOptions.DEFAULT_CLIENT_OPTIONS + and self.target_hosts is not None + ): # --client-options unset but multi-clusters used in --target-hosts? apply options defaults for all cluster names. - self.parsed_options = {cluster_name: kv_to_map([ClientOptions.DEFAULT_CLIENT_OPTIONS]) - for cluster_name in self.target_hosts.all_hosts.keys()} + self.parsed_options = { + cluster_name: kv_to_map([ClientOptions.DEFAULT_CLIENT_OPTIONS]) + for cluster_name in self.target_hosts.all_hosts.keys() + } else: - self.parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) + self.parsed_options = to_dict( + self.argvalue, default_parser=normalize_to_dict + ) @property def all_client_options(self): diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index 864772597..098f5e033 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -169,7 +169,11 @@ def dump_documents_range( break for doc in hits: - search_after = doc["sort"] + try: + search_after = doc["sort"] + except KeyError: + print(doc) + logger.info("%s", doc) data = ( json.dumps(doc["_source"], separators=(",", ":")) + "\n" ).encode("utf-8") From fbf5504adc901eb01b36f98ea25c4ea5d9092a5f Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Sun, 5 Nov 2023 11:00:37 -0500 Subject: [PATCH 05/10] Fixing bugs, removing existing workload before making, fixing argparse for documents. Needs documentation update, and configurability Signed-off-by: AkshathRaghav --- osbenchmark/utils/opts.py | 2 +- osbenchmark/workload_generator/corpus.py | 34 +++++++------------ .../workload_generator/workload_generator.py | 5 +++ 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/osbenchmark/utils/opts.py b/osbenchmark/utils/opts.py index 5326b2bc5..827fda4bc 100644 --- a/osbenchmark/utils/opts.py +++ b/osbenchmark/utils/opts.py @@ -128,9 +128,9 @@ class StoreKeyPairAsDict(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): custom_dict = {} + values = values[0].split(",") for kv in values: try: - print(kv) k, v = kv.split(":") custom_dict[k] = v except ValueError: diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index 098f5e033..d786fc7ef 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -107,7 +107,6 @@ def dump_documents_range( start_doc, end_doc, total_docs, - progress_message_suffix="", ): """ Extract documents in the range of start_doc and end_doc and write to individual files @@ -129,24 +128,23 @@ def dump_documents_range( with open(out_path, "wb") as outfile: with open(comp_outpath, "wb") as comp_outfile: + max_doc = total_docs if end_doc > total_docs else end_doc + logger.info( "Dumping corpus for index [%s] to [%s] for docs %s-%s.", index, out_path, start_doc, - end_doc, + max_doc, ) - query = { - "query": {"match_all": {}}, - "from": start_doc, - "size": end_doc - start_doc, - } - batch_size = (end_doc - start_doc) // 5 + batch_size = (max_doc - start_doc) // 5 + if batch_size < 1: + batch_size = 1 search_after = None n = 0 - while n < (end_doc - start_doc): + while n < max_doc - start_doc: if search_after: query = { "query": {"match_all": {}}, @@ -172,7 +170,6 @@ def dump_documents_range( try: search_after = doc["sort"] except KeyError: - print(doc) logger.info("%s", doc) data = ( json.dumps(doc["_source"], separators=(",", ":")) + "\n" @@ -183,7 +180,7 @@ def dump_documents_range( n += 1 pbar.update(1) - if n >= (end_doc - start_doc): + if n >= (max_doc - start_doc): break comp_outfile.write(compressor.flush()) @@ -206,7 +203,7 @@ def dump_documents( num_threads = 8 with tqdm( total=number_of_docs, - desc="Extracting documents" + desc=f"Extracting documents from {index}" + (f" [{progress_message_suffix}]" if progress_message_suffix else ""), unit="doc", ) as pbar: @@ -215,21 +212,15 @@ def dump_documents( ranges = [(i, i + step) for i in range(0, number_of_docs, step)] executor.map( lambda args: dump_documents_range( - pbar, - client, - index, - out_path, - *args, - number_of_docs, - progress_message_suffix, + pbar, client, index, out_path, *args, number_of_docs ), ranges, ) - merge_json_files(out_path, ranges) + merge_json_files(out_path, ranges) else: with tqdm( total=number_of_docs, - desc="Extracting documents" + desc=f"Extracting documents from {index}" + (f" [{progress_message_suffix}]" if progress_message_suffix else ""), unit="doc", ) as pbar: @@ -243,6 +234,7 @@ def dump_documents( number_of_docs, progress_message_suffix, ) + merge_json_files(out_path, [(0, number_of_docs)]) def merge_json_files(out_path, ranges): diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index e4bac6277..61e412d15 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -25,6 +25,7 @@ import logging import os import json +import shutil from opensearchpy import OpenSearchException from jinja2 import Environment, FileSystemLoader, select_autoescape @@ -155,6 +156,10 @@ def create_workload(cfg): output_path = os.path.abspath( os.path.join(io.normalize_path(root_path), workload_name) ) + + if os.path.isdir(output_path): + shutil.rmtree(output_path) + io.ensure_dir(output_path) indices, corpora = extract_mappings_and_corpora( From 8f06bddcbc2d5fc208c4057429273baf760ac25a Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Tue, 7 Nov 2023 19:30:47 -0500 Subject: [PATCH 06/10] Adding configurable params Signed-off-by: AkshathRaghav --- osbenchmark/benchmark.py | 36 +++++++++++ osbenchmark/exceptions.py | 9 ++- osbenchmark/utils/opts.py | 18 ++++++ osbenchmark/workload_generator/corpus.py | 60 +++++++++++++++---- .../workload_generator/workload_generator.py | 35 ++++++++++- 5 files changed, 143 insertions(+), 15 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index a08e0d5f5..28d0e8998 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -243,6 +243,24 @@ def add_workload_source(subparser): action="store_true", help="Whether to generate workload concurrently (default: false)", ) + create_workload_parser.add_argument( + "--threads", + type=positive_number, + default=8, + help="Number of threads for dumping documents from indices (default: false)", + ) + create_workload_parser.add_argument( + "--bsize", + type=positive_number, + default=0, # 0 means dynamic batching within corpus.py `dump_documents_range()` + help="Batch size to use for dumping documents (default: false)", + ) + create_workload_parser.add_argument( + "--custom_dump_query", + action=opts.ProcessDumpQuery, + type=argparse.FileType("r"), + help="File path for custom dumping query of documents in index (default: false)", + ) compare_parser = subparsers.add_parser( "compare", help="Compare two test_executions" @@ -1347,6 +1365,24 @@ def dispatch_sub_command(arg_parser, args, cfg): "concurrent", args.concurrent, ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "threads", + args.threads, + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "bsize", + args.bsize, + ) + cfg.add( + config.Scope.applicationOverride, + "workload", + "custom_dump_query", + args.custom_dump_query, + ) configure_connection_params(arg_parser, args, cfg) workload_generator.create_workload(cfg) diff --git a/osbenchmark/exceptions.py b/osbenchmark/exceptions.py index 19a90a412..ec210909f 100644 --- a/osbenchmark/exceptions.py +++ b/osbenchmark/exceptions.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -45,6 +45,7 @@ class LaunchError(BenchmarkError): Thrown whenever there was a problem launching the benchmark candidate """ + class InstallError(BenchmarkError): """ Thrown whenever there was a problem installing the benchmark candidate @@ -108,5 +109,11 @@ class WorkloadConfigError(BenchmarkError): """ +class QueryProcessingError(BenchmarkError): + """ + Thrown when there is an error processing a query from the file path given in + """ + + class NotFound(BenchmarkError): pass diff --git a/osbenchmark/utils/opts.py b/osbenchmark/utils/opts.py index 827fda4bc..1a213d1ad 100644 --- a/osbenchmark/utils/opts.py +++ b/osbenchmark/utils/opts.py @@ -25,6 +25,7 @@ import difflib import json import argparse +import os from osbenchmark import exceptions from osbenchmark.utils import io @@ -142,6 +143,23 @@ def __call__(self, parser, namespace, values, option_string=None): return custom_dict +class ProcessDumpQuery(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + query = {} + try: + with open(str(values.name), "r") as f: + query = json.load(f) + setattr(namespace, self.dest, query) + except Exception as e: + print(e) + setattr(namespace, self.dest, "{}") + raise exceptions.QueryProcessingError( + "ProcessDumpQuery: Could not process query from file." + ) + + return query + + class ConnectOptions: """ Base Class to help either parsing --target-hosts or --client-options diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index d786fc7ef..cd4f46bac 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -52,7 +52,14 @@ def get_doc_outpath(outdir, name, suffix=""): def extract( - client, output_path, index, number_of_docs_requested=None, concurrent=False + client, + output_path, + index, + number_of_docs_requested=None, + concurrent=False, + threads=None, + bsize=None, + custom_dump_query=None, ): """ Scroll an index with a match-all query, dumping document source to ``outdir/documents.json``. @@ -87,9 +94,21 @@ def extract( index, get_doc_outpath(output_path, index, "-1k"), min(total_docs, 1000), - "for test mode", + progress_message_suffix="for test mode", + threads=threads, + bsize=bsize, + custom_dump_query=custom_dump_query, + ) + dump_documents( + concurrent, + client, + index, + docs_path, + total_docs, + threads=threads, + bsize=bsize, + custom_dump_query=custom_dump_query, ) - dump_documents(concurrent, client, index, docs_path, total_docs) return template_vars(index, docs_path, total_docs) else: logger.info( @@ -107,6 +126,8 @@ def dump_documents_range( start_doc, end_doc, total_docs, + bsize=None, + custom_dump_query=None, ): """ Extract documents in the range of start_doc and end_doc and write to individual files @@ -138,7 +159,7 @@ def dump_documents_range( max_doc, ) - batch_size = (max_doc - start_doc) // 5 + batch_size = bsize if bsize > 0 else (max_doc - start_doc) // 5 if batch_size < 1: batch_size = 1 search_after = None @@ -147,14 +168,15 @@ def dump_documents_range( while n < max_doc - start_doc: if search_after: query = { - "query": {"match_all": {}}, + "query": custom_dump_query, "size": batch_size, "sort": [{"_id": "asc"}], "search_after": search_after, } else: query = { - "query": {"match_all": {}}, + # {"match_all": {}} + "query": custom_dump_query, "size": batch_size, "sort": [{"_id": "asc"}], "from": start_doc, @@ -187,7 +209,15 @@ def dump_documents_range( def dump_documents( - concurrent, client, index, out_path, number_of_docs, progress_message_suffix="" + concurrent, + client, + index, + out_path, + number_of_docs, + progress_message_suffix="", + threads=None, + bsize=None, + custom_dump_query=None, ): """ Splits the dumping process into 8 threads. @@ -200,19 +230,27 @@ def dump_documents( :param number_of_docs: Total number of documents """ if concurrent: - num_threads = 8 + threads = int(threads) + bsize = int(bsize) with tqdm( total=number_of_docs, desc=f"Extracting documents from {index}" + (f" [{progress_message_suffix}]" if progress_message_suffix else ""), unit="doc", ) as pbar: - with ThreadPoolExecutor(max_workers=num_threads) as executor: - step = number_of_docs // num_threads + with ThreadPoolExecutor(max_workers=threads) as executor: + step = number_of_docs // threads ranges = [(i, i + step) for i in range(0, number_of_docs, step)] executor.map( lambda args: dump_documents_range( - pbar, client, index, out_path, *args, number_of_docs + pbar, + client, + index, + out_path, + *args, + number_of_docs, + bsize, + custom_dump_query, ), ranges, ) diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 61e412d15..dcd1250cb 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -66,7 +66,14 @@ def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): def extract_mappings_and_corpora( - client, output_path, indices_to_extract, indices_docs_map, concurrent=False + client, + output_path, + indices_to_extract, + indices_docs_map, + concurrent=False, + threads=None, + bsize=None, + custom_dump_query=None, ): indices = [] corpora = [] @@ -101,7 +108,14 @@ def extract_mappings_and_corpora( "Extracting [%s] docs for index [%s]", custom_docs_to_extract, i["name"] ) c = corpus.extract( - client, output_path, i["name"], custom_docs_to_extract, concurrent + client, + output_path, + i["name"], + custom_docs_to_extract, + concurrent, + threads, + bsize, + custom_dump_query, ) if c: corpora.append(c) @@ -137,6 +151,14 @@ def create_workload(cfg): number_of_docs = cfg.opts("generator", "number_of_docs") unprocessed_custom_queries = cfg.opts("workload", "custom_queries") concurrent = cfg.opts("workload", "concurrent") + threads = cfg.opts("workload", "threads") + bsize = cfg.opts("workload", "bsize") + custom_dump_query = cfg.opts("workload", "custom_dump_query") + + if (threads or bsize or custom_dump_query) and not concurrent: + raise exceptions.SystemSetupError( + "Cannot set --threads, --bsize, or --dump-query without setting --concurrent." + ) custom_queries = process_custom_queries(unprocessed_custom_queries) @@ -163,7 +185,14 @@ def create_workload(cfg): io.ensure_dir(output_path) indices, corpora = extract_mappings_and_corpora( - client, output_path, indices, number_of_docs, concurrent + client, + output_path, + indices, + number_of_docs, + concurrent, + threads, + bsize, + custom_dump_query, ) if len(indices) == 0: From c5add322de98828727e3747b7a971497409be80c Mon Sep 17 00:00:00 2001 From: dosa_chammandi Date: Tue, 7 Nov 2023 23:21:36 +0530 Subject: [PATCH 07/10] Rename parameters for Distributed Workload Generation - Issue 258 (#407) Signed-off-by: vivek palakkat Signed-off-by: AkshathRaghav --- osbenchmark/benchmark.py | 6 +++--- osbenchmark/worker_coordinator/worker_coordinator.py | 10 +++++----- tests/worker_coordinator/worker_coordinator_test.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 28d0e8998..efe89b5c9 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -585,7 +585,7 @@ def add_workload_source(subparser): default="", ) # actually the default is pipeline specific and it is set later test_execution_parser.add_argument( - "--load-worker-coordinator-hosts", + "--worker-ips", help="Define a comma-separated list of hosts which should generate load (default: localhost).", default="localhost", ) @@ -1278,8 +1278,8 @@ def dispatch_sub_command(arg_parser, args, cfg): cfg.add( config.Scope.applicationOverride, "worker_coordinator", - "load_worker_coordinator_hosts", - opts.csv_to_list(args.load_worker_coordinator_hosts), + "worker_ips", + opts.csv_to_list(args.worker_ips), ) cfg.add( config.Scope.applicationOverride, diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 98b6f551e..05ac03535 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -528,7 +528,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor self.workload = None self.test_procedure = None self.metrics_store = None - self.load_worker_coordinator_hosts = [] + self.worker_ips = [] self.workers = [] # which client ids are assigned to which workers? self.clients_per_worker = {} @@ -636,7 +636,7 @@ def prepare_benchmark(self, t): # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs. self.prepare_telemetry(os_clients, enable=not uses_static_responses) - for host in self.config.opts("worker_coordinator", "load_worker_coordinator_hosts"): + for host in self.config.opts("worker_coordinator", "worker_ips"): host_config = { # for simplicity we assume that all benchmark machines have the same specs "cores": num_cores(self.config) @@ -646,9 +646,9 @@ def prepare_benchmark(self, t): else: host_config["host"] = host - self.load_worker_coordinator_hosts.append(host_config) + self.worker_ips.append(host_config) - self.target.prepare_workload([h["host"] for h in self.load_worker_coordinator_hosts], self.config, self.workload) + self.target.prepare_workload([h["host"] for h in self.worker_ips], self.config, self.workload) def start_benchmark(self): self.logger.info("Benchmark is about to start.") @@ -669,7 +669,7 @@ def start_benchmark(self): if allocator.clients < 128: self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations])) - worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients) + worker_assignments = calculate_worker_assignments(self.worker_ips, allocator.clients) worker_id = 0 for assignment in worker_assignments: host = assignment["host"] diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index bf7865288..31391f3c4 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -110,7 +110,7 @@ def setUp(self): self.cfg.add(config.Scope.application, "client", "hosts", WorkerCoordinatorTests.Holder(all_hosts={"default": ["localhost:9200"]})) self.cfg.add(config.Scope.application, "client", "options", WorkerCoordinatorTests.Holder(all_client_options={"default": {}})) - self.cfg.add(config.Scope.application, "worker_coordinator", "load_worker_coordinator_hosts", ["localhost"]) + self.cfg.add(config.Scope.application, "worker_coordinator", "worker_ips", ["localhost"]) self.cfg.add(config.Scope.application, "results_publishing", "datastore.type", "in-memory") default_test_procedure = workload.TestProcedure("default", default=True, schedule=[ @@ -135,7 +135,7 @@ def create_test_worker_coordinator_target(self): @mock.patch("osbenchmark.utils.net.resolve") def test_start_benchmark_and_prepare_workload(self, resolve): # override load worker_coordinator host - self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "load_worker_coordinator_hosts", ["10.5.5.1", "10.5.5.2"]) + self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "worker_ips", ["10.5.5.1", "10.5.5.2"]) resolve.side_effect = ["10.5.5.1", "10.5.5.2"] target = self.create_test_worker_coordinator_target() From a4bbb2d91fc388e2321c9cb36c030f5c81c40634 Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Wed, 8 Nov 2023 16:31:09 -0500 Subject: [PATCH 08/10] Adding unit tests Signed-off-by: AkshathRaghav --- it/tracker_test.py | 39 ++++++--- osbenchmark/workload_generator/corpus.py | 27 +++--- tests/workload_generator/corpus_test.py | 105 ++++++++++++++++++----- 3 files changed, 123 insertions(+), 48 deletions(-) diff --git a/it/tracker_test.py b/it/tracker_test.py index cb407a5e1..ba5b35773 100644 --- a/it/tracker_test.py +++ b/it/tracker_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -38,7 +38,12 @@ def test_cluster(): test_execution_id = str(uuid.uuid4()) it.wait_until_port_is_free(port_number=port) - cluster.install(distribution_version=dist, node_name="benchmark-node", provision_config_instance="4gheap", http_port=port) + cluster.install( + distribution_version=dist, + node_name="benchmark-node", + provision_config_instance="4gheap", + http_port=port, + ) cluster.start(test_execution_id=test_execution_id) yield cluster cluster.stop() @@ -47,23 +52,33 @@ def test_cluster(): @it.benchmark_in_mem def test_create_workload(cfg, tmp_path, test_cluster): # prepare some data - cmd = f"--test-mode --pipeline=benchmark-only --target-hosts=127.0.0.1:{test_cluster.http_port} " \ - f" --workload=geonames --test-procedure=append-no-conflicts-index-only --quiet" + cmd = ( + f"--test-mode --pipeline=benchmark-only --target-hosts=127.0.0.1:{test_cluster.http_port} " + f" --workload=geonames --test-procedure=append-no-conflicts-index-only --quiet" + ) assert it.execute_test(cfg, cmd) == 0 # create the workload workload_name = f"test-workload-{uuid.uuid4()}" workload_path = tmp_path / workload_name - assert it.osbenchmark(cfg, f"create-workload --target-hosts=127.0.0.1:{test_cluster.http_port} --indices=geonames " - f"--workload={workload_name} --output-path={tmp_path}") == 0 + assert ( + it.osbenchmark( + cfg, + f"create-workload --target-hosts=127.0.0.1:{test_cluster.http_port} --indices=geonames " + f"--workload={workload_name} --output-path={tmp_path}", + ) + == 0 + ) - expected_files = ["workload.json", - "geonames.json", - "geonames-documents-1k.json", - "geonames-documents.json", - "geonames-documents-1k.json.bz2", - "geonames-documents.json.bz2"] + expected_files = [ + "workload.json", + "geonames.json", + "geonames-documents-1k.json", + "geonames-documents.json", + "geonames-documents-1k.json.bz2", + "geonames-documents.json.bz2", + ] for f in expected_files: full_path = workload_path / f diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index cd4f46bac..3fbb9bca3 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -126,8 +126,8 @@ def dump_documents_range( start_doc, end_doc, total_docs, - bsize=None, - custom_dump_query=None, + bsize=0, + custom_dump_query='{"match_all": {}}', ): """ Extract documents in the range of start_doc and end_doc and write to individual files @@ -151,14 +151,6 @@ def dump_documents_range( with open(comp_outpath, "wb") as comp_outfile: max_doc = total_docs if end_doc > total_docs else end_doc - logger.info( - "Dumping corpus for index [%s] to [%s] for docs %s-%s.", - index, - out_path, - start_doc, - max_doc, - ) - batch_size = bsize if bsize > 0 else (max_doc - start_doc) // 5 if batch_size < 1: batch_size = 1 @@ -175,7 +167,6 @@ def dump_documents_range( } else: query = { - # {"match_all": {}} "query": custom_dump_query, "size": batch_size, "sort": [{"_id": "asc"}], @@ -192,7 +183,7 @@ def dump_documents_range( try: search_after = doc["sort"] except KeyError: - logger.info("%s", doc) + logger.info("Error in response format: %s", doc) data = ( json.dumps(doc["_source"], separators=(",", ":")) + "\n" ).encode("utf-8") @@ -207,6 +198,8 @@ def dump_documents_range( comp_outfile.write(compressor.flush()) + logger.info("Finished dumping corpus for index [%s] to [%s].", index, out_path) + def dump_documents( concurrent, @@ -270,12 +263,13 @@ def dump_documents( 0, number_of_docs, number_of_docs, - progress_message_suffix, ) merge_json_files(out_path, [(0, number_of_docs)]) def merge_json_files(out_path, ranges): + logger = logging.getLogger(__name__) + for EXT in [OUT_EXT, OUT_EXT + COMP_EXT]: merged_file_path = f"{out_path}" + EXT with open(merged_file_path, "wb") as merged_file: @@ -284,4 +278,9 @@ def merge_json_files(out_path, ranges): with open(file_path, "rb") as f: for line in f: merged_file.write(line) - os.remove(file_path) + try: + os.remove(file_path) + except: + pass + + logger.info("Finished merging shards into [%s].", merged_file_path) diff --git a/tests/workload_generator/corpus_test.py b/tests/workload_generator/corpus_test.py index 30f089260..81259f07d 100644 --- a/tests/workload_generator/corpus_test.py +++ b/tests/workload_generator/corpus_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -35,31 +35,23 @@ def serialize_doc(doc): @mock.patch("builtins.open", new_callable=mock.mock_open) @mock.patch("opensearchpy.OpenSearch") def test_extract(client, mo): - doc = { - "field1": "stuff", - "field2": "things" - } + doc = {"field1": "stuff", "field2": "things"} doc_data = serialize_doc(doc) - client.count.return_value = { - "count": 1001 - } + client.count.return_value = {"count": 1001} client.search.return_value = { "_scroll_id": "uohialjrknf", - "_shards": { - "successful": 1, - "total": 1, - "skipped": 0 - }, + "_shards": {"successful": 1, "total": 1, "skipped": 0}, "hits": { "hits": [ { "_index": "test", "_id": "0", "_score": 0, - "_source": doc + "_source": doc, + "sort": [0], } ] - } + }, } def set_corp_size(*args, **kwargs): @@ -79,12 +71,16 @@ def set_corp_size(*args, **kwargs): with mock.patch("os.stat") as osstat: osstat.side_effect = set_corp_size res = corpus.extract(client, outdir, index) - assert mo.call_count == 4 - mo.assert_has_calls([call("/abs/outpath/to/workloads/test-documents.json", "wb"), - call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), - call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), - call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb") - ], any_order=True) + assert mo.call_count == 12 + mo.assert_has_calls( + [ + call("/abs/outpath/to/workloads/test-documents.json", "wb"), + call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb"), + ], + any_order=True, + ) assert res == { "filename": "test-documents.json.bz2", @@ -92,7 +88,72 @@ def set_corp_size(*args, **kwargs): "compressed_bytes": 500, "index_name": "test", "doc_count": 1001, - "uncompressed_bytes": 1000 + "uncompressed_bytes": 1000, + } + + file_mock = mo.return_value + file_mock.assert_has_calls([call.write(doc_data)]) + + +@mock.patch("builtins.open", new_callable=mock.mock_open) +@mock.patch("opensearchpy.OpenSearch") +def test_extract_concurrent(client, mo): + doc = {"field1": "stuff", "field2": "things"} + doc_data = serialize_doc(doc) + client.count.return_value = {"count": 1501} + client.search.return_value = { + "_scroll_id": "uohialjrknf", + "_shards": {"successful": 1, "total": 1, "skipped": 0}, + "hits": { + "hits": [ + { + "_index": "test", + "_id": "0", + "_score": 0, + "_source": doc, + "sort": [0], + } + ] + }, + } + + def set_corp_size(*args, **kwargs): + path = args[0] + mockstat = mock.Mock() + if ".bz2" in path: + mockstat.st_size = 500 + else: + mockstat.st_size = 1000 + return mockstat + + client.scroll.return_value = {} + + index = "test" + outdir = "/abs/outpath/to/workloads/" + + with mock.patch("os.stat") as osstat: + osstat.side_effect = set_corp_size + res = corpus.extract( + client, outdir, index, concurrent=True, threads=4, bsize=100 + ) + assert mo.call_count == 40 + mo.assert_has_calls( + [ + call("/abs/outpath/to/workloads/test-documents.json", "wb"), + call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb"), + ], + any_order=True, + ) + + assert res == { + "filename": "test-documents.json.bz2", + "path": "/abs/outpath/to/workloads/test-documents.json.bz2", + "compressed_bytes": 500, + "index_name": "test", + "doc_count": 1501, + "uncompressed_bytes": 1000, } file_mock = mo.return_value From 83f165058bef8bac4f32195b849b50aab046eff6 Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Wed, 8 Nov 2023 16:42:27 -0500 Subject: [PATCH 09/10] Lint check Signed-off-by: AkshathRaghav --- osbenchmark/utils/opts.py | 1 - 1 file changed, 1 deletion(-) diff --git a/osbenchmark/utils/opts.py b/osbenchmark/utils/opts.py index 1a213d1ad..8d14bada3 100644 --- a/osbenchmark/utils/opts.py +++ b/osbenchmark/utils/opts.py @@ -25,7 +25,6 @@ import difflib import json import argparse -import os from osbenchmark import exceptions from osbenchmark.utils import io From 2b1fe8ac7e9f283af303e2b0de6483c4a0430e13 Mon Sep 17 00:00:00 2001 From: AkshathRaghav Date: Wed, 8 Nov 2023 17:15:49 -0500 Subject: [PATCH 10/10] Changing config error after looking at docs Signed-off-by: AkshathRaghav --- osbenchmark/workload_generator/workload_generator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index dcd1250cb..02387e883 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -155,9 +155,9 @@ def create_workload(cfg): bsize = cfg.opts("workload", "bsize") custom_dump_query = cfg.opts("workload", "custom_dump_query") - if (threads or bsize or custom_dump_query) and not concurrent: - raise exceptions.SystemSetupError( - "Cannot set --threads, --bsize, or --dump-query without setting --concurrent." + if threads and not concurrent: + raise exceptions.WorkloadConfigError( + "Cannot set --threads without setting --concurrent." ) custom_queries = process_custom_queries(unprocessed_custom_queries)