Skip to content

Commit

Permalink
Add IncreasedCloseTimeoutNanny to appropriate tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev committed Oct 23, 2023
1 parent 21a57cd commit be4e1cc
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import dask_cuda
from dask_cuda.explicit_comms import comms
from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle
from dask_cuda.local_cuda_cluster import IncreasedCloseTimeoutNanny
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")
Expand Down
5 changes: 5 additions & 0 deletions dask_cuda/tests/test_initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from dask_cuda.initialize import initialize
from dask_cuda.utils import get_ucx_config
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

mp = mp.get_context("spawn") # type: ignore
ucp = pytest.importorskip("ucp")
Expand All @@ -29,6 +30,7 @@ def _test_initialize_ucx_tcp():
n_workers=1,
threads_per_worker=1,
processes=True,
worker_class=IncreasedCloseTimeoutNanny,
config={"distributed.comm.ucx": get_ucx_config(**kwargs)},
) as cluster:
with Client(cluster) as client:
Expand Down Expand Up @@ -64,6 +66,7 @@ def _test_initialize_ucx_nvlink():
n_workers=1,
threads_per_worker=1,
processes=True,
worker_class=IncreasedCloseTimeoutNanny,
config={"distributed.comm.ucx": get_ucx_config(**kwargs)},
) as cluster:
with Client(cluster) as client:
Expand Down Expand Up @@ -100,6 +103,7 @@ def _test_initialize_ucx_infiniband():
n_workers=1,
threads_per_worker=1,
processes=True,
worker_class=IncreasedCloseTimeoutNanny,
config={"distributed.comm.ucx": get_ucx_config(**kwargs)},
) as cluster:
with Client(cluster) as client:
Expand Down Expand Up @@ -138,6 +142,7 @@ def _test_initialize_ucx_all():
n_workers=1,
threads_per_worker=1,
processes=True,
worker_class=IncreasedCloseTimeoutNanny,
config={"distributed.comm.ucx": get_ucx_config()},
) as cluster:
with Client(cluster) as client:
Expand Down
6 changes: 5 additions & 1 deletion dask_cuda/tests/test_proxify_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dask_cuda.proxify_host_file import ProxifyHostFile
from dask_cuda.proxy_object import ProxyObject, asproxy, unproxy
from dask_cuda.utils import get_device_total_memory
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

cupy = pytest.importorskip("cupy")
cupy.cuda.set_allocator(None)
Expand Down Expand Up @@ -393,7 +394,10 @@ def is_proxy_object(x):

with dask.config.set(jit_unspill_compatibility_mode=compatibility_mode):
async with dask_cuda.LocalCUDACluster(
n_workers=1, jit_unspill=True, asynchronous=True
n_workers=1,
jit_unspill=True,
worker_class=IncreasedCloseTimeoutNanny,
asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
ddf = dask.dataframe.from_pandas(
Expand Down
2 changes: 2 additions & 0 deletions dask_cuda/tests/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dask_cuda.disk_io import SpillToDiskFile
from dask_cuda.proxify_device_objects import proxify_device_objects
from dask_cuda.proxify_host_file import ProxifyHostFile
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

# Make the "disk" serializer available and use a directory that are
# remove on exit.
Expand Down Expand Up @@ -422,6 +423,7 @@ def task(x):
async with dask_cuda.LocalCUDACluster(
n_workers=1,
protocol=protocol,
worker_class=IncreasedCloseTimeoutNanny,
asynchronous=True,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
Expand Down
3 changes: 3 additions & 0 deletions dask_cuda/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401

from dask_cuda import LocalCUDACluster, utils
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

if utils.get_device_total_memory() < 1e10:
pytest.skip("Not enough GPU memory", allow_module_level=True)
Expand Down Expand Up @@ -160,6 +161,7 @@ async def test_cupy_cluster_device_spill(params):
asynchronous=True,
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
worker_class=IncreasedCloseTimeoutNanny,
) as cluster:
async with Client(cluster, asynchronous=True) as client:

Expand Down Expand Up @@ -263,6 +265,7 @@ async def test_cudf_cluster_device_spill(params):
asynchronous=True,
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
worker_class=IncreasedCloseTimeoutNanny,
) as cluster:
async with Client(cluster, asynchronous=True) as client:

Expand Down

0 comments on commit be4e1cc

Please sign in to comment.