Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add client ramp up feature. #716

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "Python 3",
Comment on lines +1 to +4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For users unfamiliar with dev cointainers, might be good to add some comments at the top for what scenarios and how this might be used?

// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:1-3.11-bullseye"

// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "pip3 install --user -r requirements.txt",

// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ indent-after-paren=4
indent-string=' '

# Maximum number of characters on a single line.
max-line-length=140
max-line-length=180

# Maximum number of lines in a module.
max-module-lines=1000
Expand Down
13 changes: 13 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
opensearch-benchmark = {file = ".", editable = true}

[dev-packages]

[requires]
python_version = "3.11"
python_full_version = "3.11.11"
1,392 changes: 1,392 additions & 0 deletions Pipfile.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions osbenchmark/resources/workload-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"type": "integer",
"minimum": 1
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
Comment on lines +31 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the OSB workloads also need to be updated to accommodate this change?

"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -75,6 +80,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -146,6 +156,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down
453 changes: 303 additions & 150 deletions osbenchmark/worker_coordinator/worker_coordinator.py

Large diffs are not rendered by default.

650 changes: 422 additions & 228 deletions osbenchmark/workload/loader.py

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ class Task:
IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]

def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None,
warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
warmup_time_period=None, time_period=None, ramp_up_time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
self.name = name
self.operation = operation
if isinstance(tags, str):
Expand All @@ -908,6 +908,7 @@ def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations
self.iterations = iterations
self.warmup_time_period = warmup_time_period
self.time_period = time_period
self.ramp_up_time_period = ramp_up_time_period
self.clients = clients
self.completes_parent = completes_parent
self.schedule = schedule
Expand Down Expand Up @@ -988,15 +989,15 @@ def error_behavior(self, default_error_behavior):
def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \
hash(self.completes_parent)

def __eq__(self, other):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations,
self.warmup_time_period, self.time_period, self.clients, self.schedule,
self.warmup_time_period, self.time_period, self.ramp_up_time_period, self.clients, self.schedule,
self.completes_parent) == (other.name, other.operation, other.warmup_iterations,
other.iterations, other.warmup_time_period, other.time_period,
other.iterations, other.warmup_time_period, other.time_period, other.ramp_up_time_period,
other.clients, other.schedule, other.completes_parent)

def __iter__(self):
Expand Down
16 changes: 14 additions & 2 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3435,9 +3435,15 @@ def params(self):

runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True)
param_source = workload.operation_parameters(test_workload, task)
task_allocation = worker_coordinator.TaskAllocation(
task=task,
client_index_in_task=0,
global_client_index=0,
total_clients=task.clients
)
# pylint: disable=C0415
import threading
schedule = worker_coordinator.schedule_for(task, 0, param_source)
schedule = worker_coordinator.schedule_for(task_allocation, param_source)
# pylint: disable=C0415
def create_config():
cfg = config.Config()
Expand Down Expand Up @@ -3570,9 +3576,15 @@ def params(self):

runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True)
param_source = workload.operation_parameters(test_workload, task)
task_allocation = worker_coordinator.TaskAllocation(
task=task,
client_index_in_task=0,
global_client_index=0,
total_clients=task.clients
)
# pylint: disable=C0415
import threading
schedule = worker_coordinator.schedule_for(task, 0, param_source)
schedule = worker_coordinator.schedule_for(task_allocation, param_source)
def create_config():
cfg = config.Config()
cfg.add(config.Scope.application, "system", "available.cores", 8)
Expand Down
Loading
Loading