From c0cd4656037bf54fe34b45c283e31d97098b8c25 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 31 Jul 2024 09:39:41 +0200 Subject: [PATCH 1/3] Replace cuDF (de)serializer with cuDF spill-aware (de)serializer (#1369) Replace cuDF (de)serializer with cuDF spill-aware (de)serializer, using both together should be avoided as that will cause excessive spilling. Additionally add: - Missing test of cuDF internal spill mechanism with `LocalCUDACluster`; - `dask cuda worker` warning to alert the user that cuDF spilling mechanism requires client/scheduler to enable it as well. Closes #1363 . Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/dask-cuda/pull/1369 --- dask_cuda/__init__.py | 19 ++++ dask_cuda/cuda_worker.py | 8 ++ dask_cuda/local_cuda_cluster.py | 7 ++ dask_cuda/tests/test_dask_cuda_worker.py | 27 +++++ dask_cuda/tests/test_spill.py | 132 ++++++++++++++++++++--- 5 files changed, 177 insertions(+), 16 deletions(-) diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 516599da3..5711ac08b 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -9,6 +9,8 @@ import dask.dataframe.shuffle import dask.dataframe.multi import dask.bag.core +from distributed.protocol.cuda import cuda_deserialize, cuda_serialize +from distributed.protocol.serialize import dask_deserialize, dask_serialize from ._version import __git_commit__, __version__ from .cuda_worker import CUDAWorker @@ -48,3 +50,20 @@ dask.dataframe.shuffle.shuffle_group ) dask.dataframe.core._concat = unproxify_decorator(dask.dataframe.core._concat) + + +def _register_cudf_spill_aware(): + import cudf + + # Only enable Dask/cuDF spilling if cuDF spilling is disabled, see + # https://github.com/rapidsai/dask-cuda/issues/1363 + if not cudf.get_option("spill"): + # This reproduces the implementation of `_register_cudf`, see + # https://github.com/dask/distributed/blob/40fcd65e991382a956c3b879e438be1b100dff97/distributed/protocol/__init__.py#L106-L115 + from cudf.comm import serialize + + +for registry in [cuda_serialize, cuda_deserialize, dask_serialize, dask_deserialize]: + for lib in ["cudf", "dask_cudf"]: + if lib in registry._lazy: + registry._lazy[lib] = _register_cudf_spill_aware diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index b88c9bc98..3e03ed297 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -195,6 +195,14 @@ def del_pid_file(): }, ) + cudf_spill_warning = dask.config.get("cudf-spill-warning", default=True) + if enable_cudf_spill and cudf_spill_warning: + warnings.warn( + "cuDF spilling is enabled, please ensure the client and scheduler " + "processes set `CUDF_SPILL=on` as well. To disable this warning " + "set `DASK_CUDF_SPILL_WARNING=False`." + ) + self.nannies = [ Nanny( scheduler, diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 202373e9d..c037223b2 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -244,6 +244,13 @@ def __init__( # initialization happens before we can set CUDA_VISIBLE_DEVICES os.environ["RAPIDS_NO_INITIALIZE"] = "True" + if enable_cudf_spill: + import cudf + + # cuDF spilling must be enabled in the client/scheduler process too. + cudf.set_option("spill", enable_cudf_spill) + cudf.set_option("spill_stats", cudf_spill_stats) + if threads_per_worker < 1: raise ValueError("threads_per_worker must be higher than 0.") diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 505af12f1..049fe85f4 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -567,3 +567,30 @@ def test_worker_timeout(): assert "reason: nanny-close" in ret.stderr.lower() assert ret.returncode == 0 + + +@pytest.mark.parametrize("enable_cudf_spill_warning", [False, True]) +def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811 + pytest.importorskip("rmm") + + environ = {"CUDA_VISIBLE_DEVICES": "0"} + if not enable_cudf_spill_warning: + environ["DASK_CUDF_SPILL_WARNING"] = "False" + + with patch.dict(os.environ, environ): + ret = subprocess.run( + [ + "dask", + "cuda", + "worker", + "127.0.0.1:9369", + "--enable-cudf-spill", + "--death-timeout", + "1", + ], + capture_output=True, + ) + if enable_cudf_spill_warning: + assert b"UserWarning: cuDF spilling is enabled" in ret.stderr + else: + assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index f8df7e04f..bdd012d50 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -11,6 +11,8 @@ from distributed.sizeof import sizeof from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 +import dask_cudf + from dask_cuda import LocalCUDACluster, utils from dask_cuda.utils_test import IncreasedCloseTimeoutNanny @@ -18,6 +20,57 @@ pytest.skip("Not enough GPU memory", allow_module_level=True) +def _set_cudf_device_limit(): + """Ensure spilling for objects of all sizes""" + import cudf + + cudf.set_option("spill_device_limit", 0) + + +def _assert_cudf_spill_stats(enable_cudf_spill, dask_worker=None): + """Ensure cuDF has spilled data with its internal mechanism""" + import cudf + + global_manager = cudf.core.buffer.spill_manager.get_global_manager() + + if enable_cudf_spill: + stats = global_manager.statistics + buffers = global_manager.buffers() + assert stats.spill_totals[("gpu", "cpu")][0] > 1000 + assert stats.spill_totals[("cpu", "gpu")][0] > 1000 + assert len(buffers) > 0 + else: + assert global_manager is None + + +@pytest.fixture(params=[False, True]) +def cudf_spill(request): + """Fixture to enable and clear cuDF spill manager in client process""" + cudf = pytest.importorskip("cudf") + + enable_cudf_spill = request.param + + if enable_cudf_spill: + # If the global spill manager was previously set, fail. + assert cudf.core.buffer.spill_manager._global_manager is None + + cudf.set_option("spill", True) + cudf.set_option("spill_stats", True) + + # This change is to prevent changing RMM resource stack in cuDF, + # workers do not need this because they are spawned as new + # processes for every new test that runs. + cudf.set_option("spill_on_demand", False) + + _set_cudf_device_limit() + + yield enable_cudf_spill + + cudf.set_option("spill", False) + cudf.core.buffer.spill_manager._global_manager_uninitialized = True + cudf.core.buffer.spill_manager._global_manager = None + + def device_host_file_size_matches( dhf, total_bytes, device_chunk_overhead=0, serialized_chunk_overhead=1024 ): @@ -244,9 +297,11 @@ async def test_cupy_cluster_device_spill(params): ], ) @gen_test(timeout=30) -async def test_cudf_cluster_device_spill(params): +async def test_cudf_cluster_device_spill(params, cudf_spill): cudf = pytest.importorskip("cudf") + enable_cudf_spill = cudf_spill + with dask.config.set( { "distributed.comm.compression": False, @@ -266,6 +321,7 @@ async def test_cudf_cluster_device_spill(params): device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], worker_class=IncreasedCloseTimeoutNanny, + enable_cudf_spill=enable_cudf_spill, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -294,21 +350,28 @@ async def test_cudf_cluster_device_spill(params): del cdf gc.collect() - await client.run( - assert_host_chunks, - params["spills_to_disk"], - ) - await client.run( - assert_disk_chunks, - params["spills_to_disk"], - ) - - await client.run( - worker_assert, - nbytes, - 32, - 2048, - ) + if enable_cudf_spill: + await client.run( + worker_assert, + 0, + 0, + 0, + ) + else: + await client.run( + assert_host_chunks, + params["spills_to_disk"], + ) + await client.run( + assert_disk_chunks, + params["spills_to_disk"], + ) + await client.run( + worker_assert, + nbytes, + 32, + 2048, + ) del cdf2 @@ -324,3 +387,40 @@ async def test_cudf_cluster_device_spill(params): gc.collect() else: break + + +@gen_test(timeout=30) +async def test_cudf_spill_cluster(cudf_spill): + cudf = pytest.importorskip("cudf") + enable_cudf_spill = cudf_spill + + async with LocalCUDACluster( + n_workers=1, + scheduler_port=0, + silence_logs=False, + dashboard_address=None, + asynchronous=True, + device_memory_limit=None, + memory_limit=None, + worker_class=IncreasedCloseTimeoutNanny, + enable_cudf_spill=enable_cudf_spill, + cudf_spill_stats=enable_cudf_spill, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + + await client.wait_for_workers(1) + await client.run(_set_cudf_device_limit) + + cdf = cudf.DataFrame( + { + "a": list(range(200)), + "b": list(reversed(range(200))), + "c": list(range(200)), + } + ) + + ddf = dask_cudf.from_cudf(cdf, npartitions=2).sum().persist() + await wait(ddf) + + await client.run(_assert_cudf_spill_stats, enable_cudf_spill) + _assert_cudf_spill_stats(enable_cudf_spill) From bf84f99de5f477c4a230a6b9890b49c06908f55f Mon Sep 17 00:00:00 2001 From: Ray Douglass Date: Wed, 7 Aug 2024 10:42:31 -0400 Subject: [PATCH 2/3] Update Changelog [skip ci] --- CHANGELOG.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ea704c1f..37c588511 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,29 @@ +# dask-cuda 24.08.00 (7 Aug 2024) + +## 🐛 Bug Fixes + +- Fix partitioning in explicit-comms shuffle ([#1356](https://github.com/rapidsai/dask-cuda/pull/1356)) [@rjzamora](https://github.com/rjzamora) +- Update cuDF's `assert_eq` import ([#1353](https://github.com/rapidsai/dask-cuda/pull/1353)) [@pentschev](https://github.com/pentschev) + +## 🚀 New Features + +- Add arguments to enable cuDF spilling and set statistics ([#1362](https://github.com/rapidsai/dask-cuda/pull/1362)) [@pentschev](https://github.com/pentschev) +- Allow disabling RMM in benchmarks ([#1352](https://github.com/rapidsai/dask-cuda/pull/1352)) [@pentschev](https://github.com/pentschev) + +## 🛠️ Improvements + +- consolidate cuda_suffixed=false blocks in dependencies.yaml, fix update-version.sh ([#1367](https://github.com/rapidsai/dask-cuda/pull/1367)) [@jameslamb](https://github.com/jameslamb) +- split up CUDA-suffixed dependencies in dependencies.yaml ([#1364](https://github.com/rapidsai/dask-cuda/pull/1364)) [@jameslamb](https://github.com/jameslamb) +- Use verify-alpha-spec hook ([#1360](https://github.com/rapidsai/dask-cuda/pull/1360)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA) +- Use workflow branch 24.08 again ([#1359](https://github.com/rapidsai/dask-cuda/pull/1359)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA) +- Build and test with CUDA 12.5.1 ([#1357](https://github.com/rapidsai/dask-cuda/pull/1357)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA) +- Drop `setup.py` ([#1354](https://github.com/rapidsai/dask-cuda/pull/1354)) [@jakirkham](https://github.com/jakirkham) +- remove .gitattributes ([#1350](https://github.com/rapidsai/dask-cuda/pull/1350)) [@jameslamb](https://github.com/jameslamb) +- make conda recipe data-loading stricter ([#1349](https://github.com/rapidsai/dask-cuda/pull/1349)) [@jameslamb](https://github.com/jameslamb) +- Adopt CI/packaging codeowners ([#1347](https://github.com/rapidsai/dask-cuda/pull/1347)) [@bdice](https://github.com/bdice) +- Remove text builds of documentation ([#1346](https://github.com/rapidsai/dask-cuda/pull/1346)) [@vyasr](https://github.com/vyasr) +- use rapids-build-backend ([#1343](https://github.com/rapidsai/dask-cuda/pull/1343)) [@jameslamb](https://github.com/jameslamb) + # dask-cuda 24.06.00 (5 Jun 2024) ## 🐛 Bug Fixes From 00c37dc55bee1a34f7d9f6599a89a3f89c15651b Mon Sep 17 00:00:00 2001 From: Kyle Edwards Date: Thu, 8 Aug 2024 11:50:49 -0400 Subject: [PATCH 3/3] Update pre-commit hooks (#1373) This PR updates pre-commit hooks to the latest versions that are supported without causing style check errors. Authors: - Kyle Edwards (https://github.com/KyleFromNVIDIA) Approvers: - James Lamb (https://github.com/jameslamb) URL: https://github.com/rapidsai/dask-cuda/pull/1373 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 335080816..1def5e1aa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,7 +33,7 @@ repos: args: ["--module=dask_cuda", "--ignore-missing-imports"] pass_filenames: false - repo: https://github.com/rapidsai/pre-commit-hooks - rev: v0.3.0 + rev: v0.3.1 hooks: - id: verify-alpha-spec - repo: https://github.com/rapidsai/dependency-file-generator