From a22baf7369d3355057438ae97130e6d4802dd724 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 11 Oct 2023 06:46:25 -0700 Subject: [PATCH] Add a timeout to `exchange_peer_info` After some debugging of Distributed tests with UCX it was observed that sometimes `exchange_peer_info` hangs indefinitely, specifically when executing `stream_recv` on the client side. The causes for this is unknown but believed to be due to messages being lost if there's either multiple stream messages being transferred simultaneously among various endpoints or being lost due to the receiving end taking too long to launch `stream_recv`, see https://github.com/rapidsai/ucx-py/pull/509 for a similar issue related to stream API. By adding a timeout doesn't allow recovery, but at least allows a UCX-Py client to retry upon failure to establish the endpoint. --- ucp/core.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/ucp/core.py b/ucp/core.py index 6f5ddf3c0..8da1ef4e6 100644 --- a/ucp/core.py +++ b/ucp/core.py @@ -35,7 +35,7 @@ def _get_ctx(): return _ctx -async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): +async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener, stream_timeout=5.0): """Help function that exchange endpoint information""" # Pack peer information incl. a checksum @@ -48,11 +48,23 @@ async def exchange_peer_info(endpoint, msg_tag, ctrl_tag, listener): # Send/recv peer information. Notice, we force an `await` between the two # streaming calls (see ) if listener is True: - await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) - await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) + await asyncio.wait_for( + comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes), + timeout=stream_timeout, + ) + await asyncio.wait_for( + comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes), + timeout=stream_timeout, + ) else: - await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes) - await comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes) + await asyncio.wait_for( + comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes), + timeout=stream_timeout, + ) + await asyncio.wait_for( + comm.stream_send(endpoint, my_info_arr, my_info_arr.nbytes), + timeout=stream_timeout, + ) # Unpacking and sanity check of the peer information ret = {}