diff --git a/osbenchmark/client.py b/osbenchmark/client.py index cc3fbc152..19abfe35f 100644 --- a/osbenchmark/client.py +++ b/osbenchmark/client.py @@ -59,11 +59,11 @@ def request_end(self): @property def client_request_start(self): - return self.ctx["server_request_start"] + return self.ctx["client_request_start"] @property def client_request_end(self): - return self.ctx["server_request_end"] + return self.ctx["client_request_end"] async def __aexit__(self, exc_type, exc_val, exc_tb): # propagate earliest request start and most recent request end to parent @@ -76,8 +76,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): if self.token.old_value != contextvars.Token.MISSING: self.ctx_holder.update_request_start(request_start) self.ctx_holder.update_request_end(request_end) - self.ctx_holder.update_server_request_start(client_request_start) - self.ctx_holder.update_server_request_end(client_request_end) + self.ctx_holder.update_client_request_start(client_request_start) + self.ctx_holder.update_client_request_end(client_request_end) self.token = None return False @@ -109,14 +109,15 @@ def update_request_start(cls, new_request_start): meta["request_start"] = new_request_start @classmethod - def update_client_request_start(cls, new_server_request_start): + def update_client_request_start(cls, new_client_request_start): meta = cls.request_context.get() - meta["server_request_start"] = new_server_request_start + if "request_start" not in meta: + meta["client_request_start"] = new_client_request_start @classmethod - def update_client_request_end(cls, new_server_request_end): + def update_client_request_end(cls, new_client_request_end): meta = cls.request_context.get() - meta["server_request_end"] = new_server_request_end + meta["client_request_end"] = new_client_request_end @classmethod def update_request_end(cls, new_request_end): diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index 31391f3c4..ea471dd2a 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -276,6 +276,9 @@ def processing_time(self, absolute_time, relative_time, value): def latency(self, absolute_time, relative_time, value): return self.request_metric(absolute_time, relative_time, "latency", value) + + def client_processing_time(self, absolute_time, relative_time, value): + return self.request_metric(absolute_time, relative_time, "client_processing_time", value) def request_metric(self, absolute_time, relative_time, name, value): return mock.call(name=name, @@ -300,19 +303,19 @@ def test_all_samples(self, metrics_store): samples = [ worker_coordinator.Sample( 0, 38598, 24, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2), + None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2), worker_coordinator.Sample( 0, 38599, 25, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2), + None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2), ] post_process(samples) calls = [ - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), - self.latency(38599, 25, 10.0), self.service_time(38599, 25, 7.0), self.processing_time(38599, 25, 9.0), - self.throughput(38598, 24, 5000), - self.throughput(38599, 25, 5000), + self.latency(38598, 24, 10.0), self.client_processing_time(38598, 24, 0.7), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), + self.latency(38599, 25, 10.0), self.client_processing_time(38599, 25, 0.7), self.service_time(38599, 25, 7.0), self.processing_time(38599, 25, 9.0), + self.throughput(38598, 24, 5000.0), + self.throughput(38599, 25, 5000.0), ] metrics_store.put_value_cluster_level.assert_has_calls(calls) @@ -328,19 +331,20 @@ def test_downsamples(self, metrics_store): samples = [ worker_coordinator.Sample( 0, 38598, 24, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2), + None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2), worker_coordinator.Sample( 0, 38599, 25, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2), + None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 2, 2 / 2), ] post_process(samples) calls = [ # only the first out of two request samples is included, throughput metrics are still complete - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), - self.throughput(38598, 24, 5000), - self.throughput(38599, 25, 5000), + self.latency(38598, 24, 10.0), self.client_processing_time(38598, 24, 0.7), + self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), + self.throughput(38598, 24, 5000.0), + self.throughput(38599, 25, 5000.0), ] metrics_store.put_value_cluster_level.assert_has_calls(calls) @@ -355,7 +359,7 @@ def test_dependent_samples(self, metrics_store): samples = [ worker_coordinator.Sample( 0, 38598, 24, 0, task, metrics.SampleType.Normal, - None, 0.01, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2, + None, 0.01, 0.0007, 0.007, 0.009, None, 5000, "docs", 1, 1 / 2, dependent_timing=[ { "absolute_time": 38601, @@ -377,11 +381,12 @@ def test_dependent_samples(self, metrics_store): post_process(samples) calls = [ - self.latency(38598, 24, 10.0), self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), + self.latency(38598, 24, 10.0), self.client_processing_time(38598, 24, 0.7), + self.service_time(38598, 24, 7.0), self.processing_time(38598, 24, 9.0), # dependent timings self.service_time(38601, 25, 50.0), self.service_time(38602, 26, 80.0), - self.throughput(38598, 24, 5000), + self.throughput(38598, 24, 5000.0), ] metrics_store.put_value_cluster_level.assert_has_calls(calls) @@ -725,8 +730,8 @@ def test_different_sample_types(self): op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, None, 3000, "docs", 1, 1), - worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 2500, "docs", 1, 1), + worker_coordinator.Sample(0, 1470838595, 21, 0, op, metrics.SampleType.Warmup, None, -1, -1, -1, -1, None, 3000, "docs", 1, 1), + worker_coordinator.Sample(0, 1470838595.5, 21.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 2500, "docs", 1, 1), ] aggregated = self.calculate_global_throughput(samples) @@ -743,15 +748,15 @@ def test_single_metrics_aggregation(self): op = workload.Operation("index", workload.OperationType.Bulk, param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 1, 1 / 9), - worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 2, 2 / 9), - worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 3, 3 / 9), - worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 4, 4 / 9), - worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 5, 5 / 9), - worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 6, 6 / 9), - worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9), - worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9), - worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9) + worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 1, 1 / 9), + worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 2, 2 / 9), + worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 3, 3 / 9), + worker_coordinator.Sample(0, 38598, 24, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4, 4 / 9), + worker_coordinator.Sample(0, 38599, 25, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5, 5 / 9), + worker_coordinator.Sample(0, 38600, 26, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6, 6 / 9), + worker_coordinator.Sample(1, 38598.5, 24.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 4.5, 7 / 9), + worker_coordinator.Sample(1, 38599.5, 25.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 5.5, 8 / 9), + worker_coordinator.Sample(1, 38600.5, 26.5, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, None, 5000, "docs", 6.5, 9 / 9) ] aggregated = self.calculate_global_throughput(samples) @@ -774,9 +779,9 @@ def test_use_provided_throughput(self): param_source="worker-coordinator-test-param-source") samples = [ - worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 1, 1 / 3), - worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 2, 2 / 3), - worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, 8000, 5000, "byte", 3, 3 / 3), + worker_coordinator.Sample(0, 38595, 21, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 1, 1 / 3), + worker_coordinator.Sample(0, 38596, 22, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 2, 2 / 3), + worker_coordinator.Sample(0, 38597, 23, 0, op, metrics.SampleType.Normal, None, -1, -1, -1, -1, 8000, 5000, "byte", 3, 3 / 3), ] aggregated = self.calculate_global_throughput(samples) @@ -1102,6 +1107,14 @@ async def __aenter__(self): @property def request_start(self): return self.current_request_start + + @property + def client_request_start(self): + return self.current_request_start - 0.0025 + + @property + def client_request_end(self): + return self.current_request_start + 0.0525 @property def request_end(self): @@ -1334,8 +1347,10 @@ def perform_request(*args, **kwargs): return as_future() opensearch.init_request_context.return_value = { - "request_start": 0, - "request_end": 10 + "client_request_start": 0, + "request_start": 1, + "request_end": 10, + "client_request_end": 11, } # as this method is called several times we need to return a fresh instance every time as the previous # one has been "consumed".