Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom iteration values #696

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,31 @@ def __init__(self, cfg, test_executions_dict, args):
self.args = args
self.test_executions = test_executions_dict
self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {}
self.accumulated_iterations: Dict[str, int] = {}
self.accumulated_iterations: Dict[str, Dict[str, int]] = {}
self.metrics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]
self.test_store = metrics.test_execution_store(self.config)
self.cwd = cfg.opts("node", "benchmark.cwd")

def count_iterations_for_each_op(self) -> None:
def count_iterations_for_each_op(self, test_execution) -> None:
loaded_workload = workload.load_workload(self.config)
Copy link
Collaborator

@IanHoang IanHoang Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing if we can improve this method:

  • Saw that we are loading the workload each time we pass in a test execution. Could we load the workload once and reuse it?
  • Saw that this method has some steps that checks if the test procedure is valid. Might be better to move that to the designated method test_execution_compatibility_check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your first point: Good call, this should be notably faster for larger aggregations now that I just load the workload once.

Also removed that extra logic👍

test_procedure_name = self.config.opts("workload", "test_procedure.name")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we initialize this as an instance variable so that it can be used throughout the aggregator class? Seeing areas where the aggregator has to fetch the test procedure name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, and for clarity I renamed the test_procedure in the build_aggregated_results function since it's not just the string name of the test procedure, but a test_procedure object with other attributes that are needed there

test_procedure_found = False
workload_params = {} if getattr(test_execution, 'workload_params') is None else test_execution.workload_params
OVI3D0 marked this conversation as resolved.
Show resolved Hide resolved

test_execution_id = test_execution.test_execution_id
self.accumulated_iterations[test_execution_id] = {}

for test_procedure in loaded_workload.test_procedures:
if test_procedure.name == test_procedure_name:
test_procedure_found = True
Copy link
Collaborator

@IanHoang IanHoang Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: Since loaded_workload.test_procedures is a list, we can reduce these three lines to:

if test_procedure_name in loaded_workload.test_procedures:
# Count iterations in each operation
else:
# raise value error

That way we can also remove the need for test_procedure_found

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I don't think so because loaded_workload.test_procedures returns a list of test_procedure objects. So I'd have to match test_procedure_name to the name attribute in each of these objects

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I could use python's next() to search for the first object with a name attribute that matches the test_procedure_name. Something like:

matching_test_procedure = next((tp for tp in loaded_workload.test_procedures if tp.name == self.test_procedure_name), None)
if matching_test_procedure:
# count iterations
else:
# raise value error

for task in test_procedure.schedule:
task_name = task.name
iterations = task.iterations or 1
self.accumulated_iterations[task_name] = self.accumulated_iterations.get(task_name, 0) + iterations
custom_key = f"{task_name}_iterations"
OVI3D0 marked this conversation as resolved.
Show resolved Hide resolved
if custom_key in workload_params:
iterations = int(workload_params[custom_key])
else:
iterations = task.iterations or 1
self.accumulated_iterations[test_execution_id][task_name] = iterations
else:
continue # skip to the next test procedure if the name doesn't match

Expand Down Expand Up @@ -119,8 +127,7 @@ def build_aggregated_results(self):
}

for task, task_metrics in self.accumulated_results.items():
iterations = self.accumulated_iterations.get(task, 1)
aggregated_task_metrics = self.calculate_weighted_average(task_metrics, iterations)
aggregated_task_metrics = self.calculate_weighted_average(task_metrics, task)
op_metric = {
"task": task,
"operation": task,
Expand Down Expand Up @@ -191,10 +198,13 @@ def build_aggregated_results(self):

return test_execution

def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]:
def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_name: str) -> Dict[str, Any]:
weighted_metrics = {}
num_executions = len(next(iter(task_metrics.values())))
total_iterations = iterations * num_executions

# Get iterations for each test execution
iterations_per_execution = [self.accumulated_iterations[test_id][task_name]
for test_id in self.test_executions.keys()]
total_iterations = sum(iterations_per_execution)

for metric, values in task_metrics.items():
if isinstance(values[0], dict):
Expand All @@ -209,10 +219,10 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterati
else:
# for items like median or containing percentile values
item_values = [value.get(metric_field, 0) for value in values]
weighted_sum = sum(value * iterations for value in item_values)
weighted_sum = sum(value * iterations for value, iterations in zip(item_values, iterations_per_execution))
weighted_metrics[metric][metric_field] = weighted_sum / total_iterations
else:
weighted_sum = sum(value * iterations for value in values)
weighted_sum = sum(value * iterations for value, iterations in zip(values, iterations_per_execution))
weighted_metrics[metric] = weighted_sum / total_iterations

return weighted_metrics
Expand Down Expand Up @@ -256,7 +266,7 @@ def aggregate(self) -> None:
if test_execution:
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository)
self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload)
self.count_iterations_for_each_op()
self.count_iterations_for_each_op(test_execution)
self.accumulate_results(test_execution)

aggregated_results = self.build_aggregated_results()
Expand Down
47 changes: 26 additions & 21 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import Mock, patch, mock_open
from unittest.mock import Mock, patch
import pytest
from osbenchmark import config
from osbenchmark.aggregator import Aggregator, AggregatedResults
Expand Down Expand Up @@ -50,15 +50,17 @@ def test_count_iterations_for_each_op(aggregator):
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

# Update the config mock to return the correct test_procedure_name
mock_test_execution = Mock(test_execution_id="test1", workload_params={})

# update the config mock to return the correct test_procedure_name
aggregator.config.opts.side_effect = lambda *args: \
mock_test_procedure.name if args == ("workload", "test_procedure.name") else "/path/to/root"
with patch('osbenchmark.workload.load_workload', return_value=mock_workload):
aggregator.count_iterations_for_each_op()
aggregator.count_iterations_for_each_op(mock_test_execution)

print(f"accumulated_iterations: {aggregator.accumulated_iterations}") # Debug print
assert "op1" in aggregator.accumulated_iterations, "op1 not found in accumulated_iterations"
assert aggregator.accumulated_iterations["op1"] == 5
assert "test1" in aggregator.accumulated_iterations, "test1 not found in accumulated_iterations"
assert "op1" in aggregator.accumulated_iterations["test1"], "op1 not found in accumulated_iterations for test1"
assert aggregator.accumulated_iterations["test1"]["op1"] == 5

def test_accumulate_results(aggregator):
mock_test_execution = Mock()
Expand Down Expand Up @@ -103,12 +105,19 @@ def test_calculate_weighted_average(aggregator):
"throughput": [100, 200],
"latency": [{"avg": 10, "unit": "ms"}, {"avg": 20, "unit": "ms"}]
}
iterations = 2
task_name = "op1"

# set up accumulated_iterations
aggregator.accumulated_iterations = {
"test1": {"op1": 2},
"test2": {"op1": 3}
}
aggregator.test_executions = {"test1": Mock(), "test2": Mock()}

result = aggregator.calculate_weighted_average(task_metrics, iterations)
result = aggregator.calculate_weighted_average(task_metrics, task_name)

assert result["throughput"] == 150
assert result["latency"]["avg"] == 15
assert result["throughput"] == 160 # (100*2 + 200*3) / (2+3)
assert result["latency"]["avg"] == 16 # (10*2 + 20*3) / (2+3)
assert result["latency"]["unit"] == "ms"

def test_calculate_rsd(aggregator):
Expand All @@ -134,26 +143,22 @@ def test_aggregate(aggregator):
patch.object(aggregator, 'count_iterations_for_each_op'), \
patch.object(aggregator, 'accumulate_results'), \
patch.object(aggregator, 'build_aggregated_results', return_value=mock_aggregated_results) as mock_build, \
patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class, \
patch('osbenchmark.utils.io.ensure_dir') as mock_ensure_dir, \
patch('builtins.open', mock_open()) as mock_file:
patch('osbenchmark.aggregator.FileTestExecutionStore') as mock_store_class:

mock_store = mock_store_class.return_value
mock_store.store_aggregated_execution.side_effect = lambda x: print(f"Storing aggregated execution: {x}")

aggregator.aggregate()
# mock test_store to return a Mock object for each test execution
aggregator.test_store.find_by_test_execution_id.side_effect = [
Mock(test_execution_id="test1", workload_params={}),
Mock(test_execution_id="test2", workload_params={})
]

print(f"mock_build called: {mock_build.called}")
print(f"mock_store.store_aggregated_execution called: {mock_store.store_aggregated_execution.called}")
aggregator.aggregate()

assert mock_build.called, "build_aggregated_results was not called"
mock_store.store_aggregated_execution.assert_called_once_with(mock_aggregated_results)

print(f"ensure_dir called: {mock_ensure_dir.called}")
print(f"ensure_dir call args: {mock_ensure_dir.call_args_list}")
print(f"open called: {mock_file.called}")
print(f"open call args: {mock_file.call_args_list}")

assert mock_store.store_aggregated_execution.called, "store_aggregated_execution was not called"

def test_aggregated_results():
Expand Down
Loading