From 5538546e9f1d73fce58c4834164cc1045811426d Mon Sep 17 00:00:00 2001 From: Ian Hoang <51065478+IanHoang@users.noreply.github.com> Date: Thu, 25 Jan 2024 13:10:36 -0600 Subject: [PATCH 1/2] Create-Workload Improvements: Separate operations and test procedures from workload.json (#446) Signed-off-by: Ian Hoang Co-authored-by: Ian Hoang --- osbenchmark/resources/base-workload.json.j2 | 35 ++------ .../resources/custom-operations.json.j2 | 27 +++++++ .../resources/custom-query-workload.json.j2 | 14 ---- .../resources/custom-test-procedures.json.j2 | 64 +++++++++++++++ .../resources/default-operations.json.j2 | 28 +++++++ .../resources/default-query-workload.json.j2 | 16 ---- .../resources/default-test-procedures.json.j2 | 59 ++++++++++++++ .../workload_generator/workload_generator.py | 79 +++++++++++++++---- 8 files changed, 247 insertions(+), 75 deletions(-) create mode 100644 osbenchmark/resources/custom-operations.json.j2 delete mode 100644 osbenchmark/resources/custom-query-workload.json.j2 create mode 100644 osbenchmark/resources/custom-test-procedures.json.j2 create mode 100644 osbenchmark/resources/default-operations.json.j2 delete mode 100644 osbenchmark/resources/default-query-workload.json.j2 create mode 100644 osbenchmark/resources/default-test-procedures.json.j2 diff --git a/osbenchmark/resources/base-workload.json.j2 b/osbenchmark/resources/base-workload.json.j2 index cfb6ced98..ca0c5668a 100644 --- a/osbenchmark/resources/base-workload.json.j2 +++ b/osbenchmark/resources/base-workload.json.j2 @@ -22,35 +22,10 @@ ] }{% endfor %} ], - "schedule": [ - { - "operation": "delete-index" - },{% raw %} - { - "operation": { - "operation-type": "create-index", - "settings": {{index_settings | default({}) | tojson}} - } - },{% endraw %} - { - "operation": { - "operation-type": "cluster-health", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %} - "request-params": { - "wait_for_status": "{{cluster_health | default('green')}}", - "wait_for_no_relocating_shards": "true" - }, - "retry-until-success": true - } - }, - { - "operation": { - "operation-type": "bulk", - "bulk-size": {{bulk_size | default(5000)}}, - "ingest-percentage": {{ingest_percentage | default(100)}} - }, - "clients": {{bulk_indexing_clients | default(8)}} - },{% endraw -%} - {% block queries %}{% endblock %} + "operations": [ + {% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %} + ], + "test_procedures": [ + {% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %} ] } diff --git a/osbenchmark/resources/custom-operations.json.j2 b/osbenchmark/resources/custom-operations.json.j2 new file mode 100644 index 000000000..16fb38606 --- /dev/null +++ b/osbenchmark/resources/custom-operations.json.j2 @@ -0,0 +1,27 @@ +{ + "name": "index-append", + "operation-type": "bulk", + "bulk-size": {{bulk_size | default(5000)}}, + "ingest-percentage": {{ingest_percentage | default(100)}} +}, +{ + "name": "wait-until-merges-finish", + "operation-type": "index-stats", + "index": "_all", + "condition": { + "path": "_all.total.merges.current", + "expected-value": 0 + }, + "retry-until-success": true, + "include-in-reporting": false +}, +{%- block queries -%} +{% for query in custom_queries %} +{ + "name": "{{query.name}}", + "operation-type": "{{query['operation-type']}}", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "body": {{query.body | replace("'", '"') }} +}{% if not loop.last %},{% endif -%} +{% endfor %} +{%- endblock %} \ No newline at end of file diff --git a/osbenchmark/resources/custom-query-workload.json.j2 b/osbenchmark/resources/custom-query-workload.json.j2 deleted file mode 100644 index 76028db77..000000000 --- a/osbenchmark/resources/custom-query-workload.json.j2 +++ /dev/null @@ -1,14 +0,0 @@ -{% extends "base-workload.json.j2" %} - -{%- block queries -%} - {% for query in custom_queries %} - { - "operation": { - "name": "{{query.name}}", - "operation-type": "{{query['operation-type']}}", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, - "body": {{query.body | replace("'", '"') }} - } - }{% if not loop.last %},{% endif -%} - {% endfor %} -{%- endblock %} diff --git a/osbenchmark/resources/custom-test-procedures.json.j2 b/osbenchmark/resources/custom-test-procedures.json.j2 new file mode 100644 index 000000000..5fc247db1 --- /dev/null +++ b/osbenchmark/resources/custom-test-procedures.json.j2 @@ -0,0 +1,64 @@ +{ + "name": "custom-test-procedures", + "description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark.", + "default": true, + "schedule": [ + { + "operation": "delete-index" + }, + { + "operation": { + "operation-type": "create-index", + {% raw %}"settings": {{ index_settings | default({}) | tojson }} + {% endraw %}} + }, + { + "operation": { + "operation-type": "cluster-health", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "request-params": { + {% raw %}"wait_for_status": "{{ cluster_health | default('green') }}", + {% endraw -%}"wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "index-append", + {% raw -%}"clients": {{ bulk_indexing_clients | default(8) }}, + {% endraw -%} + {% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}" + {% endraw -%}}, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "operation": { + "operation-type": "force-merge", + "request-timeout": 7200{%- if force_merge_max_num_segments is defined %}, + "max-num-segments": {{ force_merge_max_num_segments | tojson }} + {%- endif %} + } + }, + { + "name": "refresh-after-force-merge", + "operation": "refresh" + }, + { + "operation": "wait-until-merges-finish" + }, +{%- block queries -%} + {% for query in custom_queries %} + { + "operation":"{{query.name}}", + {% raw -%} + "warmup-iterations": {{ warmup_iterations | default(50) }}, + "iterations": {{ iterations | default(100) }}, + "target-throughput": {{ target_throughput | default(3) }}, + "clients": {{ search_clients | default(1) }} + {% endraw -%}}{% if not loop.last %},{% endif -%} + {% endfor %} +{%- endblock %} + ] +} diff --git a/osbenchmark/resources/default-operations.json.j2 b/osbenchmark/resources/default-operations.json.j2 new file mode 100644 index 000000000..969a3b913 --- /dev/null +++ b/osbenchmark/resources/default-operations.json.j2 @@ -0,0 +1,28 @@ +{ + "name": "index-append", + "operation-type": "bulk", + "bulk-size": {{bulk_size | default(5000)}}, + "ingest-percentage": {{ingest_percentage | default(100)}} +}, +{ + "name": "wait-until-merges-finish", + "operation-type": "index-stats", + "index": "_all", + "condition": { + "path": "_all.total.merges.current", + "expected-value": 0 + }, + "retry-until-success": true, + "include-in-reporting": false +}, +{ + "name": "match-all", + "operation-type": "search", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "body": { + "size": {{match_all_size | default(10)}}, + "query": { + "match_all": {} + } + } +} \ No newline at end of file diff --git a/osbenchmark/resources/default-query-workload.json.j2 b/osbenchmark/resources/default-query-workload.json.j2 deleted file mode 100644 index 38c296738..000000000 --- a/osbenchmark/resources/default-query-workload.json.j2 +++ /dev/null @@ -1,16 +0,0 @@ -{% extends "base-workload.json.j2" %} - -{% block queries %} - { - "operation": { - "operation-type": "search", - "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, - "body": { - "query": { - "match_all": {} - } - } - },{% raw %} - "clients": {{search_clients | default(8)}}{% endraw %} - } -{%- endblock %} \ No newline at end of file diff --git a/osbenchmark/resources/default-test-procedures.json.j2 b/osbenchmark/resources/default-test-procedures.json.j2 new file mode 100644 index 000000000..b8f4c73da --- /dev/null +++ b/osbenchmark/resources/default-test-procedures.json.j2 @@ -0,0 +1,59 @@ +{ + "name": "default-test-procedure", + "description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark. Workload deletes existing indexes, creates indexes, ingests documents, and runs default match-all query.", + "default": true, + "schedule": [ + { + "operation": "delete-index" + }, + { + "operation": { + "operation-type": "create-index", + {% raw %}"settings": {{ index_settings | default({}) | tojson }} + {% endraw %}} + }, + { + "operation": { + "operation-type": "cluster-health", + "index": {{ indices | map(attribute='name') | list | join(',') | tojson }}, + "request-params": { + {% raw %}"wait_for_status": "{{ cluster_health | default('green') }}", + {% endraw -%}"wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "index-append", + {% raw -%}"clients": {{ bulk_indexing_clients | default(8) }}, + {% endraw -%} + {% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}" + {% endraw -%}}, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "operation": { + "operation-type": "force-merge", + "request-timeout": 7200{%- if force_merge_max_num_segments is defined %}, + "max-num-segments": {{ force_merge_max_num_segments | tojson }} + {%- endif %} + } + }, + { + "name": "refresh-after-force-merge", + "operation": "refresh" + }, + { + "operation": "wait-until-merges-finish" + },{% raw %} + { + "operation": "match-all", + "warmup-iterations": {{ warmup_iterations | default(50) }}, + "iterations": {{ iterations | default(100) }}, + "target-throughput": {{ target_throughput | default(3) }}, + "clients": {{ search_clients | default(1) }} + }{% endraw -%} + ] +} \ No newline at end of file diff --git a/osbenchmark/workload_generator/workload_generator.py b/osbenchmark/workload_generator/workload_generator.py index 3bcaacc42..60163701e 100644 --- a/osbenchmark/workload_generator/workload_generator.py +++ b/osbenchmark/workload_generator/workload_generator.py @@ -24,6 +24,7 @@ import logging import os +import shutil import json from opensearchpy import OpenSearchException @@ -34,14 +35,6 @@ from osbenchmark.workload_generator import corpus, index from osbenchmark.utils import io, opts, console - -def process_template(templates_path, template_filename, template_vars, output_path): - env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) - template = env.get_template(template_filename) - - with open(output_path, "w") as f: - f.write(template.render(template_vars)) - def validate_indices_docs_map(indices, indices_docs_map, docs_were_requested): if not docs_were_requested: return @@ -108,9 +101,37 @@ def process_custom_queries(custom_queries): return data +def write_template(template_vars, templates_path, output_path, template_file): + template = get_template(template_file, templates_path) + with open(output_path, "w") as f: + f.write(template.render(template_vars)) + +def get_template(template_file, templates_path): + template_file_name = template_file + ".json.j2" + + env = Environment(loader=FileSystemLoader(templates_path), autoescape=select_autoescape(['html', 'xml'])) + + return env.get_template(template_file_name) + +def render_templates(workload_path, + operations_path, + test_procedures_path, + templates_path, + template_vars, + custom_queries): + write_template(template_vars, templates_path, workload_path, "base-workload") + + if custom_queries: + write_template(template_vars, templates_path, operations_path, "custom-operations") + write_template(template_vars, templates_path, test_procedures_path, "custom-test-procedures") + else: + write_template(template_vars, templates_path, operations_path, "default-operations") + write_template(template_vars, templates_path, test_procedures_path, "default-test-procedures") + def create_workload(cfg): logger = logging.getLogger(__name__) + # All inputs provided by user workload_name = cfg.opts("workload", "workload.name") indices = cfg.opts("generator", "indices") root_path = cfg.opts("generator", "output.path") @@ -118,25 +139,46 @@ def create_workload(cfg): client_options = cfg.opts("client", "options") number_of_docs = cfg.opts("generator", "number_of_docs") unprocessed_custom_queries = cfg.opts("workload", "custom_queries") + templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") + # Process custom queries custom_queries = process_custom_queries(unprocessed_custom_queries) logger.info("Creating workload [%s] matching indices [%s]", workload_name, indices) logger.info("Number of Docs: %s", number_of_docs) + + # Initialize client factory client = OsClientFactory(hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]).create() - info = client.info() console.info(f"Connected to OpenSearch cluster [{info['name']}] version [{info['version']['number']}].\n", logger=logger) + # Establish output paths directory output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), workload_name)) + + operations_path = os.path.join(output_path, "operations") + test_procedures_path = os.path.join(output_path, "test_procedures") + + if os.path.exists(output_path): + try: + logger.info("Workload already exists. Removing existing workload [%s] in path [%s]", workload_name, output_path) + shutil.rmtree(output_path) + except OSError: + logger.error("Had issues removing existing workload [%s] in path [%s]", workload_name, output_path) + io.ensure_dir(output_path) + io.ensure_dir(operations_path) + io.ensure_dir(test_procedures_path) + # Extract Indices and Corpora + logger.info("Extracting indices and corpora") indices, corpora = extract_mappings_and_corpora(client, output_path, indices, number_of_docs) + logger.info("Finished extracting indices and corpora") if len(indices) == 0: raise RuntimeError("Failed to extract any indices for workload!") + # Collect all itmes into dictionary template_vars = { "workload_name": workload_name, "indices": indices, @@ -147,12 +189,19 @@ def create_workload(cfg): logger.info("Template Vars: %s", template_vars) workload_path = os.path.join(output_path, "workload.json") - templates_path = os.path.join(cfg.opts("node", "benchmark.root"), "resources") - - if custom_queries: - process_template(templates_path, "custom-query-workload.json.j2", template_vars, workload_path) - else: - process_template(templates_path, "default-query-workload.json.j2", template_vars, workload_path) + operations_path = os.path.join(operations_path, "default.json") + test_procedures_path = os.path.join(test_procedures_path, "default.json") + + # Render all templates + logger.info("Rendering templates") + render_templates( + workload_path, + operations_path, + test_procedures_path, + templates_path, + template_vars, + custom_queries + ) console.println("") console.info(f"Workload {workload_name} has been created. Run it with: {PROGRAM_NAME} --workload-path={output_path}") From c74c993a927c1b78bce4adec0c1aa8f373969dc3 Mon Sep 17 00:00:00 2001 From: peteralfonsi Date: Tue, 30 Jan 2024 09:10:25 -0800 Subject: [PATCH 2/2] Adds command line argument to specify throughput percentiles to display (#449) Signed-off-by: Peter Alfonsi Co-authored-by: Peter Alfonsi --- osbenchmark/benchmark.py | 7 +++ osbenchmark/metrics.py | 75 ++++++++++++++++++++++++-------- osbenchmark/results_publisher.py | 14 ++++-- tests/metrics_test.py | 20 +++++---- 4 files changed, 86 insertions(+), 30 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index c4128f9d5..e70d4568c 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -560,6 +560,12 @@ def add_workload_source(subparser): f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).", default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES ) + test_execution_parser.add_argument( + "--throughput-percentiles", + help=f"A comma-separated list of percentiles to report for throughput, in addition to mean/median/max/min " + f"(default: {metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES}).", + default=metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES + ) ############################################################################### # @@ -869,6 +875,7 @@ def dispatch_sub_command(arg_parser, args, cfg): opts.csv_to_list(args.load_worker_coordinator_hosts)) cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) + cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) configure_workload_params(arg_parser, args, cfg) configure_connection_params(arg_parser, args, cfg) configure_telemetry_params(args, cfg) diff --git a/osbenchmark/metrics.py b/osbenchmark/metrics.py index e9a480288..cd143a45b 100644 --- a/osbenchmark/metrics.py +++ b/osbenchmark/metrics.py @@ -336,7 +336,8 @@ def calculate_results(store, test_execution): store, test_execution.workload, test_execution.test_procedure, - latency_percentiles=test_execution.latency_percentiles + latency_percentiles=test_execution.latency_percentiles, + throughput_percentiles=test_execution.throughput_percentiles ) return calc() @@ -1297,13 +1298,19 @@ def create_test_execution(cfg, workload, test_procedure, workload_revision=None) plugin_params = cfg.opts("builder", "plugin.params") benchmark_version = version.version() benchmark_revision = version.revision() - latency_percentiles = cfg.opts("workload", "latency.percentiles", mandatory=False) + latency_percentiles = cfg.opts("workload", "latency.percentiles", mandatory=False, + default_value=GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES) + throughput_percentiles = cfg.opts("workload", "throughput.percentiles", mandatory=False, + default_value=GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES) + # In tests, we don't get the default command-line arg value for percentiles, + # so supply them as defaults here as well return TestExecution(benchmark_version, benchmark_revision, environment, test_execution_id, test_execution_timestamp, pipeline, user_tags, workload, workload_params, test_procedure, provision_config_instance, provision_config_instance_params, - plugin_params, workload_revision, latency_percentiles=latency_percentiles) + plugin_params, workload_revision, latency_percentiles=latency_percentiles, + throughput_percentiles=throughput_percentiles) class TestExecution: @@ -1313,7 +1320,7 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name, provision_config_instance_params, plugin_params, workload_revision=None, provision_config_revision=None, distribution_version=None, distribution_flavor=None, - revision=None, results=None, meta_data=None, latency_percentiles=None): + revision=None, results=None, meta_data=None, latency_percentiles=None, throughput_percentiles=None): if results is None: results = {} # this happens when the test execution is created initially @@ -1326,6 +1333,8 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name, if latency_percentiles: # split comma-separated string into list of floats latency_percentiles = [float(value) for value in latency_percentiles.split(",")] + if throughput_percentiles: + throughput_percentiles = [float(value) for value in throughput_percentiles.split(",")] self.benchmark_version = benchmark_version self.benchmark_revision = benchmark_revision self.environment_name = environment_name @@ -1347,6 +1356,7 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name, self.results = results self.meta_data = meta_data self.latency_percentiles = latency_percentiles + self.throughput_percentiles = throughput_percentiles @property @@ -1684,33 +1694,36 @@ def filter_percentiles_by_sample_size(sample_size, percentiles): raise AssertionError("Percentiles require at least one sample") filtered_percentiles = [] - # Treat the 1 and 2-10 case separately, to return p50 and p100 if present + # Treat the cases below 10 separately, to return p25, 50, 75, 100 if present if sample_size == 1: filtered_percentiles = [100] - elif sample_size < 10: + elif sample_size < 4: for p in [50, 100]: if p in percentiles: filtered_percentiles.append(p) + elif sample_size < 10: + for p in [25, 50, 75, 100]: + if p in percentiles: + filtered_percentiles.append(p) else: effective_sample_size = 10 ** (int(math.log10(sample_size))) # round down to nearest power of ten delta = 0.000001 # If (p / 100) * effective_sample_size is within this value of a whole number, # assume the discrepancy is due to floating point and allow it for p in percentiles: fraction = p / 100 - # check if fraction * effective_sample_size is close enough to a whole number - if abs((effective_sample_size * fraction) - round(effective_sample_size*fraction)) < delta: + if abs((effective_sample_size * fraction) - round(effective_sample_size*fraction)) < delta or p in [25, 75]: filtered_percentiles.append(p) # if no percentiles are suitable, just return 100 if len(filtered_percentiles) == 0: return [100] return filtered_percentiles -def percentiles_for_sample_size(sample_size, latency_percentiles=None): +def percentiles_for_sample_size(sample_size, percentiles_list=None): # If latency_percentiles is present, as a list, display those values instead (assuming there are enough samples) - percentiles = GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES_LIST - if latency_percentiles: - percentiles = latency_percentiles # Defaults get overridden if a value is provided + percentiles = [] + if percentiles_list: + percentiles = percentiles_list # Defaults get overridden if a value is provided percentiles.sort() return filter_percentiles_by_sample_size(sample_size, percentiles) @@ -1718,12 +1731,19 @@ class GlobalStatsCalculator: DEFAULT_LATENCY_PERCENTILES = "50,90,99,99.9,99.99,100" DEFAULT_LATENCY_PERCENTILES_LIST = [float(value) for value in DEFAULT_LATENCY_PERCENTILES.split(",")] - def __init__(self, store, workload, test_procedure, latency_percentiles=None): + DEFAULT_THROUGHPUT_PERCENTILES = "" + DEFAULT_THROUGHPUT_PERCENTILES_LIST = [] + + OTHER_PERCENTILES = [50,90,99,99.9,99.99,100] + # Use these percentiles when the single_latency fn is called for something other than latency + + def __init__(self, store, workload, test_procedure, latency_percentiles=None, throughput_percentiles=None): self.store = store self.logger = logging.getLogger(__name__) self.workload = workload self.test_procedure = test_procedure self.latency_percentiles = latency_percentiles + self.throughput_percentiles = throughput_percentiles def __call__(self): result = GlobalStats() @@ -1739,7 +1759,7 @@ def __call__(self): result.add_op_metrics( t, task.operation.name, - self.summary_stats("throughput", t, op_type), + self.summary_stats("throughput", t, op_type, percentiles_list=self.throughput_percentiles), self.single_latency(t, op_type), self.single_latency(t, op_type, metric_name="service_time"), self.single_latency(t, op_type, metric_name="processing_time"), @@ -1817,13 +1837,15 @@ def sum(self, metric_name): def one(self, metric_name): return self.store.get_one(metric_name) - def summary_stats(self, metric_name, task_name, operation_type): + def summary_stats(self, metric_name, task_name, operation_type, percentiles_list=None): mean = self.store.get_mean(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal) median = self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal) unit = self.store.get_unit(metric_name, task=task_name, operation_type=operation_type) stats = self.store.get_stats(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal) + + result = {} if mean and median and stats: - return { + result = { "min": stats["min"], "mean": mean, "median": median, @@ -1831,7 +1853,7 @@ def summary_stats(self, metric_name, task_name, operation_type): "unit": unit } else: - return { + result = { "min": None, "mean": None, "median": None, @@ -1839,6 +1861,20 @@ def summary_stats(self, metric_name, task_name, operation_type): "unit": unit } + if percentiles_list: # modified from single_latency() + sample_size = stats["count"] + percentiles = self.store.get_percentiles(metric_name, + task=task_name, + operation_type=operation_type, + sample_type=SampleType.Normal, + percentiles=percentiles_for_sample_size( + sample_size, + percentiles_list=percentiles_list)) + for k, v in percentiles.items(): + # safely encode so we don't have any dots in field names + result[encode_float_key(k)] = v + return result + def shard_stats(self, metric_name): values = self.store.get_raw(metric_name, mapper=lambda doc: doc["per-shard"]) unit = self.store.get_unit(metric_name) @@ -1896,6 +1932,9 @@ def single_latency(self, task, operation_type, metric_name="latency"): sample_type = SampleType.Normal stats = self.store.get_stats(metric_name, task=task, operation_type=operation_type, sample_type=sample_type) sample_size = stats["count"] if stats else 0 + percentiles_list = self.OTHER_PERCENTILES + if metric_name == "latency": + percentiles_list = self.latency_percentiles if sample_size > 0: # The custom latency percentiles have to be supplied here as the workload runs, # or else they aren't present when results are published @@ -1905,7 +1944,7 @@ def single_latency(self, task, operation_type, metric_name="latency"): sample_type=sample_type, percentiles=percentiles_for_sample_size( sample_size, - latency_percentiles=self.latency_percentiles + percentiles_list=percentiles_list )) mean = self.store.get_mean(metric_name, task=task, diff --git a/osbenchmark/results_publisher.py b/osbenchmark/results_publisher.py index 2a0690d3a..61999ae7c 100644 --- a/osbenchmark/results_publisher.py +++ b/osbenchmark/results_publisher.py @@ -122,7 +122,10 @@ def __init__(self, results, config): self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime", mandatory=False, default_value=False)) self.cwd = config.opts("node", "benchmark.cwd") - self.latency_percentiles = comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False)) + self.display_percentiles = { + "throughput":comma_separated_string_to_number_list(config.opts("workload", "throughput.percentiles", mandatory=False)), + "latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False)) + } def publish(self): print_header(FINAL_SCORE) @@ -184,7 +187,8 @@ def _publish_throughput(self, values, task): self._line("Min Throughput", task, throughput["min"], unit, lambda v: "%.2f" % v), self._line("Mean Throughput", task, throughput["mean"], unit, lambda v: "%.2f" % v), self._line("Median Throughput", task, throughput["median"], unit, lambda v: "%.2f" % v), - self._line("Max Throughput", task, throughput["max"], unit, lambda v: "%.2f" % v) + self._line("Max Throughput", task, throughput["max"], unit, lambda v: "%.2f" % v), + *self._publish_percentiles("throughput", task, throughput) ) def _publish_latency(self, values, task): @@ -198,8 +202,10 @@ def _publish_processing_time(self, values, task): def _publish_percentiles(self, name, task, value): lines = [] + percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES) + if value: - for percentile in metrics.percentiles_for_sample_size(sys.maxsize, latency_percentiles=self.latency_percentiles): + for percentile in metrics.percentiles_for_sample_size(sys.maxsize, percentiles_list=percentiles): percentile_value = value.get(metrics.encode_float_key(percentile)) a_line = self._line("%sth percentile %s" % (percentile, name), task, percentile_value, "ms", force=self.publish_all_percentile_values) @@ -436,7 +442,7 @@ def _publish_processing_time(self, baseline_stats, contender_stats, task): def _publish_percentiles(self, name, task, baseline_values, contender_values): lines = [] - for percentile in metrics.percentiles_for_sample_size(sys.maxsize, latency_percentiles=self.latency_percentiles): + for percentile in metrics.percentiles_for_sample_size(sys.maxsize, percentiles_list=self.latency_percentiles): baseline_value = baseline_values.get(metrics.encode_float_key(percentile)) contender_value = contender_values.get(metrics.encode_float_key(percentile)) self._append_non_empty(lines, self._line("%sth percentile %s" % (percentile, name), diff --git a/tests/metrics_test.py b/tests/metrics_test.py index dfc0ff08d..f4415b120 100644 --- a/tests/metrics_test.py +++ b/tests/metrics_test.py @@ -1797,9 +1797,11 @@ def test_filter_percentiles_by_sample_size(self): 4, 10, 10.01, + 25, 45, 46.001, 50, + 75, 80.1, 90, 98.9, @@ -1813,14 +1815,16 @@ def test_filter_percentiles_by_sample_size(self): 100] sample_size_to_result_map = { 1: [100], - 5: [50, 100], - 10: [0, 10, 50, 90, 100], - 99: [0, 10, 50, 90, 100], - 100: [0, 4, 10, 45, 50, 90, 99, 100], - 1000: [0, 0.1, 4, 10, 45, 50, 80.1, 90, 98.9, 99, 99.9, 100], - 10000: [0, 0.01, 0.1, 4, 10, 10.01, 45, 50, 80.1, 90, 98.9, 98.91, 99, 99.9, 99.99, 100], - 100000: [0, 0.001, 0.01, 0.1, 4, 10, 10.01, 45, 46.001, 50, 80.1, 90, 98.9, 98.91, 98.999, 99, 99.9, 99.99, 99.999, 100], - 1000000: [0, 0.0001, 0.001, 0.01, 0.1, 4, 10, 10.01, 45, 46.001, 50, + 2: [50, 100], + 4: [25, 50, 75, 100], + 10: [0, 10, 25, 50, 75, 90, 100], + 99: [0, 10, 25, 50, 75, 90, 100], + 100: [0, 4, 10, 25, 45, 50, 75, 90, 99, 100], + 1000: [0, 0.1, 4, 10, 25, 45, 50, 75, 80.1, 90, 98.9, 99, 99.9, 100], + 10000: [0, 0.01, 0.1, 4, 10, 10.01, 25, 45, 50, 75, 80.1, 90, 98.9, 98.91, 99, 99.9, 99.99, 100], + 100000: [0, 0.001, 0.01, 0.1, 4, 10, 10.01, 25, 45, 46.001, 50, 75, + 80.1, 90, 98.9, 98.91, 98.999, 99, 99.9, 99.99, 99.999, 100], + 1000000: [0, 0.0001, 0.001, 0.01, 0.1, 4, 10, 10.01, 25, 45, 46.001, 50, 75, 80.1, 90, 98.9, 98.91, 98.999, 99, 99.9, 99.99, 99.999, 99.9999, 100] } # 100,000 corresponds to 0.001% which is the order of magnitude we round to, # so at higher orders (>=1M samples) all values are permitted