Skip to content

Commit

Permalink
Added client processing time
Browse files Browse the repository at this point in the history
Signed-off-by: saimedhi <[email protected]>
  • Loading branch information
saimedhi committed Dec 27, 2023
1 parent c5b677e commit 17f8eb6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 38 deletions.
17 changes: 9 additions & 8 deletions osbenchmark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ def request_end(self):

@property
def client_request_start(self):
return self.ctx["server_request_start"]
return self.ctx["client_request_start"]

@property
def client_request_end(self):
return self.ctx["server_request_end"]
return self.ctx["client_request_end"]

async def __aexit__(self, exc_type, exc_val, exc_tb):
# propagate earliest request start and most recent request end to parent
Expand All @@ -76,8 +76,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.token.old_value != contextvars.Token.MISSING:
self.ctx_holder.update_request_start(request_start)
self.ctx_holder.update_request_end(request_end)
self.ctx_holder.update_server_request_start(client_request_start)
self.ctx_holder.update_server_request_end(client_request_end)
self.ctx_holder.update_client_request_start(client_request_start)
self.ctx_holder.update_client_request_end(client_request_end)
self.token = None
return False

Expand Down Expand Up @@ -109,14 +109,15 @@ def update_request_start(cls, new_request_start):
meta["request_start"] = new_request_start

@classmethod
def update_client_request_start(cls, new_server_request_start):
def update_client_request_start(cls, new_client_request_start):
meta = cls.request_context.get()
meta["server_request_start"] = new_server_request_start
if "request_start" not in meta:
meta["client_request_start"] = new_client_request_start

@classmethod
def update_client_request_end(cls, new_server_request_end):
def update_client_request_end(cls, new_client_request_end):
meta = cls.request_context.get()
meta["server_request_end"] = new_server_request_end
meta["client_request_end"] = new_client_request_end

@classmethod
def update_request_end(cls, new_request_end):
Expand Down
75 changes: 45 additions & 30 deletions tests/worker_coordinator/worker_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ def processing_time(self, absolute_time, relative_time, value):

def latency(self, absolute_time, relative_time, value):
return self.request_metric(absolute_time, relative_time, "latency", value)

def client_processing_time(self, absolute_time, relative_time, value):
return self.request_metric(absolute_time, relative_time, "client_processing_time", value)

def request_metric(self, absolute_time, relative_time, name, value):
return mock.call(name=name,
Expand All @@ -300,19 +303,19 @@ def test_all_samples(self, metrics_store):
samples = [
worker_coordinator.Sample(
0, 38598, 24, 0, task, metrics.SampleType.Normal,
None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2),
None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2),
worker_coordinator.Sample(
0, 38599, 25, 0, task, metrics.SampleType.Normal,
None, 0.01, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2),
None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2),
]

post_process(samples)

calls = [
self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0),
self.latency(38599, 25, 10.0), self.service_time(38599, 25, 7.0), self.processing_time(38599, 25, 9.0),
self.throughput(38598, 24, 5000),
self.throughput(38599, 25, 5000),
self.latency(38598, 24, 10.0), self.client_processing_time(38598, 24, 0.7), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0),
self.latency(38599, 25, 10.0), self.client_processing_time(38599, 25, 0.7), self.service_time(38599, 25, 7.0), self.processing_time(38599, 25, 9.0),
self.throughput(38598, 24, 5000.0),
self.throughput(38599, 25, 5000.0),
]
metrics_store.put_value_cluster_level.assert_has_calls(calls)

Expand All @@ -328,19 +331,20 @@ def test_downsamples(self, metrics_store):
samples = [
worker_coordinator.Sample(
0, 38598, 24, 0, task, metrics.SampleType.Normal,
None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2),
None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2),
worker_coordinator.Sample(
0, 38599, 25, 0, task, metrics.SampleType.Normal,
None, 0.01, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2),
None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2),
]

post_process(samples)

calls = [
# only the first out of two request samples is included, throughput metrics are still complete
self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0),
self.throughput(38598, 24, 5000),
self.throughput(38599, 25, 5000),
self.latency(38598, 24, 10.0), self.client_processing_time(38598, 24, 0.7),
self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0),
self.throughput(38598, 24, 5000.0),
self.throughput(38599, 25, 5000.0),
]
metrics_store.put_value_cluster_level.assert_has_calls(calls)

Expand All @@ -355,7 +359,7 @@ def test_dependent_samples(self, metrics_store):
samples = [
worker_coordinator.Sample(
0, 38598, 24, 0, task, metrics.SampleType.Normal,
None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2,
None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2,
dependent_timing=[
{
"absolute_time": 38601,
Expand All @@ -377,11 +381,12 @@ def test_dependent_samples(self, metrics_store):
post_process(samples)

calls = [
self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0),
self.latency(38598, 24, 10.0), self.client_processing_time(38598, 24, 0.7),
self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0),
# dependent timings
self.service_time(38601, 25, 50.0),
self.service_time(38602, 26, 80.0),
self.throughput(38598, 24, 5000),
self.throughput(38598, 24, 5000.0),
]
metrics_store.put_value_cluster_level.assert_has_calls(calls)

Expand Down Expand Up @@ -725,8 +730,8 @@ def test_different_sample_types(self):
op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source")

samples = [
worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, None, 3000, "docs", 1, 1),
worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 2500, "docs", 1, 1),
worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, -1, None, 3000, "docs", 1, 1),
worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 2500, "docs", 1, 1),
]

aggregated = self.calculate_global_throughput(samples)
Expand All @@ -743,15 +748,15 @@ def test_single_metrics_aggregation(self):
op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source")

samples = [
worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 1, 1 / 9),
worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 2, 2 / 9),
worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 3, 3 / 9),
worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 4, 4 / 9),
worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 5, 5 / 9),
worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 6, 6 / 9),
worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9),
worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9),
worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9)
worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 1, 1 / 9),
worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 2, 2 / 9),
worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 3, 3 / 9),
worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4, 4 / 9),
worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5, 5 / 9),
worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6, 6 / 9),
worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9),
worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9),
worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9)
]

aggregated = self.calculate_global_throughput(samples)
Expand All @@ -774,9 +779,9 @@ def test_use_provided_throughput(self):
param_source="worker-coordinator-test-param-source")

samples = [
worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 1, 1 / 3),
worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 2, 2 / 3),
worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 3, 3 / 3),
worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 1, 1 / 3),
worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 2, 2 / 3),
worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 3, 3 / 3),
]

aggregated = self.calculate_global_throughput(samples)
Expand Down Expand Up @@ -1102,6 +1107,14 @@ async def __aenter__(self):
@property
def request_start(self):
return self.current_request_start

@property
def client_request_start(self):
return self.current_request_start - 0.0025

@property
def client_request_end(self):
return self.current_request_start + 0.0525

@property
def request_end(self):
Expand Down Expand Up @@ -1334,8 +1347,10 @@ def perform_request(*args, **kwargs):
return as_future()

opensearch.init_request_context.return_value = {
"request_start": 0,
"request_end": 10
"client_request_start": 0,
"request_start": 1,
"request_end": 10,
"client_request_end": 11,
}
# as this method is called several times we need to return a fresh instance every time as the previous
# one has been "consumed".
Expand Down

0 comments on commit 17f8eb6

Please sign in to comment.