Skip to content

Commit

Permalink
Add support for ramping up clients
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Singh <[email protected]>
  • Loading branch information
rishabh6788 committed Dec 19, 2024
1 parent 29b11a9 commit fd2bc3b
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 56 deletions.
15 changes: 15 additions & 0 deletions osbenchmark/resources/workload-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"type": "integer",
"minimum": 1
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -75,6 +80,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -146,6 +156,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down
75 changes: 52 additions & 23 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import math
import multiprocessing
import queue
import sys
import threading
from dataclasses import dataclass
from typing import Callable
Expand Down Expand Up @@ -657,12 +658,10 @@ def start_benchmark(self):
self.logger.info("Attaching cluster-level telemetry devices.")
self.telemetry.on_benchmark_start()
self.logger.info("Cluster-level telemetry devices are now attached.")

allocator = Allocator(self.test_procedure.schedule)
self.allocations = allocator.allocations
self.number_of_steps = len(allocator.join_points) - 1
self.number_of_steps = len(allocator.join_points) - 1 ## 1
self.tasks_per_join_point = allocator.tasks_per_joinpoint

self.logger.info("Benchmark consists of [%d] steps executed by [%d] clients.",
self.number_of_steps, len(self.allocations))
# avoid flooding the log if there are too many clients
Expand All @@ -672,8 +671,8 @@ def start_benchmark(self):
worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients)
worker_id = 0
for assignment in worker_assignments:
host = assignment["host"]
for clients in assignment["workers"]:
host = assignment["host"] #localhost
for clients in assignment["workers"]: # cpu cores [0] [1] [2] [3]
# don't assign workers without any clients
if len(clients) > 0:
self.logger.info("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients))
Expand Down Expand Up @@ -1009,7 +1008,6 @@ def calculate_worker_assignments(host_configs, client_count):
remaining_clients -= clients_on_this_host

assert remaining_clients == 0

return assignments


Expand Down Expand Up @@ -1201,7 +1199,7 @@ def current_tasks_and_advance(self):
self.current_task_index = self.next_task_index
current = self.client_allocations.tasks(self.current_task_index)
self.next_task_index += 1
self.logger.debug("Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index)
self.logger.info("Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index)
return current

def send_samples(self):
Expand Down Expand Up @@ -1522,7 +1520,7 @@ def os_clients(all_hosts, all_client_options):
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
schedule = schedule_for(task_allocation, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error), self.cfg)
Expand Down Expand Up @@ -1607,6 +1605,15 @@ async def __call__(self, *args, **kwargs):
# lazily initialize the schedule
self.logger.debug("Initializing schedule for client id [%s].", self.client_id)
schedule = self.schedule_handle()
self.schedule_handle.start()
rampup_wait_time = self.schedule_handle.ramp_up_wait_time
if rampup_wait_time:
self.logger.info("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time)
await asyncio.sleep(rampup_wait_time)

if rampup_wait_time:
console.println(f" Client id {self.client_id} is running now.")

self.logger.debug("Entering main loop for client id [%s].", self.client_id)
# noinspection PyBroadException
try:
Expand Down Expand Up @@ -1806,18 +1813,21 @@ def __repr__(self, *args, **kwargs):


class TaskAllocation:
def __init__(self, task, client_index_in_task):
def __init__(self, task, client_index_in_task, global_client_index, total_clients):
self.task = task
self.client_index_in_task = client_index_in_task
self.global_client_index = global_client_index
self.total_clients = total_clients

def __hash__(self):
return hash(self.task) ^ hash(self.client_index_in_task)
return hash(self.task) ^ hash(self.global_client_index)

def __eq__(self, other):
return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task
return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index

def __repr__(self, *args, **kwargs):
return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task)
return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \
f"and [{self.global_client_index}/{self.total_clients}] in total"


class Allocator:
Expand Down Expand Up @@ -1852,7 +1862,6 @@ def allocations(self):
for client_index in range(max_clients):
allocations[client_index].append(next_join_point)
join_point_id += 1

for task in self.schedule:
start_client_index = 0
clients_executing_completing_task = []
Expand All @@ -1863,7 +1872,14 @@ def allocations(self):
physical_client_index = client_index % max_clients
if sub_task.completes_parent:
clients_executing_completing_task.append(physical_client_index)
allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index))
ta = TaskAllocation(task=sub_task,
client_index_in_task=client_index - start_client_index,
global_client_index=client_index,
# if task represents a parallel structure this is the total number of clients
# executing sub-tasks concurrently.
total_clients=task.clients)

allocations[physical_client_index].append(ta)
start_client_index += sub_task.clients

# uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them
Expand Down Expand Up @@ -1918,7 +1934,6 @@ def tasks_per_joinpoint(self):
elif isinstance(allocation, JoinPoint) and len(current_tasks) > 0:
tasks.append(current_tasks)
current_tasks = set()

return tasks

@property
Expand All @@ -1941,7 +1956,7 @@ def clients(self):

# Runs a concrete schedule on one worker client
# Needs to determine the runners and concrete iterations per client.
def schedule_for(task, client_index, parameter_source):
def schedule_for(task_allocation, parameter_source):
"""
Calculates a client's schedule for a given task.
Expand All @@ -1951,15 +1966,17 @@ def schedule_for(task, client_index, parameter_source):
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
task = task_allocation.task
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task)

client_index = task_allocation.client_index_in_task
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)
params_for_op = parameter_source.partition(client_index, task.clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
Expand Down Expand Up @@ -1992,7 +2009,7 @@ def schedule_for(task, client_index, parameter_source):
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)
return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, task_runner, params):
Expand All @@ -2009,7 +2026,7 @@ def requires_time_period_schedule(task, task_runner, params):


class ScheduleHandle:
def __init__(self, task_name, sched, task_progress_control, runner, params):
def __init__(self, task_allocation, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.
Expand All @@ -2020,7 +2037,7 @@ def __init__(self, task_name, sched, task_progress_control, runner, params):
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
self.task_name = task_name
self.task_allocation = task_allocation
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
Expand All @@ -2030,6 +2047,20 @@ def __init__(self, task_name, sched, task_progress_control, runner, params):
#import asyncio
#self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
#self.loop = asyncio.get_event_loop()

@property
def ramp_up_wait_time(self):
"""
:return: the number of seconds to wait until this client should start so load can gradually ramp-up.
"""
ramp_up_time_period = self.task_allocation.task.ramp_up_time_period
if ramp_up_time_period:
return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients)
else:
return 0

def start(self):
self.task_progress_control.start()

def before_request(self, now):
self.sched.before_request(now)
Expand All @@ -2041,7 +2072,6 @@ async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand All @@ -2054,7 +2084,6 @@ async def __call__(self):
except StopIteration:
return
else:
self.task_progress_control.start()
while not self.task_progress_control.completed:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand Down
32 changes: 30 additions & 2 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1716,14 +1716,26 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False)
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False)
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False)

# now descent to each operation
tasks = []
for task in self._r(ops_spec, "tasks", error_ctx="parallel"):
tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations,
default_warmup_time_period, default_time_period, completed_by))
default_warmup_time_period, default_time_period, default_ramp_up_time_period,
completed_by))

for task in tasks:
if task.ramp_up_time_period != default_ramp_up_time_period:
if default_ramp_up_time_period is None:
self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies "
f"a ramp-up-time-period but it is only allowed on the 'parallel' element.")
else:
self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing "
f"'parallel' element in test-procedure '{test_procedure_name}'.")

if completed_by:
completion_task = None
for task in tasks:
Expand All @@ -1739,7 +1751,7 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
return workload.Parallel(tasks, clients)

def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None,
default_warmup_time_period=None, default_time_period=None, completed_by_name=None):
default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None, completed_by_name=None):

op_spec = task_spec["operation"]
if isinstance(op_spec, str) and op_spec in ops:
Expand All @@ -1762,6 +1774,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
default_value=default_warmup_time_period),
time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False,
default_value=default_time_period),
ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name,
mandatory=False, default_value=default_ramp_up_time_period),
clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1),
completes_parent=(task_name == completed_by_name),
schedule=schedule,
Expand All @@ -1775,6 +1789,20 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
self._error(
"Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))

if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and "
f"{task.iterations} iterations but mixing time periods and iterations is not allowed.")

if task.ramp_up_time_period is not None:
if task.warmup_time_period is None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds but no warmup-time-period.")
elif task.warmup_time_period < task.ramp_up_time_period:
self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is "
f"{task.warmup_time_period} seconds but must be greater than or equal to the "
f"ramp-up-time-period of {task.ramp_up_time_period} seconds.")

return task

Expand Down
9 changes: 5 additions & 4 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ class Task:
IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]

def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None,
warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
warmup_time_period=None, time_period=None, ramp_up_time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
self.name = name
self.operation = operation
if isinstance(tags, str):
Expand All @@ -908,6 +908,7 @@ def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations
self.iterations = iterations
self.warmup_time_period = warmup_time_period
self.time_period = time_period
self.ramp_up_time_period = ramp_up_time_period
self.clients = clients
self.completes_parent = completes_parent
self.schedule = schedule
Expand Down Expand Up @@ -988,15 +989,15 @@ def error_behavior(self, default_error_behavior):
def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \
hash(self.completes_parent)

def __eq__(self, other):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations,
self.warmup_time_period, self.time_period, self.clients, self.schedule,
self.warmup_time_period, self.time_period, self.ramp_up_time_period, self.clients, self.schedule,
self.completes_parent) == (other.name, other.operation, other.warmup_iterations,
other.iterations, other.warmup_time_period, other.time_period,
other.iterations, other.warmup_time_period, other.time_period, other.ramp_up_time_period,
other.clients, other.schedule, other.completes_parent)

def __iter__(self):
Expand Down
Loading

0 comments on commit fd2bc3b

Please sign in to comment.