-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds controllable randomization to range queries in workloads #455
Changes from 2 commits
ebfbc87
58ce803
76b725e
45c3088
ec46b7b
413ea98
d7a3ca8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import json | ||
import logging | ||
import os | ||
import random | ||
import re | ||
import sys | ||
import tempfile | ||
|
@@ -48,7 +49,7 @@ class WorkloadSyntaxError(exceptions.InvalidSyntax): | |
|
||
|
||
class WorkloadProcessor: | ||
def on_after_load_workload(self, workload): | ||
def on_after_load_workload(self, input_workload, **kwargs): | ||
""" | ||
This method is called by Benchmark after a workload has been loaded. Implementations are expected to modify the | ||
provided workload object in place. | ||
|
@@ -74,7 +75,7 @@ def on_prepare_workload(self, workload, data_root_dir): | |
|
||
class WorkloadProcessorRegistry: | ||
def __init__(self, cfg): | ||
self.required_processors = [TaskFilterWorkloadProcessor(cfg), TestModeWorkloadProcessor(cfg)] | ||
self.required_processors = [TaskFilterWorkloadProcessor(cfg), TestModeWorkloadProcessor(cfg), QueryRandomizerWorkloadProcessor(cfg)] | ||
self.workload_processors = [] | ||
self.offline = cfg.opts("system", "offline.mode") | ||
self.test_mode = cfg.opts("workload", "test.mode.enabled", mandatory=False, default_value=False) | ||
|
@@ -824,11 +825,11 @@ def _filter_out_match(self, task): | |
return self.exclude | ||
return not self.exclude | ||
|
||
def on_after_load_workload(self, workload): | ||
def on_after_load_workload(self, input_workload, **kwargs): | ||
if not self.filters: | ||
return workload | ||
return input_workload | ||
|
||
for test_procedure in workload.test_procedures: | ||
for test_procedure in input_workload.test_procedures: | ||
# don't modify the schedule while iterating over it | ||
tasks_to_remove = [] | ||
for task in test_procedure.schedule: | ||
|
@@ -847,19 +848,19 @@ def on_after_load_workload(self, workload): | |
self.logger.info("Removing task [%s] from test_procedure [%s] due to task filter.", task, test_procedure) | ||
test_procedure.remove_task(task) | ||
|
||
return workload | ||
return input_workload | ||
|
||
|
||
class TestModeWorkloadProcessor(WorkloadProcessor): | ||
def __init__(self, cfg): | ||
self.test_mode_enabled = cfg.opts("workload", "test.mode.enabled", mandatory=False, default_value=False) | ||
self.logger = logging.getLogger(__name__) | ||
|
||
def on_after_load_workload(self, workload): | ||
def on_after_load_workload(self, input_workload, **kwargs): | ||
if not self.test_mode_enabled: | ||
return workload | ||
self.logger.info("Preparing workload [%s] for test mode.", str(workload)) | ||
for corpus in workload.corpora: | ||
return input_workload | ||
self.logger.info("Preparing workload [%s] for test mode.", str(input_workload)) | ||
for corpus in input_workload.corpora: | ||
if self.logger.isEnabledFor(logging.DEBUG): | ||
self.logger.debug("Reducing corpus size to 1000 documents for [%s]", corpus.name) | ||
for document_set in corpus.documents: | ||
|
@@ -884,7 +885,7 @@ def on_after_load_workload(self, workload): | |
document_set.compressed_size_in_bytes = None | ||
document_set.uncompressed_size_in_bytes = None | ||
|
||
for test_procedure in workload.test_procedures: | ||
for test_procedure in input_workload.test_procedures: | ||
for task in test_procedure.schedule: | ||
# we need iterate over leaf tasks and await iterating over possible intermediate 'parallel' elements | ||
for leaf_task in task: | ||
|
@@ -918,8 +919,181 @@ def on_after_load_workload(self, workload): | |
leaf_task.params.pop("target-interval", None) | ||
leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}" | ||
|
||
return workload | ||
return input_workload | ||
|
||
class QueryRandomizerWorkloadProcessor(WorkloadProcessor): | ||
DEFAULT_RF = 0.3 | ||
DEFAULT_N = 5000 | ||
def __init__(self, cfg): | ||
self.randomization_enabled = cfg.opts("workload", "randomization.enabled", mandatory=False, default_value=False) | ||
self.rf = float(cfg.opts("workload", "randomization.rf", mandatory=False, default_value=self.DEFAULT_RF)) | ||
self.logger = logging.getLogger(__name__) | ||
self.N = int(cfg.opts("workload", "randomization.n", mandatory=False, default_value=self.DEFAULT_N)) | ||
self.zipf_alpha = 1 | ||
self.H_list = self.precompute_H(self.N, self.zipf_alpha) | ||
|
||
# Helper functions for computing Zipf distribution | ||
def H(self, i, H_list): | ||
# compute the harmonic number H_n,m = sum over i from 1 to n of (1 / i^m) | ||
return H_list[i-1] | ||
|
||
def precompute_H(self, n, m): | ||
H_list = [1] | ||
for j in range(2, n+1): | ||
H_list.append(H_list[-1] + 1 / (j ** m)) | ||
return H_list | ||
|
||
def zipf_cdf_inverse(self, u, H_list): | ||
# To map a uniformly distributed u from [0, 1] to some probability distribution we plug it into its inverse CDF. | ||
# as the zipf cdf is discontinuous there is no real inverse but we can use this solution: | ||
# https://math.stackexchange.com/questions/53671/how-to-calculate-the-inverse-cdf-for-the-zipf-distribution | ||
# Precompute all values H_i,alpha for a fixed alpha and pass in as H_list | ||
if (u < 0 or u >= 1): | ||
raise Exception("Input u must have 0 <= u < 1") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use a more descriptive exception rather than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops - I had pasted this in from a test script and forgot to change this to an OSB exception. Will fix |
||
n = len(H_list) | ||
candidate_return = 1 | ||
denominator = self.H(n, H_list) | ||
numerator = 0 | ||
while candidate_return < n: | ||
numerator = self.H(candidate_return, H_list) | ||
if u < numerator / denominator: | ||
return candidate_return | ||
candidate_return += 1 | ||
return n | ||
|
||
def get_dict_from_previous_path(self, root, current_path): | ||
curr = root | ||
for value in current_path: | ||
curr = curr[value] | ||
return curr | ||
|
||
def extract_fields_helper(self, root, current_path): | ||
# Recursively called to find the location of ranges in an OpenSearch range query. | ||
# Return the field and the current path if we're currently scanning the field name in a range query, otherwise return an empty list. | ||
fields = [] # pairs of (field, path_to_field) | ||
curr = self.get_dict_from_previous_path(root, current_path) | ||
if isinstance(curr, dict) and curr != {}: | ||
if len(current_path) > 0 and current_path[-1] == "range": | ||
for key in curr.keys(): | ||
if isinstance(curr, dict): | ||
if ("gte" in curr[key] or "gt" in curr[key]) and ("lte" in curr[key] or "lt" in curr[key]): | ||
fields.append((key, current_path)) | ||
return fields | ||
else: | ||
for key in curr.keys(): | ||
fields += self.extract_fields_helper(root, current_path + [key]) | ||
return fields | ||
elif isinstance(curr, list) and curr != []: | ||
for i in range(len(curr)): | ||
fields += self.extract_fields_helper(root, current_path + [i]) | ||
return fields | ||
else: | ||
# leaf node | ||
return [] | ||
|
||
def extract_fields_and_paths(self, params): | ||
# Search for fields used in range queries, and the paths to those fields | ||
# Return pairs of (field, path_to_field) | ||
# TODO: Maybe only do this the first time, and assume for a given task, the same query structure is used. | ||
# We could achieve this by passing in the task name to get_randomized_values as a kwarg? | ||
try: | ||
root = params["body"]["query"] | ||
except KeyError: | ||
raise exceptions.SystemSetupError("Cannot extract range query fields from these params, missing params[\"body\"][\"query\"]") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be helpful to elaborate in which files users can commonly fix this error similar to what was added in line 73 of params.py. |
||
fields_and_paths = self.extract_fields_helper(root, []) | ||
return fields_and_paths | ||
|
||
def set_range(self, params, fields_and_paths, new_values): | ||
assert len(fields_and_paths) == len(new_values) | ||
for field_and_path, new_value in zip(fields_and_paths, new_values): | ||
field = field_and_path[0] | ||
path = field_and_path[1] | ||
range_section = self.get_dict_from_previous_path(params["body"]["query"], path)[field] | ||
# get the section of the query corresponding to the field name | ||
for greater_than in ["gte", "gt"]: | ||
if greater_than in range_section: | ||
range_section[greater_than] = new_value["gte"] | ||
for less_than in ["lte", "lt"]: | ||
if less_than in range_section: | ||
range_section[less_than] = new_value["lte"] | ||
if "format" in new_values: | ||
range_section["format"] = new_values["format"] | ||
return params | ||
|
||
def get_repeated_value_index(self): | ||
# minus 1 for mapping [1, N] to [0, N-1] of list indices | ||
return self.zipf_cdf_inverse(random.random(), self.H_list) - 1 | ||
|
||
def get_randomized_values(self, input_workload, input_params, | ||
get_standard_value=params.get_standard_value, | ||
get_standard_value_source=params.get_standard_value_source, # Made these configurable for simpler unit tests | ||
**kwargs): | ||
|
||
# The queries as listed in operations/default.json don't have the index param, | ||
# unlike the custom ones you would specify in workload.py, so we have to add them ourselves | ||
if not "index" in input_params: | ||
input_params["index"] = params.get_target(input_workload, input_params) | ||
|
||
fields_and_paths = self.extract_fields_and_paths(input_params) | ||
|
||
if random.random() < self.rf: | ||
# Draw a potentially repeated value from the saved standard values | ||
index = self.get_repeated_value_index() | ||
new_values = [get_standard_value(kwargs["op_name"], field_and_path[0], index) for field_and_path in fields_and_paths] | ||
# Use the same index for all fields in one query, otherwise the probability of repeats in a multi-field query would be very low | ||
input_params = self.set_range(input_params, fields_and_paths, new_values) | ||
else: | ||
# Generate a new random value, from the standard value source function. This will be new (a cache miss) | ||
new_values = [get_standard_value_source(kwargs["op_name"], field_and_path[0])() for field_and_path in fields_and_paths] | ||
input_params = self.set_range(input_params, fields_and_paths, new_values) | ||
return input_params | ||
|
||
def create_param_source_lambda(self, op_name, get_standard_value, get_standard_value_source): | ||
return lambda w, p, **kwargs: self.get_randomized_values(w, p, | ||
get_standard_value=get_standard_value, | ||
get_standard_value_source=get_standard_value_source, | ||
op_name=op_name, **kwargs) | ||
|
||
def on_after_load_workload(self, input_workload, **kwargs): | ||
if not self.randomization_enabled: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Logging here would be good too. Something along the lines of:
|
||
return input_workload | ||
|
||
# By default, use params for standard values and generate new standard values the first time an op/field is seen. | ||
# In unit tests, we should be able to supply our own sources independent of params. | ||
# This is done in kwargs because pylint didn't like having specific keyword args that weren't in the parent method. | ||
generate_new_standard_values = False | ||
if "get_standard_value" not in kwargs: | ||
kwargs["get_standard_value"] = params.get_standard_value | ||
generate_new_standard_values = True | ||
if "get_standard_value_source" not in kwargs: | ||
kwargs["get_standard_value_source"] = params.get_standard_value_source | ||
generate_new_standard_values = True | ||
|
||
default_test_procedure = None | ||
for test_procedure in input_workload.test_procedures: | ||
if test_procedure.default: | ||
default_test_procedure = test_procedure # TODO - not sure if this is correct | ||
break | ||
|
||
for task in default_test_procedure.schedule: | ||
for leaf_task in task: | ||
try: | ||
op_type = workload.OperationType.from_hyphenated_string(leaf_task.operation.type) | ||
except KeyError: | ||
op_type = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: For debugging purposes, it would be nice to have a logging statement in the exception here. |
||
if op_type == workload.OperationType.Search: | ||
op_name = leaf_task.operation.name | ||
param_source_name = op_name + "-randomized" | ||
params.register_param_source_for_name( | ||
param_source_name, | ||
self.create_param_source_lambda(op_name, get_standard_value=kwargs["get_standard_value"], | ||
get_standard_value_source=kwargs["get_standard_value_source"])) | ||
leaf_task.operation.param_source = param_source_name | ||
# Generate the right number of standard values for this field, if not already present | ||
for field_and_path in self.extract_fields_and_paths(leaf_task.operation.params): | ||
if generate_new_standard_values: | ||
params.generate_standard_values_if_absent(op_name, field_and_path[0], self.N) | ||
return input_workload | ||
|
||
class CompleteWorkloadParams: | ||
def __init__(self, user_specified_workload_params=None): | ||
|
@@ -1097,6 +1271,10 @@ def register_workload_processor(self, workload_processor): | |
if self.workload_processor_registry: | ||
self.workload_processor_registry(workload_processor) | ||
|
||
def register_standard_value_source(self, op_name, field_name, standard_value_source): | ||
# Define a value source for parameters for a given operation name and field name, for use in randomization | ||
params.register_standard_value_source(op_name, field_name, standard_value_source) | ||
|
||
@property | ||
def meta_data(self): | ||
return { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,8 @@ | |
__PARAM_SOURCES_BY_OP = {} | ||
__PARAM_SOURCES_BY_NAME = {} | ||
|
||
__STANDARD_VALUE_SOURCES = {} | ||
__STANDARD_VALUES = {} | ||
|
||
def param_source_for_operation(op_type, workload, params, task_name): | ||
try: | ||
|
@@ -63,6 +65,14 @@ def param_source_for_name(name, workload, params): | |
else: | ||
return param_source(workload, params) | ||
|
||
def get_standard_value_source(op_name, field_name): | ||
try: | ||
return __STANDARD_VALUE_SOURCES[op_name][field_name] | ||
except KeyError: | ||
raise exceptions.SystemSetupError( | ||
"Could not find standard value source for operation {}, field {}! Make sure this is registered in workload.py" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! This is a helpful suggestion. It'd be good for us to add documentation related to this in the official docs. Once we have that, we can add a link to that guide to this error message. |
||
.format(op_name, field_name)) | ||
|
||
|
||
def ensure_valid_param_source(param_source): | ||
if not inspect.isfunction(param_source) and not inspect.isclass(param_source): | ||
|
@@ -78,13 +88,50 @@ def register_param_source_for_name(name, param_source_class): | |
ensure_valid_param_source(param_source_class) | ||
__PARAM_SOURCES_BY_NAME[name] = param_source_class | ||
|
||
# These may not belong in params.py - they're here for now by analogy with register_param_source | ||
|
||
def register_standard_value_source(op_name, field_name, standard_value_source): | ||
if op_name in __STANDARD_VALUE_SOURCES: | ||
__STANDARD_VALUE_SOURCES[op_name][field_name] = standard_value_source | ||
# We have to allow re-registration for the same op/field, since plugins are loaded many times when a workload is run | ||
else: | ||
__STANDARD_VALUE_SOURCES[op_name] = {field_name:standard_value_source} | ||
|
||
def generate_standard_values_if_absent(op_name, field_name, n): | ||
if not op_name in __STANDARD_VALUES: | ||
__STANDARD_VALUES[op_name] = {} | ||
if not field_name in __STANDARD_VALUES[op_name]: | ||
__STANDARD_VALUES[op_name][field_name] = [] | ||
try: | ||
standard_value_source = __STANDARD_VALUE_SOURCES[op_name][field_name] | ||
except KeyError: | ||
raise exceptions.SystemSetupError( | ||
"Cannot generate standard values for operation {}, field {}. Standard value source is missing" | ||
.format(op_name, field_name)) | ||
for _i in range(n): | ||
__STANDARD_VALUES[op_name][field_name].append(standard_value_source()) | ||
|
||
def get_standard_value(op_name, field_name, i): | ||
try: | ||
return __STANDARD_VALUES[op_name][field_name][i] | ||
except KeyError: | ||
raise exceptions.SystemSetupError("No standard values generated for operation {}, field {}".format(op_name, field_name)) | ||
except IndexError: | ||
raise exceptions.SystemSetupError( | ||
"Standard value index {} out of range for operation {}, field name {} ({} values total)" | ||
.format(i, op_name, field_name, len(__STANDARD_VALUES[op_name][field_name]))) | ||
|
||
|
||
# only intended for tests | ||
def _unregister_param_source_for_name(name): | ||
# We intentionally do not specify a default value if the key does not exist. If we try to remove a key that we didn't insert then | ||
# something is fishy with the test and we'd rather know early. | ||
__PARAM_SOURCES_BY_NAME.pop(name) | ||
|
||
# only intended for tests | ||
def _clear_standard_values(): | ||
__STANDARD_VALUES = {} | ||
__STANDARD_VALUE_SOURCES = {} | ||
|
||
# Default | ||
class ParamSource: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should aim to use full-length arguments as best practice, such as
--randomization-repeat-frequency
, in addition to the short-hand format, like--randomization-rf
or-rrf
. That way users don't have to look up what the meaning of the flags as much.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peteralfonsi Integration tests are failing due to extra parameter added has
--
and is not considered a "short-form" flag. Think you'll need to change this to "-rf", "-rrf", or some iteration with a single hyphen.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh - thanks, that probably would have taken me a while to figure out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After second glance,
--randomization-repeat-frequency
is a bit lengthy and can be cumbersome for users to type out every time. Short-form flags should be used for more commonly used flags like--target-hosts
or--workload
.--randomization-repeat-frequency
is a specific use-case and I think it will be fine to keep it as--randomization-rf
.For this option, we can either do what you did before and keep
--randomization-rf
or we can have the full-length version--randomization-repeat-frequency
and a short-form version-rf
. I'll leave it up to your preference!