Skip to content

Commit

Permalink
Merge branch 'branch-24.10' into remote-io-bench
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora authored Aug 12, 2024
2 parents 1505890 + 00c37dc commit 3860705
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ repos:
args: ["--module=dask_cuda", "--ignore-missing-imports"]
pass_filenames: false
- repo: https://github.com/rapidsai/pre-commit-hooks
rev: v0.3.0
rev: v0.3.1
hooks:
- id: verify-alpha-spec
- repo: https://github.com/rapidsai/dependency-file-generator
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
# dask-cuda 24.08.00 (7 Aug 2024)

## 🐛 Bug Fixes

- Fix partitioning in explicit-comms shuffle ([#1356](https://github.com/rapidsai/dask-cuda/pull/1356)) [@rjzamora](https://github.com/rjzamora)
- Update cuDF's `assert_eq` import ([#1353](https://github.com/rapidsai/dask-cuda/pull/1353)) [@pentschev](https://github.com/pentschev)

## 🚀 New Features

- Add arguments to enable cuDF spilling and set statistics ([#1362](https://github.com/rapidsai/dask-cuda/pull/1362)) [@pentschev](https://github.com/pentschev)
- Allow disabling RMM in benchmarks ([#1352](https://github.com/rapidsai/dask-cuda/pull/1352)) [@pentschev](https://github.com/pentschev)

## 🛠️ Improvements

- consolidate cuda_suffixed=false blocks in dependencies.yaml, fix update-version.sh ([#1367](https://github.com/rapidsai/dask-cuda/pull/1367)) [@jameslamb](https://github.com/jameslamb)
- split up CUDA-suffixed dependencies in dependencies.yaml ([#1364](https://github.com/rapidsai/dask-cuda/pull/1364)) [@jameslamb](https://github.com/jameslamb)
- Use verify-alpha-spec hook ([#1360](https://github.com/rapidsai/dask-cuda/pull/1360)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
- Use workflow branch 24.08 again ([#1359](https://github.com/rapidsai/dask-cuda/pull/1359)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
- Build and test with CUDA 12.5.1 ([#1357](https://github.com/rapidsai/dask-cuda/pull/1357)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
- Drop `setup.py` ([#1354](https://github.com/rapidsai/dask-cuda/pull/1354)) [@jakirkham](https://github.com/jakirkham)
- remove .gitattributes ([#1350](https://github.com/rapidsai/dask-cuda/pull/1350)) [@jameslamb](https://github.com/jameslamb)
- make conda recipe data-loading stricter ([#1349](https://github.com/rapidsai/dask-cuda/pull/1349)) [@jameslamb](https://github.com/jameslamb)
- Adopt CI/packaging codeowners ([#1347](https://github.com/rapidsai/dask-cuda/pull/1347)) [@bdice](https://github.com/bdice)
- Remove text builds of documentation ([#1346](https://github.com/rapidsai/dask-cuda/pull/1346)) [@vyasr](https://github.com/vyasr)
- use rapids-build-backend ([#1343](https://github.com/rapidsai/dask-cuda/pull/1343)) [@jameslamb](https://github.com/jameslamb)

# dask-cuda 24.06.00 (5 Jun 2024)

## 🐛 Bug Fixes
Expand Down
19 changes: 19 additions & 0 deletions dask_cuda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import dask.dataframe.shuffle
import dask.dataframe.multi
import dask.bag.core
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.protocol.serialize import dask_deserialize, dask_serialize

from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
Expand Down Expand Up @@ -48,3 +50,20 @@
dask.dataframe.shuffle.shuffle_group
)
dask.dataframe.core._concat = unproxify_decorator(dask.dataframe.core._concat)


def _register_cudf_spill_aware():
import cudf

# Only enable Dask/cuDF spilling if cuDF spilling is disabled, see
# https://github.com/rapidsai/dask-cuda/issues/1363
if not cudf.get_option("spill"):
# This reproduces the implementation of `_register_cudf`, see
# https://github.com/dask/distributed/blob/40fcd65e991382a956c3b879e438be1b100dff97/distributed/protocol/__init__.py#L106-L115
from cudf.comm import serialize


for registry in [cuda_serialize, cuda_deserialize, dask_serialize, dask_deserialize]:
for lib in ["cudf", "dask_cudf"]:
if lib in registry._lazy:
registry._lazy[lib] = _register_cudf_spill_aware
8 changes: 8 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ def del_pid_file():
},
)

cudf_spill_warning = dask.config.get("cudf-spill-warning", default=True)
if enable_cudf_spill and cudf_spill_warning:
warnings.warn(
"cuDF spilling is enabled, please ensure the client and scheduler "
"processes set `CUDF_SPILL=on` as well. To disable this warning "
"set `DASK_CUDF_SPILL_WARNING=False`."
)

self.nannies = [
Nanny(
scheduler,
Expand Down
7 changes: 7 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ def __init__(
# initialization happens before we can set CUDA_VISIBLE_DEVICES
os.environ["RAPIDS_NO_INITIALIZE"] = "True"

if enable_cudf_spill:
import cudf

# cuDF spilling must be enabled in the client/scheduler process too.
cudf.set_option("spill", enable_cudf_spill)
cudf.set_option("spill_stats", cudf_spill_stats)

if threads_per_worker < 1:
raise ValueError("threads_per_worker must be higher than 0.")

Expand Down
27 changes: 27 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,30 @@ def test_worker_timeout():
assert "reason: nanny-close" in ret.stderr.lower()

assert ret.returncode == 0


@pytest.mark.parametrize("enable_cudf_spill_warning", [False, True])
def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811
pytest.importorskip("rmm")

environ = {"CUDA_VISIBLE_DEVICES": "0"}
if not enable_cudf_spill_warning:
environ["DASK_CUDF_SPILL_WARNING"] = "False"

with patch.dict(os.environ, environ):
ret = subprocess.run(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--enable-cudf-spill",
"--death-timeout",
"1",
],
capture_output=True,
)
if enable_cudf_spill_warning:
assert b"UserWarning: cuDF spilling is enabled" in ret.stderr
else:
assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr
132 changes: 116 additions & 16 deletions dask_cuda/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,66 @@
from distributed.sizeof import sizeof
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401

import dask_cudf

from dask_cuda import LocalCUDACluster, utils
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny

if utils.get_device_total_memory() < 1e10:
pytest.skip("Not enough GPU memory", allow_module_level=True)


def _set_cudf_device_limit():
"""Ensure spilling for objects of all sizes"""
import cudf

cudf.set_option("spill_device_limit", 0)


def _assert_cudf_spill_stats(enable_cudf_spill, dask_worker=None):
"""Ensure cuDF has spilled data with its internal mechanism"""
import cudf

global_manager = cudf.core.buffer.spill_manager.get_global_manager()

if enable_cudf_spill:
stats = global_manager.statistics
buffers = global_manager.buffers()
assert stats.spill_totals[("gpu", "cpu")][0] > 1000
assert stats.spill_totals[("cpu", "gpu")][0] > 1000
assert len(buffers) > 0
else:
assert global_manager is None


@pytest.fixture(params=[False, True])
def cudf_spill(request):
"""Fixture to enable and clear cuDF spill manager in client process"""
cudf = pytest.importorskip("cudf")

enable_cudf_spill = request.param

if enable_cudf_spill:
# If the global spill manager was previously set, fail.
assert cudf.core.buffer.spill_manager._global_manager is None

cudf.set_option("spill", True)
cudf.set_option("spill_stats", True)

# This change is to prevent changing RMM resource stack in cuDF,
# workers do not need this because they are spawned as new
# processes for every new test that runs.
cudf.set_option("spill_on_demand", False)

_set_cudf_device_limit()

yield enable_cudf_spill

cudf.set_option("spill", False)
cudf.core.buffer.spill_manager._global_manager_uninitialized = True
cudf.core.buffer.spill_manager._global_manager = None


def device_host_file_size_matches(
dhf, total_bytes, device_chunk_overhead=0, serialized_chunk_overhead=1024
):
Expand Down Expand Up @@ -244,9 +297,11 @@ async def test_cupy_cluster_device_spill(params):
],
)
@gen_test(timeout=30)
async def test_cudf_cluster_device_spill(params):
async def test_cudf_cluster_device_spill(params, cudf_spill):
cudf = pytest.importorskip("cudf")

enable_cudf_spill = cudf_spill

with dask.config.set(
{
"distributed.comm.compression": False,
Expand All @@ -266,6 +321,7 @@ async def test_cudf_cluster_device_spill(params):
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
worker_class=IncreasedCloseTimeoutNanny,
enable_cudf_spill=enable_cudf_spill,
) as cluster:
async with Client(cluster, asynchronous=True) as client:

Expand Down Expand Up @@ -294,21 +350,28 @@ async def test_cudf_cluster_device_spill(params):
del cdf
gc.collect()

await client.run(
assert_host_chunks,
params["spills_to_disk"],
)
await client.run(
assert_disk_chunks,
params["spills_to_disk"],
)

await client.run(
worker_assert,
nbytes,
32,
2048,
)
if enable_cudf_spill:
await client.run(
worker_assert,
0,
0,
0,
)
else:
await client.run(
assert_host_chunks,
params["spills_to_disk"],
)
await client.run(
assert_disk_chunks,
params["spills_to_disk"],
)
await client.run(
worker_assert,
nbytes,
32,
2048,
)

del cdf2

Expand All @@ -324,3 +387,40 @@ async def test_cudf_cluster_device_spill(params):
gc.collect()
else:
break


@gen_test(timeout=30)
async def test_cudf_spill_cluster(cudf_spill):
cudf = pytest.importorskip("cudf")
enable_cudf_spill = cudf_spill

async with LocalCUDACluster(
n_workers=1,
scheduler_port=0,
silence_logs=False,
dashboard_address=None,
asynchronous=True,
device_memory_limit=None,
memory_limit=None,
worker_class=IncreasedCloseTimeoutNanny,
enable_cudf_spill=enable_cudf_spill,
cudf_spill_stats=enable_cudf_spill,
) as cluster:
async with Client(cluster, asynchronous=True) as client:

await client.wait_for_workers(1)
await client.run(_set_cudf_device_limit)

cdf = cudf.DataFrame(
{
"a": list(range(200)),
"b": list(reversed(range(200))),
"c": list(range(200)),
}
)

ddf = dask_cudf.from_cudf(cdf, npartitions=2).sum().persist()
await wait(ddf)

await client.run(_assert_cudf_spill_stats, enable_cudf_spill)
_assert_cudf_spill_stats(enable_cudf_spill)

0 comments on commit 3860705

Please sign in to comment.