Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into client_time/final
Browse files Browse the repository at this point in the history
  • Loading branch information
saimedhi authored Jan 31, 2024
2 parents 777d2b2 + c74c993 commit d02da12
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 105 deletions.
7 changes: 7 additions & 0 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ def add_workload_source(subparser):
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES
)
test_execution_parser.add_argument(
"--throughput-percentiles",
help=f"A comma-separated list of percentiles to report for throughput, in addition to mean/median/max/min "
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES
)

###############################################################################
#
Expand Down Expand Up @@ -869,6 +875,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
Expand Down
75 changes: 57 additions & 18 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ def calculate_results(store, test_execution):
store,
test_execution.workload,
test_execution.test_procedure,
latency_percentiles=test_execution.latency_percentiles
latency_percentiles=test_execution.latency_percentiles,
throughput_percentiles=test_execution.throughput_percentiles
)
return calc()

Expand Down Expand Up @@ -1297,13 +1298,19 @@ def create_test_execution(cfg, workload, test_procedure, workload_revision=None)
plugin_params = cfg.opts("builder", "plugin.params")
benchmark_version = version.version()
benchmark_revision = version.revision()
latency_percentiles = cfg.opts("workload", "latency.percentiles", mandatory=False)
latency_percentiles = cfg.opts("workload", "latency.percentiles", mandatory=False,
default_value=GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES)
throughput_percentiles = cfg.opts("workload", "throughput.percentiles", mandatory=False,
default_value=GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES)
# In tests, we don't get the default command-line arg value for percentiles,
# so supply them as defaults here as well

return TestExecution(benchmark_version, benchmark_revision,
environment, test_execution_id, test_execution_timestamp,
pipeline, user_tags, workload,
workload_params, test_procedure, provision_config_instance, provision_config_instance_params,
plugin_params, workload_revision, latency_percentiles=latency_percentiles)
plugin_params, workload_revision, latency_percentiles=latency_percentiles,
throughput_percentiles=throughput_percentiles)


class TestExecution:
Expand All @@ -1313,7 +1320,7 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name,
provision_config_instance_params, plugin_params,
workload_revision=None, provision_config_revision=None,
distribution_version=None, distribution_flavor=None,
revision=None, results=None, meta_data=None, latency_percentiles=None):
revision=None, results=None, meta_data=None, latency_percentiles=None, throughput_percentiles=None):
if results is None:
results = {}
# this happens when the test execution is created initially
Expand All @@ -1326,6 +1333,8 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name,
if latency_percentiles:
# split comma-separated string into list of floats
latency_percentiles = [float(value) for value in latency_percentiles.split(",")]
if throughput_percentiles:
throughput_percentiles = [float(value) for value in throughput_percentiles.split(",")]
self.benchmark_version = benchmark_version
self.benchmark_revision = benchmark_revision
self.environment_name = environment_name
Expand All @@ -1347,6 +1356,7 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name,
self.results = results
self.meta_data = meta_data
self.latency_percentiles = latency_percentiles
self.throughput_percentiles = throughput_percentiles


@property
Expand Down Expand Up @@ -1684,46 +1694,56 @@ def filter_percentiles_by_sample_size(sample_size, percentiles):
raise AssertionError("Percentiles require at least one sample")

filtered_percentiles = []
# Treat the 1 and 2-10 case separately, to return p50 and p100 if present
# Treat the cases below 10 separately, to return p25, 50, 75, 100 if present
if sample_size == 1:
filtered_percentiles = [100]
elif sample_size < 10:
elif sample_size < 4:
for p in [50, 100]:
if p in percentiles:
filtered_percentiles.append(p)
elif sample_size < 10:
for p in [25, 50, 75, 100]:
if p in percentiles:
filtered_percentiles.append(p)
else:
effective_sample_size = 10 ** (int(math.log10(sample_size))) # round down to nearest power of ten
delta = 0.000001 # If (p / 100) * effective_sample_size is within this value of a whole number,
# assume the discrepancy is due to floating point and allow it
for p in percentiles:
fraction = p / 100

# check if fraction * effective_sample_size is close enough to a whole number
if abs((effective_sample_size * fraction) - round(effective_sample_size*fraction)) < delta:
if abs((effective_sample_size * fraction) - round(effective_sample_size*fraction)) < delta or p in [25, 75]:
filtered_percentiles.append(p)
# if no percentiles are suitable, just return 100
if len(filtered_percentiles) == 0:
return [100]
return filtered_percentiles

def percentiles_for_sample_size(sample_size, latency_percentiles=None):
def percentiles_for_sample_size(sample_size, percentiles_list=None):
# If latency_percentiles is present, as a list, display those values instead (assuming there are enough samples)
percentiles = GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES_LIST
if latency_percentiles:
percentiles = latency_percentiles # Defaults get overridden if a value is provided
percentiles = []
if percentiles_list:
percentiles = percentiles_list # Defaults get overridden if a value is provided
percentiles.sort()
return filter_percentiles_by_sample_size(sample_size, percentiles)

class GlobalStatsCalculator:
DEFAULT_LATENCY_PERCENTILES = "50,90,99,99.9,99.99,100"
DEFAULT_LATENCY_PERCENTILES_LIST = [float(value) for value in DEFAULT_LATENCY_PERCENTILES.split(",")]

def __init__(self, store, workload, test_procedure, latency_percentiles=None):
DEFAULT_THROUGHPUT_PERCENTILES = ""
DEFAULT_THROUGHPUT_PERCENTILES_LIST = []

OTHER_PERCENTILES = [50,90,99,99.9,99.99,100]
# Use these percentiles when the single_latency fn is called for something other than latency

def __init__(self, store, workload, test_procedure, latency_percentiles=None, throughput_percentiles=None):
self.store = store
self.logger = logging.getLogger(__name__)
self.workload = workload
self.test_procedure = test_procedure
self.latency_percentiles = latency_percentiles
self.throughput_percentiles = throughput_percentiles

def __call__(self):
result = GlobalStats()
Expand All @@ -1739,7 +1759,7 @@ def __call__(self):
result.add_op_metrics(
t,
task.operation.name,
self.summary_stats("throughput", t, op_type),
self.summary_stats("throughput", t, op_type, percentiles_list=self.throughput_percentiles),
self.single_latency(t, op_type),
self.single_latency(t, op_type, metric_name="service_time"),
self.single_latency(t, op_type, metric_name="client_processing_time"),
Expand Down Expand Up @@ -1818,28 +1838,44 @@ def sum(self, metric_name):
def one(self, metric_name):
return self.store.get_one(metric_name)

def summary_stats(self, metric_name, task_name, operation_type):
def summary_stats(self, metric_name, task_name, operation_type, percentiles_list=None):
mean = self.store.get_mean(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
median = self.store.get_median(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)
unit = self.store.get_unit(metric_name, task=task_name, operation_type=operation_type)
stats = self.store.get_stats(metric_name, task=task_name, operation_type=operation_type, sample_type=SampleType.Normal)

result = {}
if mean and median and stats:
return {
result = {
"min": stats["min"],
"mean": mean,
"median": median,
"max": stats["max"],
"unit": unit
}
else:
return {
result = {
"min": None,
"mean": None,
"median": None,
"max": None,
"unit": unit
}

if percentiles_list: # modified from single_latency()
sample_size = stats["count"]
percentiles = self.store.get_percentiles(metric_name,
task=task_name,
operation_type=operation_type,
sample_type=SampleType.Normal,
percentiles=percentiles_for_sample_size(
sample_size,
percentiles_list=percentiles_list))
for k, v in percentiles.items():
# safely encode so we don't have any dots in field names
result[encode_float_key(k)] = v
return result

def shard_stats(self, metric_name):
values = self.store.get_raw(metric_name, mapper=lambda doc: doc["per-shard"])
unit = self.store.get_unit(metric_name)
Expand Down Expand Up @@ -1897,6 +1933,9 @@ def single_latency(self, task, operation_type, metric_name="latency"):
sample_type = SampleType.Normal
stats = self.store.get_stats(metric_name, task=task, operation_type=operation_type, sample_type=sample_type)
sample_size = stats["count"] if stats else 0
percentiles_list = self.OTHER_PERCENTILES
if metric_name == "latency":
percentiles_list = self.latency_percentiles
if sample_size > 0:
# The custom latency percentiles have to be supplied here as the workload runs,
# or else they aren't present when results are published
Expand All @@ -1906,7 +1945,7 @@ def single_latency(self, task, operation_type, metric_name="latency"):
sample_type=sample_type,
percentiles=percentiles_for_sample_size(
sample_size,
latency_percentiles=self.latency_percentiles
percentiles_list=percentiles_list
))
mean = self.store.get_mean(metric_name,
task=task,
Expand Down
35 changes: 5 additions & 30 deletions osbenchmark/resources/base-workload.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,10 @@
]
}{% endfor %}
],
"schedule": [
{
"operation": "delete-index"
},{% raw %}
{
"operation": {
"operation-type": "create-index",
"settings": {{index_settings | default({}) | tojson}}
}
},{% endraw %}
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},{% raw %}
"request-params": {
"wait_for_status": "{{cluster_health | default('green')}}",
"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": {
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
"clients": {{bulk_indexing_clients | default(8)}}
},{% endraw -%}
{% block queries %}{% endblock %}
"operations": [
{% raw %}{{ benchmark.collect(parts="operations/*.json") }}{% endraw %}
],
"test_procedures": [
{% raw %}{{ benchmark.collect(parts="test_procedures/*.json") }}{% endraw %}
]
}
27 changes: 27 additions & 0 deletions osbenchmark/resources/custom-operations.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "index-append",
"operation-type": "bulk",
"bulk-size": {{bulk_size | default(5000)}},
"ingest-percentage": {{ingest_percentage | default(100)}}
},
{
"name": "wait-until-merges-finish",
"operation-type": "index-stats",
"index": "_all",
"condition": {
"path": "_all.total.merges.current",
"expected-value": 0
},
"retry-until-success": true,
"include-in-reporting": false
},
{%- block queries -%}
{% for query in custom_queries %}
{
"name": "{{query.name}}",
"operation-type": "{{query['operation-type']}}",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"body": {{query.body | replace("'", '"') }}
}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
14 changes: 0 additions & 14 deletions osbenchmark/resources/custom-query-workload.json.j2

This file was deleted.

64 changes: 64 additions & 0 deletions osbenchmark/resources/custom-test-procedures.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"name": "custom-test-procedures",
"description": "Customized test procedure with custom operations generated by create-workload feature in OpenSearch Benchmark.",
"default": true,
"schedule": [
{
"operation": "delete-index"
},
{
"operation": {
"operation-type": "create-index",
{% raw %}"settings": {{ index_settings | default({}) | tojson }}
{% endraw %}}
},
{
"operation": {
"operation-type": "cluster-health",
"index": {{ indices | map(attribute='name') | list | join(',') | tojson }},
"request-params": {
{% raw %}"wait_for_status": "{{ cluster_health | default('green') }}",
{% endraw -%}"wait_for_no_relocating_shards": "true"
},
"retry-until-success": true
}
},
{
"operation": "index-append",
{% raw -%}"clients": {{ bulk_indexing_clients | default(8) }},
{% endraw -%}
{% raw -%}"ignore-response-error-level": "{{ error_level | default('non-fatal') }}"
{% endraw -%}},
{
"name": "refresh-after-index",
"operation": "refresh"
},
{
"operation": {
"operation-type": "force-merge",
"request-timeout": 7200{%- if force_merge_max_num_segments is defined %},
"max-num-segments": {{ force_merge_max_num_segments | tojson }}
{%- endif %}
}
},
{
"name": "refresh-after-force-merge",
"operation": "refresh"
},
{
"operation": "wait-until-merges-finish"
},
{%- block queries -%}
{% for query in custom_queries %}
{
"operation":"{{query.name}}",
{% raw -%}
"warmup-iterations": {{ warmup_iterations | default(50) }},
"iterations": {{ iterations | default(100) }},
"target-throughput": {{ target_throughput | default(3) }},
"clients": {{ search_clients | default(1) }}
{% endraw -%}}{% if not loop.last %},{% endif -%}
{% endfor %}
{%- endblock %}
]
}
Loading

0 comments on commit d02da12

Please sign in to comment.