Skip to content

Commit

Permalink
Multiple search clients for automatic scaling
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Roblin <[email protected]>
  • Loading branch information
finnroblin committed Aug 7, 2024
1 parent 61ec362 commit 94e555c
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 12 deletions.
54 changes: 46 additions & 8 deletions osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io
import logging
import sys
import re
from enum import Enum

import tabulate

Expand All @@ -43,6 +45,11 @@
------------------------------------------------------
"""

class Throughput(Enum):
MEAN = "mean"
MAX = "max"
MIN = "min"
MEDIAN = "median"

def summarize(results, cfg):
SummaryResultsPublisher(results, cfg).publish()
Expand Down Expand Up @@ -127,6 +134,16 @@ def __init__(self, results, config):
"latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False))
}

def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task):
metrics_table.extend(self._publish_throughput(record, task))
metrics_table.extend(self._publish_latency(record, task))
metrics_table.extend(self._publish_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._publish_processing_time(record, task))
metrics_table.extend(self._publish_error_rate(record, task))
self.add_warnings(warnings, record, task)

def publish(self):
print_header(FINAL_SCORE)

Expand All @@ -145,16 +162,33 @@ def publish(self):

metrics_table.extend(self._publish_transform_stats(stats))

max_throughput = -1
record_with_best_throughput = None

throughput_pattern = r"_(\d+)_clients$"


for record in stats.op_metrics:
task = record["task"]
metrics_table.extend(self._publish_throughput(record, task))
metrics_table.extend(self._publish_latency(record, task))
metrics_table.extend(self._publish_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._publish_processing_time(record, task))
metrics_table.extend(self._publish_error_rate(record, task))
self.add_warnings(warnings, record, task)
maybe_match_task_is_part_of_throughput_testing = re.search(throughput_pattern, task)
if maybe_match_task_is_part_of_throughput_testing:

# assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search).
# To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs.
task_throughput = record["throughput"][Throughput.MEAN.value]
logger = logging.getLogger(__name__)
logger.info("Task %s has throughput %s", task, task_throughput)
if task_throughput > max_throughput:
max_throughput = task_throughput
record_with_best_throughput = record

else:
self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task)

if max_throughput != -1 and record_with_best_throughput is not None:
self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput,
task=record_with_best_throughput["task"])
metrics_table.extend(self._publish_best_client_settings(record_with_best_throughput, record_with_best_throughput["task"]))

for record in stats.correctness_metrics:
task = record["task"]
Expand Down Expand Up @@ -217,6 +251,10 @@ def _publish_recall(self, values, task):
self._line("Mean recall@1", task, recall_1_mean, "", lambda v: "%.2f" % v)
)

def _publish_best_client_settings(self, record, task):
num_clients = re.search(r"_(\d+)_clients$", task).group(1)
return self._join(self._line("Num clients that achieved maximium throughput", "", num_clients, ""))

def _publish_percentiles(self, name, task, value, unit="ms"):
lines = []
percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES)
Expand Down
19 changes: 15 additions & 4 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,11 +1596,22 @@ def _create_test_procedures(self, workload_spec):
schedule = []

for op in self._r(test_procedure_spec, "schedule", error_ctx=name):
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
if "clients_list" in op:
self.logger.info("Clients list specified, running multiple search tasks with %s clients.", op["clients_list"])
for client in op["clients_list"]:
op["clients"] = client

new_name = name + "_" + str(client) + "_clients"
new_task = self.parse_task(op, ops, new_name)
new_task.name = new_name
schedule.append(new_task)
else:
task = self.parse_task(op, ops, name)
schedule.append(task)
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
else:
task = self.parse_task(op, ops, name)

schedule.append(task)

# verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
known_task_names = set()
Expand Down
46 changes: 46 additions & 0 deletions tests/workload/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2477,6 +2477,52 @@ def test_parse_unique_task_names(self):
self.assertEqual("search-two-clients", schedule[1].name)
self.assertEqual("search", schedule[1].operation.name)

def test_parse_clients_list(self):
workload_specification = {
"description": "description for unit test",
"operations": [
{
"name": "search",
"operation-type": "search",
"index": "_all"
}
],
"test_procedure": {
"name": "default-test_procedure",
"schedule": [
{
"name": "search-one-client",
"operation": "search",
"clients": 1,
"clients_list": [1,2,3]
},
{
"name": "search-two-clients",
"operation": "search",
"clients": 2
}
]
}
}

reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure")
resulting_workload = reader("unittest", workload_specification, "/mappings")
self.assertEqual("unittest", resulting_workload.name)
test_procedure = resulting_workload.test_procedures[0]
self.assertTrue(test_procedure.selected)
schedule = test_procedure.schedule
self.assertEqual(4, len(schedule))

self.assertEqual("default-test_procedure_1_clients", schedule[0].name)
self.assertEqual("search", schedule[0].operation.name)
self.assertEqual("default-test_procedure_2_clients", schedule[1].name)
self.assertEqual("search", schedule[1].operation.name)
self.assertEqual("default-test_procedure_3_clients", schedule[2].name)
self.assertEqual("search", schedule[2].operation.name)

self.assertEqual("search-two-clients", schedule[3].name)
self.assertEqual("search", schedule[3].operation.name)

def test_parse_indices_valid_workload_specification(self):
workload_specification = {
"description": "description for unit test",
Expand Down

0 comments on commit 94e555c

Please sign in to comment.