From 78c734c3667b41e3042232b6b12cfbb758fdb4aa Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 6 Nov 2023 21:16:21 +0100 Subject: [PATCH] Add argument to enable/disable endpoint error handling in benchmarks (#1007) Some transports (e.g., `sm`) do not support endpoint error handling. To be able to benchmark them, add a new argument to benchmarks to enable or disable endpoint error handling. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/ucx-py/pull/1007 --- ucp/benchmarks/backends/ucp_async.py | 12 ++++++++++-- ucp/benchmarks/backends/ucp_core.py | 4 ++-- ucp/benchmarks/send_recv.py | 6 ++++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/ucp/benchmarks/backends/ucp_async.py b/ucp/benchmarks/backends/ucp_async.py index e28d8f17d..6205458f8 100644 --- a/ucp/benchmarks/backends/ucp_async.py +++ b/ucp/benchmarks/backends/ucp_async.py @@ -81,7 +81,11 @@ async def server_handler(ep): await ep.close() lf.close() - lf = ucp.create_listener(server_handler, port=self.args.port) + lf = ucp.create_listener( + server_handler, + port=self.args.port, + endpoint_error_handling=self.args.error_handling, + ) self.queue.put(lf.port) while not lf.closed(): @@ -114,7 +118,11 @@ async def run(self): register_am_allocators(self.args) - ep = await ucp.create_endpoint(self.server_address, self.port) + ep = await ucp.create_endpoint( + self.server_address, + self.port, + endpoint_error_handling=self.args.error_handling, + ) if self.args.enable_am: msg = xp.arange(self.args.n_bytes, dtype="u1") diff --git a/ucp/benchmarks/backends/ucp_core.py b/ucp/benchmarks/backends/ucp_core.py index 30f0393b4..1360602b5 100644 --- a/ucp/benchmarks/backends/ucp_core.py +++ b/ucp/benchmarks/backends/ucp_core.py @@ -125,7 +125,7 @@ def _listener_handler(conn_request, msg): self.ep = ucx_api.UCXEndpoint.create_from_conn_request( worker, conn_request, - endpoint_error_handling=True, + endpoint_error_handling=self.args.error_handling, ) # Wireup before starting to transfer data @@ -229,7 +229,7 @@ def run(self): worker, self.server_address, self.port, - endpoint_error_handling=True, + endpoint_error_handling=self.args.error_handling, ) send_msg = xp.arange(self.args.n_bytes, dtype="u1") diff --git a/ucp/benchmarks/send_recv.py b/ucp/benchmarks/send_recv.py index 7a65c3436..3dc167ded 100644 --- a/ucp/benchmarks/send_recv.py +++ b/ucp/benchmarks/send_recv.py @@ -324,6 +324,12 @@ def parse_args(): help="Only applies to 'ucp-core' backend: number of maximum outstanding " "operations, see --delay-progress. (Default: 32)", ) + parser.add_argument( + "--error-handling", + action=argparse.BooleanOptionalAction, + default=True, + help="Enable endpoint error handling.", + ) args = parser.parse_args()