diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 8029a14fd..6d7c27475 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -1649,6 +1649,9 @@ async def __call__(self, *args, **kwargs): self.complete.set() +request_context_holder = client.RequestContextHolder() + + async def execute_single(runner, opensearch, params, on_error): """ Invokes the given runner once and provides the runner's return value in a uniform structure. @@ -1675,6 +1678,7 @@ async def execute_single(runner, opensearch, params, on_error): total_ops_unit = "ops" request_meta_data = {"success": True} except opensearchpy.TransportError as e: + request_context_holder.on_client_request_end() # we *specifically* want to distinguish connection refused (a node died?) from connection timeouts # pylint: disable=unidiomatic-typecheck if type(e) is opensearchpy.ConnectionError: @@ -1701,6 +1705,7 @@ async def execute_single(runner, opensearch, params, on_error): error_description = str(e.error) request_meta_data["error-description"] = error_description except KeyError as e: + request_context_holder.on_client_request_end() logging.getLogger(__name__).exception("Cannot execute runner [%s]; most likely due to missing parameters.", str(runner)) msg = "Cannot execute [%s]. Provided parameters are: %s. Error: [%s]." % (str(runner), list(params.keys()), str(e)) console.error(msg) diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index ef89e4ee1..240ce17a9 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -1578,8 +1578,9 @@ async def test_execute_single_dict(self): "success": True }, request_meta_data) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @run_async - async def test_execute_single_with_connection_error_always_aborts(self): + async def test_execute_single_with_connection_error_always_aborts(self, on_client_request_end): for on_error in ["abort", "continue"]: with self.subTest(): opensearch = None @@ -1593,8 +1594,9 @@ async def test_execute_single_with_connection_error_always_aborts(self): "Request returned an error. Error type: transport, Description: no route to host", ctx.exception.args[0]) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @run_async - async def test_execute_single_with_http_400_aborts_when_specified(self): + async def test_execute_single_with_http_400_aborts_when_specified(self, on_client_request_end): opensearch = None params = None runner = mock.Mock(side_effect= @@ -1606,9 +1608,9 @@ async def test_execute_single_with_http_400_aborts_when_specified(self): "Request returned an error. Error type: transport, Description: not found (the requested document could not be found)", ctx.exception.args[0]) - + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @run_async - async def test_execute_single_with_http_400(self): + async def test_execute_single_with_http_400(self, on_client_request_end): opensearch = None params = None runner = mock.Mock(side_effect= @@ -1626,8 +1628,9 @@ async def test_execute_single_with_http_400(self): "success": False }, request_meta_data) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @run_async - async def test_execute_single_with_http_413(self): + async def test_execute_single_with_http_413(self, on_client_request_end): opensearch = None params = None runner = mock.Mock(side_effect= @@ -1645,8 +1648,9 @@ async def test_execute_single_with_http_413(self): "success": False }, request_meta_data) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @run_async - async def test_execute_single_with_key_error(self): + async def test_execute_single_with_key_error(self, on_client_request_end): class FailingRunner: async def __call__(self, *args): raise KeyError("bulk-size missing")