diff --git a/osbenchmark/resources/base-workload.json.j2 b/osbenchmark/resources/base-workload.json.j2 index cfb6ced98..ca0c5668a 100644 --- a/osbenchmark/resources/base-workload.json.j2 +++ b/osbenchmark/resources/base-workload.json.j2 @@ -22,35 +22,10 @@ ] }{% endfor %} ], - "schedule": [ - { - "operation": "delete-index" - },{% raw %} - { - "operation": { - "operation-type": "create-index", - "settings": {{index_settings | default({}) | tojson}} - } - },{% endraw %} - { - "operation": { - "operation-type": "cluster-health", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %} - "request-params": { - "wait_for_status": "{{cluster_health | default('green')}}", - "wait_for_no_relocating_shards": "true" - }, - "retry-until-success": true - } - }, - { - "operation": { - "operation-type": "bulk", - "bulk-size": {{bulk_size | default(5000)}}, - "ingest-percentage": {{ingest_percentage | default(100)}} - }, - "clients": {{bulk_indexing_clients | default(8)}} - },{% endraw -%} - {% block queries %}{% endblock %} + "operations": [ + {% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %} + ], + "test_procedures": [ + {% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %} ] } diff --git a/osbenchmark/resources/custom-operations.json.j2 b/osbenchmark/resources/custom-operations.json.j2 new file mode 100644 index 000000000..16fb38606 --- /dev/null +++ b/osbenchmark/resources/custom-operations.json.j2 @@ -0,0 +1,27 @@ +{ + "name": "index-append", + "operation-type": "bulk", + "bulk-size": {{bulk_size | default(5000)}}, + "ingest-percentage": {{ingest_percentage | default(100)}} +}, +{ + "name": "wait-until-merges-finish", + "operation-type": "index-stats", + "index": "_all", + "condition": { + "path": "_all.total.merges.current", + "expected-value": 0 + }, + "retry-until-success": true, + "include-in-reporting": false +}, +{%- block queries -%} +{% for query in custom_queries %} +{ + "name": "{{query.name}}", + "operation-type": "{{query['operation-type']}}", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "body": {{query.body | replace("'", '"') }} +}{% if not loop.last %},{% endif -%} +{% endfor %} +{%- endblock %} \ No newline at end of file diff --git a/osbenchmark/resources/custom-query-workload.json.j2 b/osbenchmark/resources/custom-query-workload.json.j2 deleted file mode 100644 index 76028db77..000000000 --- a/osbenchmark/resources/custom-query-workload.json.j2 +++ /dev/null @@ -1,14 +0,0 @@ -{% extends "base-workload.json.j2" %} - -{%- block queries -%} - {% for query in custom_queries %} - { - "operation": { - "name": "{{query.name}}", - "operation-type": "{{query['operation-type']}}", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, - "body": {{query.body | replace("'", '"') }} - } - }{% if not loop.last %},{% endif -%} - {% endfor %} -{%- endblock %} diff --git a/osbenchmark/resources/custom-test-procedures.json.j2 b/osbenchmark/resources/custom-test-procedures.json.j2 new file mode 100644 index 000000000..5fc247db1 --- /dev/null +++ b/osbenchmark/resources/custom-test-procedures.json.j2 @@ -0,0 +1,64 @@ +{ + "name": "custom-test-procedures", + "description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark.", + "default": true, + "schedule": [ + { + "operation": "delete-index" + }, + { + "operation": { + "operation-type": "create-index", + {% raw %}"settings": {{ index_settings | default({}) | tojson }} + {% endraw %}} + }, + { + "operation": { + "operation-type": "cluster-health", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "request-params": { + {% raw %}"wait_for_status": "{{ cluster_health | default('green') }}", + {% endraw -%}"wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "index-append", + {% raw -%}"clients": {{ bulk_indexing_clients | default(8) }}, + {% endraw -%} + {% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}" + {% endraw -%}}, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "operation": { + "operation-type": "force-merge", + "request-timeout": 7200{%- if force_merge_max_num_segments is defined %}, + "max-num-segments": {{ force_merge_max_num_segments | tojson }} + {%- endif %} + } + }, + { + "name": "refresh-after-force-merge", + "operation": "refresh" + }, + { + "operation": "wait-until-merges-finish" + }, +{%- block queries -%} + {% for query in custom_queries %} + { + "operation":"{{query.name}}", + {% raw -%} + "warmup-iterations": {{ warmup_iterations | default(50) }}, + "iterations": {{ iterations | default(100) }}, + "target-throughput": {{ target_throughput | default(3) }}, + "clients": {{ search_clients | default(1) }} + {% endraw -%}}{% if not loop.last %},{% endif -%} + {% endfor %} +{%- endblock %} + ] +} diff --git a/osbenchmark/resources/default-operations.json.j2 b/osbenchmark/resources/default-operations.json.j2 new file mode 100644 index 000000000..969a3b913 --- /dev/null +++ b/osbenchmark/resources/default-operations.json.j2 @@ -0,0 +1,28 @@ +{ + "name": "index-append", + "operation-type": "bulk", + "bulk-size": {{bulk_size | default(5000)}}, + "ingest-percentage": {{ingest_percentage | default(100)}} +}, +{ + "name": "wait-until-merges-finish", + "operation-type": "index-stats", + "index": "_all", + "condition": { + "path": "_all.total.merges.current", + "expected-value": 0 + }, + "retry-until-success": true, + "include-in-reporting": false +}, +{ + "name": "match-all", + "operation-type": "search", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "body": { + "size": {{match_all_size | default(10)}}, + "query": { + "match_all": {} + } + } +} \ No newline at end of file diff --git a/osbenchmark/resources/default-query-workload.json.j2 b/osbenchmark/resources/default-query-workload.json.j2 deleted file mode 100644 index 38c296738..000000000 --- a/osbenchmark/resources/default-query-workload.json.j2 +++ /dev/null @@ -1,16 +0,0 @@ -{% extends "base-workload.json.j2" %} - -{% block queries %} - { - "operation": { - "operation-type": "search", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, - "body": { - "query": { - "match_all": {} - } - } - },{% raw %} - "clients": {{search_clients | default(8)}}{% endraw %} - } -{%- endblock %} \ No newline at end of file diff --git a/osbenchmark/resources/default-test-procedures.json.j2 b/osbenchmark/resources/default-test-procedures.json.j2 new file mode 100644 index 000000000..b8f4c73da --- /dev/null +++ b/osbenchmark/resources/default-test-procedures.json.j2 @@ -0,0 +1,59 @@ +{ + "name": "default-test-procedure", + "description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark. Workload deletes existing indexes, creates indexes, ingests documents, and runs default match-all query.", + "default": true, + "schedule": [ + { + "operation": "delete-index" + }, + { + "operation": { + "operation-type": "create-index", + {% raw %}"settings": {{ index_settings | default({}) | tojson }} + {% endraw %}} + }, + { + "operation": { + "operation-type": "cluster-health", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "request-params": { + {% raw %}"wait_for_status": "{{ cluster_health | default('green') }}", + {% endraw -%}"wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "index-append", + {% raw -%}"clients": {{ bulk_indexing_clients | default(8) }}, + {% endraw -%} + {% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}" + {% endraw -%}}, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "operation": { + "operation-type": "force-merge", + "request-timeout": 7200{%- if force_merge_max_num_segments is defined %}, + "max-num-segments": {{ force_merge_max_num_segments | tojson }} + {%- endif %} + } + }, + { + "name": "refresh-after-force-merge", + "operation": "refresh" + }, + { + "operation": "wait-until-merges-finish" + },{% raw %} + { + "operation": "match-all", + "warmup-iterations": {{ warmup_iterations | default(50) }}, + "iterations": {{ iterations | default(100) }}, + "target-throughput": {{ target_throughput | default(3) }}, + "clients": {{ search_clients | default(1) }} + }{% endraw -%} + ] +} \ No newline at end of file diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 3bcaacc42..60163701e 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -24,6 +24,7 @@ import logging import os +import shutil import json from opensearchpy import OpenSearchException @@ -34,14 +35,6 @@ from osbenchmark.workload_generator import corpus, index from osbenchmark.utils import io, opts, console - -def process_template(templates_path, template_filename, template_vars, output_path): - env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) - template = env.get_template(template_filename) - - with open(output_path, "w") as f: - f.write(template.render(template_vars)) - def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): if not docs_were_requested: return @@ -108,9 +101,37 @@ def process_custom_queries(custom_queries): return data +def write_template(template_vars, templates_path, output_path, template_file): + template = get_template(template_file, templates_path) + with open(output_path, "w") as f: + f.write(template.render(template_vars)) + +def get_template(template_file, templates_path): + template_file_name = template_file + ".json.j2" + + env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) + + return env.get_template(template_file_name) + +def render_templates(workload_path, + operations_path, + test_procedures_path, + templates_path, + template_vars, + custom_queries): + write_template(template_vars, templates_path, workload_path, "base-workload") + + if custom_queries: + write_template(template_vars, templates_path, operations_path, "custom-operations") + write_template(template_vars, templates_path, test_procedures_path, "custom-test-procedures") + else: + write_template(template_vars, templates_path, operations_path, "default-operations") + write_template(template_vars, templates_path, test_procedures_path, "default-test-procedures") + def create_workload(cfg): logger = logging.getLogger(__name__) + # All inputs provided by user workload_name = cfg.opts("workload", "workload.name") indices = cfg.opts("generator", "indices") root_path = cfg.opts("generator", "output.path") @@ -118,25 +139,46 @@ def create_workload(cfg): client_options = cfg.opts("client", "options") number_of_docs = cfg.opts("generator", "number_of_docs") unprocessed_custom_queries = cfg.opts("workload", "custom_queries") + templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") + # Process custom queries custom_queries = process_custom_queries(unprocessed_custom_queries) logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices) logger.info("Number of Docs: %s", number_of_docs) + + # Initialize client factory client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create() - info = client.info() console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) + # Establish output paths directory output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name)) + + operations_path = os.path.join(output_path, "operations") + test_procedures_path = os.path.join(output_path, "test_procedures") + + if os.path.exists(output_path): + try: + logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", workload_name, output_path) + shutil.rmtree(output_path) + except OSError: + logger.error("Had issues removing existing workload [%s] in path [%s]", workload_name, output_path) + io.ensure_dir(output_path) + io.ensure_dir(operations_path) + io.ensure_dir(test_procedures_path) + # Extract Indices and Corpora + logger.info("Extracting indices and corpora") indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs) + logger.info("Finished extracting indices and corpora") if len(indices) == 0: raise RuntimeError("Failed to extract any indices for workload!") + # Collect all itmes into dictionary template_vars = { "workload_name": workload_name, "indices": indices, @@ -147,12 +189,19 @@ def create_workload(cfg): logger.info("Template Vars: %s", template_vars) workload_path = os.path.join(output_path, "workload.json") - templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") - - if custom_queries: - process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path) - else: - process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path) + operations_path = os.path.join(operations_path, "default.json") + test_procedures_path = os.path.join(test_procedures_path, "default.json") + + # Render all templates + logger.info("Rendering templates") + render_templates( + workload_path, + operations_path, + test_procedures_path, + templates_path, + template_vars, + custom_queries + ) console.println("") console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}")