Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a command-line argument to specify more latency percentiles at the end of a workload. #441

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,12 @@ def add_workload_source(subparser):
default=False,
help="If any processes is running, it is going to kill them and allow Benchmark to continue to run."
)
test_execution_parser.add_argument(
"--latency-percentiles",
help=f"A comma-separated list of percentiles to report for latency "
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES
)

###############################################################################
#
Expand Down Expand Up @@ -862,6 +868,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
Expand Down
76 changes: 58 additions & 18 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,12 @@ class MetaInfoScope(Enum):


def calculate_results(store, test_execution):
calc = GlobalStatsCalculator(store, test_execution.workload, test_execution.test_procedure)
calc = GlobalStatsCalculator(
store,
test_execution.workload,
test_execution.test_procedure,
latency_percentiles=test_execution.latency_percentiles
)
return calc()


Expand Down Expand Up @@ -1292,12 +1297,13 @@ def create_test_execution(cfg, workload, test_procedure, workload_revision=None)
plugin_params = cfg.opts("builder", "plugin.params")
benchmark_version = version.version()
benchmark_revision = version.revision()
latency_percentiles = cfg.opts("workload", "latency.percentiles", mandatory=False)

return TestExecution(benchmark_version, benchmark_revision,
environment, test_execution_id, test_execution_timestamp,
pipeline, user_tags, workload,
workload_params, test_procedure, provision_config_instance, provision_config_instance_params,
plugin_params, workload_revision)
plugin_params, workload_revision, latency_percentiles=latency_percentiles)


class TestExecution:
Expand All @@ -1307,7 +1313,7 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name,
provision_config_instance_params, plugin_params,
workload_revision=None, provision_config_revision=None,
distribution_version=None, distribution_flavor=None,
revision=None, results=None, meta_data=None):
revision=None, results=None, meta_data=None, latency_percentiles=None):
if results is None:
results = {}
# this happens when the test execution is created initially
Expand All @@ -1317,6 +1323,9 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name,
meta_data.update(workload.meta_data)
if test_procedure:
meta_data.update(test_procedure.meta_data)
if latency_percentiles:
# split comma-separated string into list of floats
latency_percentiles = [float(value) for value in latency_percentiles.split(",")]
self.benchmark_version = benchmark_version
self.benchmark_revision = benchmark_revision
self.environment_name = environment_name
Expand All @@ -1337,6 +1346,8 @@ def __init__(self, benchmark_version, benchmark_revision, environment_name,
self.revision = revision
self.results = results
self.meta_data = meta_data
self.latency_percentiles = latency_percentiles


@property
def workload_name(self):
Expand Down Expand Up @@ -1665,30 +1676,54 @@ def encode_float_key(k):
return str(float(k)).replace(".", "_")


def percentiles_for_sample_size(sample_size):
# if needed we can come up with something smarter but it'll do for now
def filter_percentiles_by_sample_size(sample_size, percentiles):
# Don't show percentiles if there aren't enough samples for the value to be distinct.
# For example, we should only show p99.9, p45.6, or p0.01 if there are at least 1000 values.
# If nothing is suitable, default to just returning [100] rather than an empty list.
if sample_size < 1:
raise AssertionError("Percentiles require at least one sample")
elif sample_size == 1:
return [100]
elif 1 < sample_size < 10:
return [50, 100]
elif 10 <= sample_size < 100:
return [50, 90, 100]
elif 100 <= sample_size < 1000:
return [50, 90, 99, 100]
elif 1000 <= sample_size < 10000:
return [50, 90, 99, 99.9, 100]

filtered_percentiles = []
# Treat the 1 and 2-10 case separately, to return p50 and p100 if present
if sample_size == 1:
filtered_percentiles = [100]
elif sample_size < 10:
for p in [50, 100]:
if p in percentiles:
filtered_percentiles.append(p)
else:
return [50, 90, 99, 99.9, 99.99, 100]
effective_sample_size = 10 ** (int(math.log10(sample_size))) # round down to nearest power of ten
delta = 0.000001 # If (p / 100) * effective_sample_size is within this value of a whole number,
# assume the discrepancy is due to floating point and allow it
for p in percentiles:
fraction = p / 100

# check if fraction * effective_sample_size is close enough to a whole number
if abs((effective_sample_size * fraction) - round(effective_sample_size*fraction)) < delta:
filtered_percentiles.append(p)
# if no percentiles are suitable, just return 100
if len(filtered_percentiles) == 0:
return [100]
return filtered_percentiles

def percentiles_for_sample_size(sample_size, latency_percentiles=None):
# If latency_percentiles is present, as a list, display those values instead (assuming there are enough samples)
percentiles = GlobalStatsCalculator.DEFAULT_LATENCY_PERCENTILES_LIST
if latency_percentiles:
percentiles = latency_percentiles # Defaults get overridden if a value is provided
percentiles.sort()
return filter_percentiles_by_sample_size(sample_size, percentiles)

class GlobalStatsCalculator:
def __init__(self, store, workload, test_procedure):
DEFAULT_LATENCY_PERCENTILES = "50,90,99,99.9,99.99,100"
DEFAULT_LATENCY_PERCENTILES_LIST = [float(value) for value in DEFAULT_LATENCY_PERCENTILES.split(",")]

def __init__(self, store, workload, test_procedure, latency_percentiles=None):
self.store = store
self.logger = logging.getLogger(__name__)
self.workload = workload
self.test_procedure = test_procedure
self.latency_percentiles = latency_percentiles

def __call__(self):
result = GlobalStats()
Expand Down Expand Up @@ -1862,11 +1897,16 @@ def single_latency(self, task, operation_type, metric_name="latency"):
stats = self.store.get_stats(metric_name, task=task, operation_type=operation_type, sample_type=sample_type)
sample_size = stats["count"] if stats else 0
if sample_size > 0:
# The custom latency percentiles have to be supplied here as the workload runs,
# or else they aren't present when results are published
percentiles = self.store.get_percentiles(metric_name,
task=task,
operation_type=operation_type,
sample_type=sample_type,
percentiles=percentiles_for_sample_size(sample_size))
percentiles=percentiles_for_sample_size(
sample_size,
latency_percentiles=self.latency_percentiles
))
mean = self.store.get_mean(metric_name,
task=task,
operation_type=operation_type,
Expand Down
19 changes: 17 additions & 2 deletions osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ def format_as_csv(headers, data):
writer.writerow(metric_record)
return out.getvalue()

def comma_separated_string_to_number_list(string_list):
# Split a comma-separated list in a string to a list of numbers. If they are whole numbers, make them ints,
# so they display without decimals.
# If the input is None, return None.
if string_list is None or len(string_list) == 0:
return None
results = [float(value) for value in string_list.split(",")]
for i, value in enumerate(results):
if round(value) == value:
results[i] = int(value)
return results



class SummaryResultsPublisher:
def __init__(self, results, config):
Expand All @@ -109,6 +122,7 @@ def __init__(self, results, config):
self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime",
mandatory=False, default_value=False))
self.cwd = config.opts("node", "benchmark.cwd")
self.latency_percentiles = comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False))

def publish(self):
print_header(FINAL_SCORE)
Expand Down Expand Up @@ -185,7 +199,7 @@ def _publish_processing_time(self, values, task):
def _publish_percentiles(self, name, task, value):
lines = []
if value:
for percentile in metrics.percentiles_for_sample_size(sys.maxsize):
for percentile in metrics.percentiles_for_sample_size(sys.maxsize, latency_percentiles=self.latency_percentiles):
percentile_value = value.get(metrics.encode_float_key(percentile))
a_line = self._line("%sth percentile %s" % (percentile, name), task, percentile_value, "ms",
force=self.publish_all_percentile_values)
Expand Down Expand Up @@ -324,6 +338,7 @@ def __init__(self, config):
self.cwd = config.opts("node", "benchmark.cwd")
self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime",
mandatory=False, default_value=False))
self.latency_percentiles = comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False))
self.plain = False

def publish(self, r1, r2):
Expand Down Expand Up @@ -421,7 +436,7 @@ def _publish_processing_time(self, baseline_stats, contender_stats, task):

def _publish_percentiles(self, name, task, baseline_values, contender_values):
lines = []
for percentile in metrics.percentiles_for_sample_size(sys.maxsize):
for percentile in metrics.percentiles_for_sample_size(sys.maxsize, latency_percentiles=self.latency_percentiles):
baseline_value = baseline_values.get(metrics.encode_float_key(percentile))
contender_value = contender_values.get(metrics.encode_float_key(percentile))
self._append_non_empty(lines, self._line("%sth percentile %s" % (percentile, name),
Expand Down
43 changes: 43 additions & 0 deletions tests/metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,6 +1787,49 @@ def assert_equal_percentiles(self, name, percentiles, expected_percentiles):
self.assertAlmostEqual(expected_percentiles[percentile], actual_percentile_value, places=1,
msg=str(percentile) + "th percentile differs")

def test_filter_percentiles_by_sample_size(self):
test_percentiles = [
0,
0.0001,
0.001,
0.01,
0.1,
4,
10,
10.01,
45,
46.001,
50,
80.1,
90,
98.9,
98.91,
98.999,
99,
99.9,
99.99,
99.999,
99.9999,
100]
sample_size_to_result_map = {
1: [100],
5: [50, 100],
10: [0, 10, 50, 90, 100],
99: [0, 10, 50, 90, 100],
100: [0, 4, 10, 45, 50, 90, 99, 100],
1000: [0, 0.1, 4, 10, 45, 50, 80.1, 90, 98.9, 99, 99.9, 100],
10000: [0, 0.01, 0.1, 4, 10, 10.01, 45, 50, 80.1, 90, 98.9, 98.91, 99, 99.9, 99.99, 100],
100000: [0, 0.001, 0.01, 0.1, 4, 10, 10.01, 45, 46.001, 50, 80.1, 90, 98.9, 98.91, 98.999, 99, 99.9, 99.99, 99.999, 100],
1000000: [0, 0.0001, 0.001, 0.01, 0.1, 4, 10, 10.01, 45, 46.001, 50,
80.1, 90, 98.9, 98.91, 98.999, 99, 99.9, 99.99, 99.999, 99.9999, 100]
} # 100,000 corresponds to 0.001% which is the order of magnitude we round to,
# so at higher orders (>=1M samples) all values are permitted
for sample_size, expected_results in sample_size_to_result_map.items():
filtered = metrics.filter_percentiles_by_sample_size(sample_size, test_percentiles)
self.assertEqual(len(filtered), len(expected_results))
for res, exp in zip(filtered, expected_results):
self.assertEqual(res, exp)

def test_externalize_and_bulk_add(self):
self.metrics_store.open(InMemoryMetricsStoreTests.TEST_EXECUTION_ID, InMemoryMetricsStoreTests.TEST_EXECUTION_TIMESTAMP,
"test", "append-no-conflicts", "defaults", create=True)
Expand Down
Loading