diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index d9cd6dfb..bd677022 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -17,7 +17,7 @@ import dask_cuda from dask_cuda.explicit_comms import comms from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle -from dask_cuda.local_cuda_cluster import IncreasedCloseTimeoutNanny +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") diff --git a/dask_cuda/tests/test_initialize.py b/dask_cuda/tests/test_initialize.py index 60c7a798..05b72f99 100644 --- a/dask_cuda/tests/test_initialize.py +++ b/dask_cuda/tests/test_initialize.py @@ -10,6 +10,7 @@ from dask_cuda.initialize import initialize from dask_cuda.utils import get_ucx_config +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") @@ -29,6 +30,7 @@ def _test_initialize_ucx_tcp(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -64,6 +66,7 @@ def _test_initialize_ucx_nvlink(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -100,6 +103,7 @@ def _test_initialize_ucx_infiniband(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config(**kwargs)}, ) as cluster: with Client(cluster) as client: @@ -138,6 +142,7 @@ def _test_initialize_ucx_all(): n_workers=1, threads_per_worker=1, processes=True, + worker_class=IncreasedCloseTimeoutNanny, config={"distributed.comm.ucx": get_ucx_config()}, ) as cluster: with Client(cluster) as client: diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py index 2e3f8269..191f62fe 100644 --- a/dask_cuda/tests/test_proxify_host_file.py +++ b/dask_cuda/tests/test_proxify_host_file.py @@ -19,6 +19,7 @@ from dask_cuda.proxify_host_file import ProxifyHostFile from dask_cuda.proxy_object import ProxyObject, asproxy, unproxy from dask_cuda.utils import get_device_total_memory +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny cupy = pytest.importorskip("cupy") cupy.cuda.set_allocator(None) @@ -393,7 +394,10 @@ def is_proxy_object(x): with dask.config.set(jit_unspill_compatibility_mode=compatibility_mode): async with dask_cuda.LocalCUDACluster( - n_workers=1, jit_unspill=True, asynchronous=True + n_workers=1, + jit_unspill=True, + worker_class=IncreasedCloseTimeoutNanny, + asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: ddf = dask.dataframe.from_pandas( diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index c779a39e..8de56a5c 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -23,6 +23,7 @@ from dask_cuda.disk_io import SpillToDiskFile from dask_cuda.proxify_device_objects import proxify_device_objects from dask_cuda.proxify_host_file import ProxifyHostFile +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny # Make the "disk" serializer available and use a directory that are # remove on exit. @@ -422,6 +423,7 @@ def task(x): async with dask_cuda.LocalCUDACluster( n_workers=1, protocol=protocol, + worker_class=IncreasedCloseTimeoutNanny, asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as client: diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 6172b0bc..f8df7e04 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -12,6 +12,7 @@ from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 from dask_cuda import LocalCUDACluster, utils +from dask_cuda.utils_test import IncreasedCloseTimeoutNanny if utils.get_device_total_memory() < 1e10: pytest.skip("Not enough GPU memory", allow_module_level=True) @@ -160,6 +161,7 @@ async def test_cupy_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], + worker_class=IncreasedCloseTimeoutNanny, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -263,6 +265,7 @@ async def test_cudf_cluster_device_spill(params): asynchronous=True, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], + worker_class=IncreasedCloseTimeoutNanny, ) as cluster: async with Client(cluster, asynchronous=True) as client: