Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds controllable randomization to range queries in workloads #455

Merged
merged 7 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,22 @@ def add_workload_source(subparser):
f"(default: {metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES}).",
default=metrics.GlobalStatsCalculator.DEFAULT_THROUGHPUT_PERCENTILES
)
test_execution_parser.add_argument(
"--randomization-enabled",
help="Runs the given workload with query randomization enabled (default: false).",
default=False,
action="store_true")
test_execution_parser.add_argument(
"--randomization-repeat-frequency",
"-rf",
help=f"The repeat_frequency for query randomization. Ignored if randomization is off"
f"(default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF)
test_execution_parser.add_argument(
"--randomization-n",
help=f"The number of standard values to generate for each field for query randomization."
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_N}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_N)

###############################################################################
#
Expand Down Expand Up @@ -876,6 +892,9 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
Expand Down
210 changes: 198 additions & 12 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import json
import logging
import os
import random
import re
import sys
import tempfile
Expand All @@ -48,7 +49,7 @@ class WorkloadSyntaxError(exceptions.InvalidSyntax):


class WorkloadProcessor:
def on_after_load_workload(self, workload):
def on_after_load_workload(self, input_workload, **kwargs):
"""
This method is called by Benchmark after a workload has been loaded. Implementations are expected to modify the
provided workload object in place.
Expand All @@ -74,7 +75,7 @@ def on_prepare_workload(self, workload, data_root_dir):

class WorkloadProcessorRegistry:
def __init__(self, cfg):
self.required_processors = [TaskFilterWorkloadProcessor(cfg), TestModeWorkloadProcessor(cfg)]
self.required_processors = [TaskFilterWorkloadProcessor(cfg), TestModeWorkloadProcessor(cfg), QueryRandomizerWorkloadProcessor(cfg)]
self.workload_processors = []
self.offline = cfg.opts("system", "offline.mode")
self.test_mode = cfg.opts("workload", "test.mode.enabled", mandatory=False, default_value=False)
Expand Down Expand Up @@ -824,11 +825,11 @@ def _filter_out_match(self, task):
return self.exclude
return not self.exclude

def on_after_load_workload(self, workload):
def on_after_load_workload(self, input_workload, **kwargs):
if not self.filters:
return workload
return input_workload

for test_procedure in workload.test_procedures:
for test_procedure in input_workload.test_procedures:
# don't modify the schedule while iterating over it
tasks_to_remove = []
for task in test_procedure.schedule:
Expand All @@ -847,19 +848,19 @@ def on_after_load_workload(self, workload):
self.logger.info("Removing task [%s] from test_procedure [%s] due to task filter.", task, test_procedure)
test_procedure.remove_task(task)

return workload
return input_workload


class TestModeWorkloadProcessor(WorkloadProcessor):
def __init__(self, cfg):
self.test_mode_enabled = cfg.opts("workload", "test.mode.enabled", mandatory=False, default_value=False)
self.logger = logging.getLogger(__name__)

def on_after_load_workload(self, workload):
def on_after_load_workload(self, input_workload, **kwargs):
if not self.test_mode_enabled:
return workload
self.logger.info("Preparing workload [%s] for test mode.", str(workload))
for corpus in workload.corpora:
return input_workload
self.logger.info("Preparing workload [%s] for test mode.", str(input_workload))
for corpus in input_workload.corpora:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Reducing corpus size to 1000 documents for [%s]", corpus.name)
for document_set in corpus.documents:
Expand All @@ -884,7 +885,7 @@ def on_after_load_workload(self, workload):
document_set.compressed_size_in_bytes = None
document_set.uncompressed_size_in_bytes = None

for test_procedure in workload.test_procedures:
for test_procedure in input_workload.test_procedures:
for task in test_procedure.schedule:
# we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements
for leaf_task in task:
Expand Down Expand Up @@ -918,8 +919,189 @@ def on_after_load_workload(self, workload):
leaf_task.params.pop("target-interval", None)
leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}"

return workload
return input_workload

class QueryRandomizerWorkloadProcessor(WorkloadProcessor):
DEFAULT_RF = 0.3
DEFAULT_N = 5000
def __init__(self, cfg):
self.randomization_enabled = cfg.opts("workload", "randomization.enabled", mandatory=False, default_value=False)
self.rf = float(cfg.opts("workload", "randomization.repeat_frequency", mandatory=False, default_value=self.DEFAULT_RF))
self.logger = logging.getLogger(__name__)
self.N = int(cfg.opts("workload", "randomization.n", mandatory=False, default_value=self.DEFAULT_N))
self.zipf_alpha = 1
self.H_list = self.precompute_H(self.N, self.zipf_alpha)

# Helper functions for computing Zipf distribution
def H(self, i, H_list):
# compute the harmonic number H_n,m = sum over i from 1 to n of (1 / i^m)
return H_list[i-1]

def precompute_H(self, n, m):
H_list = [1]
for j in range(2, n+1):
H_list.append(H_list[-1] + 1 / (j ** m))
return H_list

def zipf_cdf_inverse(self, u, H_list):
# To map a uniformly distributed u from [0, 1] to some probability distribution we plug it into its inverse CDF.
# as the zipf cdf is discontinuous there is no real inverse but we can use this solution:
# https://math.stackexchange.com/questions/53671/how-to-calculate-the-inverse-cdf-for-the-zipf-distribution
# Precompute all values H_i,alpha for a fixed alpha and pass in as H_list
if (u < 0 or u >= 1):
raise exceptions.ExecutorError(
"Input u must have 0 <= u < 1. This error shouldn't appear, please raise an issue if it does")
n = len(H_list)
candidate_return = 1
denominator = self.H(n, H_list)
numerator = 0
while candidate_return < n:
numerator = self.H(candidate_return, H_list)
if u < numerator / denominator:
return candidate_return
candidate_return += 1
return n

def get_dict_from_previous_path(self, root, current_path):
curr = root
for value in current_path:
curr = curr[value]
return curr

def extract_fields_helper(self, root, current_path):
# Recursively called to find the location of ranges in an OpenSearch range query.
# Return the field and the current path if we're currently scanning the field name in a range query, otherwise return an empty list.
fields = [] # pairs of (field, path_to_field)
curr = self.get_dict_from_previous_path(root, current_path)
if isinstance(curr, dict) and curr != {}:
if len(current_path) > 0 and current_path[-1] == "range":
for key in curr.keys():
if isinstance(curr, dict):
if ("gte" in curr[key] or "gt" in curr[key]) and ("lte" in curr[key] or "lt" in curr[key]):
fields.append((key, current_path))
return fields
else:
for key in curr.keys():
fields += self.extract_fields_helper(root, current_path + [key])
return fields
elif isinstance(curr, list) and curr != []:
for i in range(len(curr)):
fields += self.extract_fields_helper(root, current_path + [i])
return fields
else:
# leaf node
return []

def extract_fields_and_paths(self, params):
# Search for fields used in range queries, and the paths to those fields
# Return pairs of (field, path_to_field)
# TODO: Maybe only do this the first time, and assume for a given task, the same query structure is used.
# We could achieve this by passing in the task name to get_randomized_values as a kwarg?
try:
root = params["body"]["query"]
except KeyError:
raise exceptions.SystemSetupError(
f"Cannot extract range query fields from these params: {params}\n, missing params[\"body\"][\"query\"]\n"
f"Make sure the operation in operations/default.json is well-formed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

fields_and_paths = self.extract_fields_helper(root, [])
return fields_and_paths

def set_range(self, params, fields_and_paths, new_values):
assert len(fields_and_paths) == len(new_values)
for field_and_path, new_value in zip(fields_and_paths, new_values):
field = field_and_path[0]
path = field_and_path[1]
range_section = self.get_dict_from_previous_path(params["body"]["query"], path)[field]
# get the section of the query corresponding to the field name
for greater_than in ["gte", "gt"]:
if greater_than in range_section:
range_section[greater_than] = new_value["gte"]
for less_than in ["lte", "lt"]:
if less_than in range_section:
range_section[less_than] = new_value["lte"]
if "format" in new_values:
range_section["format"] = new_values["format"]
return params

def get_repeated_value_index(self):
# minus 1 for mapping [1, N] to [0, N-1] of list indices
return self.zipf_cdf_inverse(random.random(), self.H_list) - 1

def get_randomized_values(self, input_workload, input_params,
get_standard_value=params.get_standard_value,
get_standard_value_source=params.get_standard_value_source, # Made these configurable for simpler unit tests
**kwargs):

# The queries as listed in operations/default.json don't have the index param,
# unlike the custom ones you would specify in workload.py, so we have to add them ourselves
if not "index" in input_params:
input_params["index"] = params.get_target(input_workload, input_params)

fields_and_paths = self.extract_fields_and_paths(input_params)

if random.random() < self.rf:
# Draw a potentially repeated value from the saved standard values
index = self.get_repeated_value_index()
new_values = [get_standard_value(kwargs["op_name"], field_and_path[0], index) for field_and_path in fields_and_paths]
# Use the same index for all fields in one query, otherwise the probability of repeats in a multi-field query would be very low
input_params = self.set_range(input_params, fields_and_paths, new_values)
else:
# Generate a new random value, from the standard value source function. This will be new (a cache miss)
new_values = [get_standard_value_source(kwargs["op_name"], field_and_path[0])() for field_and_path in fields_and_paths]
input_params = self.set_range(input_params, fields_and_paths, new_values)
return input_params

def create_param_source_lambda(self, op_name, get_standard_value, get_standard_value_source):
return lambda w, p, **kwargs: self.get_randomized_values(w, p,
get_standard_value=get_standard_value,
get_standard_value_source=get_standard_value_source,
op_name=op_name, **kwargs)

def on_after_load_workload(self, input_workload, **kwargs):
if not self.randomization_enabled:
Copy link
Collaborator

@IanHoang IanHoang Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Logging here would be good too. Something along the lines of:

self.logger.info("Query randomization is disabled.")

self.logger.info("Query randomization is disabled.")
return input_workload
self.logger.info("Query randomization is enabled, with repeat frequency = %d, n = %d",self.rf, self.N)

# By default, use params for standard values and generate new standard values the first time an op/field is seen.
# In unit tests, we should be able to supply our own sources independent of params.
# This is done in kwargs because pylint didn't like having specific keyword args that weren't in the parent method.
generate_new_standard_values = False
if "get_standard_value" not in kwargs:
kwargs["get_standard_value"] = params.get_standard_value
generate_new_standard_values = True
if "get_standard_value_source" not in kwargs:
kwargs["get_standard_value_source"] = params.get_standard_value_source
generate_new_standard_values = True

default_test_procedure = None
for test_procedure in input_workload.test_procedures:
if test_procedure.default:
default_test_procedure = test_procedure
break

for task in default_test_procedure.schedule:
for leaf_task in task:
try:
op_type = workload.OperationType.from_hyphenated_string(leaf_task.operation.type)
except KeyError:
op_type = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For debugging purposes, it would be nice to have a logging statement in the exception here.

self.logger.info(
"Found operation %s in default schedule with type %s, which couldn't be converted to a known OperationType",
leaf_task.operation.name, leaf_task.operation.type)
if op_type == workload.OperationType.Search:
op_name = leaf_task.operation.name
param_source_name = op_name + "-randomized"
params.register_param_source_for_name(
param_source_name,
self.create_param_source_lambda(op_name, get_standard_value=kwargs["get_standard_value"],
get_standard_value_source=kwargs["get_standard_value_source"]))
leaf_task.operation.param_source = param_source_name
# Generate the right number of standard values for this field, if not already present
for field_and_path in self.extract_fields_and_paths(leaf_task.operation.params):
if generate_new_standard_values:
params.generate_standard_values_if_absent(op_name, field_and_path[0], self.N)
return input_workload

class CompleteWorkloadParams:
def __init__(self, user_specified_workload_params=None):
Expand Down Expand Up @@ -1097,6 +1279,10 @@ def register_workload_processor(self, workload_processor):
if self.workload_processor_registry:
self.workload_processor_registry(workload_processor)

def register_standard_value_source(self, op_name, field_name, standard_value_source):
# Define a value source for parameters for a given operation name and field name, for use in randomization
params.register_standard_value_source(op_name, field_name, standard_value_source)

@property
def meta_data(self):
return {
Expand Down
45 changes: 45 additions & 0 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
__PARAM_SOURCES_BY_OP = {}
__PARAM_SOURCES_BY_NAME = {}

__STANDARD_VALUE_SOURCES = {}
__STANDARD_VALUES = {}

def param_source_for_operation(op_type, workload, params, task_name):
try:
Expand All @@ -63,6 +65,14 @@ def param_source_for_name(name, workload, params):
else:
return param_source(workload, params)

def get_standard_value_source(op_name, field_name):
try:
return __STANDARD_VALUE_SOURCES[op_name][field_name]
except KeyError:
raise exceptions.SystemSetupError(
"Could not find standard value source for operation {}, field {}! Make sure this is registered in workload.py"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This is a helpful suggestion. It'd be good for us to add documentation related to this in the official docs. Once we have that, we can add a link to that guide to this error message.

.format(op_name, field_name))


def ensure_valid_param_source(param_source):
if not inspect.isfunction(param_source) and not inspect.isclass(param_source):
Expand All @@ -78,13 +88,48 @@ def register_param_source_for_name(name, param_source_class):
ensure_valid_param_source(param_source_class)
__PARAM_SOURCES_BY_NAME[name] = param_source_class

def register_standard_value_source(op_name, field_name, standard_value_source):
if op_name in __STANDARD_VALUE_SOURCES:
__STANDARD_VALUE_SOURCES[op_name][field_name] = standard_value_source
# We have to allow re-registration for the same op/field, since plugins are loaded many times when a workload is run
else:
__STANDARD_VALUE_SOURCES[op_name] = {field_name:standard_value_source}

def generate_standard_values_if_absent(op_name, field_name, n):
if not op_name in __STANDARD_VALUES:
__STANDARD_VALUES[op_name] = {}
if not field_name in __STANDARD_VALUES[op_name]:
__STANDARD_VALUES[op_name][field_name] = []
try:
standard_value_source = __STANDARD_VALUE_SOURCES[op_name][field_name]
except KeyError:
raise exceptions.SystemSetupError(
"Cannot generate standard values for operation {}, field {}. Standard value source is missing"
.format(op_name, field_name))
for _i in range(n):
__STANDARD_VALUES[op_name][field_name].append(standard_value_source())

def get_standard_value(op_name, field_name, i):
try:
return __STANDARD_VALUES[op_name][field_name][i]
except KeyError:
raise exceptions.SystemSetupError("No standard values generated for operation {}, field {}".format(op_name, field_name))
except IndexError:
raise exceptions.SystemSetupError(
"Standard value index {} out of range for operation {}, field name {} ({} values total)"
.format(i, op_name, field_name, len(__STANDARD_VALUES[op_name][field_name])))


# only intended for tests
def _unregister_param_source_for_name(name):
# We intentionally do not specify a default value if the key does not exist. If we try to remove a key that we didn't insert then
# something is fishy with the test and we'd rather know early.
__PARAM_SOURCES_BY_NAME.pop(name)

# only intended for tests
def _clear_standard_values():
__STANDARD_VALUES = {}
__STANDARD_VALUE_SOURCES = {}

# Default
class ParamSource:
Expand Down
Loading
Loading