diff --git a/.gitignore b/.gitignore index 5b0e17785..bc398cad7 100644 --- a/.gitignore +++ b/.gitignore @@ -115,3 +115,6 @@ recipes/ccr/ccr-target-hosts.json # Tracker tracks tracks/ + +# Visual Studio Code for Contributors +.vscode/ diff --git a/osbenchmark/resources/base-workload.json.j2 b/osbenchmark/resources/base-workload.json.j2 index cfb6ced98..111549e28 100644 --- a/osbenchmark/resources/base-workload.json.j2 +++ b/osbenchmark/resources/base-workload.json.j2 @@ -22,35 +22,10 @@ ] }{% endfor %} ], - "schedule": [ - { - "operation": "delete-index" - },{% raw %} - { - "operation": { - "operation-type": "create-index", - "settings": {{index_settings | default({}) | tojson}} - } - },{% endraw %} - { - "operation": { - "operation-type": "cluster-health", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %} - "request-params": { - "wait_for_status": "{{cluster_health | default('green')}}", - "wait_for_no_relocating_shards": "true" - }, - "retry-until-success": true - } - }, - { - "operation": { - "operation-type": "bulk", - "bulk-size": {{bulk_size | default(5000)}}, - "ingest-percentage": {{ingest_percentage | default(100)}} - }, - "clients": {{bulk_indexing_clients | default(8)}} - },{% endraw -%} - {% block queries %}{% endblock %} + "operations": [ + {% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %} + ], + "test_procedures": [ + {% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %} ] -} +} \ No newline at end of file diff --git a/osbenchmark/resources/custom-operations.json.j2 b/osbenchmark/resources/custom-operations.json.j2 new file mode 100644 index 000000000..329a0b93c --- /dev/null +++ b/osbenchmark/resources/custom-operations.json.j2 @@ -0,0 +1,16 @@ + { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": {{bulk_size}}, + "ingest-percentage": {{ingest_percentage}} + }, + { + "name": "search", + "operation-type": "search", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "body": { + "query": { + "match_all": {} + } + } + } \ No newline at end of file diff --git a/osbenchmark/resources/custom-query-workload.json.j2 b/osbenchmark/resources/custom-query-workload.json.j2 deleted file mode 100644 index 76028db77..000000000 --- a/osbenchmark/resources/custom-query-workload.json.j2 +++ /dev/null @@ -1,14 +0,0 @@ -{% extends "base-workload.json.j2" %} - -{%- block queries -%} - {% for query in custom_queries %} - { - "operation": { - "name": "{{query.name}}", - "operation-type": "{{query['operation-type']}}", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, - "body": {{query.body | replace("'", '"') }} - } - }{% if not loop.last %},{% endif -%} - {% endfor %} -{%- endblock %} diff --git a/osbenchmark/resources/custom-test-procedures.json.j2 b/osbenchmark/resources/custom-test-procedures.json.j2 new file mode 100644 index 000000000..01482e7e2 --- /dev/null +++ b/osbenchmark/resources/custom-test-procedures.json.j2 @@ -0,0 +1,58 @@ +{ + "name": "append-no-conflicts", + "schedule": [ + { + "operation": "delete-index" + },{% raw %} + { + "operation": { + "operation-type": "create-index", + "settings": {{index_settings | tojson}} + } + },{% endraw %} + { + "operation": { + "operation-type": "cluster-health", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %} + "request-params": { + "wait_for_status": "{{cluster_health}}", + "wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "index-append", + {% raw -%}"clients": {{bulk_indexing_clients}}, + "ignore-response-error-level": "{{error_level}}"{% endraw -%} + }, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "name": "search-after-index", + "operation": "search" + }, + {% endraw -%} + {%- block queries -%} + {% for query in custom_queries %} + { + "operation":"{{query.name}}", + "warmup-iterations": {{query.warmup_iterations}}}}, + "iterations": {{query.iterations}}, + {%- if query.target_throughput %} + "target-throughput": {{ query.target_throughput}} + {%- else %} + "target-throughput": {{ query.target_throughput | tojson }} + {%- endif %} + {%-if search_clients is defined and search_clients %} + ,"clients": {{ search_clients | tojson}} + {%- endif %} + },{% if not loop.last %},{% endif -%} + {% endfor %} + {%- endblock %} + } + ] +} + diff --git a/osbenchmark/resources/default-operations.json.j2 b/osbenchmark/resources/default-operations.json.j2 new file mode 100644 index 000000000..67a903ac0 --- /dev/null +++ b/osbenchmark/resources/default-operations.json.j2 @@ -0,0 +1,16 @@ + { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": {{bulk_size | default(5000)}}, + "ingest-percentage": {{ingest_percentage | default(100)}} + }, + { + "name": "search", + "operation-type": "search", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "body": { + "query": { + "match_all": {} + } + } + } \ No newline at end of file diff --git a/osbenchmark/resources/default-query-workload.json.j2 b/osbenchmark/resources/default-query-workload.json.j2 deleted file mode 100644 index 38c296738..000000000 --- a/osbenchmark/resources/default-query-workload.json.j2 +++ /dev/null @@ -1,16 +0,0 @@ -{% extends "base-workload.json.j2" %} - -{% block queries %} - { - "operation": { - "operation-type": "search", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, - "body": { - "query": { - "match_all": {} - } - } - },{% raw %} - "clients": {{search_clients | default(8)}}{% endraw %} - } -{%- endblock %} \ No newline at end of file diff --git a/osbenchmark/resources/default-test-procedures.json.j2 b/osbenchmark/resources/default-test-procedures.json.j2 new file mode 100644 index 000000000..1b8bfe163 --- /dev/null +++ b/osbenchmark/resources/default-test-procedures.json.j2 @@ -0,0 +1,41 @@ +{ + "name": "append-no-conflicts", + "description": "Indexes the whole document corpus using OpenSearch default settings. We only adjust the number of replicas as we benchmark a single node cluster and Benchmark will only start the benchmark if the cluster turns green. Document ids are unique so all index operations are append only.", + "default": true, + "schedule": [ + { + "operation": "delete-index" + },{% raw %} + { + "operation": { + "operation-type": "create-index", + "settings": {{index_settings | default({}) | tojson}} + } + },{% endraw %} + { + "operation": { + "operation-type": "cluster-health", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %} + "request-params": { + "wait_for_status": "{{cluster_health | default('green')}}", + "wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + },{% endraw -%} + { + "operation": "index-append", + {% raw -%} "clients": {{bulk_indexing_clients | default(8)}},{% endraw -%} + {% raw -%}"ignore-response-error-level": "{{error_level | default('non-fatal')}}"{% endraw -%} + }, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "name": "search-after-index", + "operation": "search" + } + ] +} + diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index d9a99bb1f..d685c3f2c 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -38,7 +38,16 @@ from osbenchmark import exceptions, time, PROGRAM_NAME, config, version from osbenchmark.workload import params, workload -from osbenchmark.utils import io, collections, convert, net, console, modules, opts, repo +from osbenchmark.utils import ( + io, + collections, + convert, + net, + console, + modules, + opts, + repo, +) class WorkloadSyntaxError(exceptions.InvalidSyntax): @@ -74,10 +83,15 @@ def on_prepare_workload(self, workload, data_root_dir): class WorkloadProcessorRegistry: def __init__(self, cfg): - self.required_processors = [TaskFilterWorkloadProcessor(cfg), TestModeWorkloadProcessor(cfg)] + self.required_processors = [ + TaskFilterWorkloadProcessor(cfg), + TestModeWorkloadProcessor(cfg), + ] self.workload_processors = [] self.offline = cfg.opts("system", "offline.mode") - self.test_mode = cfg.opts("workload", "test.mode.enabled", mandatory=False, default_value=False) + self.test_mode = cfg.opts( + "workload", "test.mode.enabled", mandatory=False, default_value=False + ) self.base_config = cfg self.custom_configuration = False @@ -97,7 +111,9 @@ def register_workload_processor(self, processor): @property def processors(self): if not self.custom_configuration: - self.register_workload_processor(DefaultWorkloadPreparator(self.base_config)) + self.register_workload_processor( + DefaultWorkloadPreparator(self.base_config) + ) return [*self.required_processors, *self.workload_processors] @@ -111,24 +127,39 @@ def workloads(cfg): :return: A list of workloads that are available for the provided distribution version or else for the master version. """ repo = workload_repo(cfg) - return [_load_single_workload(cfg, repo, workload_name) for workload_name in repo.workload_names] + return [ + _load_single_workload(cfg, repo, workload_name) + for workload_name in repo.workload_names + ] def list_workloads(cfg): available_workloads = workloads(cfg) - only_auto_generated_test_procedures = all(t.default_test_procedure.auto_generated for t in available_workloads) + only_auto_generated_test_procedures = all( + t.default_test_procedure.auto_generated for t in available_workloads + ) data = [] for t in available_workloads: - line = [t.name, t.description, convert.number_to_human_string(t.number_of_documents), - convert.bytes_to_human_string(t.compressed_size_in_bytes), - convert.bytes_to_human_string(t.uncompressed_size_in_bytes)] + line = [ + t.name, + t.description, + convert.number_to_human_string(t.number_of_documents), + convert.bytes_to_human_string(t.compressed_size_in_bytes), + convert.bytes_to_human_string(t.uncompressed_size_in_bytes), + ] if not only_auto_generated_test_procedures: line.append(t.default_test_procedure) line.append(",".join(map(str, t.test_procedures))) data.append(line) - headers = ["Name", "Description", "Documents", "Compressed Size", "Uncompressed Size"] + headers = [ + "Name", + "Description", + "Documents", + "Compressed Size", + "Uncompressed Size", + ] if not only_auto_generated_test_procedures: headers.append("Default TestProcedure") headers.append("All TestProcedures") @@ -160,7 +191,11 @@ def test_procedure_info(c): if task.nested: console.println(format_task(task, suffix=":", num="{}. ".format(num))) for leaf_num, leaf_task in enumerate(task, start=1): - console.println(format_task(leaf_task, indent="\t", num="{}.{} ".format(num, leaf_num))) + console.println( + format_task( + leaf_task, indent="\t", num="{}.{} ".format(num, leaf_num) + ) + ) else: console.println(format_task(task, num="{}. ".format(num))) @@ -168,9 +203,21 @@ def test_procedure_info(c): console.println("Showing details for workload [{}]:\n".format(t.name)) console.println("* Description: {}".format(t.description)) if t.number_of_documents: - console.println("* Documents: {}".format(convert.number_to_human_string(t.number_of_documents))) - console.println("* Compressed Size: {}".format(convert.bytes_to_human_string(t.compressed_size_in_bytes))) - console.println("* Uncompressed Size: {}".format(convert.bytes_to_human_string(t.uncompressed_size_in_bytes))) + console.println( + "* Documents: {}".format( + convert.number_to_human_string(t.number_of_documents) + ) + ) + console.println( + "* Compressed Size: {}".format( + convert.bytes_to_human_string(t.compressed_size_in_bytes) + ) + ) + console.println( + "* Uncompressed Size: {}".format( + convert.bytes_to_human_string(t.uncompressed_size_in_bytes) + ) + ) console.println("") if t.selected_test_procedure: @@ -197,28 +244,44 @@ def _load_single_workload(cfg, workload_repository, workload_name): try: workload_dir = workload_repository.workload_dir(workload_name) reader = WorkloadFileReader(cfg) - current_workload = reader.read(workload_name, workload_repository.workload_file(workload_name), workload_dir) + current_workload = reader.read( + workload_name, + workload_repository.workload_file(workload_name), + workload_dir, + ) tpr = WorkloadProcessorRegistry(cfg) - has_plugins = load_workload_plugins(cfg, workload_name, register_workload_processor=tpr.register_workload_processor) + has_plugins = load_workload_plugins( + cfg, + workload_name, + register_workload_processor=tpr.register_workload_processor, + ) current_workload.has_plugins = has_plugins for processor in tpr.processors: processor.on_after_load_workload(current_workload) return current_workload except FileNotFoundError as e: - logging.getLogger(__name__).exception("Cannot load workload [%s]", workload_name) - raise exceptions.SystemSetupError(f"Cannot load workload [{workload_name}]. " - f"List the available workloads with [{PROGRAM_NAME} list workloads].") from e + logging.getLogger(__name__).exception( + "Cannot load workload [%s]", workload_name + ) + raise exceptions.SystemSetupError( + f"Cannot load workload [{workload_name}]. " + f"List the available workloads with [{PROGRAM_NAME} list workloads]." + ) from e except BaseException: - logging.getLogger(__name__).exception("Cannot load workload [%s]", workload_name) + logging.getLogger(__name__).exception( + "Cannot load workload [%s]", workload_name + ) raise -def load_workload_plugins(cfg, - workload_name, - register_runner=None, - register_scheduler=None, - register_workload_processor=None, - force_update=False): +def load_workload_plugins( + cfg, + workload_name, + register_runner=None, + register_scheduler=None, + register_workload_processor=None, + force_update=False, +): """ Loads plugins that are defined for the current workload (as specified by the configuration). @@ -233,8 +296,17 @@ def load_workload_plugins(cfg, """ repo = workload_repo(cfg, fetch=force_update, update=force_update) workload_plugin_path = repo.workload_dir(workload_name) - logging.getLogger(__name__).debug("Invoking plugin_reader with name [%s] resolved to path [%s]", workload_name, workload_plugin_path) - plugin_reader = WorkloadPluginReader(workload_plugin_path, register_runner, register_scheduler, register_workload_processor) + logging.getLogger(__name__).debug( + "Invoking plugin_reader with name [%s] resolved to path [%s]", + workload_name, + workload_plugin_path, + ) + plugin_reader = WorkloadPluginReader( + workload_plugin_path, + register_runner, + register_scheduler, + register_workload_processor, + ) if plugin_reader.can_load(): plugin_reader.load() @@ -266,9 +338,13 @@ def first_existing(root_dirs, f): for document_set in corpus.documents: # At this point we can assume that the file is available locally. Check which path exists and set it. if document_set.document_archive: - document_set.document_archive = first_existing(data_root, document_set.document_archive) + document_set.document_archive = first_existing( + data_root, document_set.document_archive + ) if document_set.document_file: - document_set.document_file = first_existing(data_root, document_set.document_file) + document_set.document_file = first_existing( + data_root, document_set.document_file + ) def is_simple_workload_mode(cfg): @@ -299,7 +375,9 @@ def data_dir(cfg, workload_name, corpus_name): :param corpus_name: Name of the current corpus. :return: A list containing either one or two elements. Each element contains a path to a directory which may contain document files. """ - corpus_dir = os.path.join(cfg.opts("benchmarks", "local.dataset.cache"), corpus_name) + corpus_dir = os.path.join( + cfg.opts("benchmarks", "local.dataset.cache"), corpus_name + ) if is_simple_workload_mode(cfg): workload_path = cfg.opts("workload", "workload.path") r = SimpleWorkloadRepository(workload_path) @@ -314,7 +392,9 @@ class GitWorkloadRepository: def __init__(self, cfg, fetch, update, repo_class=repo.BenchmarkRepository): # current workload name (if any) self.workload_name = cfg.opts("workload", "workload.name", mandatory=False) - distribution_version = cfg.opts("builder", "distribution.version", mandatory=False) + distribution_version = cfg.opts( + "builder", "distribution.version", mandatory=False + ) repo_name = cfg.opts("workload", "repository.name") repo_revision = cfg.opts("workload", "repository.revision", mandatory=False) offline = cfg.opts("system", "offline.mode") @@ -323,17 +403,27 @@ def __init__(self, cfg, fetch, update, repo_class=repo.BenchmarkRepository): workload_repositories = cfg.opts("benchmarks", "workload.repository.dir") workloads_dir = os.path.join(root, workload_repositories) - self.repo = repo_class(remote_url, workloads_dir, repo_name, "workloads", offline, fetch) + self.repo = repo_class( + remote_url, workloads_dir, repo_name, "workloads", offline, fetch + ) if update: if repo_revision: self.repo.checkout(repo_revision) else: self.repo.update(distribution_version) - cfg.add(config.Scope.applicationOverride, "workload", "repository.revision", self.repo.revision) + cfg.add( + config.Scope.applicationOverride, + "workload", + "repository.revision", + self.repo.revision, + ) @property def workload_names(self): - return filter(lambda p: os.path.exists(self.workload_file(p)), next(os.walk(self.repo.repo_dir))[1]) + return filter( + lambda p: os.path.exists(self.workload_file(p)), + next(os.walk(self.repo.repo_dir))[1], + ) def workload_dir(self, workload_name): return os.path.join(self.repo.repo_dir, workload_name) @@ -345,30 +435,43 @@ def workload_file(self, workload_name): class SimpleWorkloadRepository: def __init__(self, workload_path): if not os.path.exists(workload_path): - raise exceptions.SystemSetupError("Workload path %s does not exist" % workload_path) + raise exceptions.SystemSetupError( + "Workload path %s does not exist" % workload_path + ) if os.path.isdir(workload_path): self.workload_name = io.basename(workload_path) self._workload_dir = workload_path self._workload_file = os.path.join(workload_path, "workload.json") if not os.path.exists(self._workload_file): - raise exceptions.SystemSetupError("Could not find workload.json in %s" % workload_path) + raise exceptions.SystemSetupError( + "Could not find workload.json in %s" % workload_path + ) elif os.path.isfile(workload_path): if io.has_extension(workload_path, ".json"): self._workload_dir = io.dirname(workload_path) self._workload_file = workload_path self.workload_name = io.splitext(io.basename(workload_path))[0] else: - raise exceptions.SystemSetupError("%s has to be a JSON file" % workload_path) + raise exceptions.SystemSetupError( + "%s has to be a JSON file" % workload_path + ) else: - raise exceptions.SystemSetupError("%s is neither a file nor a directory" % workload_path) + raise exceptions.SystemSetupError( + "%s is neither a file nor a directory" % workload_path + ) @property def workload_names(self): return [self.workload_name] def workload_dir(self, workload_name): - assert workload_name == self.workload_name, "Expect provided workload name [%s] to match [%s]" % (workload_name, self.workload_name) + assert ( + workload_name == self.workload_name + ), "Expect provided workload name [%s] to match [%s]" % ( + workload_name, + self.workload_name, + ) return self._workload_dir def workload_file(self, workload_name): @@ -412,12 +515,19 @@ def prepare_docs(cfg, workload, corpus, preparator): for document_set in corpus.documents: if document_set.is_bulk: data_root = data_dir(cfg, workload.name, corpus.name) - logging.getLogger(__name__).info("Resolved data root directory for document corpus [%s] in workload [%s] " - "to [%s].", corpus.name, workload.name, data_root) + logging.getLogger(__name__).info( + "Resolved data root directory for document corpus [%s] in workload [%s] " + "to [%s].", + corpus.name, + workload.name, + data_root, + ) if len(data_root) == 1: preparator.prepare_document_set(document_set, data_root[0]) # attempt to prepare everything in the current directory and fallback to the corpus directory - elif not preparator.prepare_bundled_document_set(document_set, data_root[0]): + elif not preparator.prepare_bundled_document_set( + document_set, data_root[0] + ): preparator.prepare_document_set(document_set, data_root[1]) def on_prepare_workload(self, workload, data_root_dir): @@ -427,7 +537,7 @@ def on_prepare_workload(self, workload, data_root_dir): "cfg": self.cfg, "workload": workload, "corpus": corpus, - "preparator": prep + "preparator": prep, } yield DefaultWorkloadPreparator.prepare_docs, params @@ -438,8 +548,10 @@ def __init__(self): def decompress(self, archive_path, documents_path, uncompressed_size): if uncompressed_size: - msg = f"Decompressing workload data from [{archive_path}] to [{documents_path}] (resulting size: " \ - f"[{convert.bytes_to_gb(uncompressed_size):.2f}] GB) ... " + msg = ( + f"Decompressing workload data from [{archive_path}] to [{documents_path}] (resulting size: " + f"[{convert.bytes_to_gb(uncompressed_size):.2f}] GB) ... " + ) else: msg = f"Decompressing workload data from [{archive_path}] to [{documents_path}] ... " @@ -449,12 +561,15 @@ def decompress(self, archive_path, documents_path, uncompressed_size): if not os.path.isfile(documents_path): raise exceptions.DataError( f"Decompressing [{archive_path}] did not create [{documents_path}]. Please check with the workload " - f"author if the compressed archive has been created correctly.") + f"author if the compressed archive has been created correctly." + ) extracted_bytes = os.path.getsize(documents_path) if uncompressed_size is not None and extracted_bytes != uncompressed_size: - raise exceptions.DataError(f"[{documents_path}] is corrupt. Extracted [{extracted_bytes}] bytes " - f"but [{uncompressed_size}] bytes are expected.") + raise exceptions.DataError( + f"[{documents_path}] is corrupt. Extracted [{extracted_bytes}] bytes " + f"but [{uncompressed_size}] bytes are expected." + ) class Downloader: @@ -467,9 +582,13 @@ def download(self, base_url, target_path, size_in_bytes): file_name = os.path.basename(target_path) if not base_url: - raise exceptions.DataError("Cannot download data because no base URL is provided.") + raise exceptions.DataError( + "Cannot download data because no base URL is provided." + ) if self.offline: - raise exceptions.SystemSetupError(f"Cannot find [{target_path}]. Please disable offline mode and retry.") + raise exceptions.SystemSetupError( + f"Cannot find [{target_path}]. Please disable offline mode and retry." + ) if base_url.endswith("/"): separator = "" @@ -481,19 +600,32 @@ def download(self, base_url, target_path, size_in_bytes): io.ensure_dir(os.path.dirname(target_path)) if size_in_bytes: size_in_mb = round(convert.bytes_to_mb(size_in_bytes)) - self.logger.info("Downloading data from [%s] (%s MB) to [%s].", data_url, size_in_mb, target_path) + self.logger.info( + "Downloading data from [%s] (%s MB) to [%s].", + data_url, + size_in_mb, + target_path, + ) else: - self.logger.info("Downloading data from [%s] to [%s].", data_url, target_path) + self.logger.info( + "Downloading data from [%s] to [%s].", data_url, target_path + ) # we want to have a bit more accurate download progress as these files are typically very large progress = net.Progress("[INFO] Downloading workload data", accuracy=1) - net.download(data_url, target_path, size_in_bytes, progress_indicator=progress) + net.download( + data_url, target_path, size_in_bytes, progress_indicator=progress + ) progress.finish() - self.logger.info("Downloaded data from [%s] to [%s].", data_url, target_path) + self.logger.info( + "Downloaded data from [%s] to [%s].", data_url, target_path + ) except urllib.error.HTTPError as e: if e.code == 404 and self.test_mode: - raise exceptions.DataError("This workload does not support test mode. Ask the workload author to add it or" - " disable test mode and retry.") from None + raise exceptions.DataError( + "This workload does not support test mode. Ask the workload author to add it or" + " disable test mode and retry." + ) from None else: msg = f"Could not download [{data_url}] to [{target_path}]" if e.reason: @@ -502,16 +634,22 @@ def download(self, base_url, target_path, size_in_bytes): msg += f" (HTTP status: {e.code})" raise exceptions.DataError(msg) from e except urllib.error.URLError as e: - raise exceptions.DataError(f"Could not download [{data_url}] to [{target_path}].") from e + raise exceptions.DataError( + f"Could not download [{data_url}] to [{target_path}]." + ) from e if not os.path.isfile(target_path): - raise exceptions.SystemSetupError(f"Could not download [{data_url}] to [{target_path}]. Verify data " - f"are available at [{data_url}] and check your Internet connection.") + raise exceptions.SystemSetupError( + f"Could not download [{data_url}] to [{target_path}]. Verify data " + f"are available at [{data_url}] and check your Internet connection." + ) actual_size = os.path.getsize(target_path) if size_in_bytes is not None and actual_size != size_in_bytes: - raise exceptions.DataError(f"[{target_path}] is corrupt. Downloaded [{actual_size}] bytes " - f"but [{size_in_bytes}] bytes are expected.") + raise exceptions.DataError( + f"[{target_path}] is corrupt. Downloaded [{actual_size}] bytes " + f"but [{size_in_bytes}] bytes are expected." + ) class DocumentSetPreparator: @@ -531,8 +669,10 @@ def create_file_offset_table(self, document_file_path, expected_number_of_lines) lines_read = io.prepare_file_offset_table(document_file_path) if lines_read and lines_read != expected_number_of_lines: io.remove_file_offset_table(document_file_path) - raise exceptions.DataError(f"Data in [{document_file_path}] for workload [{self.workload_name}] are invalid. " - f"Expected [{expected_number_of_lines}] lines but got [{lines_read}].") + raise exceptions.DataError( + f"Data in [{document_file_path}] for workload [{self.workload_name}] are invalid. " + f"Expected [{expected_number_of_lines}] lines but got [{lines_read}]." + ) def prepare_document_set(self, document_set, data_root): """ @@ -551,15 +691,26 @@ def prepare_document_set(self, document_set, data_root): :param data_root: The data root directory for this document set. """ doc_path = os.path.join(data_root, document_set.document_file) - archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None + archive_path = ( + os.path.join(data_root, document_set.document_archive) + if document_set.has_compressed_corpus() + else None + ) while True: - if self.is_locally_available(doc_path) and \ - self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): + if self.is_locally_available(doc_path) and self.has_expected_size( + doc_path, document_set.uncompressed_size_in_bytes + ): break - if document_set.has_compressed_corpus() and \ - self.is_locally_available(archive_path) and \ - self.has_expected_size(archive_path, document_set.compressed_size_in_bytes): - self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) + if ( + document_set.has_compressed_corpus() + and self.is_locally_available(archive_path) + and self.has_expected_size( + archive_path, document_set.compressed_size_in_bytes + ) + ): + self.decompressor.decompress( + archive_path, doc_path, document_set.uncompressed_size_in_bytes + ) else: if document_set.has_compressed_corpus(): target_path = archive_path @@ -569,16 +720,25 @@ def prepare_document_set(self, document_set, data_root): expected_size = document_set.uncompressed_size_in_bytes else: # this should not happen in practice as the JSON schema should take care of this - raise exceptions.BenchmarkAssertionError(f"Workload {self.workload_name} specifies documents but no corpus") + raise exceptions.BenchmarkAssertionError( + f"Workload {self.workload_name} specifies documents but no corpus" + ) try: - self.downloader.download(document_set.base_url, target_path, expected_size) + self.downloader.download( + document_set.base_url, target_path, expected_size + ) except exceptions.DataError as e: - if e.message == "Cannot download data because no base URL is provided." and \ - self.is_locally_available(target_path): - raise exceptions.DataError(f"[{target_path}] is present but does not have the expected " - f"size of [{expected_size}] bytes and it cannot be downloaded " - f"because no base URL is provided.") from None + if ( + e.message + == "Cannot download data because no base URL is provided." + and self.is_locally_available(target_path) + ): + raise exceptions.DataError( + f"[{target_path}] is present but does not have the expected " + f"size of [{expected_size}] bytes and it cannot be downloaded " + f"because no base URL is provided." + ) from None else: raise @@ -604,26 +764,44 @@ def prepare_bundled_document_set(self, document_set, data_root): :return: See postcondition. """ doc_path = os.path.join(data_root, document_set.document_file) - archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None + archive_path = ( + os.path.join(data_root, document_set.document_archive) + if document_set.has_compressed_corpus() + else None + ) while True: if self.is_locally_available(doc_path): - if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): - self.create_file_offset_table(doc_path, document_set.number_of_lines) + if self.has_expected_size( + doc_path, document_set.uncompressed_size_in_bytes + ): + self.create_file_offset_table( + doc_path, document_set.number_of_lines + ) return True else: - raise exceptions.DataError(f"[{doc_path}] is present but does not have the expected size " - f"of [{document_set.uncompressed_size_in_bytes}] bytes.") - - if document_set.has_compressed_corpus() and self.is_locally_available(archive_path): - if self.has_expected_size(archive_path, document_set.compressed_size_in_bytes): - self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) + raise exceptions.DataError( + f"[{doc_path}] is present but does not have the expected size " + f"of [{document_set.uncompressed_size_in_bytes}] bytes." + ) + + if document_set.has_compressed_corpus() and self.is_locally_available( + archive_path + ): + if self.has_expected_size( + archive_path, document_set.compressed_size_in_bytes + ): + self.decompressor.decompress( + archive_path, doc_path, document_set.uncompressed_size_in_bytes + ) else: # treat this is an error because if the file is present but the size does not match, something is # really fishy. It is likely that the user is currently creating a new workload and did not specify # the file size correctly. - raise exceptions.DataError(f"[{archive_path}] is present but does not have " - f"the expected size of [{document_set.compressed_size_in_bytes}] bytes.") + raise exceptions.DataError( + f"[{archive_path}] is present but does not have " + f"the expected size of [{document_set.compressed_size_in_bytes}] bytes." + ) else: return False @@ -635,9 +813,13 @@ class TemplateSource: benchmark.collect(parts=... """ - collect_parts_re = re.compile(r"{{\ +?benchmark\.collect\(parts=\"(.+?(?=\"))\"\)\ +?}}") + collect_parts_re = re.compile( + r"{{\ +?benchmark\.collect\(parts=\"(.+?(?=\"))\"\)\ +?}}" + ) - def __init__(self, base_path, template_file_name, source=io.FileSource, fileglobber=glob.glob): + def __init__( + self, base_path, template_file_name, source=io.FileSource, fileglobber=glob.glob + ): self.base_path = base_path self.template_file_name = template_file_name self.source = source @@ -648,12 +830,17 @@ def __init__(self, base_path, template_file_name, source=io.FileSource, fileglob def load_template_from_file(self): loader = jinja2.FileSystemLoader(self.base_path) try: - base_workload = loader.get_source(jinja2.Environment( - autoescape=select_autoescape(['html', 'xml'])), - self.template_file_name) + base_workload = loader.get_source( + jinja2.Environment(autoescape=select_autoescape(["html", "xml"])), + self.template_file_name, + ) except jinja2.TemplateNotFound: - self.logger.exception("Could not load workload from [%s].", self.template_file_name) - raise WorkloadSyntaxError("Could not load workload from '{}'".format(self.template_file_name)) + self.logger.exception( + "Could not load workload from [%s].", self.template_file_name + ) + raise WorkloadSyntaxError( + "Could not load workload from '{}'".format(self.template_file_name) + ) self.assembled_source = self.replace_includes(self.base_path, base_workload[0]) def load_template_from_string(self, template_source): @@ -667,7 +854,9 @@ def replace_includes(self, base_path, workload_fragment): for glob_pattern in match: full_glob_path = os.path.join(base_path, glob_pattern) sub_source = self.read_glob_files(full_glob_path) - repl[glob_pattern] = self.replace_includes(base_path=io.dirname(full_glob_path), workload_fragment=sub_source) + repl[glob_pattern] = self.replace_includes( + base_path=io.dirname(full_glob_path), workload_fragment=sub_source + ) def replstring(matchobj): # matchobj.groups() is a tuple and first element contains the matched group id @@ -691,17 +880,14 @@ def default_internal_template_vars(glob_helper=lambda f: [], clock=time.Clock): """ return { - "globals": { - "now": clock.now(), - "glob": glob_helper - }, - "filters": { - "days_ago": time.days_ago - } + "globals": {"now": clock.now(), "glob": glob_helper}, + "filters": {"days_ago": time.days_ago}, } -def render_template(template_source, template_vars=None, template_internal_vars=None, loader=None): +def render_template( + template_source, template_vars=None, template_internal_vars=None, loader=None +): macros = [ """ {% macro collect(parts) -%} @@ -723,17 +909,19 @@ def render_template(template_source, template_vars=None, template_internal_vars= {% endif %} {% endif %} {%- endmacro %} - """ + """, ] # place helpers dict loader first to prevent users from overriding our macros. env = jinja2.Environment( - loader=jinja2.ChoiceLoader([ - jinja2.DictLoader({"benchmark.helpers": "".join(macros)}), - jinja2.BaseLoader(), - loader - ]), - autoescape=select_autoescape(['html', 'xml']) + loader=jinja2.ChoiceLoader( + [ + jinja2.DictLoader({"benchmark.helpers": "".join(macros)}), + jinja2.BaseLoader(), + loader, + ] + ), + autoescape=select_autoescape(["html", "xml"]), ) if template_vars: @@ -742,7 +930,9 @@ def render_template(template_source, template_vars=None, template_internal_vars= # ensure that user variables never override our internal variables if template_internal_vars: for macro_type in template_internal_vars: - for env_global_key, env_global_value in template_internal_vars[macro_type].items(): + for env_global_key, env_global_value in template_internal_vars[ + macro_type + ].items(): getattr(env, macro_type)[env_global_key] = env_global_value template = env.from_string(template_source) @@ -750,12 +940,14 @@ def render_template(template_source, template_vars=None, template_internal_vars= def register_all_params_in_workload(assembled_source, complete_workload_params=None): - j2env = jinja2.Environment(autoescape=select_autoescape(['html', 'xml'])) + j2env = jinja2.Environment(autoescape=select_autoescape(["html", "xml"])) # we don't need the following j2 filters/macros but we define them anyway to prevent parsing failures internal_template_vars = default_internal_template_vars() for macro_type in internal_template_vars: - for env_global_key, env_global_value in internal_template_vars[macro_type].items(): + for env_global_key, env_global_value in internal_template_vars[ + macro_type + ].items(): getattr(j2env, macro_type)[env_global_key] = env_global_value ast = j2env.parse(assembled_source) @@ -764,7 +956,9 @@ def register_all_params_in_workload(assembled_source, complete_workload_params=N complete_workload_params.populate_workload_defined_params(j2_variables) -def render_template_from_file(template_file_name, template_vars, complete_workload_params=None): +def render_template_from_file( + template_file_name, template_vars, complete_workload_params=None +): def relative_glob(start, f): result = glob.glob(os.path.join(start, f)) if result: @@ -775,12 +969,18 @@ def relative_glob(start, f): base_path = io.dirname(template_file_name) template_source = TemplateSource(base_path, io.basename(template_file_name)) template_source.load_template_from_file() - register_all_params_in_workload(template_source.assembled_source, complete_workload_params) + register_all_params_in_workload( + template_source.assembled_source, complete_workload_params + ) - return render_template(loader=jinja2.FileSystemLoader(base_path), - template_source=template_source.assembled_source, - template_vars=template_vars, - template_internal_vars=default_internal_template_vars(glob_helper=lambda f: relative_glob(base_path, f))) + return render_template( + loader=jinja2.FileSystemLoader(base_path), + template_source=template_source.assembled_source, + template_vars=template_vars, + template_internal_vars=default_internal_template_vars( + glob_helper=lambda f: relative_glob(base_path, f) + ), + ) class TaskFilterWorkloadProcessor(WorkloadProcessor): @@ -810,10 +1010,14 @@ def _filters_from_filtered_tasks(self, filtered_tasks): elif spec[0] == "tag": filters.append(workload.TaskTagFilter(spec[1])) else: - raise exceptions.SystemSetupError(f"Invalid format for filtered tasks: [{t}]. " - f"Expected [type] but got [{spec[0]}].") + raise exceptions.SystemSetupError( + f"Invalid format for filtered tasks: [{t}]. " + f"Expected [type] but got [{spec[0]}]." + ) else: - raise exceptions.SystemSetupError(f"Invalid format for filtered tasks: [{t}]") + raise exceptions.SystemSetupError( + f"Invalid format for filtered tasks: [{t}]" + ) return filters def _filter_out_match(self, task): @@ -840,11 +1044,18 @@ def on_after_load_workload(self, workload): if self._filter_out_match(leaf_task): leafs_to_remove.append(leaf_task) for leaf_task in leafs_to_remove: - self.logger.info("Removing sub-task [%s] from test_procedure [%s] due to task filter.", - leaf_task, test_procedure) + self.logger.info( + "Removing sub-task [%s] from test_procedure [%s] due to task filter.", + leaf_task, + test_procedure, + ) task.remove_task(leaf_task) for task in tasks_to_remove: - self.logger.info("Removing task [%s] from test_procedure [%s] due to task filter.", task, test_procedure) + self.logger.info( + "Removing task [%s] from test_procedure [%s] due to task filter.", + task, + test_procedure, + ) test_procedure.remove_task(task) return workload @@ -852,7 +1063,9 @@ def on_after_load_workload(self, workload): class TestModeWorkloadProcessor(WorkloadProcessor): def __init__(self, cfg): - self.test_mode_enabled = cfg.opts("workload", "test.mode.enabled", mandatory=False, default_value=False) + self.test_mode_enabled = cfg.opts( + "workload", "test.mode.enabled", mandatory=False, default_value=False + ) self.logger = logging.getLogger(__name__) def on_after_load_workload(self, workload): @@ -861,7 +1074,9 @@ def on_after_load_workload(self, workload): self.logger.info("Preparing workload [%s] for test mode.", str(workload)) for corpus in workload.corpora: if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug("Reducing corpus size to 1000 documents for [%s]", corpus.name) + self.logger.debug( + "Reducing corpus size to 1000 documents for [%s]", corpus.name + ) for document_set in corpus.documents: # TODO #341: Should we allow this for snapshots too? if document_set.is_bulk: @@ -877,8 +1092,10 @@ def on_after_load_workload(self, workload): path, ext = io.splitext(document_set.document_file) document_set.document_file = f"{path}-1k{ext}" else: - raise exceptions.BenchmarkAssertionError(f"Document corpus [{corpus.name}] has neither compressed " - f"nor uncompressed corpus.") + raise exceptions.BenchmarkAssertionError( + f"Document corpus [{corpus.name}] has neither compressed " + f"nor uncompressed corpus." + ) # we don't want to check sizes document_set.compressed_size_in_bytes = None @@ -890,33 +1107,58 @@ def on_after_load_workload(self, workload): for leaf_task in task: # iteration-based schedules are divided among all clients and we should provide # at least one iteration for each client. - if leaf_task.warmup_iterations is not None and leaf_task.warmup_iterations > leaf_task.clients: + if ( + leaf_task.warmup_iterations is not None + and leaf_task.warmup_iterations > leaf_task.clients + ): count = leaf_task.clients if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug("Resetting warmup iterations to %d for [%s]", count, str(leaf_task)) + self.logger.debug( + "Resetting warmup iterations to %d for [%s]", + count, + str(leaf_task), + ) leaf_task.warmup_iterations = count - if leaf_task.iterations is not None and leaf_task.iterations > leaf_task.clients: + if ( + leaf_task.iterations is not None + and leaf_task.iterations > leaf_task.clients + ): count = leaf_task.clients if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug("Resetting measurement iterations to %d for [%s]", count, str(leaf_task)) + self.logger.debug( + "Resetting measurement iterations to %d for [%s]", + count, + str(leaf_task), + ) leaf_task.iterations = count - if leaf_task.warmup_time_period is not None and leaf_task.warmup_time_period > 0: + if ( + leaf_task.warmup_time_period is not None + and leaf_task.warmup_time_period > 0 + ): leaf_task.warmup_time_period = 0 if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug("Resetting warmup time period for [%s] to [%d] seconds.", - str(leaf_task), leaf_task.warmup_time_period) + self.logger.debug( + "Resetting warmup time period for [%s] to [%d] seconds.", + str(leaf_task), + leaf_task.warmup_time_period, + ) if leaf_task.time_period is not None and leaf_task.time_period > 10: leaf_task.time_period = 10 if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug("Resetting measurement time period for [%s] to [%d] seconds.", - str(leaf_task), leaf_task.time_period) + self.logger.debug( + "Resetting measurement time period for [%s] to [%d] seconds.", + str(leaf_task), + leaf_task.time_period, + ) # Keep throttled to expose any errors but increase the target throughput for short execution times. if leaf_task.target_throughput: original_throughput = leaf_task.target_throughput leaf_task.params.pop("target-throughput", None) leaf_task.params.pop("target-interval", None) - leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}" + leaf_task.params[ + "target-throughput" + ] = f"{sys.maxsize} {original_throughput.unit}" return workload @@ -924,7 +1166,9 @@ def on_after_load_workload(self, workload): class CompleteWorkloadParams: def __init__(self, user_specified_workload_params=None): self.workload_defined_params = set() - self.user_specified_workload_params = user_specified_workload_params if user_specified_workload_params else {} + self.user_specified_workload_params = ( + user_specified_workload_params if user_specified_workload_params else {} + ) def populate_workload_defined_params(self, list_of_workload_params=None): self.workload_defined_params.update(set(list_of_workload_params)) @@ -948,15 +1192,21 @@ class WorkloadFileReader: """ def __init__(self, cfg): - workload_schema_file = os.path.join(cfg.opts("node", "benchmark.root"), "resources", "workload-schema.json") + workload_schema_file = os.path.join( + cfg.opts("node", "benchmark.root"), "resources", "workload-schema.json" + ) with open(workload_schema_file, mode="rt", encoding="utf-8") as f: self.workload_schema = json.loads(f.read()) self.workload_params = cfg.opts("workload", "params", mandatory=False) - self.complete_workload_params = CompleteWorkloadParams(user_specified_workload_params=self.workload_params) + self.complete_workload_params = CompleteWorkloadParams( + user_specified_workload_params=self.workload_params + ) self.read_workload = WorkloadSpecificationReader( workload_params=self.workload_params, complete_workload_params=self.complete_workload_params, - selected_test_procedure=cfg.opts("workload", "test_procedure.name", mandatory=False) + selected_test_procedure=cfg.opts( + "workload", "test_procedure.name", mandatory=False + ), ) self.logger = logging.getLogger(__name__) @@ -970,21 +1220,31 @@ def read(self, workload_name, workload_spec_file, mapping_dir): :return: A corresponding workload instance if the workload file is valid. """ - self.logger.info("Reading workload specification file [%s].", workload_spec_file) + self.logger.info( + "Reading workload specification file [%s].", workload_spec_file + ) # render the workload to a temporary file instead of dumping it into the logs. It is easier to check for error messages # involving lines numbers and it also does not bloat Benchmark's log file so much. tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json") try: rendered = render_template_from_file( - workload_spec_file, self.workload_params, - complete_workload_params=self.complete_workload_params) + workload_spec_file, + self.workload_params, + complete_workload_params=self.complete_workload_params, + ) with open(tmp.name, "wt", encoding="utf-8") as f: f.write(rendered) - self.logger.info("Final rendered workload for '%s' has been written to '%s'.", workload_spec_file, tmp.name) + self.logger.info( + "Final rendered workload for '%s' has been written to '%s'.", + workload_spec_file, + tmp.name, + ) workload_spec = json.loads(rendered) except jinja2.exceptions.TemplateNotFound: self.logger.exception("Could not load [%s]", workload_spec_file) - raise exceptions.SystemSetupError("Workload {} does not exist".format(workload_name)) + raise exceptions.SystemSetupError( + "Workload {} does not exist".format(workload_name) + ) except json.JSONDecodeError as e: self.logger.exception("Could not load [%s].", workload_spec_file) msg = "Could not load '{}': {}.".format(workload_spec_file, str(e)) @@ -995,45 +1255,73 @@ def read(self, workload_name, workload_spec_file, mapping_dir): ctx_start = max(0, line_idx - ctx_line_count) ctx_end = min(line_idx + ctx_line_count, len(lines)) erroneous_lines = lines[ctx_start:ctx_end] - erroneous_lines.insert(line_idx - ctx_start + 1, "-" * (e.colno - 1) + "^ Error is here") - msg += " Lines containing the error:\n\n{}\n\n".format("\n".join(erroneous_lines)) - msg += "The complete workload has been written to '{}' for diagnosis.".format(tmp.name) + erroneous_lines.insert( + line_idx - ctx_start + 1, "-" * (e.colno - 1) + "^ Error is here" + ) + msg += " Lines containing the error:\n\n{}\n\n".format( + "\n".join(erroneous_lines) + ) + msg += ( + "The complete workload has been written to '{}' for diagnosis.".format( + tmp.name + ) + ) raise WorkloadSyntaxError(msg) except Exception as e: self.logger.exception("Could not load [%s].", workload_spec_file) - msg = "Could not load '{}'. The complete workload has been written to '{}' for diagnosis.".format(workload_spec_file, tmp.name) + msg = "Could not load '{}'. The complete workload has been written to '{}' for diagnosis.".format( + workload_spec_file, tmp.name + ) # Convert to string early on to avoid serialization errors with Jinja exceptions. raise WorkloadSyntaxError(msg, str(e)) # check the workload version before even attempting to validate the JSON format to avoid bogus errors. - raw_version = workload_spec.get("version", WorkloadFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION) + raw_version = workload_spec.get( + "version", WorkloadFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION + ) try: workload_version = int(raw_version) except ValueError: - raise exceptions.InvalidSyntax("version identifier for workload %s must be numeric but was [%s]" % ( - workload_name, str(raw_version))) + raise exceptions.InvalidSyntax( + "version identifier for workload %s must be numeric but was [%s]" + % (workload_name, str(raw_version)) + ) if WorkloadFileReader.MINIMUM_SUPPORTED_TRACK_VERSION > workload_version: - raise exceptions.BenchmarkError("Workload {} is on version {} but needs to be updated at least to version {} to work with the " - "current version of Benchmark.".format(workload_name, workload_version, - WorkloadFileReader.MINIMUM_SUPPORTED_TRACK_VERSION)) + raise exceptions.BenchmarkError( + "Workload {} is on version {} but needs to be updated at least to version {} to work with the " + "current version of Benchmark.".format( + workload_name, + workload_version, + WorkloadFileReader.MINIMUM_SUPPORTED_TRACK_VERSION, + ) + ) if WorkloadFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION < workload_version: - raise exceptions.BenchmarkError("Workload {} requires a newer version of Benchmark. " - "Please upgrade Benchmark (supported workload version: {}, " - "required workload version: {}).".format( - workload_name, - WorkloadFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION, - workload_version)) + raise exceptions.BenchmarkError( + "Workload {} requires a newer version of Benchmark. " + "Please upgrade Benchmark (supported workload version: {}, " + "required workload version: {}).".format( + workload_name, + WorkloadFileReader.MAXIMUM_SUPPORTED_TRACK_VERSION, + workload_version, + ) + ) try: jsonschema.validate(workload_spec, self.workload_schema) except jsonschema.exceptions.ValidationError as ve: raise WorkloadSyntaxError( "Workload '{}' is invalid.\n\nError details: {}\nInstance: {}\nPath: {}\nSchema path: {}".format( - workload_name, ve.message, json.dumps( - ve.instance, indent=4, sort_keys=True), - ve.absolute_path, ve.absolute_schema_path)) + workload_name, + ve.message, + json.dumps(ve.instance, indent=4, sort_keys=True), + ve.absolute_path, + ve.absolute_schema_path, + ) + ) current_workload = self.read_workload(workload_name, workload_spec, mapping_dir) - unused_user_defined_workload_params = self.complete_workload_params.unused_user_defined_workload_params() + unused_user_defined_workload_params = ( + self.complete_workload_params.unused_user_defined_workload_params() + ) if len(unused_user_defined_workload_params) > 0: err_msg = ( "Some of your workload parameter(s) {} are not used by this workload; perhaps you intend to use {} instead.\n\n" @@ -1041,19 +1329,39 @@ def read(self, workload_name, workload_spec_file, mapping_dir): "{}\n\n" "All parameters exposed by this workload:\n" "{}".format( - ",".join(opts.double_quoted_list_of(sorted(unused_user_defined_workload_params))), - ",".join(opts.double_quoted_list_of(sorted(opts.make_list_of_close_matches( - unused_user_defined_workload_params, - self.complete_workload_params.workload_defined_params - )))), - "\n".join(opts.bulleted_list_of(sorted(list(self.workload_params.keys())))), - "\n".join(opts.bulleted_list_of(self.complete_workload_params.sorted_workload_defined_params)))) + ",".join( + opts.double_quoted_list_of( + sorted(unused_user_defined_workload_params) + ) + ), + ",".join( + opts.double_quoted_list_of( + sorted( + opts.make_list_of_close_matches( + unused_user_defined_workload_params, + self.complete_workload_params.workload_defined_params, + ) + ) + ) + ), + "\n".join( + opts.bulleted_list_of(sorted(list(self.workload_params.keys()))) + ), + "\n".join( + opts.bulleted_list_of( + self.complete_workload_params.sorted_workload_defined_params + ) + ), + ) + ) self.logger.critical(err_msg) # also dump the message on the console console.println(err_msg) raise exceptions.WorkloadConfigError( - "Unused workload parameters {}.".format(sorted(unused_user_defined_workload_params)) + "Unused workload parameters {}.".format( + sorted(unused_user_defined_workload_params) + ) ) return current_workload @@ -1063,11 +1371,19 @@ class WorkloadPluginReader: Loads workload plugins """ - def __init__(self, workload_plugin_path, runner_registry=None, scheduler_registry=None, workload_processor_registry=None): + def __init__( + self, + workload_plugin_path, + runner_registry=None, + scheduler_registry=None, + workload_processor_registry=None, + ): self.runner_registry = runner_registry self.scheduler_registry = scheduler_registry self.workload_processor_registry = workload_processor_registry - self.loader = modules.ComponentLoader(root_path=workload_plugin_path, component_entry_point="workload") + self.loader = modules.ComponentLoader( + root_path=workload_plugin_path, component_entry_point="workload" + ) def can_load(self): return self.loader.can_load() @@ -1099,10 +1415,7 @@ def register_workload_processor(self, workload_processor): @property def meta_data(self): - return { - "benchmark_version": version.release_version(), - "async_runner": True - } + return {"benchmark_version": version.release_version(), "async_runner": True} class WorkloadSpecificationReader: @@ -1110,7 +1423,13 @@ class WorkloadSpecificationReader: Creates a workload instances based on its parsed JSON description. """ - def __init__(self, workload_params=None, complete_workload_params=None, selected_test_procedure=None, source=io.FileSource): + def __init__( + self, + workload_params=None, + complete_workload_params=None, + selected_test_procedure=None, + source=io.FileSource, + ): self.name = None self.workload_params = workload_params if workload_params else {} self.complete_workload_params = complete_workload_params @@ -1120,31 +1439,76 @@ def __init__(self, workload_params=None, complete_workload_params=None, selected def __call__(self, workload_name, workload_specification, mapping_dir): self.name = workload_name - description = self._r(workload_specification, "description", mandatory=False, default_value="") + description = self._r( + workload_specification, "description", mandatory=False, default_value="" + ) meta_data = self._r(workload_specification, "meta", mandatory=False) - indices = [self._create_index(idx, mapping_dir) - for idx in self._r(workload_specification, "indices", mandatory=False, default_value=[])] - data_streams = [self._create_data_stream(idx) - for idx in self._r(workload_specification, "data-streams", mandatory=False, default_value=[])] + indices = [ + self._create_index(idx, mapping_dir) + for idx in self._r( + workload_specification, "indices", mandatory=False, default_value=[] + ) + ] + data_streams = [ + self._create_data_stream(idx) + for idx in self._r( + workload_specification, + "data-streams", + mandatory=False, + default_value=[], + ) + ] if len(indices) > 0 and len(data_streams) > 0: # we guard against this early and support either or - raise WorkloadSyntaxError("indices and data-streams cannot both be specified") - templates = [self._create_index_template(tpl, mapping_dir) - for tpl in self._r(workload_specification, "templates", mandatory=False, default_value=[])] - composable_templates = [self._create_index_template(tpl, mapping_dir) - for tpl in self._r(workload_specification, "composable-templates", mandatory=False, default_value=[])] - component_templates = [self._create_component_template(tpl, mapping_dir) - for tpl in self._r(workload_specification, "component-templates", mandatory=False, default_value=[])] - corpora = self._create_corpora(self._r(workload_specification, "corpora", mandatory=False, default_value=[]), - indices, data_streams) + raise WorkloadSyntaxError( + "indices and data-streams cannot both be specified" + ) + templates = [ + self._create_index_template(tpl, mapping_dir) + for tpl in self._r( + workload_specification, "templates", mandatory=False, default_value=[] + ) + ] + composable_templates = [ + self._create_index_template(tpl, mapping_dir) + for tpl in self._r( + workload_specification, + "composable-templates", + mandatory=False, + default_value=[], + ) + ] + component_templates = [ + self._create_component_template(tpl, mapping_dir) + for tpl in self._r( + workload_specification, + "component-templates", + mandatory=False, + default_value=[], + ) + ] + corpora = self._create_corpora( + self._r( + workload_specification, "corpora", mandatory=False, default_value=[] + ), + indices, + data_streams, + ) test_procedures = self._create_test_procedures(workload_specification) # at this point, *all* workload params must have been referenced in the templates - return workload.Workload(name=self.name, meta_data=meta_data, - description=description, test_procedures=test_procedures, - indices=indices, - data_streams=data_streams, templates=templates, composable_templates=composable_templates, - component_templates=component_templates, corpora=corpora) + return workload.Workload( + name=self.name, + meta_data=meta_data, + description=description, + test_procedures=test_procedures, + indices=indices, + data_streams=data_streams, + templates=templates, + composable_templates=composable_templates, + component_templates=component_templates, + corpora=corpora, + ) def _error(self, msg): raise WorkloadSyntaxError("Workload '%s' is invalid. %s" % (self.name, msg)) @@ -1161,7 +1525,10 @@ def _r(self, root, path, error_ctx=None, mandatory=True, default_value=None): except KeyError: if mandatory: if error_ctx: - self._error("Mandatory element '%s' is missing in '%s'." % (".".join(path), error_ctx)) + self._error( + "Mandatory element '%s' is missing in '%s'." + % (".".join(path), error_ctx) + ) else: self._error("Mandatory element '%s' is missing." % ".".join(path)) else: @@ -1176,11 +1543,16 @@ def _create_index(self, index_spec, mapping_dir): idx_body_tmpl_src.load_template_from_string(f.read()) body = self._load_template( idx_body_tmpl_src.assembled_source, - "definition for index {} in {}".format(index_name, body_file)) + "definition for index {} in {}".format(index_name, body_file), + ) else: body = None - return workload.Index(name=index_name, body=body, types=self._r(index_spec, "types", mandatory=False, default_value=[])) + return workload.Index( + name=index_name, + body=body, + types=self._r(index_spec, "types", mandatory=False, default_value=[]), + ) def _create_data_stream(self, data_stream_spec): return workload.DataStream(name=self._r(data_stream_spec, "name")) @@ -1194,37 +1566,48 @@ def _create_component_template(self, tpl_spec, mapping_dir): idx_tmpl_src.load_template_from_string(f.read()) template_content = self._load_template( idx_tmpl_src.assembled_source, - f"definition for component template {name} in {template_file}") + f"definition for component template {name} in {template_file}", + ) return workload.ComponentTemplate(name, template_content) def _create_index_template(self, tpl_spec, mapping_dir): name = self._r(tpl_spec, "name") template_file = self._r(tpl_spec, "template") index_pattern = self._r(tpl_spec, "index-pattern") - delete_matching_indices = self._r(tpl_spec, "delete-matching-indices", mandatory=False, default_value=True) + delete_matching_indices = self._r( + tpl_spec, "delete-matching-indices", mandatory=False, default_value=True + ) template_file = os.path.join(mapping_dir, template_file) idx_tmpl_src = TemplateSource(mapping_dir, template_file, self.source) with self.source(template_file, "rt") as f: idx_tmpl_src.load_template_from_string(f.read()) template_content = self._load_template( idx_tmpl_src.assembled_source, - "definition for index template {} in {}".format(name, template_file)) - return workload.IndexTemplate(name, index_pattern, template_content, delete_matching_indices) + "definition for index template {} in {}".format(name, template_file), + ) + return workload.IndexTemplate( + name, index_pattern, template_content, delete_matching_indices + ) def _load_template(self, contents, description): self.logger.info("Loading template [%s].", description) register_all_params_in_workload(contents, self.complete_workload_params) try: - rendered = render_template(template_source=contents, - template_vars=self.workload_params) + rendered = render_template( + template_source=contents, template_vars=self.workload_params + ) return json.loads(rendered) except Exception as e: self.logger.exception("Could not load file template for %s.", description) - raise WorkloadSyntaxError("Could not load file template for '%s'" % description, str(e)) + raise WorkloadSyntaxError( + "Could not load file template for '%s'" % description, str(e) + ) def _create_corpora(self, corpora_specs, indices, data_streams): if len(indices) > 0 and len(data_streams) > 0: - raise WorkloadSyntaxError("indices and data-streams cannot both be specified") + raise WorkloadSyntaxError( + "indices and data-streams cannot both be specified" + ) document_corpora = [] known_corpora_names = set() for corpus_spec in corpora_specs: @@ -1237,35 +1620,74 @@ def _create_corpora(self, corpora_specs, indices, data_streams): meta_data = self._r(corpus_spec, "meta", error_ctx=name, mandatory=False) corpus = workload.DocumentCorpus(name=name, meta_data=meta_data) # defaults on corpus level - default_base_url = self._r(corpus_spec, "base-url", mandatory=False, default_value=None) - default_source_format = self._r(corpus_spec, "source-format", mandatory=False, - default_value=workload.Documents.SOURCE_FORMAT_BULK) - default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, - default_value=False) + default_base_url = self._r( + corpus_spec, "base-url", mandatory=False, default_value=None + ) + default_source_format = self._r( + corpus_spec, + "source-format", + mandatory=False, + default_value=workload.Documents.SOURCE_FORMAT_BULK, + ) + default_action_and_meta_data = self._r( + corpus_spec, + "includes-action-and-meta-data", + mandatory=False, + default_value=False, + ) corpus_target_idx = None corpus_target_ds = None corpus_target_type = None if len(indices) == 1: - corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False, default_value=indices[0].name) + corpus_target_idx = self._r( + corpus_spec, + "target-index", + mandatory=False, + default_value=indices[0].name, + ) elif len(indices) > 0: - corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False) + corpus_target_idx = self._r( + corpus_spec, "target-index", mandatory=False + ) if len(data_streams) == 1: - corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False, - default_value=data_streams[0].name) + corpus_target_ds = self._r( + corpus_spec, + "target-data-stream", + mandatory=False, + default_value=data_streams[0].name, + ) elif len(data_streams) > 0: - corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False) + corpus_target_ds = self._r( + corpus_spec, "target-data-stream", mandatory=False + ) if len(indices) == 1 and len(indices[0].types) == 1: - corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, - default_value=indices[0].types[0]) + corpus_target_type = self._r( + corpus_spec, + "target-type", + mandatory=False, + default_value=indices[0].types[0], + ) elif len(indices) > 0: - corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False) + corpus_target_type = self._r( + corpus_spec, "target-type", mandatory=False + ) for doc_spec in self._r(corpus_spec, "documents"): - base_url = self._r(doc_spec, "base-url", mandatory=False, default_value=default_base_url) - source_format = self._r(doc_spec, "source-format", mandatory=False, default_value=default_source_format) + base_url = self._r( + doc_spec, + "base-url", + mandatory=False, + default_value=default_base_url, + ) + source_format = self._r( + doc_spec, + "source-format", + mandatory=False, + default_value=default_source_format, + ) if source_format == workload.Documents.SOURCE_FORMAT_BULK: docs = self._r(doc_spec, "source-file") @@ -1276,82 +1698,143 @@ def _create_corpora(self, corpora_specs, indices, data_streams): document_archive = None document_file = docs num_docs = self._r(doc_spec, "document-count") - compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False) - uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False) - doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False) - - includes_action_and_meta_data = self._r(doc_spec, "includes-action-and-meta-data", mandatory=False, - default_value=default_action_and_meta_data) + compressed_bytes = self._r( + doc_spec, "compressed-bytes", mandatory=False + ) + uncompressed_bytes = self._r( + doc_spec, "uncompressed-bytes", mandatory=False + ) + doc_meta_data = self._r( + doc_spec, "meta", error_ctx=name, mandatory=False + ) + + includes_action_and_meta_data = self._r( + doc_spec, + "includes-action-and-meta-data", + mandatory=False, + default_value=default_action_and_meta_data, + ) if includes_action_and_meta_data: target_idx = None target_type = None target_ds = None else: - target_type = self._r(doc_spec, "target-type", mandatory=False, - default_value=corpus_target_type, error_ctx=docs) + target_type = self._r( + doc_spec, + "target-type", + mandatory=False, + default_value=corpus_target_type, + error_ctx=docs, + ) # require to be specified if we're using data streams and we have no default - target_ds = self._r(doc_spec, "target-data-stream", - mandatory=len(data_streams) > 0 and corpus_target_ds is None, - default_value=corpus_target_ds, - error_ctx=docs) + target_ds = self._r( + doc_spec, + "target-data-stream", + mandatory=len(data_streams) > 0 + and corpus_target_ds is None, + default_value=corpus_target_ds, + error_ctx=docs, + ) if target_ds and len(indices) > 0: # if indices are in use we error - raise WorkloadSyntaxError("target-data-stream cannot be used when using indices") + raise WorkloadSyntaxError( + "target-data-stream cannot be used when using indices" + ) elif target_ds and target_type: - raise WorkloadSyntaxError("target-type cannot be used when using data-streams") + raise WorkloadSyntaxError( + "target-type cannot be used when using data-streams" + ) # need an index if we're using indices and no meta-data are present and we don't have a default - target_idx = self._r(doc_spec, "target-index", - mandatory=len(indices) > 0 and corpus_target_idx is None, - default_value=corpus_target_idx, - error_ctx=docs) + target_idx = self._r( + doc_spec, + "target-index", + mandatory=len(indices) > 0 and corpus_target_idx is None, + default_value=corpus_target_idx, + error_ctx=docs, + ) # either target_idx or target_ds if target_idx and len(data_streams) > 0: # if data streams are in use we error - raise WorkloadSyntaxError("target-index cannot be used when using data-streams") + raise WorkloadSyntaxError( + "target-index cannot be used when using data-streams" + ) # we need one or the other if target_idx is None and target_ds is None: - raise WorkloadSyntaxError(f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} " - f"is required for {docs}" ) - - docs = workload.Documents(source_format=source_format, - document_file=document_file, - document_archive=document_archive, - base_url=base_url, - includes_action_and_meta_data=includes_action_and_meta_data, - number_of_documents=num_docs, - compressed_size_in_bytes=compressed_bytes, - uncompressed_size_in_bytes=uncompressed_bytes, - target_index=target_idx, target_type=target_type, - target_data_stream=target_ds, meta_data=doc_meta_data) + raise WorkloadSyntaxError( + f"a {'target-index' if len(indices) > 0 else 'target-data-stream'} " + f"is required for {docs}" + ) + + docs = workload.Documents( + source_format=source_format, + document_file=document_file, + document_archive=document_archive, + base_url=base_url, + includes_action_and_meta_data=includes_action_and_meta_data, + number_of_documents=num_docs, + compressed_size_in_bytes=compressed_bytes, + uncompressed_size_in_bytes=uncompressed_bytes, + target_index=target_idx, + target_type=target_type, + target_data_stream=target_ds, + meta_data=doc_meta_data, + ) corpus.documents.append(docs) else: - self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name)) + self._error( + "Unknown source-format [%s] in document corpus [%s]." + % (source_format, name) + ) document_corpora.append(corpus) return document_corpora def _create_test_procedures(self, workload_spec): - ops = self.parse_operations(self._r(workload_spec, "operations", mandatory=False, default_value=[])) - workload_params = self._r(workload_spec, "parameters", mandatory=False, default_value={}) + ops = self.parse_operations( + self._r(workload_spec, "operations", mandatory=False, default_value=[]) + ) + workload_params = self._r( + workload_spec, "parameters", mandatory=False, default_value={} + ) test_procedures = [] known_test_procedure_names = set() default_test_procedure = None - test_procedure_specs, auto_generated = self._get_test_procedure_specs(workload_spec) + test_procedure_specs, auto_generated = self._get_test_procedure_specs( + workload_spec + ) number_of_test_procedures = len(test_procedure_specs) for test_procedure_spec in test_procedure_specs: name = self._r(test_procedure_spec, "name", error_ctx="test_procedures") - description = self._r(test_procedure_spec, "description", error_ctx=name, mandatory=False) - user_info = self._r(test_procedure_spec, "user-info", error_ctx=name, mandatory=False) - test_procedure_params = self._r(test_procedure_spec, "parameters", error_ctx=name, mandatory=False, default_value={}) - meta_data = self._r(test_procedure_spec, "meta", error_ctx=name, mandatory=False) + description = self._r( + test_procedure_spec, "description", error_ctx=name, mandatory=False + ) + user_info = self._r( + test_procedure_spec, "user-info", error_ctx=name, mandatory=False + ) + test_procedure_params = self._r( + test_procedure_spec, + "parameters", + error_ctx=name, + mandatory=False, + default_value={}, + ) + meta_data = self._r( + test_procedure_spec, "meta", error_ctx=name, mandatory=False + ) # if we only have one test_procedure it is treated as default test_procedure, no matter what the user has specified - default = number_of_test_procedures == 1 or self._r(test_procedure_spec, "default", error_ctx=name, mandatory=False) - selected = number_of_test_procedures == 1 or self.selected_test_procedure == name + default = number_of_test_procedures == 1 or self._r( + test_procedure_spec, "default", error_ctx=name, mandatory=False + ) + selected = ( + number_of_test_procedures == 1 or self.selected_test_procedure == name + ) if default and default_test_procedure is not None: - self._error("Both '%s' and '%s' are defined as default test_procedures. Please define only one of them as default." - % (default_test_procedure.name, name)) + self._error( + "Both '%s' and '%s' are defined as default test_procedures. Please define only one of them as default." + % (default_test_procedure.name, name) + ) if name in known_test_procedure_names: self._error("Duplicate test_procedure with name '%s'." % name) known_test_procedure_names.add(name) @@ -1370,23 +1853,30 @@ def _create_test_procedures(self, workload_spec): for task in schedule: for sub_task in task: if sub_task.name in known_task_names: - self._error("TestProcedure '%s' contains multiple tasks with the name '%s'. Please use the task's name property to " - "assign a unique name for each task." % (name, sub_task.name)) + self._error( + "TestProcedure '%s' contains multiple tasks with the name '%s'. Please use the task's name property to " + "assign a unique name for each task." + % (name, sub_task.name) + ) else: known_task_names.add(sub_task.name) # merge params - final_test_procedure_params = dict(collections.merge_dicts(workload_params, test_procedure_params)) - - test_procedure = workload.TestProcedure(name=name, - parameters=final_test_procedure_params, - meta_data=meta_data, - description=description, - user_info=user_info, - default=default, - selected=selected, - auto_generated=auto_generated, - schedule=schedule) + final_test_procedure_params = dict( + collections.merge_dicts(workload_params, test_procedure_params) + ) + + test_procedure = workload.TestProcedure( + name=name, + parameters=final_test_procedure_params, + meta_data=meta_data, + description=description, + user_info=user_info, + default=default, + selected=selected, + auto_generated=auto_generated, + schedule=schedule, + ) if default: default_test_procedure = test_procedure @@ -1394,8 +1884,9 @@ def _create_test_procedures(self, workload_spec): if test_procedures and default_test_procedure is None: self._error( - "No default test_procedure specified. Please edit the workload and add \"default\": true to one of the test_procedures %s." - % ", ".join([c.name for c in test_procedures])) + 'No default test_procedure specified. Please edit the workload and add "default": true to one of the test_procedures %s.' + % ", ".join([c.name for c in test_procedures]) + ) return test_procedures def _get_test_procedure_specs(self, workload_spec): @@ -1403,40 +1894,69 @@ def _get_test_procedure_specs(self, workload_spec): test_procedure = self._r(workload_spec, "test_procedure", mandatory=False) test_procedures = self._r(workload_spec, "test_procedures", mandatory=False) - count_defined = len(list(filter(lambda e: e is not None, [schedule, test_procedure, test_procedures]))) + count_defined = len( + list( + filter( + lambda e: e is not None, [schedule, test_procedure, test_procedures] + ) + ) + ) if count_defined == 0: - self._error("You must define 'test_procedure', 'test_procedures' or 'schedule' but none is specified.") + self._error( + "You must define 'test_procedure', 'test_procedures' or 'schedule' but none is specified." + ) elif count_defined > 1: - self._error("Multiple out of 'test_procedure', 'test_procedures' or 'schedule' are defined but only one of them is allowed.") + self._error( + "Multiple out of 'test_procedure', 'test_procedures' or 'schedule' are defined but only one of them is allowed." + ) elif test_procedure is not None: return [test_procedure], False elif test_procedures is not None: return test_procedures, False elif schedule is not None: - return [{ - "name": "default", - "schedule": schedule - }], True + return [{"name": "default", "schedule": schedule}], True else: raise AssertionError( "Unexpected: schedule=[{}], test_procedure=[{}], test_procedures=[{}]".format( - schedule, test_procedure, test_procedures)) + schedule, test_procedure, test_procedures + ) + ) def parse_parallel(self, ops_spec, ops, test_procedure_name): # use same default values as #parseTask() in case the 'parallel' element did not specify anything - default_warmup_iterations = self._r(ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False) - default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False) - default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False) - default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False) + default_warmup_iterations = self._r( + ops_spec, "warmup-iterations", error_ctx="parallel", mandatory=False + ) + default_iterations = self._r( + ops_spec, "iterations", error_ctx="parallel", mandatory=False + ) + default_warmup_time_period = self._r( + ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False + ) + default_time_period = self._r( + ops_spec, "time-period", error_ctx="parallel", mandatory=False + ) clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False) - completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False) + completed_by = self._r( + ops_spec, "completed-by", error_ctx="parallel", mandatory=False + ) # now descent to each operation tasks = [] for task in self._r(ops_spec, "tasks", error_ctx="parallel"): - tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations, - default_warmup_time_period, default_time_period, completed_by)) + tasks.append( + self.parse_task( + task, + ops, + test_procedure_name, + default_warmup_iterations, + default_iterations, + default_warmup_time_period, + default_time_period, + completed_by, + ) + ) if completed_by: completion_task = None for task in tasks: @@ -1445,49 +1965,109 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): elif task.completes_parent: self._error( "'parallel' element for test_procedure '%s' contains multiple tasks with the name '%s' which are marked with " - "'completed-by' but only task is allowed to match." % (test_procedure_name, completed_by)) + "'completed-by' but only task is allowed to match." + % (test_procedure_name, completed_by) + ) if not completion_task: - self._error("'parallel' element for test_procedure '%s' is marked with 'completed-by' with task name '%s' but no task with " - "this name exists." % (test_procedure_name, completed_by)) + self._error( + "'parallel' element for test_procedure '%s' is marked with 'completed-by' with task name '%s' but no task with " + "this name exists." % (test_procedure_name, completed_by) + ) return workload.Parallel(tasks, clients) - def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None, - default_warmup_time_period=None, default_time_period=None, completed_by_name=None): - + def parse_task( + self, + task_spec, + ops, + test_procedure_name, + default_warmup_iterations=None, + default_iterations=None, + default_warmup_time_period=None, + default_time_period=None, + completed_by_name=None, + ): + # raise ValueError("Invalid task specification: %s" % task_spec) op_spec = task_spec["operation"] if isinstance(op_spec, str) and op_spec in ops: op = ops[op_spec] else: # may as well an inline operation - op = self.parse_operation(op_spec, error_ctx="inline operation in test_procedure %s" % test_procedure_name) + op = self.parse_operation( + op_spec, + error_ctx="inline operation in test_procedure %s" % test_procedure_name, + ) schedule = self._r(task_spec, "schedule", error_ctx=op.name, mandatory=False) - task_name = self._r(task_spec, "name", error_ctx=op.name, mandatory=False, default_value=op.name) - task = workload.Task(name=task_name, - operation=op, - tags=self._r(task_spec, "tags", error_ctx=op.name, mandatory=False), - meta_data=self._r(task_spec, "meta", error_ctx=op.name, mandatory=False), - warmup_iterations=self._r(task_spec, "warmup-iterations", error_ctx=op.name, mandatory=False, - default_value=default_warmup_iterations), - iterations=self._r(task_spec, "iterations", error_ctx=op.name, mandatory=False, default_value=default_iterations), - warmup_time_period=self._r(task_spec, "warmup-time-period", error_ctx=op.name, - mandatory=False, - default_value=default_warmup_time_period), - time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False, - default_value=default_time_period), - clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1), - completes_parent=(task_name == completed_by_name), - schedule=schedule, - # this is to provide scheduler-specific parameters for custom schedulers. - params=task_spec) + task_name = self._r( + task_spec, "name", error_ctx=op.name, mandatory=False, default_value=op.name + ) + task = workload.Task( + name=task_name, + operation=op, + tags=self._r(task_spec, "tags", error_ctx=op.name, mandatory=False), + meta_data=self._r(task_spec, "meta", error_ctx=op.name, mandatory=False), + warmup_iterations=self._r( + task_spec, + "warmup-iterations", + error_ctx=op.name, + mandatory=False, + default_value=default_warmup_iterations, + ), + iterations=self._r( + task_spec, + "iterations", + error_ctx=op.name, + mandatory=False, + default_value=default_iterations, + ), + warmup_time_period=self._r( + task_spec, + "warmup-time-period", + error_ctx=op.name, + mandatory=False, + default_value=default_warmup_time_period, + ), + time_period=self._r( + task_spec, + "time-period", + error_ctx=op.name, + mandatory=False, + default_value=default_time_period, + ), + clients=self._r( + task_spec, + "clients", + error_ctx=op.name, + mandatory=False, + default_value=1, + ), + completes_parent=(task_name == completed_by_name), + schedule=schedule, + # this is to provide scheduler-specific parameters for custom schedulers. + params=task_spec, + ) if task.warmup_iterations is not None and task.time_period is not None: self._error( "Operation '%s' in test_procedure '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not " - "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period)) + "mix time periods and iterations." + % ( + op.name, + test_procedure_name, + task.warmup_iterations, + task.time_period, + ) + ) elif task.warmup_time_period is not None and task.iterations is not None: self._error( "Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not " - "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations)) + "mix time periods and iterations." + % ( + op.name, + test_procedure_name, + task.warmup_time_period, + task.iterations, + ) + ) return task @@ -1516,8 +2096,16 @@ def parse_operation(self, op_spec, error_ctx="operations"): # Benchmark's core operations will still use enums then but we'll allow users to define arbitrary operations op_type_name = self._r(op_spec, "operation-type", error_ctx=error_ctx) # fallback to use the operation type as the operation name - op_name = self._r(op_spec, "name", error_ctx=error_ctx, mandatory=False, default_value=op_type_name) - param_source = self._r(op_spec, "param-source", error_ctx=error_ctx, mandatory=False) + op_name = self._r( + op_spec, + "name", + error_ctx=error_ctx, + mandatory=False, + default_value=op_type_name, + ) + param_source = self._r( + op_spec, "param-source", error_ctx=error_ctx, mandatory=False + ) # just pass-through all parameters by default params = op_spec @@ -1525,13 +2113,25 @@ def parse_operation(self, op_spec, error_ctx="operations"): op = workload.OperationType.from_hyphenated_string(op_type_name) if "include-in-results_publishing" not in params: params["include-in-results_publishing"] = not op.admin_op - self.logger.debug("Using built-in operation type [%s] for operation [%s].", op_type_name, op_name) + self.logger.debug( + "Using built-in operation type [%s] for operation [%s].", + op_type_name, + op_name, + ) except KeyError: - self.logger.info("Using user-provided operation type [%s] for operation [%s].", op_type_name, op_name) + self.logger.info( + "Using user-provided operation type [%s] for operation [%s].", + op_type_name, + op_name, + ) try: - return workload.Operation(name=op_name, meta_data=meta_data, - operation_type=op_type_name, params=params, - param_source=param_source) + return workload.Operation( + name=op_name, + meta_data=meta_data, + operation_type=op_type_name, + params=params, + param_source=param_source, + ) except exceptions.InvalidSyntax as e: raise WorkloadSyntaxError("Invalid operation [%s]: %s" % (op_name, str(e))) diff --git a/osbenchmark/workload_generator/index.py b/osbenchmark/workload_generator/index.py index 71301101d..a2a5bfc39 100644 --- a/osbenchmark/workload_generator/index.py +++ b/osbenchmark/workload_generator/index.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -26,14 +26,16 @@ import logging import os -INDEX_SETTINGS_EPHEMERAL_KEYS = ["uuid", - "creation_date", - "version", - "provided_name", - "store"] +INDEX_SETTINGS_EPHEMERAL_KEYS = [ + "uuid", + "creation_date", + "version", + "provided_name", + "store", +] INDEX_SETTINGS_PARAMETERS = { "number_of_replicas": "{{{{number_of_replicas | default({orig})}}}}", - "number_of_shards": "{{{{number_of_shards | default({orig})}}}}" + "number_of_shards": "{{{{number_of_shards | default({orig})}}}}", } @@ -81,13 +83,13 @@ def extract_index_mapping_and_settings(client, index_pattern): valid, reason = is_valid(index) if valid: mappings = details["mappings"] - index_settings = filter_ephemeral_index_settings(details["settings"]["index"]) + index_settings = filter_ephemeral_index_settings( + details["settings"]["index"] + ) update_index_setting_parameters(index_settings) results[index] = { "mappings": mappings, - "settings": { - "index": index_settings - } + "settings": {"index": index_settings}, } else: logger.info("Skipping index [%s] (reason: %s).", index, reason) @@ -112,9 +114,11 @@ def extract(client, outdir, index_pattern): with open(outpath, "w") as outfile: json.dump(details, outfile, indent=4, sort_keys=True) outfile.write("\n") - results.append({ - "name": index, - "path": outpath, - "filename": filename, - }) + results.append( + { + "name": index, + "path": outpath, + "filename": filename, + } + ) return results diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 3bcaacc42..e89201bce 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -25,6 +25,7 @@ import logging import os import json +import shutil from opensearchpy import OpenSearchException from jinja2 import Environment, FileSystemLoader, select_autoescape @@ -35,12 +36,13 @@ from osbenchmark.utils import io, opts, console -def process_template(templates_path, template_filename, template_vars, output_path): - env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) - template = env.get_template(template_filename) +def extract_template(templates_path, template_filename): + env = Environment( + loader=FileSystemLoader(templates_path), + autoescape=select_autoescape(["html", "xml"]), + ) + return env.get_template(template_filename) - with open(output_path, "w") as f: - f.write(template.render(template_vars)) def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): if not docs_were_requested: @@ -48,18 +50,21 @@ def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): if len(indices) < len(indices_docs_map): raise exceptions.SystemSetupError( - "Number of : pairs exceeds number of indices in --indices. " + - "Ensure number of : pairs is less than or equal to number of indices in --indices." + "Number of : pairs exceeds number of indices in --indices. " + + "Ensure number of : pairs is less than or equal to number of indices in --indices." ) for index_name in indices_docs_map: if index_name not in indices: raise exceptions.SystemSetupError( - "Index from : pair was not found in --indices. " + - "Ensure that indices from all : pairs exist in --indices." + "Index from : pair was not found in --indices. " + + "Ensure that indices from all : pairs exist in --indices." ) -def extract_mappings_and_corpora(client, output_path, indices_to_extract, indices_docs_map): + +def extract_mappings_and_corpora( + client, output_path, indices_to_extract, indices_docs_map +): indices = [] corpora = [] docs_were_requested = indices_docs_map is not None and len(indices_docs_map) > 0 @@ -72,7 +77,9 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract, indice try: indices += index.extract(client, output_path, index_name) except OpenSearchException: - logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name) + logging.getLogger(__name__).exception( + "Failed to extract index [%s]", index_name + ) # That list only contains valid indices (with index patterns already resolved) # For each index, check if docs were requested. If so, extract the number of docs from the map @@ -87,13 +94,16 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract, indice f"The string [{indices_docs_map.get(i['name'])}] in : pair cannot be converted to an integer." ) - logging.getLogger(__name__).info("Extracting [%s] docs for index [%s]", custom_docs_to_extract, i["name"]) + logging.getLogger(__name__).info( + "Extracting [%s] docs for index [%s]", custom_docs_to_extract, i["name"] + ) c = corpus.extract(client, output_path, i["name"], custom_docs_to_extract) if c: corpora.append(c) return indices, corpora + def process_custom_queries(custom_queries): if not custom_queries: return [] @@ -104,10 +114,13 @@ def process_custom_queries(custom_queries): if isinstance(data, dict): data = [data] except ValueError as err: - raise exceptions.SystemSetupError(f"Ensure JSON schema is valid and queries are contained in a list: {err}") + raise exceptions.SystemSetupError( + f"Ensure JSON schema is valid and queries are contained in a list: {err}" + ) return data + def create_workload(cfg): logger = logging.getLogger(__name__) @@ -123,16 +136,36 @@ def create_workload(cfg): logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices) logger.info("Number of Docs: %s", number_of_docs) - client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], - client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create() + client = OsClientFactory( + hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], + client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT], + ).create() info = client.info() - console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) + console.info( + f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", + logger=logger, + ) + + output_path = os.path.abspath( + os.path.join(io.normalize_path(root_path), workload_name) + ) + + operations_path = os.path.join(output_path, "operations") + test_procedures_path = os.path.join(output_path, "test_procedures") + + try: + shutil.rmtree(output_path) + except OSError: + pass - output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name)) io.ensure_dir(output_path) + io.ensure_dir(operations_path) + io.ensure_dir(test_procedures_path) - indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs) + indices, corpora = extract_mappings_and_corpora( + client, output_path, indices, number_of_docs + ) if len(indices) == 0: raise RuntimeError("Failed to extract any indices for workload!") @@ -141,18 +174,49 @@ def create_workload(cfg): "workload_name": workload_name, "indices": indices, "corpora": corpora, - "custom_queries": custom_queries + "custom_queries": custom_queries, } logger.info("Template Vars: %s", template_vars) workload_path = os.path.join(output_path, "workload.json") + operations_path = os.path.join(operations_path, "default.json") + test_procedures_path = os.path.join(test_procedures_path, "default.json") templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") - if custom_queries: - process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path) - else: - process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path) + render_templates( + workload_path, + templates_path, + template_vars, + custom_queries, + operations_path, + test_procedures_path, + ) console.println("") - console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}") + console.info( + f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}" + ) + + +def render_templates( + workload_path, + templates_path, + template_vars, + custom_queries, + operations_path, + test_procedures_path, +): + def write_template(output_path, template_file): + template = extract_template(templates_path, template_file + ".json.j2") + with open(output_path, "w") as f: + f.write(template.render(template_vars)) + + write_template(workload_path, "base-workload") + + if custom_queries: + write_template(operations_path, "custom-operations") + write_template(test_procedures_path, "custom-test-procedures") + else: + write_template(operations_path, "default-operations") + write_template(test_procedures_path, "default-test-procedures")