From fd2bc3b0ecf8f449807f85da7e4348057455e92e Mon Sep 17 00:00:00 2001 From: Rishabh Singh Date: Thu, 12 Dec 2024 13:21:06 -0800 Subject: [PATCH] Add support for ramping up clients Signed-off-by: Rishabh Singh --- osbenchmark/resources/workload-schema.json | 15 ++ .../worker_coordinator/worker_coordinator.py | 75 +++++--- osbenchmark/workload/loader.py | 32 +++- osbenchmark/workload/workload.py | 9 +- tests/worker_coordinator/runner_test.py | 16 +- .../worker_coordinator_test.py | 164 +++++++++++++++--- tests/workload/loader_test.py | 64 ++++++- 7 files changed, 319 insertions(+), 56 deletions(-) diff --git a/osbenchmark/resources/workload-schema.json b/osbenchmark/resources/workload-schema.json index 7d2102e5a..34f2fd4d4 100644 --- a/osbenchmark/resources/workload-schema.json +++ b/osbenchmark/resources/workload-schema.json @@ -28,6 +28,11 @@ "type": "integer", "minimum": 1 }, + "ramp-up-time-period": { + "type": "integer", + "minimum": 0, + "description": "Defines the time period in seconds to gradually increase the number of clients." + }, "warmup-time-period": { "type": "integer", "minimum": 0, @@ -75,6 +80,11 @@ "minimum": 1, "description": "Defines the number of times to run the operation." }, + "ramp-up-time-period": { + "type": "integer", + "minimum": 0, + "description": "Defines the time period in seconds to gradually increase the number of clients." + }, "warmup-time-period": { "type": "integer", "minimum": 0, @@ -146,6 +156,11 @@ "minimum": 1, "description": "Defines the number of times to run the operation." }, + "ramp-up-time-period": { + "type": "integer", + "minimum": 0, + "description": "Defines the time period in seconds to gradually increase the number of clients." + }, "warmup-time-period": { "type": "integer", "minimum": 0, diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 7b17e4959..d08242a91 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -32,6 +32,7 @@ import math import multiprocessing import queue +import sys import threading from dataclasses import dataclass from typing import Callable @@ -657,12 +658,10 @@ def start_benchmark(self): self.logger.info("Attaching cluster-level telemetry devices.") self.telemetry.on_benchmark_start() self.logger.info("Cluster-level telemetry devices are now attached.") - allocator = Allocator(self.test_procedure.schedule) self.allocations = allocator.allocations - self.number_of_steps = len(allocator.join_points) - 1 + self.number_of_steps = len(allocator.join_points) - 1 ## 1 self.tasks_per_join_point = allocator.tasks_per_joinpoint - self.logger.info("Benchmark consists of [%d] steps executed by [%d] clients.", self.number_of_steps, len(self.allocations)) # avoid flooding the log if there are too many clients @@ -672,8 +671,8 @@ def start_benchmark(self): worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients) worker_id = 0 for assignment in worker_assignments: - host = assignment["host"] - for clients in assignment["workers"]: + host = assignment["host"] #localhost + for clients in assignment["workers"]: # cpu cores [0] [1] [2] [3] # don't assign workers without any clients if len(clients) > 0: self.logger.info("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients)) @@ -1009,7 +1008,6 @@ def calculate_worker_assignments(host_configs, client_count): remaining_clients -= clients_on_this_host assert remaining_clients == 0 - return assignments @@ -1201,7 +1199,7 @@ def current_tasks_and_advance(self): self.current_task_index = self.next_task_index current = self.client_allocations.tasks(self.current_task_index) self.next_task_index += 1 - self.logger.debug("Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index) + self.logger.info("Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index) return current def send_samples(self): @@ -1522,7 +1520,7 @@ def os_clients(all_hosts, all_client_options): # # Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we # need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB. - schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task]) + schedule = schedule_for(task_allocation, params_per_task[task]) async_executor = AsyncExecutor( client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error), self.cfg) @@ -1607,6 +1605,15 @@ async def __call__(self, *args, **kwargs): # lazily initialize the schedule self.logger.debug("Initializing schedule for client id [%s].", self.client_id) schedule = self.schedule_handle() + self.schedule_handle.start() + rampup_wait_time = self.schedule_handle.ramp_up_wait_time + if rampup_wait_time: + self.logger.info("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time) + await asyncio.sleep(rampup_wait_time) + + if rampup_wait_time: + console.println(f" Client id {self.client_id} is running now.") + self.logger.debug("Entering main loop for client id [%s].", self.client_id) # noinspection PyBroadException try: @@ -1806,18 +1813,21 @@ def __repr__(self, *args, **kwargs): class TaskAllocation: - def __init__(self, task, client_index_in_task): + def __init__(self, task, client_index_in_task, global_client_index, total_clients): self.task = task self.client_index_in_task = client_index_in_task + self.global_client_index = global_client_index + self.total_clients = total_clients def __hash__(self): - return hash(self.task) ^ hash(self.client_index_in_task) + return hash(self.task) ^ hash(self.global_client_index) def __eq__(self, other): - return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task + return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index def __repr__(self, *args, **kwargs): - return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task) + return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \ + f"and [{self.global_client_index}/{self.total_clients}] in total" class Allocator: @@ -1852,7 +1862,6 @@ def allocations(self): for client_index in range(max_clients): allocations[client_index].append(next_join_point) join_point_id += 1 - for task in self.schedule: start_client_index = 0 clients_executing_completing_task = [] @@ -1863,7 +1872,14 @@ def allocations(self): physical_client_index = client_index % max_clients if sub_task.completes_parent: clients_executing_completing_task.append(physical_client_index) - allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index)) + ta = TaskAllocation(task=sub_task, + client_index_in_task=client_index - start_client_index, + global_client_index=client_index, + # if task represents a parallel structure this is the total number of clients + # executing sub-tasks concurrently. + total_clients=task.clients) + + allocations[physical_client_index].append(ta) start_client_index += sub_task.clients # uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them @@ -1918,7 +1934,6 @@ def tasks_per_joinpoint(self): elif isinstance(allocation, JoinPoint) and len(current_tasks) > 0: tasks.append(current_tasks) current_tasks = set() - return tasks @property @@ -1941,7 +1956,7 @@ def clients(self): # Runs a concrete schedule on one worker client # Needs to determine the runners and concrete iterations per client. -def schedule_for(task, client_index, parameter_source): +def schedule_for(task_allocation, parameter_source): """ Calculates a client's schedule for a given task. @@ -1951,15 +1966,17 @@ def schedule_for(task, client_index, parameter_source): :return: A generator for the operations the given client needs to perform for this task. """ logger = logging.getLogger(__name__) + task = task_allocation.task op = task.operation - num_clients = task.clients sched = scheduler.scheduler_for(task) + + client_index = task_allocation.client_index_in_task # guard all logging statements with the client index and only emit them for the first client. This information is # repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent). if client_index == 0: logger.info("Choosing [%s] for [%s].", sched, task) runner_for_op = runner.runner_for(op.type) - params_for_op = parameter_source.partition(client_index, num_clients) + params_for_op = parameter_source.partition(client_index, task.clients) if hasattr(sched, "parameter_source"): if client_index == 0: logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched) @@ -1992,7 +2009,7 @@ def schedule_for(task, client_index, parameter_source): else: logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name) - return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op) + return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op) def requires_time_period_schedule(task, task_runner, params): @@ -2009,7 +2026,7 @@ def requires_time_period_schedule(task, task_runner, params): class ScheduleHandle: - def __init__(self, task_name, sched, task_progress_control, runner, params): + def __init__(self, task_allocation, sched, task_progress_control, runner, params): """ Creates a generator that will yield individual task invocations for the provided schedule. @@ -2020,7 +2037,7 @@ def __init__(self, task_name, sched, task_progress_control, runner, params): :param params: The parameter source for a given operation. :return: A generator for the corresponding parameters. """ - self.task_name = task_name + self.task_allocation = task_allocation self.sched = sched self.task_progress_control = task_progress_control self.runner = runner @@ -2030,6 +2047,20 @@ def __init__(self, task_name, sched, task_progress_control, runner, params): #import asyncio #self.io_pool_exc = ThreadPoolExecutor(max_workers=1) #self.loop = asyncio.get_event_loop() + + @property + def ramp_up_wait_time(self): + """ + :return: the number of seconds to wait until this client should start so load can gradually ramp-up. + """ + ramp_up_time_period = self.task_allocation.task.ramp_up_time_period + if ramp_up_time_period: + return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients) + else: + return 0 + + def start(self): + self.task_progress_control.start() def before_request(self, now): self.sched.before_request(now) @@ -2041,7 +2072,6 @@ async def __call__(self): next_scheduled = 0 if self.task_progress_control.infinite: param_source_knows_progress = hasattr(self.params, "percent_completed") - self.task_progress_control.start() while True: try: next_scheduled = self.sched.next(next_scheduled) @@ -2054,7 +2084,6 @@ async def __call__(self): except StopIteration: return else: - self.task_progress_control.start() while not self.task_progress_control.completed: try: next_scheduled = self.sched.next(next_scheduled) diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index a57c3d901..8905c6fc6 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1716,6 +1716,7 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False) default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False) default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False) + default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False) clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False) completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False) @@ -1723,7 +1724,18 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): tasks = [] for task in self._r(ops_spec, "tasks", error_ctx="parallel"): tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations, - default_warmup_time_period, default_time_period, completed_by)) + default_warmup_time_period, default_time_period, default_ramp_up_time_period, + completed_by)) + + for task in tasks: + if task.ramp_up_time_period != default_ramp_up_time_period: + if default_ramp_up_time_period is None: + self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies " + f"a ramp-up-time-period but it is only allowed on the 'parallel' element.") + else: + self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing " + f"'parallel' element in test-procedure '{test_procedure_name}'.") + if completed_by: completion_task = None for task in tasks: @@ -1739,7 +1751,7 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): return workload.Parallel(tasks, clients) def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None, - default_warmup_time_period=None, default_time_period=None, completed_by_name=None): + default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None, completed_by_name=None): op_spec = task_spec["operation"] if isinstance(op_spec, str) and op_spec in ops: @@ -1762,6 +1774,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati default_value=default_warmup_time_period), time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False, default_value=default_time_period), + ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name, + mandatory=False, default_value=default_ramp_up_time_period), clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1), completes_parent=(task_name == completed_by_name), schedule=schedule, @@ -1775,6 +1789,20 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati self._error( "Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not " "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations)) + + if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None: + self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of " + f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and " + f"{task.iterations} iterations but mixing time periods and iterations is not allowed.") + + if task.ramp_up_time_period is not None: + if task.warmup_time_period is None: + self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of " + f"{task.ramp_up_time_period} seconds but no warmup-time-period.") + elif task.warmup_time_period < task.ramp_up_time_period: + self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is " + f"{task.warmup_time_period} seconds but must be greater than or equal to the " + f"ramp-up-time-period of {task.ramp_up_time_period} seconds.") return task diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index d221e22e5..0a141221a 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -894,7 +894,7 @@ class Task: IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"] def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None, - warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule=None, params=None): + warmup_time_period=None, time_period=None, ramp_up_time_period=None, clients=1, completes_parent=False, schedule=None, params=None): self.name = name self.operation = operation if isinstance(tags, str): @@ -908,6 +908,7 @@ def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations self.iterations = iterations self.warmup_time_period = warmup_time_period self.time_period = time_period + self.ramp_up_time_period = ramp_up_time_period self.clients = clients self.completes_parent = completes_parent self.schedule = schedule @@ -988,15 +989,15 @@ def error_behavior(self, default_error_behavior): def __hash__(self): # Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task) return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \ - hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \ + hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \ hash(self.completes_parent) def __eq__(self, other): # Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task) return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations, - self.warmup_time_period, self.time_period, self.clients, self.schedule, + self.warmup_time_period, self.time_period, self.ramp_up_time_period, self.clients, self.schedule, self.completes_parent) == (other.name, other.operation, other.warmup_iterations, - other.iterations, other.warmup_time_period, other.time_period, + other.iterations, other.warmup_time_period, other.time_period, other.ramp_up_time_period, other.clients, other.schedule, other.completes_parent) def __iter__(self): diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index b43bd53be..5e1926296 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -3435,9 +3435,15 @@ def params(self): runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True) param_source = workload.operation_parameters(test_workload, task) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) # pylint: disable=C0415 import threading - schedule = worker_coordinator.schedule_for(task, 0, param_source) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) # pylint: disable=C0415 def create_config(): cfg = config.Config() @@ -3570,9 +3576,15 @@ def params(self): runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True) param_source = workload.operation_parameters(test_workload, task) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) # pylint: disable=C0415 import threading - schedule = worker_coordinator.schedule_for(task, 0, param_source) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) def create_config(): cfg = config.Config() cfg.add(config.Scope.application, "system", "available.cores", 8) diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index 240ce17a9..ab7eba636 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -577,8 +577,10 @@ class AllocatorTests(TestCase): def setUp(self): params.register_param_source_for_name("worker-coordinator-test-param-source", WorkerCoordinatorTestParamSource) - def ta(self, task, client_index_in_task): - return worker_coordinator.TaskAllocation(task, client_index_in_task) + def ta(self, task, client_index_in_task, global_client_index=None, total_clients=None): + return worker_coordinator.TaskAllocation(task, client_index_in_task, + client_index_in_task if global_client_index is None else global_client_index, + task.clients if total_clients is None else total_clients) def test_allocates_one_task(self): task = workload.Task("index", op("index", workload.OperationType.Bulk)) @@ -666,17 +668,24 @@ def test_allocates_more_tasks_than_clients(self): self.assertEqual(2, allocator.clients) allocations = allocator.allocations - # 2 clients self.assertEqual(2, len(allocations)) # join_point, index_a, index_c, index_e, join_point self.assertEqual(5, len(allocations[0])) # we really have no chance to extract the join point so we just take what is there... - self.assertEqual([allocations[0][0], self.ta(index_a, 0), self.ta(index_c, 0), self.ta(index_e, 0), allocations[0][4]], + self.assertEqual([allocations[0][0], + self.ta(index_a, client_index_in_task=0, global_client_index=0, total_clients=2), + self.ta(index_c, client_index_in_task=0, global_client_index=2, total_clients=2), + self.ta(index_e, client_index_in_task=0, global_client_index=4, total_clients=2), + allocations[0][4]], allocations[0]) # join_point, index_a, index_c, None, join_point self.assertEqual(5, len(allocator.allocations[1])) - self.assertEqual([allocations[1][0], self.ta(index_b, 0), self.ta(index_d, 0), None, allocations[1][4]], allocations[1]) + self.assertEqual([allocations[1][0], + self.ta(index_b, client_index_in_task=0, global_client_index=1, total_clients=2), + self.ta(index_d, client_index_in_task=0, global_client_index=3, total_clients=2), + None, allocations[1][4]], + allocations[1]) self.assertEqual([{index_a, index_b, index_c, index_d, index_e}], allocator.tasks_per_joinpoint) self.assertEqual(2, len(allocator.join_points)) @@ -703,16 +712,28 @@ def test_considers_number_of_clients_per_subtask(self): # join_point, index_a, index_c, join_point self.assertEqual(4, len(allocations[0])) # we really have no chance to extract the join point so we just take what is there... - self.assertEqual([allocations[0][0], self.ta(index_a, 0), self.ta(index_c, 1), allocations[0][3]], allocations[0]) + self.assertEqual([allocations[0][0], + self.ta(index_a, client_index_in_task=0, global_client_index=0, total_clients=3), + self.ta(index_c, client_index_in_task=1, global_client_index=3, total_clients=3), + allocations[0][3]], + allocations[0]) # task that client 1 will execute: # join_point, index_b, None, join_point self.assertEqual(4, len(allocator.allocations[1])) - self.assertEqual([allocations[1][0], self.ta(index_b, 0), None, allocations[1][3]], allocations[1]) + self.assertEqual([allocations[1][0], + self.ta(index_b, client_index_in_task=0, global_client_index=1, total_clients=3), + None, + allocations[1][3]], + allocations[1]) # tasks that client 2 will execute: self.assertEqual(4, len(allocator.allocations[2])) - self.assertEqual([allocations[2][0], self.ta(index_c, 0), None, allocations[2][3]], allocations[2]) + self.assertEqual([allocations[2][0], + self.ta(index_c, client_index_in_task=0, global_client_index=2, total_clients=3), + None, + allocations[2][3]], + allocations[2]) self.assertEqual([{index_a, index_b, index_c}], allocator.tasks_per_joinpoint) @@ -839,6 +860,7 @@ def next(self, current): async def assert_schedule(self, expected_schedule, schedule_handle, infinite_schedule=False): idx = 0 + schedule_handle.start() async for invocation_time, sample_type, progress_percent, runner, params in schedule_handle(): schedule_handle.before_request(now=idx) exp_invocation_time, exp_sample_type, exp_progress_percent, exp_params = expected_schedule[idx] @@ -882,7 +904,13 @@ def test_injects_parameter_source_into_scheduler(self): }) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) self.assertIsNotNone(schedule.sched.parameter_source, "Parameter source has not been injected into scheduler") self.assertEqual(param_source, schedule.sched.parameter_source) @@ -893,7 +921,13 @@ async def test_search_task_one_client(self): param_source="worker-coordinator-test-param-source"), warmup_iterations=3, iterations=5, clients=1, params={"target-throughput": 10, "clients": 1}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) expected_schedule = [ (0, metrics.SampleType.Warmup, 1 / 8, {}), @@ -913,7 +947,13 @@ async def test_search_task_two_clients(self): param_source="worker-coordinator-test-param-source"), warmup_iterations=1, iterations=5, clients=2, params={"target-throughput": 10, "clients": 2}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) expected_schedule = [ (0, metrics.SampleType.Warmup, 1 / 6, {}), @@ -934,7 +974,13 @@ async def test_schedule_param_source_determines_iterations_no_warmup(self): clients=4, params={"target-throughput": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}), @@ -950,7 +996,13 @@ async def test_schedule_param_source_determines_iterations_including_warmup(self warmup_iterations=2, clients=4, params={"target-throughput": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}), @@ -969,7 +1021,13 @@ async def test_schedule_defaults_to_iteration_based(self): clients=1, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}), @@ -983,7 +1041,13 @@ async def test_schedule_for_warmup_time_based(self): warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "size": 11}), @@ -1007,7 +1071,13 @@ async def test_infinite_schedule_without_progress_indication(self): warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, None, {"body": ["a"]}), @@ -1025,7 +1095,13 @@ async def test_finite_schedule_with_progress_indication(self): warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation,param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "size": 5}), @@ -1044,7 +1120,13 @@ async def test_schedule_with_progress_determined_by_runner(self): params={"target-throughput": 1, "clients": 1}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, None, {"body": ["a"]}), @@ -1064,7 +1146,15 @@ async def test_schedule_for_time_based(self): clients=1) param_source = workload.operation_parameters(self.test_workload, task) - schedule_handle = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule_handle = worker_coordinator.schedule_for(task_allocation, param_source) + schedule_handle.start() + self.assertEqual(0.0, schedule_handle.ramp_up_wait_time) schedule = schedule_handle() last_progress = -1 @@ -1198,7 +1288,11 @@ async def test_execute_schedule_in_throughput_mode(self, opensearch, on_client_r param_source="worker-coordinator-test-param-source"), warmup_time_period=0, clients=4) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=task_start) cancel = threading.Event() @@ -1253,7 +1347,11 @@ async def test_execute_schedule_with_progress_determined_by_runner(self, opensea "size": None }, param_source="worker-coordinator-test-param-source"), warmup_time_period=0, clients=4) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=task_start) cancel = threading.Event() @@ -1315,7 +1413,11 @@ async def test_execute_schedule_runner_overrides_times(self, opensearch): param_source="worker-coordinator-test-param-source"), warmup_iterations=0, iterations=1, clients=1) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=task_start) cancel = threading.Event() @@ -1394,7 +1496,11 @@ def perform_request(*args, **kwargs): complete = threading.Event() param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) execute_schedule = worker_coordinator.AsyncExecutor(client_id=0, task=task, schedule=schedule, @@ -1446,7 +1552,11 @@ async def test_cancel_execute_schedule(self, opensearch): params={"target-throughput": target_throughput, "clients": 4}) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=0) cancel = threading.Event() @@ -1482,12 +1592,18 @@ def run(*args, **kwargs): raise ExpectedUnitTestException() class ScheduleHandle: + def __init__(self): + self.ramp_up_wait_time = 0 + def before_request(self, now): pass def after_request(self, now, weight, unit, meta_data): pass + def start(self): + pass + async def __call__(self): invocations = [(0, metrics.SampleType.Warmup, 0, AsyncExecutorTests.context_managed(run), None)] for invocation in invocations: diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index 562afea0c..b7e01805d 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -2269,6 +2269,63 @@ def test_parse_missing_test_procedure_or_test_procedures(self, mocked_params_che self.assertEqual("Workload 'unittest' is invalid. You must define 'test_procedure', 'test_procedures' or " "'schedule' but none is specified.", ctx.exception.args[0]) + + @mock.patch("osbenchmark.workload.loader.register_all_params_in_workload") + def test_parse_missing_test_procedure_or_test_procedures(self, mocked_params_checker): + workload_specification = { + "description": "description for unit test", + "indices": [ + { + "name": "test-index", + "body": "index.json", + "types": ["docs"] + } + ], + "corpora": [ + { + "name": "test", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000 + } + ] + } + ], + "operations": [ + { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000, + } + ], + "test_procedures": [ + { + "name": "default-challenge", + "schedule": [ + { + "clients": 8, + "operation": "index-append", + "ramp-up-time-period": 120, + "warmup-iterations": 3, + "iterations": 5 + } + ] + } + + ] + } + reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ + "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], + })) + + with self.assertRaises(loader.WorkloadSyntaxError) as ctx: + reader("unittest", workload_specification, "/mappings") + + self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-challenge' defines a ramp-up time period of " + "120 seconds as well as 3 warmup iterations and 5 iterations but mixing time periods and iterations is not allowed.", ctx.exception.args[0]) @mock.patch("osbenchmark.workload.loader.register_all_params_in_workload") def test_parse_test_procedure_and_test_procedures_are_defined(self, mocked_params_checker): @@ -3651,13 +3708,18 @@ def test_supports_target_throughput(self): { "operation": "index-append", "target-throughput": 10, + "warmup-time-period": 120, + "ramp-up-time-period": 60 } ] } } reader = loader.WorkloadSpecificationReader() resulting_workload = reader("unittest", workload_specification, "/mappings") - self.assertEqual(10, resulting_workload.test_procedures[0].schedule[0].params["target-throughput"]) + indexing_task = resulting_workload.test_procedures[0].schedule[0] + self.assertEqual(10, indexing_task.params["target-throughput"]) + self.assertEqual(120, indexing_task.warmup_time_period) + self.assertEqual(60, indexing_task.ramp_up_time_period) def test_supports_target_interval(self): workload_specification = {