diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 69b0de5f..3d097bcd 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -28,7 +28,7 @@ concurrency:
jobs:
conda-python-build:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.12
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
@@ -38,7 +38,7 @@ jobs:
if: github.ref_type == 'branch'
needs: [conda-python-build]
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.12
with:
arch: "amd64"
branch: ${{ inputs.branch }}
@@ -51,7 +51,7 @@ jobs:
upload-conda:
needs: [conda-python-build]
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-upload-packages.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-upload-packages.yaml@branch-24.12
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
@@ -59,7 +59,7 @@ jobs:
sha: ${{ inputs.sha }}
wheel-build:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.12
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
@@ -72,7 +72,7 @@ jobs:
wheel-publish:
needs: wheel-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/wheels-publish.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/wheels-publish.yaml@branch-24.12
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml
index 4e56d24d..0e20bdaf 100644
--- a/.github/workflows/pr.yaml
+++ b/.github/workflows/pr.yaml
@@ -18,26 +18,26 @@ jobs:
- docs-build
- wheel-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/pr-builder.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/pr-builder.yaml@branch-24.12
checks:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@branch-24.12
conda-python-build:
needs: checks
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-build.yaml@branch-24.12
with:
build_type: pull-request
conda-python-tests:
needs: conda-python-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.12
with:
build_type: pull-request
docs-build:
needs: conda-python-build
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/custom-job.yaml@branch-24.12
with:
build_type: pull-request
node_type: "gpu-v100-latest-1"
@@ -46,7 +46,7 @@ jobs:
run_script: "ci/build_docs.sh"
wheel-build:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/wheels-build.yaml@branch-24.12
with:
build_type: pull-request
# Package is pure Python and only ever requires one build.
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index 7a884c5c..631a6173 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -16,7 +16,7 @@ on:
jobs:
conda-python-tests:
secrets: inherit
- uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.08
+ uses: rapidsai/shared-workflows/.github/workflows/conda-python-tests.yaml@branch-24.12
with:
build_type: nightly
branch: ${{ inputs.branch }}
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index b10be12a..a2202df3 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -13,7 +13,7 @@ repos:
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
- rev: 3.8.3
+ rev: 7.1.1
hooks:
- id: flake8
- repo: https://github.com/codespell-project/codespell
@@ -32,8 +32,12 @@ repos:
additional_dependencies: [types-cachetools]
args: ["--module=dask_cuda", "--ignore-missing-imports"]
pass_filenames: false
+ - repo: https://github.com/rapidsai/pre-commit-hooks
+ rev: v0.4.0
+ hooks:
+ - id: verify-alpha-spec
- repo: https://github.com/rapidsai/dependency-file-generator
- rev: v1.13.11
+ rev: v1.16.0
hooks:
- id: rapids-dependency-file-generator
args: ["--clean"]
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3ea704c1..f8c992fb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,58 @@
+# dask-cuda 24.10.00 (9 Oct 2024)
+
+## ๐จ Breaking Changes
+
+- Replace cuDF (de)serializer with cuDF spill-aware (de)serializer ([#1369](https://github.com/rapidsai/dask-cuda/pull/1369)) [@pentschev](https://github.com/pentschev)
+
+## ๐ Documentation
+
+- Fix typo in spilling documentation ([#1384](https://github.com/rapidsai/dask-cuda/pull/1384)) [@rjzamora](https://github.com/rjzamora)
+- Add notes on cudf spilling to docs ([#1383](https://github.com/rapidsai/dask-cuda/pull/1383)) [@rjzamora](https://github.com/rjzamora)
+
+## ๐ New Features
+
+- [Benchmark] Add parquet read benchmark ([#1371](https://github.com/rapidsai/dask-cuda/pull/1371)) [@rjzamora](https://github.com/rjzamora)
+- Replace cuDF (de)serializer with cuDF spill-aware (de)serializer ([#1369](https://github.com/rapidsai/dask-cuda/pull/1369)) [@pentschev](https://github.com/pentschev)
+
+## ๐ ๏ธ Improvements
+
+- Update update-version.sh to use packaging lib ([#1387](https://github.com/rapidsai/dask-cuda/pull/1387)) [@AyodeAwe](https://github.com/AyodeAwe)
+- Use CI workflow branch 'branch-24.10' again ([#1386](https://github.com/rapidsai/dask-cuda/pull/1386)) [@jameslamb](https://github.com/jameslamb)
+- Update to flake8 7.1.1. ([#1385](https://github.com/rapidsai/dask-cuda/pull/1385)) [@bdice](https://github.com/bdice)
+- enable Python 3.12 tests on PRs ([#1382](https://github.com/rapidsai/dask-cuda/pull/1382)) [@jameslamb](https://github.com/jameslamb)
+- Add support for Python 3.12 ([#1380](https://github.com/rapidsai/dask-cuda/pull/1380)) [@jameslamb](https://github.com/jameslamb)
+- Update rapidsai/pre-commit-hooks ([#1379](https://github.com/rapidsai/dask-cuda/pull/1379)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Drop Python 3.9 support ([#1377](https://github.com/rapidsai/dask-cuda/pull/1377)) [@jameslamb](https://github.com/jameslamb)
+- Remove NumPy <2 pin ([#1375](https://github.com/rapidsai/dask-cuda/pull/1375)) [@seberg](https://github.com/seberg)
+- Update pre-commit hooks ([#1373](https://github.com/rapidsai/dask-cuda/pull/1373)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Merge branch-24.08 into branch-24.10 ([#1368](https://github.com/rapidsai/dask-cuda/pull/1368)) [@jameslamb](https://github.com/jameslamb)
+
+# dask-cuda 24.08.00 (7 Aug 2024)
+
+## ๐ Bug Fixes
+
+- Fix partitioning in explicit-comms shuffle ([#1356](https://github.com/rapidsai/dask-cuda/pull/1356)) [@rjzamora](https://github.com/rjzamora)
+- Update cuDF's `assert_eq` import ([#1353](https://github.com/rapidsai/dask-cuda/pull/1353)) [@pentschev](https://github.com/pentschev)
+
+## ๐ New Features
+
+- Add arguments to enable cuDF spilling and set statistics ([#1362](https://github.com/rapidsai/dask-cuda/pull/1362)) [@pentschev](https://github.com/pentschev)
+- Allow disabling RMM in benchmarks ([#1352](https://github.com/rapidsai/dask-cuda/pull/1352)) [@pentschev](https://github.com/pentschev)
+
+## ๐ ๏ธ Improvements
+
+- consolidate cuda_suffixed=false blocks in dependencies.yaml, fix update-version.sh ([#1367](https://github.com/rapidsai/dask-cuda/pull/1367)) [@jameslamb](https://github.com/jameslamb)
+- split up CUDA-suffixed dependencies in dependencies.yaml ([#1364](https://github.com/rapidsai/dask-cuda/pull/1364)) [@jameslamb](https://github.com/jameslamb)
+- Use verify-alpha-spec hook ([#1360](https://github.com/rapidsai/dask-cuda/pull/1360)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Use workflow branch 24.08 again ([#1359](https://github.com/rapidsai/dask-cuda/pull/1359)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Build and test with CUDA 12.5.1 ([#1357](https://github.com/rapidsai/dask-cuda/pull/1357)) [@KyleFromNVIDIA](https://github.com/KyleFromNVIDIA)
+- Drop `setup.py` ([#1354](https://github.com/rapidsai/dask-cuda/pull/1354)) [@jakirkham](https://github.com/jakirkham)
+- remove .gitattributes ([#1350](https://github.com/rapidsai/dask-cuda/pull/1350)) [@jameslamb](https://github.com/jameslamb)
+- make conda recipe data-loading stricter ([#1349](https://github.com/rapidsai/dask-cuda/pull/1349)) [@jameslamb](https://github.com/jameslamb)
+- Adopt CI/packaging codeowners ([#1347](https://github.com/rapidsai/dask-cuda/pull/1347)) [@bdice](https://github.com/bdice)
+- Remove text builds of documentation ([#1346](https://github.com/rapidsai/dask-cuda/pull/1346)) [@vyasr](https://github.com/vyasr)
+- use rapids-build-backend ([#1343](https://github.com/rapidsai/dask-cuda/pull/1343)) [@jameslamb](https://github.com/jameslamb)
+
# dask-cuda 24.06.00 (5 Jun 2024)
## ๐ Bug Fixes
diff --git a/VERSION b/VERSION
index ec8489fd..af28c42b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-24.08.00
+24.12.00
diff --git a/ci/build_docs.sh b/ci/build_docs.sh
index c2a65a41..58da36c7 100755
--- a/ci/build_docs.sh
+++ b/ci/build_docs.sh
@@ -5,6 +5,8 @@ set -euo pipefail
rapids-logger "Create test conda environment"
. /opt/conda/etc/profile.d/conda.sh
+RAPIDS_VERSION="$(rapids-version)"
+
rapids-dependency-file-generator \
--output conda \
--file-key docs \
@@ -21,9 +23,8 @@ PYTHON_CHANNEL=$(rapids-download-conda-from-s3 python)
rapids-mamba-retry install \
--channel "${PYTHON_CHANNEL}" \
- dask-cuda
+ "dask-cuda=${RAPIDS_VERSION}"
-export RAPIDS_VERSION_NUMBER="24.08"
export RAPIDS_DOCS_DIR="$(mktemp -d)"
rapids-logger "Build Python docs"
@@ -33,4 +34,4 @@ mkdir -p "${RAPIDS_DOCS_DIR}/dask-cuda/"html
mv _html/* "${RAPIDS_DOCS_DIR}/dask-cuda/html"
popd
-rapids-upload-docs
+RAPIDS_VERSION_NUMBER="$(rapids-version-major-minor)" rapids-upload-docs
diff --git a/ci/build_python.sh b/ci/build_python.sh
index 48cece32..c12a0dde 100755
--- a/ci/build_python.sh
+++ b/ci/build_python.sh
@@ -5,12 +5,8 @@ set -euo pipefail
rapids-configure-conda-channels
-source rapids-configure-sccache
-
source rapids-date-string
-export CMAKE_GENERATOR=Ninja
-
rapids-print-env
rapids-generate-version > ./VERSION
diff --git a/ci/build_wheel.sh b/ci/build_wheel.sh
index 828972dc..760e46e3 100755
--- a/ci/build_wheel.sh
+++ b/ci/build_wheel.sh
@@ -3,11 +3,11 @@
set -euo pipefail
-source rapids-configure-sccache
source rapids-date-string
rapids-generate-version > ./VERSION
-python -m pip wheel . -w dist -vvv --no-deps --disable-pip-version-check
+python -m pip wheel . -w dist -v --no-deps --disable-pip-version-check
+./ci/validate_wheel.sh dist
RAPIDS_PY_WHEEL_NAME="dask-cuda" rapids-upload-wheels-to-s3 dist
diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh
index ac834e5e..b229d280 100755
--- a/ci/release/update-version.sh
+++ b/ci/release/update-version.sh
@@ -22,7 +22,7 @@ CURRENT_SHORT_TAG=${CURRENT_MAJOR}.${CURRENT_MINOR}
NEXT_MAJOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[1]}')
NEXT_MINOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[2]}')
NEXT_SHORT_TAG=${NEXT_MAJOR}.${NEXT_MINOR}
-NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; print(packaging.version.Version('${NEXT_SHORT_TAG}'))")
+NEXT_SHORT_TAG_PEP440=$(python -c "from packaging.version import Version; print(Version('${NEXT_SHORT_TAG}'))")
NEXT_UCXPY_VERSION="$(curl -s https://version.gpuci.io/rapids/${NEXT_SHORT_TAG})"
echo "Preparing release $CURRENT_TAG => $NEXT_FULL_TAG"
@@ -45,17 +45,29 @@ DEPENDENCIES=(
kvikio
rapids-dask-dependency
)
-for FILE in dependencies.yaml conda/environments/*.yaml; do
- for DEP in "${DEPENDENCIES[@]}"; do
+for DEP in "${DEPENDENCIES[@]}"; do
+ for FILE in dependencies.yaml conda/environments/*.yaml; do
sed_runner "/-.* ${DEP}\(-cu[[:digit:]]\{2\}\)\{0,1\}==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}.*,>=0.0.0a0/g" "${FILE}"
done
+ sed_runner "/\"${DEP}==/ s/==.*\"/==${NEXT_SHORT_TAG_PEP440}.*,>=0.0.0a0\"/g" pyproject.toml
+done
+
+UCX_DEPENDENCIES=(
+ distributed-ucxx
+ ucx-py
+ ucxx
+)
+for DEP in "${UCX_DEPENDENCIES[@]}"; do
+ for FILE in dependencies.yaml conda/environments/*.yaml; do
+ sed_runner "/-.* ${DEP}\(-cu[[:digit:]]\{2\}\)\{0,1\}==/ s/==.*/==${NEXT_UCXPY_VERSION}.*,>=0.0.0a0/g" "${FILE}"
+ done
+ sed_runner "/\"${DEP}==/ s/==.*\"/==${NEXT_UCXPY_VERSION}.*,>=0.0.0a0\"/g" pyproject.toml
done
# CI files
for FILE in .github/workflows/*.yaml; do
sed_runner "/shared-workflows/ s/@.*/@branch-${NEXT_SHORT_TAG}/g" "${FILE}"
done
-sed_runner "s/RAPIDS_VERSION_NUMBER=\".*/RAPIDS_VERSION_NUMBER=\"${NEXT_SHORT_TAG}\"/g" ci/build_docs.sh
# Docs referencing source code
find docs/source/ -type f -name *.rst -print0 | while IFS= read -r -d '' filename; do
diff --git a/ci/test_python.sh b/ci/test_python.sh
index 78330a40..18dd88cf 100755
--- a/ci/test_python.sh
+++ b/ci/test_python.sh
@@ -5,6 +5,8 @@ set -euo pipefail
. /opt/conda/etc/profile.d/conda.sh
+RAPIDS_VERSION="$(rapids-version)"
+
rapids-logger "Generate Python testing dependencies"
rapids-dependency-file-generator \
--output conda \
@@ -29,7 +31,7 @@ rapids-print-env
rapids-mamba-retry install \
--channel "${PYTHON_CHANNEL}" \
- dask-cuda
+ "dask-cuda=${RAPIDS_VERSION}"
rapids-logger "Check GPU usage"
nvidia-smi
@@ -50,9 +52,9 @@ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
-timeout 60m pytest \
+timeout 90m pytest \
-vv \
- --durations=0 \
+ --durations=50 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda.xml" \
@@ -60,7 +62,7 @@ timeout 60m pytest \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage.xml" \
--cov-report=term \
- tests -k "not ucxx"
+ tests
popd
rapids-logger "pytest explicit-comms (legacy dd)"
@@ -71,9 +73,9 @@ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \
UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
-timeout 30m pytest \
+timeout 60m pytest \
-vv \
- --durations=0 \
+ --durations=50 \
--capture=no \
--cache-clear \
--junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \
@@ -81,7 +83,7 @@ timeout 30m pytest \
--cov=dask_cuda \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \
--cov-report=term \
- tests/test_explicit_comms.py -k "not ucxx"
+ tests/test_explicit_comms.py
popd
rapids-logger "Run local benchmark (dask-expr)"
diff --git a/ci/validate_wheel.sh b/ci/validate_wheel.sh
new file mode 100755
index 00000000..60a80fce
--- /dev/null
+++ b/ci/validate_wheel.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+# Copyright (c) 2024, NVIDIA CORPORATION.
+
+set -euo pipefail
+
+wheel_dir_relative_path=$1
+
+rapids-logger "validate packages with 'pydistcheck'"
+
+pydistcheck \
+ --inspect \
+ "$(echo ${wheel_dir_relative_path}/*.whl)"
+
+rapids-logger "validate packages with 'twine'"
+
+twine check \
+ --strict \
+ "$(echo ${wheel_dir_relative_path}/*.whl)"
diff --git a/conda/environments/all_cuda-114_arch-x86_64.yaml b/conda/environments/all_cuda-114_arch-x86_64.yaml
index 1cc125d4..4a6e2cd5 100644
--- a/conda/environments/all_cuda-114_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-114_arch-x86_64.yaml
@@ -10,28 +10,28 @@ dependencies:
- click >=8.1
- cuda-version=11.4
- cudatoolkit
-- cudf==24.8.*,>=0.0.0a0
-- dask-cudf==24.8.*,>=0.0.0a0
-- distributed-ucxx==0.39.*,>=0.0.0a0
-- kvikio==24.8.*,>=0.0.0a0
+- cudf==24.12.*,>=0.0.0a0
+- dask-cudf==24.12.*,>=0.0.0a0
+- distributed-ucxx==0.41.*,>=0.0.0a0
+- kvikio==24.12.*,>=0.0.0a0
- numactl-devel-cos7-x86_64
- numba>=0.57
-- numpy>=1.23,<2.0a0
+- numpy>=1.23,<3.0a0
- numpydoc>=1.1.0
- pandas>=1.3
- pre-commit
- pynvml>=11.0.0
- pytest
- pytest-cov
-- python>=3.9,<3.12
+- python>=3.10,<3.13
- rapids-build-backend>=0.3.0,<0.4.0dev0
-- rapids-dask-dependency==24.8.*,>=0.0.0a0
+- rapids-dask-dependency==24.12.*,>=0.0.0a0
- setuptools>=64.0.0
- sphinx
- sphinx-click>=2.7.1
- sphinx-rtd-theme>=0.5.1
- ucx-proc=*=gpu
-- ucx-py==0.39.*,>=0.0.0a0
-- ucxx==0.39.*,>=0.0.0a0
+- ucx-py==0.41.*,>=0.0.0a0
+- ucxx==0.41.*,>=0.0.0a0
- zict>=2.0.0
name: all_cuda-114_arch-x86_64
diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml
index f8b5d17f..6b6a637b 100644
--- a/conda/environments/all_cuda-118_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-118_arch-x86_64.yaml
@@ -10,28 +10,28 @@ dependencies:
- click >=8.1
- cuda-version=11.8
- cudatoolkit
-- cudf==24.8.*,>=0.0.0a0
-- dask-cudf==24.8.*,>=0.0.0a0
-- distributed-ucxx==0.39.*,>=0.0.0a0
-- kvikio==24.8.*,>=0.0.0a0
+- cudf==24.12.*,>=0.0.0a0
+- dask-cudf==24.12.*,>=0.0.0a0
+- distributed-ucxx==0.41.*,>=0.0.0a0
+- kvikio==24.12.*,>=0.0.0a0
- numactl-devel-cos7-x86_64
- numba>=0.57
-- numpy>=1.23,<2.0a0
+- numpy>=1.23,<3.0a0
- numpydoc>=1.1.0
- pandas>=1.3
- pre-commit
- pynvml>=11.0.0
- pytest
- pytest-cov
-- python>=3.9,<3.12
+- python>=3.10,<3.13
- rapids-build-backend>=0.3.0,<0.4.0dev0
-- rapids-dask-dependency==24.8.*,>=0.0.0a0
+- rapids-dask-dependency==24.12.*,>=0.0.0a0
- setuptools>=64.0.0
- sphinx
- sphinx-click>=2.7.1
- sphinx-rtd-theme>=0.5.1
- ucx-proc=*=gpu
-- ucx-py==0.39.*,>=0.0.0a0
-- ucxx==0.39.*,>=0.0.0a0
+- ucx-py==0.41.*,>=0.0.0a0
+- ucxx==0.41.*,>=0.0.0a0
- zict>=2.0.0
name: all_cuda-118_arch-x86_64
diff --git a/conda/environments/all_cuda-122_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml
similarity index 64%
rename from conda/environments/all_cuda-122_arch-x86_64.yaml
rename to conda/environments/all_cuda-125_arch-x86_64.yaml
index 881fcba0..f97f3771 100644
--- a/conda/environments/all_cuda-122_arch-x86_64.yaml
+++ b/conda/environments/all_cuda-125_arch-x86_64.yaml
@@ -10,29 +10,29 @@ dependencies:
- click >=8.1
- cuda-nvcc-impl
- cuda-nvrtc
-- cuda-version=12.2
-- cudf==24.8.*,>=0.0.0a0
-- dask-cudf==24.8.*,>=0.0.0a0
-- distributed-ucxx==0.39.*,>=0.0.0a0
-- kvikio==24.8.*,>=0.0.0a0
+- cuda-version=12.5
+- cudf==24.12.*,>=0.0.0a0
+- dask-cudf==24.12.*,>=0.0.0a0
+- distributed-ucxx==0.41.*,>=0.0.0a0
+- kvikio==24.12.*,>=0.0.0a0
- numactl-devel-cos7-x86_64
- numba>=0.57
-- numpy>=1.23,<2.0a0
+- numpy>=1.23,<3.0a0
- numpydoc>=1.1.0
- pandas>=1.3
- pre-commit
- pynvml>=11.0.0
- pytest
- pytest-cov
-- python>=3.9,<3.12
+- python>=3.10,<3.13
- rapids-build-backend>=0.3.0,<0.4.0dev0
-- rapids-dask-dependency==24.8.*,>=0.0.0a0
+- rapids-dask-dependency==24.12.*,>=0.0.0a0
- setuptools>=64.0.0
- sphinx
- sphinx-click>=2.7.1
- sphinx-rtd-theme>=0.5.1
- ucx-proc=*=gpu
-- ucx-py==0.39.*,>=0.0.0a0
-- ucxx==0.39.*,>=0.0.0a0
+- ucx-py==0.41.*,>=0.0.0a0
+- ucxx==0.41.*,>=0.0.0a0
- zict>=2.0.0
-name: all_cuda-122_arch-x86_64
+name: all_cuda-125_arch-x86_64
diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py
index 516599da..5711ac08 100644
--- a/dask_cuda/__init__.py
+++ b/dask_cuda/__init__.py
@@ -9,6 +9,8 @@
import dask.dataframe.shuffle
import dask.dataframe.multi
import dask.bag.core
+from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
+from distributed.protocol.serialize import dask_deserialize, dask_serialize
from ._version import __git_commit__, __version__
from .cuda_worker import CUDAWorker
@@ -48,3 +50,20 @@
dask.dataframe.shuffle.shuffle_group
)
dask.dataframe.core._concat = unproxify_decorator(dask.dataframe.core._concat)
+
+
+def _register_cudf_spill_aware():
+ import cudf
+
+ # Only enable Dask/cuDF spilling if cuDF spilling is disabled, see
+ # https://github.com/rapidsai/dask-cuda/issues/1363
+ if not cudf.get_option("spill"):
+ # This reproduces the implementation of `_register_cudf`, see
+ # https://github.com/dask/distributed/blob/40fcd65e991382a956c3b879e438be1b100dff97/distributed/protocol/__init__.py#L106-L115
+ from cudf.comm import serialize
+
+
+for registry in [cuda_serialize, cuda_deserialize, dask_serialize, dask_deserialize]:
+ for lib in ["cudf", "dask_cudf"]:
+ if lib in registry._lazy:
+ registry._lazy[lib] = _register_cudf_spill_aware
diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py
index 7f48d4fa..49676fee 100644
--- a/dask_cuda/benchmarks/common.py
+++ b/dask_cuda/benchmarks/common.py
@@ -1,3 +1,4 @@
+import contextlib
from argparse import Namespace
from functools import partial
from typing import Any, Callable, List, Mapping, NamedTuple, Optional, Tuple
@@ -7,7 +8,7 @@
import pandas as pd
import dask
-from distributed import Client
+from distributed import Client, performance_report
from dask_cuda.benchmarks.utils import (
address_to_index,
@@ -87,12 +88,20 @@ def run_benchmark(client: Client, args: Namespace, config: Config):
If ``args.profile`` is set, the final run is profiled.
"""
+
results = []
- for _ in range(max(1, args.runs) - 1):
- res = config.bench_once(client, args, write_profile=None)
- results.append(res)
- results.append(config.bench_once(client, args, write_profile=args.profile))
- return results
+ for _ in range(max(0, args.warmup_runs)):
+ config.bench_once(client, args, write_profile=None)
+
+ ctx = contextlib.nullcontext()
+ if args.profile is not None:
+ ctx = performance_report(filename=args.profile)
+ with ctx:
+ for _ in range(max(1, args.runs) - 1):
+ res = config.bench_once(client, args, write_profile=None)
+ results.append(res)
+ results.append(config.bench_once(client, args, write_profile=args.profile_last))
+ return results
def gather_bench_results(client: Client, args: Namespace, config: Config):
diff --git a/dask_cuda/benchmarks/local_cudf_groupby.py b/dask_cuda/benchmarks/local_cudf_groupby.py
index 2f07e3df..a9e7d833 100644
--- a/dask_cuda/benchmarks/local_cudf_groupby.py
+++ b/dask_cuda/benchmarks/local_cudf_groupby.py
@@ -7,7 +7,7 @@
import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -98,10 +98,9 @@ def bench_once(client, args, write_profile=None):
"False": False,
}.get(args.shuffle, args.shuffle)
- if write_profile is None:
- ctx = contextlib.nullcontext()
- else:
- ctx = performance_report(filename=args.profile)
+ ctx = contextlib.nullcontext()
+ if write_profile is not None:
+ ctx = performance_report(filename=write_profile)
with ctx:
t1 = clock()
@@ -260,19 +259,6 @@ def parse_args():
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
- {
- "name": "--runs",
- "default": 3,
- "type": int,
- "help": "Number of runs",
- },
]
return parse_benchmark_args(
diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py
index 6a68ad78..6ebe005a 100644
--- a/dask_cuda/benchmarks/local_cudf_merge.py
+++ b/dask_cuda/benchmarks/local_cudf_merge.py
@@ -9,7 +9,7 @@
import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -190,7 +190,7 @@ def bench_once(client, args, write_profile=None):
if args.backend == "explicit-comms":
ctx1 = dask.config.set(explicit_comms=True)
if write_profile is not None:
- ctx2 = performance_report(filename=args.profile)
+ ctx2 = performance_report(filename=write_profile)
with ctx1:
with ctx2:
@@ -335,13 +335,6 @@ def parse_args():
"action": "store_true",
"help": "Use shuffle join (takes precedence over '--broadcast-join').",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
{
"name": "--frac-match",
"default": 0.3,
@@ -353,12 +346,6 @@ def parse_args():
"action": "store_true",
"help": "Don't shuffle the keys of the left (base) dataframe.",
},
- {
- "name": "--runs",
- "default": 3,
- "type": int,
- "help": "Number of runs",
- },
{
"name": [
"-s",
diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py
index a1129dd3..3a0955c4 100644
--- a/dask_cuda/benchmarks/local_cudf_shuffle.py
+++ b/dask_cuda/benchmarks/local_cudf_shuffle.py
@@ -121,10 +121,9 @@ def create_data(
def bench_once(client, args, write_profile=None):
data_processed, df = create_data(client, args)
- if write_profile is None:
- ctx = contextlib.nullcontext()
- else:
- ctx = performance_report(filename=args.profile)
+ ctx = contextlib.nullcontext()
+ if write_profile is not None:
+ ctx = performance_report(filename=write_profile)
with ctx:
if args.backend in {"dask", "dask-noop"}:
@@ -228,19 +227,6 @@ def parse_args():
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
- {
- "name": "--runs",
- "default": 3,
- "type": int,
- "help": "Number of runs",
- },
{
"name": "--ignore-index",
"action": "store_true",
diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py
index 22c51556..ba88db30 100644
--- a/dask_cuda/benchmarks/local_cupy.py
+++ b/dask_cuda/benchmarks/local_cupy.py
@@ -8,7 +8,7 @@
from dask import array as da
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -141,12 +141,11 @@ def bench_once(client, args, write_profile=None):
chunksize = x.chunksize
data_processed = sum(arg.nbytes for arg in func_args)
- # Execute the operations to benchmark
- if args.profile is not None and write_profile is not None:
- ctx = performance_report(filename=args.profile)
- else:
- ctx = contextlib.nullcontext()
+ ctx = contextlib.nullcontext()
+ if write_profile is not None:
+ ctx = performance_report(filename=write_profile)
+ # Execute the operations to benchmark
with ctx:
rng = start_range(message=args.operation, color="purple")
result = func(*func_args)
@@ -297,19 +296,6 @@ def parse_args():
"type": int,
"help": "Chunk size (default 2500).",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB').",
- },
- {
- "name": "--runs",
- "default": 3,
- "type": int,
- "help": "Number of runs (default 3).",
- },
{
"name": [
"-b",
diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py
index 8250c9f9..ecefa52a 100644
--- a/dask_cuda/benchmarks/local_cupy_map_overlap.py
+++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py
@@ -10,7 +10,7 @@
from dask import array as da
from dask.distributed import performance_report, wait
-from dask.utils import format_bytes, parse_bytes
+from dask.utils import format_bytes
from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
@@ -42,12 +42,11 @@ def bench_once(client, args, write_profile=None):
data_processed = x.nbytes
- # Execute the operations to benchmark
- if args.profile is not None and write_profile is not None:
- ctx = performance_report(filename=args.profile)
- else:
- ctx = contextlib.nullcontext()
+ ctx = contextlib.nullcontext()
+ if write_profile is not None:
+ ctx = performance_report(filename=write_profile)
+ # Execute the operations to benchmark
with ctx:
result = x.map_overlap(mean_filter, args.kernel_size, shape=ks)
if args.backend == "dask-noop":
@@ -168,19 +167,6 @@ def parse_args():
"type": int,
"help": "Kernel size, 2*k+1, in each dimension (default 1)",
},
- {
- "name": "--ignore-size",
- "default": "1 MiB",
- "metavar": "nbytes",
- "type": parse_bytes,
- "help": "Ignore messages smaller than this (default '1 MB')",
- },
- {
- "name": "--runs",
- "default": 3,
- "type": int,
- "help": "Number of runs",
- },
{
"name": [
"-b",
diff --git a/dask_cuda/benchmarks/read_parquet.py b/dask_cuda/benchmarks/read_parquet.py
new file mode 100644
index 00000000..bce69673
--- /dev/null
+++ b/dask_cuda/benchmarks/read_parquet.py
@@ -0,0 +1,268 @@
+import contextlib
+from collections import ChainMap
+from time import perf_counter as clock
+
+import fsspec
+import pandas as pd
+
+import dask
+import dask.dataframe as dd
+from dask.base import tokenize
+from dask.distributed import performance_report
+from dask.utils import format_bytes, parse_bytes
+
+from dask_cuda.benchmarks.common import Config, execute_benchmark
+from dask_cuda.benchmarks.utils import (
+ parse_benchmark_args,
+ print_key_value,
+ print_separator,
+ print_throughput_bandwidth,
+)
+
+DISK_SIZE_CACHE = {}
+OPTIONS_CACHE = {}
+
+
+def _noop(df):
+ return df
+
+
+def read_data(paths, columns, backend, **kwargs):
+ with dask.config.set({"dataframe.backend": backend}):
+ return dd.read_parquet(
+ paths,
+ columns=columns,
+ **kwargs,
+ )
+
+
+def get_fs_paths_kwargs(args):
+ kwargs = {}
+
+ storage_options = {}
+ if args.key:
+ storage_options["key"] = args.key
+ if args.secret:
+ storage_options["secret"] = args.secret
+
+ if args.filesystem == "arrow":
+ import pyarrow.fs as pa_fs
+ from fsspec.implementations.arrow import ArrowFSWrapper
+
+ _mapping = {
+ "key": "access_key",
+ "secret": "secret_key",
+ } # See: pyarrow.fs.S3FileSystem docs
+ s3_args = {}
+ for k, v in storage_options.items():
+ s3_args[_mapping[k]] = v
+
+ fs = pa_fs.FileSystem.from_uri(args.path)[0]
+ try:
+ region = {"region": fs.region}
+ except AttributeError:
+ region = {}
+ kwargs["filesystem"] = type(fs)(**region, **s3_args)
+ fsspec_fs = ArrowFSWrapper(kwargs["filesystem"])
+
+ if args.type == "gpu":
+ kwargs["blocksize"] = args.blocksize
+ else:
+ fsspec_fs = fsspec.core.get_fs_token_paths(
+ args.path, mode="rb", storage_options=storage_options
+ )[0]
+ kwargs["filesystem"] = fsspec_fs
+ kwargs["blocksize"] = args.blocksize
+ kwargs["aggregate_files"] = args.aggregate_files
+
+ # Collect list of paths
+ stripped_url_path = fsspec_fs._strip_protocol(args.path)
+ if stripped_url_path.endswith("/"):
+ stripped_url_path = stripped_url_path[:-1]
+ paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet")
+ if args.file_count:
+ paths = paths[: args.file_count]
+
+ return fsspec_fs, paths, kwargs
+
+
+def bench_once(client, args, write_profile=None):
+ global OPTIONS_CACHE
+ global DISK_SIZE_CACHE
+
+ # Construct kwargs
+ token = tokenize(args)
+ try:
+ fsspec_fs, paths, kwargs = OPTIONS_CACHE[token]
+ except KeyError:
+ fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args)
+ OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs)
+
+ if write_profile is None:
+ ctx = contextlib.nullcontext()
+ else:
+ ctx = performance_report(filename=args.profile)
+
+ with ctx:
+ t1 = clock()
+ df = read_data(
+ paths,
+ columns=args.columns,
+ backend="cudf" if args.type == "gpu" else "pandas",
+ **kwargs,
+ )
+ num_rows = len(
+ # Use opaque `map_partitions` call to "block"
+ # dask-expr from using pq metadata to get length
+ df.map_partitions(
+ _noop,
+ meta=df._meta,
+ enforce_metadata=False,
+ )
+ )
+ t2 = clock()
+
+ # Extract total size of files on disk
+ token = tokenize(paths)
+ try:
+ disk_size = DISK_SIZE_CACHE[token]
+ except KeyError:
+ disk_size = sum(fsspec_fs.sizes(paths))
+ DISK_SIZE_CACHE[token] = disk_size
+
+ return (disk_size, num_rows, t2 - t1)
+
+
+def pretty_print_results(args, address_to_index, p2p_bw, results):
+ if args.markdown:
+ print("```")
+ print("Parquet read benchmark")
+ data_processed, row_count, durations = zip(*results)
+ print_separator(separator="-")
+ backend = "cudf" if args.type == "gpu" else "pandas"
+ print_key_value(key="Path", value=args.path)
+ print_key_value(key="Columns", value=f"{args.columns}")
+ print_key_value(key="Backend", value=f"{backend}")
+ print_key_value(key="Filesystem", value=f"{args.filesystem}")
+ print_key_value(key="Blocksize", value=f"{format_bytes(args.blocksize)}")
+ print_key_value(key="Aggregate files", value=f"{args.aggregate_files}")
+ print_key_value(key="Row count", value=f"{row_count[0]}")
+ print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}")
+ if args.markdown:
+ print("\n```")
+ args.no_show_p2p_bandwidth = True
+ print_throughput_bandwidth(
+ args, durations, data_processed, p2p_bw, address_to_index
+ )
+ print_separator(separator="=")
+
+
+def create_tidy_results(args, p2p_bw, results):
+ configuration = {
+ "path": args.path,
+ "columns": args.columns,
+ "backend": "cudf" if args.type == "gpu" else "pandas",
+ "filesystem": args.filesystem,
+ "blocksize": args.blocksize,
+ "aggregate_files": args.aggregate_files,
+ }
+ timing_data = pd.DataFrame(
+ [
+ pd.Series(
+ data=ChainMap(
+ configuration,
+ {
+ "wallclock": duration,
+ "data_processed": data_processed,
+ "num_rows": num_rows,
+ },
+ )
+ )
+ for data_processed, num_rows, duration in results
+ ]
+ )
+ return timing_data, p2p_bw
+
+
+def parse_args():
+ special_args = [
+ {
+ "name": "path",
+ "type": str,
+ "help": "Parquet directory to read from (must be a flat directory).",
+ },
+ {
+ "name": "--blocksize",
+ "default": "256MB",
+ "type": parse_bytes,
+ "help": "How to set the blocksize option",
+ },
+ {
+ "name": "--aggregate-files",
+ "default": False,
+ "action": "store_true",
+ "help": "How to set the aggregate_files option",
+ },
+ {
+ "name": "--file-count",
+ "type": int,
+ "help": "Maximum number of files to read.",
+ },
+ {
+ "name": "--columns",
+ "type": str,
+ "help": "Columns to read/select from data.",
+ },
+ {
+ "name": "--key",
+ "type": str,
+ "help": "Public S3 key.",
+ },
+ {
+ "name": "--secret",
+ "type": str,
+ "help": "Secret S3 key.",
+ },
+ {
+ "name": [
+ "-t",
+ "--type",
+ ],
+ "choices": ["cpu", "gpu"],
+ "default": "gpu",
+ "type": str,
+ "help": "Use GPU or CPU dataframes (default 'gpu')",
+ },
+ {
+ "name": "--filesystem",
+ "choices": ["arrow", "fsspec"],
+ "default": "fsspec",
+ "type": str,
+ "help": "Filesystem backend",
+ },
+ {
+ "name": "--runs",
+ "default": 3,
+ "type": int,
+ "help": "Number of runs",
+ },
+ ]
+
+ args = parse_benchmark_args(
+ description="Parquet read benchmark",
+ args_list=special_args,
+ check_explicit_comms=False,
+ )
+ args.no_show_p2p_bandwidth = True
+ return args
+
+
+if __name__ == "__main__":
+ execute_benchmark(
+ Config(
+ args=parse_args(),
+ bench_once=bench_once,
+ create_tidy_results=create_tidy_results,
+ pretty_print_results=pretty_print_results,
+ )
+ )
diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py
index 48e4755f..4f87a025 100644
--- a/dask_cuda/benchmarks/utils.py
+++ b/dask_cuda/benchmarks/utils.py
@@ -323,7 +323,16 @@ def parse_benchmark_args(
metavar="PATH",
default=None,
type=str,
- help="Write dask profile report (E.g. dask-report.html)",
+ help="Write dask profile report (E.g. dask-report.html) on all "
+ "iterations (excluding warmup).",
+ )
+ parser.add_argument(
+ "--profile-last",
+ metavar="PATH",
+ default=None,
+ type=str,
+ help="Write dask profile report (E.g. dask-report.html) on last "
+ "iteration only.",
)
# See save_benchmark_data for more information
parser.add_argument(
@@ -337,6 +346,25 @@ def parse_benchmark_args(
"If the files already exist, new files are created with a uniquified "
"BASENAME.",
)
+ parser.add_argument(
+ "--ignore-size",
+ default="1 MiB",
+ metavar="nbytes",
+ type=parse_bytes,
+ help="Bandwidth statistics: ignore messages smaller than this (default '1 MB')",
+ )
+ parser.add_argument(
+ "--runs",
+ default=3,
+ type=int,
+ help="Number of runs",
+ )
+ parser.add_argument(
+ "--warmup-runs",
+ default=1,
+ type=int,
+ help="Number of warmup runs",
+ )
for args in args_list:
name = args.pop("name")
@@ -765,7 +793,7 @@ def print_throughput_bandwidth(
)
print_key_value(
key="Wall clock",
- value=f"{format_time(durations.mean())} +/- {format_time(durations.std()) }",
+ value=f"{format_time(durations.mean())} +/- {format_time(durations.std())}",
)
if not args.no_show_p2p_bandwidth:
print_separator(separator="=")
diff --git a/dask_cuda/cli.py b/dask_cuda/cli.py
index ba58fe3e..8101f020 100644
--- a/dask_cuda/cli.py
+++ b/dask_cuda/cli.py
@@ -13,7 +13,7 @@
from distributed.utils import import_term
from .cuda_worker import CUDAWorker
-from .utils import print_cluster_config
+from .utils import CommaSeparatedChoice, print_cluster_config
logger = logging.getLogger(__name__)
@@ -101,6 +101,20 @@ def cuda():
total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"`` or 0 to
disable spilling to host (i.e. allow full device memory usage).""",
)
+@click.option(
+ "--enable-cudf-spill/--disable-cudf-spill",
+ default=False,
+ show_default=True,
+ help="""Enable automatic cuDF spilling. WARNING: This should NOT be used with
+ JIT-Unspill.""",
+)
+@click.option(
+ "--cudf-spill-stats",
+ type=int,
+ default=0,
+ help="""Set the cuDF spilling statistics level. This option has no effect if
+ `--enable-cudf-spill` is not specified.""",
+)
@click.option(
"--rmm-pool-size",
default=None,
@@ -150,13 +164,24 @@ def cuda():
incompatible with RMM pools and managed memory, trying to enable both will
result in failure.""",
)
+@click.option(
+ "--set-rmm-allocator-for-libs",
+ "rmm_allocator_external_lib_list",
+ type=CommaSeparatedChoice(["cupy", "torch"]),
+ default=None,
+ show_default=True,
+ help="""
+ Set RMM as the allocator for external libraries. Provide a comma-separated
+ list of libraries to set, e.g., "torch,cupy".""",
+)
@click.option(
"--rmm-release-threshold",
default=None,
- help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this value, unused
- memory held by the pool will be released at the next synchronization point. Can be
- an integer (bytes), float (fraction of total device memory), string (like ``"5GB"``
- or ``"5000M"``) or ``None``. By default, this feature is disabled.
+ help="""When ``rmm.async`` is ``True`` and the pool size grows beyond this
+ value, unused memory held by the pool will be released at the next
+ synchronization point. Can be an integer (bytes), float (fraction of total
+ device memory), string (like ``"5GB"`` or ``"5000M"``) or ``None``. By
+ default, this feature is disabled.
.. note::
This size is a per-worker configuration, and not cluster-wide.""",
@@ -330,10 +355,13 @@ def worker(
name,
memory_limit,
device_memory_limit,
+ enable_cudf_spill,
+ cudf_spill_stats,
rmm_pool_size,
rmm_maximum_pool_size,
rmm_managed_memory,
rmm_async,
+ rmm_allocator_external_lib_list,
rmm_release_threshold,
rmm_log_directory,
rmm_track_allocations,
@@ -402,10 +430,13 @@ def worker(
name,
memory_limit,
device_memory_limit,
+ enable_cudf_spill,
+ cudf_spill_stats,
rmm_pool_size,
rmm_maximum_pool_size,
rmm_managed_memory,
rmm_async,
+ rmm_allocator_external_lib_list,
rmm_release_threshold,
rmm_log_directory,
rmm_track_allocations,
diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py
index e25a7c14..30c14450 100644
--- a/dask_cuda/cuda_worker.py
+++ b/dask_cuda/cuda_worker.py
@@ -20,7 +20,7 @@
from .device_host_file import DeviceHostFile
from .initialize import initialize
-from .plugins import CPUAffinity, PreImport, RMMSetup
+from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup
from .proxify_host_file import ProxifyHostFile
from .utils import (
cuda_visible_devices,
@@ -41,10 +41,13 @@ def __init__(
name=None,
memory_limit="auto",
device_memory_limit="auto",
+ enable_cudf_spill=False,
+ cudf_spill_stats=0,
rmm_pool_size=None,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
+ rmm_allocator_external_lib_list=None,
rmm_release_threshold=None,
rmm_log_directory=None,
rmm_track_allocations=False,
@@ -166,6 +169,12 @@ def del_pid_file():
if device_memory_limit is None and memory_limit is None:
data = lambda _: {}
elif jit_unspill:
+ if enable_cudf_spill:
+ warnings.warn(
+ "Enabling cuDF spilling and JIT-Unspill together is not "
+ "safe, consider disabling JIT-Unspill."
+ )
+
data = lambda i: (
ProxifyHostFile,
{
@@ -187,6 +196,14 @@ def del_pid_file():
},
)
+ cudf_spill_warning = dask.config.get("cudf-spill-warning", default=True)
+ if enable_cudf_spill and cudf_spill_warning:
+ warnings.warn(
+ "cuDF spilling is enabled, please ensure the client and scheduler "
+ "processes set `CUDF_SPILL=on` as well. To disable this warning "
+ "set `DASK_CUDF_SPILL_WARNING=False`."
+ )
+
self.nannies = [
Nanny(
scheduler,
@@ -215,8 +232,10 @@ def del_pid_file():
release_threshold=rmm_release_threshold,
log_directory=rmm_log_directory,
track_allocations=rmm_track_allocations,
+ external_lib_list=rmm_allocator_external_lib_list,
),
PreImport(pre_import),
+ CUDFSetup(spill=enable_cudf_spill, spill_stats=cudf_spill_stats),
},
name=name if nprocs == 1 or name is None else str(name) + "-" + str(i),
local_directory=local_directory,
diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py
index 1b81c770..7a24df43 100644
--- a/dask_cuda/local_cuda_cluster.py
+++ b/dask_cuda/local_cuda_cluster.py
@@ -10,7 +10,7 @@
from .device_host_file import DeviceHostFile
from .initialize import initialize
-from .plugins import CPUAffinity, PreImport, RMMSetup
+from .plugins import CPUAffinity, CUDFSetup, PreImport, RMMSetup
from .proxify_host_file import ProxifyHostFile
from .utils import (
cuda_visible_devices,
@@ -73,6 +73,14 @@ class LocalCUDACluster(LocalCluster):
starts spilling to host memory. Can be an integer (bytes), float (fraction of
total device memory), string (like ``"5GB"`` or ``"5000M"``), or ``"auto"``, 0,
or ``None`` to disable spilling to host (i.e. allow full device memory usage).
+ enable_cudf_spill : bool, default False
+ Enable automatic cuDF spilling.
+
+ .. warning::
+ This should NOT be used together with JIT-Unspill.
+ cudf_spill_stats : int, default 0
+ Set the cuDF spilling statistics level. This option has no effect if
+ ``enable_cudf_spill=False``.
local_directory : str or None, default None
Path on local machine to store temporary files. Can be a string (like
``"path/to/files"``) or ``None`` to fall back on the value of
@@ -135,6 +143,11 @@ class LocalCUDACluster(LocalCluster):
The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also
incompatible with RMM pools and managed memory. Trying to enable both will
result in an exception.
+ rmm_allocator_external_lib_list: str, list or None, default None
+ List of external libraries for which to set RMM as the allocator.
+ Supported options are: ``["torch", "cupy"]``. Can be a comma-separated string
+ (like ``"torch,cupy"``) or a list of strings (like ``["torch", "cupy"]``).
+ If ``None``, no external libraries will use RMM as their allocator.
rmm_release_threshold: int, str or None, default None
When ``rmm.async is True`` and the pool size grows beyond this value, unused
memory held by the pool will be released at the next synchronization point.
@@ -209,6 +222,8 @@ def __init__(
threads_per_worker=1,
memory_limit="auto",
device_memory_limit=0.8,
+ enable_cudf_spill=False,
+ cudf_spill_stats=0,
data=None,
local_directory=None,
shared_filesystem=None,
@@ -221,6 +236,7 @@ def __init__(
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_async=False,
+ rmm_allocator_external_lib_list=None,
rmm_release_threshold=None,
rmm_log_directory=None,
rmm_track_allocations=False,
@@ -234,6 +250,13 @@ def __init__(
# initialization happens before we can set CUDA_VISIBLE_DEVICES
os.environ["RAPIDS_NO_INITIALIZE"] = "True"
+ if enable_cudf_spill:
+ import cudf
+
+ # cuDF spilling must be enabled in the client/scheduler process too.
+ cudf.set_option("spill", enable_cudf_spill)
+ cudf.set_option("spill_stats", cudf_spill_stats)
+
if threads_per_worker < 1:
raise ValueError("threads_per_worker must be higher than 0.")
@@ -248,6 +271,19 @@ def __init__(
n_workers = len(CUDA_VISIBLE_DEVICES)
if n_workers < 1:
raise ValueError("Number of workers cannot be less than 1.")
+
+ if rmm_allocator_external_lib_list is not None:
+ if isinstance(rmm_allocator_external_lib_list, str):
+ rmm_allocator_external_lib_list = [
+ v.strip() for v in rmm_allocator_external_lib_list.split(",")
+ ]
+ elif not isinstance(rmm_allocator_external_lib_list, list):
+ raise ValueError(
+ "rmm_allocator_external_lib_list must be either a comma-separated "
+ "string or a list of strings. Examples: 'torch,cupy' "
+ "or ['torch', 'cupy']"
+ )
+
# Set nthreads=1 when parsing mem_limit since it only depends on n_workers
logger = logging.getLogger(__name__)
self.memory_limit = parse_memory_limit(
@@ -259,12 +295,16 @@ def __init__(
self.device_memory_limit = parse_device_memory_limit(
device_memory_limit, device_index=nvml_device_index(0, CUDA_VISIBLE_DEVICES)
)
+ self.enable_cudf_spill = enable_cudf_spill
+ self.cudf_spill_stats = cudf_spill_stats
self.rmm_pool_size = rmm_pool_size
self.rmm_maximum_pool_size = rmm_maximum_pool_size
self.rmm_managed_memory = rmm_managed_memory
self.rmm_async = rmm_async
self.rmm_release_threshold = rmm_release_threshold
+ self.rmm_allocator_external_lib_list = rmm_allocator_external_lib_list
+
if rmm_pool_size is not None or rmm_managed_memory or rmm_async:
try:
import rmm # noqa F401
@@ -302,6 +342,12 @@ def __init__(
if device_memory_limit is None and memory_limit is None:
data = {}
elif jit_unspill:
+ if enable_cudf_spill:
+ warnings.warn(
+ "Enabling cuDF spilling and JIT-Unspill together is not "
+ "safe, consider disabling JIT-Unspill."
+ )
+
data = (
ProxifyHostFile,
{
@@ -412,8 +458,10 @@ def new_worker_spec(self):
release_threshold=self.rmm_release_threshold,
log_directory=self.rmm_log_directory,
track_allocations=self.rmm_track_allocations,
+ external_lib_list=self.rmm_allocator_external_lib_list,
),
PreImport(self.pre_import),
+ CUDFSetup(self.enable_cudf_spill, self.cudf_spill_stats),
},
}
)
diff --git a/dask_cuda/plugins.py b/dask_cuda/plugins.py
index 4eba97f2..cd1928af 100644
--- a/dask_cuda/plugins.py
+++ b/dask_cuda/plugins.py
@@ -1,5 +1,6 @@
import importlib
import os
+from typing import Callable, Dict
from distributed import WorkerPlugin
@@ -14,6 +15,21 @@ def setup(self, worker=None):
os.sched_setaffinity(0, self.cores)
+class CUDFSetup(WorkerPlugin):
+ def __init__(self, spill, spill_stats):
+ self.spill = spill
+ self.spill_stats = spill_stats
+
+ def setup(self, worker=None):
+ try:
+ import cudf
+
+ cudf.set_option("spill", self.spill)
+ cudf.set_option("spill_stats", self.spill_stats)
+ except ImportError:
+ pass
+
+
class RMMSetup(WorkerPlugin):
def __init__(
self,
@@ -24,6 +40,7 @@ def __init__(
release_threshold,
log_directory,
track_allocations,
+ external_lib_list,
):
if initial_pool_size is None and maximum_pool_size is not None:
raise ValueError(
@@ -46,6 +63,7 @@ def __init__(
self.logging = log_directory is not None
self.log_directory = log_directory
self.rmm_track_allocations = track_allocations
+ self.external_lib_list = external_lib_list
def setup(self, worker=None):
if self.initial_pool_size is not None:
@@ -108,6 +126,70 @@ def setup(self, worker=None):
mr = rmm.mr.get_current_device_resource()
rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr))
+ if self.external_lib_list is not None:
+ for lib in self.external_lib_list:
+ enable_rmm_memory_for_library(lib)
+
+
+def enable_rmm_memory_for_library(lib_name: str) -> None:
+ """Enable RMM memory pool support for a specified third-party library.
+
+ This function allows the given library to utilize RMM's memory pool if it supports
+ integration with RMM. The library name is passed as a string argument, and if the
+ library is compatible, its memory allocator will be configured to use RMM.
+
+ Parameters
+ ----------
+ lib_name : str
+ The name of the third-party library to enable RMM memory pool support for.
+ Supported libraries are "cupy" and "torch".
+
+ Raises
+ ------
+ ValueError
+ If the library name is not supported or does not have RMM integration.
+ ImportError
+ If the required library is not installed.
+ """
+
+ # Mapping of supported libraries to their respective setup functions
+ setup_functions: Dict[str, Callable[[], None]] = {
+ "torch": _setup_rmm_for_torch,
+ "cupy": _setup_rmm_for_cupy,
+ }
+
+ if lib_name not in setup_functions:
+ supported_libs = ", ".join(setup_functions.keys())
+ raise ValueError(
+ f"The library '{lib_name}' is not supported for RMM integration. "
+ f"Supported libraries are: {supported_libs}."
+ )
+
+ # Call the setup function for the specified library
+ setup_functions[lib_name]()
+
+
+def _setup_rmm_for_torch() -> None:
+ try:
+ import torch
+ except ImportError as e:
+ raise ImportError("PyTorch is not installed.") from e
+
+ from rmm.allocators.torch import rmm_torch_allocator
+
+ torch.cuda.memory.change_current_allocator(rmm_torch_allocator)
+
+
+def _setup_rmm_for_cupy() -> None:
+ try:
+ import cupy
+ except ImportError as e:
+ raise ImportError("CuPy is not installed.") from e
+
+ from rmm.allocators.cupy import rmm_cupy_allocator
+
+ cupy.cuda.set_allocator(rmm_cupy_allocator)
+
class PreImport(WorkerPlugin):
def __init__(self, libraries):
diff --git a/dask_cuda/tests/pytest.ini b/dask_cuda/tests/pytest.ini
new file mode 100644
index 00000000..7b0a9f29
--- /dev/null
+++ b/dask_cuda/tests/pytest.ini
@@ -0,0 +1,4 @@
+# Copyright (c) 2024, NVIDIA CORPORATION.
+
+[pytest]
+addopts = --tb=native
diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py
index 974ad131..049fe85f 100644
--- a/dask_cuda/tests/test_dask_cuda_worker.py
+++ b/dask_cuda/tests/test_dask_cuda_worker.py
@@ -231,6 +231,64 @@ def test_rmm_logging(loop): # noqa: F811
assert v is rmm.mr.LoggingResourceAdaptor
+def test_cudf_spill_disabled(loop): # noqa: F811
+ cudf = pytest.importorskip("cudf")
+ with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
+ with popen(
+ [
+ "dask",
+ "cuda",
+ "worker",
+ "127.0.0.1:9369",
+ "--host",
+ "127.0.0.1",
+ "--no-dashboard",
+ ]
+ ):
+ with Client("127.0.0.1:9369", loop=loop) as client:
+ assert wait_workers(client, n_gpus=get_n_gpus())
+
+ cudf_spill = client.run(
+ cudf.get_option,
+ "spill",
+ )
+ for v in cudf_spill.values():
+ assert v is False
+
+ cudf_spill_stats = client.run(cudf.get_option, "spill_stats")
+ for v in cudf_spill_stats.values():
+ assert v == 0
+
+
+def test_cudf_spill(loop): # noqa: F811
+ cudf = pytest.importorskip("cudf")
+ with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
+ with popen(
+ [
+ "dask",
+ "cuda",
+ "worker",
+ "127.0.0.1:9369",
+ "--host",
+ "127.0.0.1",
+ "--no-dashboard",
+ "--enable-cudf-spill",
+ "--cudf-spill-stats",
+ "2",
+ ]
+ ):
+ with Client("127.0.0.1:9369", loop=loop) as client:
+ assert wait_workers(client, n_gpus=get_n_gpus())
+
+ cudf_spill = client.run(cudf.get_option, "spill")
+ for v in cudf_spill.values():
+ assert v is True
+
+ cudf_spill_stats = client.run(cudf.get_option, "spill_stats")
+ for v in cudf_spill_stats.values():
+ assert v == 2
+
+
@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0"})
def test_dashboard_address(loop): # noqa: F811
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
@@ -509,3 +567,30 @@ def test_worker_timeout():
assert "reason: nanny-close" in ret.stderr.lower()
assert ret.returncode == 0
+
+
+@pytest.mark.parametrize("enable_cudf_spill_warning", [False, True])
+def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811
+ pytest.importorskip("rmm")
+
+ environ = {"CUDA_VISIBLE_DEVICES": "0"}
+ if not enable_cudf_spill_warning:
+ environ["DASK_CUDF_SPILL_WARNING"] = "False"
+
+ with patch.dict(os.environ, environ):
+ ret = subprocess.run(
+ [
+ "dask",
+ "cuda",
+ "worker",
+ "127.0.0.1:9369",
+ "--enable-cudf-spill",
+ "--death-timeout",
+ "1",
+ ],
+ capture_output=True,
+ )
+ if enable_cudf_spill_warning:
+ assert b"UserWarning: cuDF spilling is enabled" in ret.stderr
+ else:
+ assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr
diff --git a/dask_cuda/tests/test_gds.py b/dask_cuda/tests/test_gds.py
index c8667025..262369e6 100644
--- a/dask_cuda/tests/test_gds.py
+++ b/dask_cuda/tests/test_gds.py
@@ -38,7 +38,7 @@ def test_gds(gds_enabled, cuda_lib):
a = data_create()
header, frames = serialize(a, serializers=("disk",))
b = deserialize(header, frames)
- assert type(a) == type(b)
+ assert type(a) is type(b)
assert data_compare(a, b)
finally:
ProxifyHostFile.register_disk_spilling() # Reset disk spilling options
diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py
index b05389e4..b144d111 100644
--- a/dask_cuda/tests/test_local_cuda_cluster.py
+++ b/dask_cuda/tests/test_local_cuda_cluster.py
@@ -500,6 +500,54 @@ async def test_worker_fraction_limits():
)
+@gen_test(timeout=20)
+async def test_cudf_spill_disabled():
+ cudf = pytest.importorskip("cudf")
+
+ async with LocalCUDACluster(
+ asynchronous=True,
+ ) as cluster:
+ async with Client(cluster, asynchronous=True) as client:
+ cudf_spill = await client.run(
+ cudf.get_option,
+ "spill",
+ )
+ for v in cudf_spill.values():
+ assert v is False
+
+ cudf_spill_stats = await client.run(
+ cudf.get_option,
+ "spill_stats",
+ )
+ for v in cudf_spill_stats.values():
+ assert v == 0
+
+
+@gen_test(timeout=20)
+async def test_cudf_spill():
+ cudf = pytest.importorskip("cudf")
+
+ async with LocalCUDACluster(
+ enable_cudf_spill=True,
+ cudf_spill_stats=2,
+ asynchronous=True,
+ ) as cluster:
+ async with Client(cluster, asynchronous=True) as client:
+ cudf_spill = await client.run(
+ cudf.get_option,
+ "spill",
+ )
+ for v in cudf_spill.values():
+ assert v is True
+
+ cudf_spill_stats = await client.run(
+ cudf.get_option,
+ "spill_stats",
+ )
+ for v in cudf_spill_stats.values():
+ assert v == 2
+
+
@pytest.mark.parametrize(
"protocol",
["ucx", "ucxx"],
diff --git a/dask_cuda/tests/test_proxify_host_file.py b/dask_cuda/tests/test_proxify_host_file.py
index 2683ea36..56fe7f8d 100644
--- a/dask_cuda/tests/test_proxify_host_file.py
+++ b/dask_cuda/tests/test_proxify_host_file.py
@@ -252,7 +252,7 @@ def task(x):
assert "ProxyObject" in str(type(x))
assert x._pxy_get().serializer == "dask"
else:
- assert type(x) == cudf.DataFrame
+ assert type(x) is cudf.DataFrame
assert len(x) == 10 # Trigger deserialization
return x
diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py
index 31a9e996..90b84e90 100644
--- a/dask_cuda/tests/test_proxy.py
+++ b/dask_cuda/tests/test_proxy.py
@@ -114,7 +114,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org)
got = op(pxy)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert expect == got
# Check unary operators
@@ -124,7 +124,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org)
got = op(pxy)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert all(expect == got)
# Check binary operators that takes a scalar as second argument
@@ -134,7 +134,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org, 2)
got = op(pxy, 2)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert all(expect == got)
# Check binary operators
@@ -192,7 +192,7 @@ def test_proxy_object_of_array(serializers, backend):
pxy = proxy_object.asproxy(org.copy(), serializers=serializers)
expect = op(org)
got = op(pxy)
- assert type(expect) == type(got)
+ assert type(expect) is type(got)
assert expect == got
# Check reflected methods
@@ -297,7 +297,7 @@ def task(x):
assert "ProxyObject" in str(type(x))
assert x._pxy_get().serializer == "dask"
else:
- assert type(x) == cudf.DataFrame
+ assert type(x) is cudf.DataFrame
assert len(x) == 10 # Trigger deserialization
return x
diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py
index f8df7e04..bdd012d5 100644
--- a/dask_cuda/tests/test_spill.py
+++ b/dask_cuda/tests/test_spill.py
@@ -11,6 +11,8 @@
from distributed.sizeof import sizeof
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401
+import dask_cudf
+
from dask_cuda import LocalCUDACluster, utils
from dask_cuda.utils_test import IncreasedCloseTimeoutNanny
@@ -18,6 +20,57 @@
pytest.skip("Not enough GPU memory", allow_module_level=True)
+def _set_cudf_device_limit():
+ """Ensure spilling for objects of all sizes"""
+ import cudf
+
+ cudf.set_option("spill_device_limit", 0)
+
+
+def _assert_cudf_spill_stats(enable_cudf_spill, dask_worker=None):
+ """Ensure cuDF has spilled data with its internal mechanism"""
+ import cudf
+
+ global_manager = cudf.core.buffer.spill_manager.get_global_manager()
+
+ if enable_cudf_spill:
+ stats = global_manager.statistics
+ buffers = global_manager.buffers()
+ assert stats.spill_totals[("gpu", "cpu")][0] > 1000
+ assert stats.spill_totals[("cpu", "gpu")][0] > 1000
+ assert len(buffers) > 0
+ else:
+ assert global_manager is None
+
+
+@pytest.fixture(params=[False, True])
+def cudf_spill(request):
+ """Fixture to enable and clear cuDF spill manager in client process"""
+ cudf = pytest.importorskip("cudf")
+
+ enable_cudf_spill = request.param
+
+ if enable_cudf_spill:
+ # If the global spill manager was previously set, fail.
+ assert cudf.core.buffer.spill_manager._global_manager is None
+
+ cudf.set_option("spill", True)
+ cudf.set_option("spill_stats", True)
+
+ # This change is to prevent changing RMM resource stack in cuDF,
+ # workers do not need this because they are spawned as new
+ # processes for every new test that runs.
+ cudf.set_option("spill_on_demand", False)
+
+ _set_cudf_device_limit()
+
+ yield enable_cudf_spill
+
+ cudf.set_option("spill", False)
+ cudf.core.buffer.spill_manager._global_manager_uninitialized = True
+ cudf.core.buffer.spill_manager._global_manager = None
+
+
def device_host_file_size_matches(
dhf, total_bytes, device_chunk_overhead=0, serialized_chunk_overhead=1024
):
@@ -244,9 +297,11 @@ async def test_cupy_cluster_device_spill(params):
],
)
@gen_test(timeout=30)
-async def test_cudf_cluster_device_spill(params):
+async def test_cudf_cluster_device_spill(params, cudf_spill):
cudf = pytest.importorskip("cudf")
+ enable_cudf_spill = cudf_spill
+
with dask.config.set(
{
"distributed.comm.compression": False,
@@ -266,6 +321,7 @@ async def test_cudf_cluster_device_spill(params):
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
worker_class=IncreasedCloseTimeoutNanny,
+ enable_cudf_spill=enable_cudf_spill,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
@@ -294,21 +350,28 @@ async def test_cudf_cluster_device_spill(params):
del cdf
gc.collect()
- await client.run(
- assert_host_chunks,
- params["spills_to_disk"],
- )
- await client.run(
- assert_disk_chunks,
- params["spills_to_disk"],
- )
-
- await client.run(
- worker_assert,
- nbytes,
- 32,
- 2048,
- )
+ if enable_cudf_spill:
+ await client.run(
+ worker_assert,
+ 0,
+ 0,
+ 0,
+ )
+ else:
+ await client.run(
+ assert_host_chunks,
+ params["spills_to_disk"],
+ )
+ await client.run(
+ assert_disk_chunks,
+ params["spills_to_disk"],
+ )
+ await client.run(
+ worker_assert,
+ nbytes,
+ 32,
+ 2048,
+ )
del cdf2
@@ -324,3 +387,40 @@ async def test_cudf_cluster_device_spill(params):
gc.collect()
else:
break
+
+
+@gen_test(timeout=30)
+async def test_cudf_spill_cluster(cudf_spill):
+ cudf = pytest.importorskip("cudf")
+ enable_cudf_spill = cudf_spill
+
+ async with LocalCUDACluster(
+ n_workers=1,
+ scheduler_port=0,
+ silence_logs=False,
+ dashboard_address=None,
+ asynchronous=True,
+ device_memory_limit=None,
+ memory_limit=None,
+ worker_class=IncreasedCloseTimeoutNanny,
+ enable_cudf_spill=enable_cudf_spill,
+ cudf_spill_stats=enable_cudf_spill,
+ ) as cluster:
+ async with Client(cluster, asynchronous=True) as client:
+
+ await client.wait_for_workers(1)
+ await client.run(_set_cudf_device_limit)
+
+ cdf = cudf.DataFrame(
+ {
+ "a": list(range(200)),
+ "b": list(reversed(range(200))),
+ "c": list(range(200)),
+ }
+ )
+
+ ddf = dask_cudf.from_cudf(cdf, npartitions=2).sum().persist()
+ await wait(ddf)
+
+ await client.run(_assert_cudf_spill_stats, enable_cudf_spill)
+ _assert_cudf_spill_stats(enable_cudf_spill)
diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py
index ff4dbbae..74596fe2 100644
--- a/dask_cuda/utils.py
+++ b/dask_cuda/utils.py
@@ -9,6 +9,7 @@
from multiprocessing import cpu_count
from typing import Optional
+import click
import numpy as np
import pynvml
import toolz
@@ -764,3 +765,13 @@ def get_rmm_memory_resource_stack(mr) -> list:
if isinstance(mr, rmm.mr.StatisticsResourceAdaptor):
return mr.allocation_counts["current_bytes"]
return None
+
+
+class CommaSeparatedChoice(click.Choice):
+ def convert(self, value, param, ctx):
+ values = [v.strip() for v in value.split(",")]
+ for v in values:
+ if v not in self.choices:
+ choices_str = ", ".join(f"'{c}'" for c in self.choices)
+ self.fail(f"invalid choice(s): {v}. (choices are: {choices_str})")
+ return values
diff --git a/dependencies.yaml b/dependencies.yaml
index a9183cc2..47c1a71a 100644
--- a/dependencies.yaml
+++ b/dependencies.yaml
@@ -3,7 +3,7 @@ files:
all:
output: conda
matrix:
- cuda: ["11.4", "11.8", "12.2"]
+ cuda: ["11.4", "11.8", "12.5"]
arch: [x86_64]
includes:
- build_python
@@ -100,6 +100,10 @@ dependencies:
cuda: "12.2"
packages:
- cuda-version=12.2
+ - matrix:
+ cuda: "12.5"
+ packages:
+ - cuda-version=12.5
cuda:
specific:
- output_types: conda
@@ -130,10 +134,6 @@ dependencies:
specific:
- output_types: conda
matrices:
- - matrix:
- py: "3.9"
- packages:
- - python=3.9
- matrix:
py: "3.10"
packages:
@@ -142,19 +142,23 @@ dependencies:
py: "3.11"
packages:
- python=3.11
+ - matrix:
+ py: "3.12"
+ packages:
+ - python=3.12
- matrix:
packages:
- - python>=3.9,<3.12
+ - python>=3.10,<3.13
run_python:
common:
- output_types: [conda, requirements, pyproject]
packages:
- click >=8.1
- numba>=0.57
- - numpy>=1.23,<2.0a0
+ - numpy>=1.23,<3.0a0
- pandas>=1.3
- pynvml>=11.0.0
- - rapids-dask-dependency==24.8.*,>=0.0.0a0
+ - rapids-dask-dependency==24.12.*,>=0.0.0a0
- zict>=2.0.0
test_python:
common:
@@ -164,13 +168,13 @@ dependencies:
- pytest-cov
- output_types: [conda]
packages:
- - &cudf_conda cudf==24.8.*,>=0.0.0a0
- - &dask_cudf_conda dask-cudf==24.8.*,>=0.0.0a0
- - distributed-ucxx==0.39.*,>=0.0.0a0
- - &kvikio_conda kvikio==24.8.*,>=0.0.0a0
- - &ucx_py_conda ucx-py==0.39.*,>=0.0.0a0
+ - &cudf_unsuffixed cudf==24.12.*,>=0.0.0a0
+ - &dask_cudf_unsuffixed dask-cudf==24.12.*,>=0.0.0a0
+ - distributed-ucxx==0.41.*,>=0.0.0a0
+ - &kvikio_unsuffixed kvikio==24.12.*,>=0.0.0a0
+ - &ucx_py_unsuffixed ucx-py==0.41.*,>=0.0.0a0
- ucx-proc=*=gpu
- - ucxx==0.39.*,>=0.0.0a0
+ - ucxx==0.41.*,>=0.0.0a0
specific:
- output_types: conda
matrices:
@@ -186,19 +190,23 @@ dependencies:
matrices:
# kvikio should be added to the CUDA-version-specific matrices once there are wheels available
# ref: https://github.com/rapidsai/kvikio/pull/369
- - matrix: {cuda: "12.*"}
+ - matrix:
+ cuda: "12.*"
+ cuda_suffixed: "true"
packages:
- - cudf-cu12==24.8.*,>=0.0.0a0
- - dask-cudf-cu12==24.8.*,>=0.0.0a0
- - ucx-py-cu12==0.39.*,>=0.0.0a0
- - matrix: {cuda: "11.*"}
+ - cudf-cu12==24.12.*,>=0.0.0a0
+ - dask-cudf-cu12==24.12.*,>=0.0.0a0
+ - ucx-py-cu12==0.41.*,>=0.0.0a0
+ - matrix:
+ cuda: "11.*"
+ cuda_suffixed: "true"
packages:
- - cudf-cu11==24.8.*,>=0.0.0a0
- - dask-cudf-cu11==24.8.*,>=0.0.0a0
- - ucx-py-cu11==0.39.*,>=0.0.0a0
+ - cudf-cu11==24.12.*,>=0.0.0a0
+ - dask-cudf-cu11==24.12.*,>=0.0.0a0
+ - ucx-py-cu11==0.41.*,>=0.0.0a0
- matrix:
packages:
- - *cudf_conda
- - *dask_cudf_conda
- - *kvikio_conda
- - *ucx_py_conda
+ - *cudf_unsuffixed
+ - *dask_cudf_unsuffixed
+ - *kvikio_unsuffixed
+ - *ucx_py_unsuffixed
diff --git a/docs/source/examples/best-practices.rst b/docs/source/examples/best-practices.rst
index 2de3809c..d0ddc510 100644
--- a/docs/source/examples/best-practices.rst
+++ b/docs/source/examples/best-practices.rst
@@ -44,6 +44,15 @@ We also recommend allocating most, though not all, of the GPU memory space. We d
Additionally, when using `Accelerated Networking`_ , we only need to register a single IPC handle for the whole pool (which is expensive, but only done once) since from the IPC point of viewer there's only a single allocation. As opposed to just using RMM without a pool where each new allocation must be registered with IPC.
+Spilling from Device
+~~~~~~~~~~~~~~~~~~~~
+
+Dask-CUDA offers several different ways to enable automatic spilling from device memory.
+The best method often depends on the specific workflow. For classic ETL workloads using
+`Dask cuDF `_, native cuDF spilling is usually
+the best place to start. See :ref:`Dask-CUDA's spilling documentation `
+for more details.
+
Accelerated Networking
~~~~~~~~~~~~~~~~~~~~~~
diff --git a/docs/source/explicit_comms.rst b/docs/source/explicit_comms.rst
index 9fde8756..db621977 100644
--- a/docs/source/explicit_comms.rst
+++ b/docs/source/explicit_comms.rst
@@ -14,4 +14,4 @@ Usage
In order to use explicit-comms in Dask/Distributed automatically, simply define the environment variable ``DASK_EXPLICIT_COMMS=True`` or setting the ``"explicit-comms"``
key in the `Dask configuration `_.
-It is also possible to use explicit-comms in tasks manually, see the `API <../api/#explicit-comms>`_ and our `implementation of shuffle `_ for guidance.
+It is also possible to use explicit-comms in tasks manually, see the `API <../api/#explicit-comms>`_ and our `implementation of shuffle `_ for guidance.
diff --git a/docs/source/install.rst b/docs/source/install.rst
index e522ae3c..43082a67 100644
--- a/docs/source/install.rst
+++ b/docs/source/install.rst
@@ -12,11 +12,11 @@ To use Dask-CUDA on your system, you will need:
- A version of NVIDIA CUDA Toolkit compatible with the installed driver version; see Table 1 of `CUDA Compatibility -- Binary Compatibility `_ for an overview of CUDA Toolkit driver requirements
Once the proper CUDA Toolkit version has been determined, it can be installed using along with Dask-CUDA using ``conda``.
-To install the latest version of Dask-CUDA along with CUDA Toolkit 12.0:
+To install the latest version of Dask-CUDA along with CUDA Toolkit 12.5:
.. code-block:: bash
- conda install -c rapidsai -c conda-forge -c nvidia dask-cuda cuda-version=12.0
+ conda install -c rapidsai -c conda-forge -c nvidia dask-cuda cuda-version=12.5
Pip
---
diff --git a/docs/source/spilling.rst b/docs/source/spilling.rst
index a237adf7..c86b5ce4 100644
--- a/docs/source/spilling.rst
+++ b/docs/source/spilling.rst
@@ -1,3 +1,5 @@
+.. _spilling-from-device:
+
Spilling from device
====================
@@ -105,3 +107,80 @@ type checking doesn't:
Thus, if encountering problems remember that it is always possible to use ``unproxy()``
to access the proxied object directly, or set ``DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True``
to enable compatibility mode, which automatically calls ``unproxy()`` on all function inputs.
+
+
+cuDF Spilling
+-------------
+
+When executing an ETL workflow with `Dask cuDF `_
+(i.e. Dask DataFrame), it is usually best to leverage `native spilling support in cuDF
+`_.
+
+Native cuDF spilling has an important advantage over the other methodologies mentioned
+above. When JIT-unspill or default spilling are used, the worker is only able to spill
+the input or output of a task. This means that any data that is created within the task
+is completely off limits until the task is done executing. When cuDF spilling is used,
+however, individual device buffers can be spilled/unspilled as needed while the task
+is executing.
+
+When deploying a ``LocalCUDACluster``, cuDF spilling can be enabled with the ``enable_cudf_spill`` argument:
+
+.. code-block::
+
+ >>> from distributed import Clientโ
+ >>> from dask_cuda import LocalCUDAClusterโ
+
+ >>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True)โ
+ >>> client = Client(cluster)โ
+
+The same applies for ``dask cuda worker``:
+
+.. code-block::
+
+ $ dask scheduler
+ distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
+
+ $ dask cuda worker --enable-cudf-spill
+
+
+Statistics
+~~~~~~~~~~
+
+When cuDF spilling is enabled, it is also possible to have cuDF collect basic
+spill statistics. Collecting this information can be a useful way to understand
+the performance of memory-intensive workflows using cuDF.
+
+When deploying a ``LocalCUDACluster``, cuDF spilling can be enabled with the
+``cudf_spill_stats`` argument:
+
+.. code-block::
+
+ >>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True, cudf_spill_stats=1)โ
+
+The same applies for ``dask cuda worker``:
+
+.. code-block::
+
+ $ dask cuda worker --enable-cudf-spill --cudf-spill-stats 1
+
+To have each dask-cuda worker print spill statistics within the workflow, do something like:
+
+.. code-block::
+
+ def spill_info():
+ from cudf.core.buffer.spill_manager import get_global_manager
+ print(get_global_manager().statistics)
+ client.submit(spill_info)
+
+See the `cuDF spilling documentation
+`_
+for more information on the available spill-statistics options.
+
+Limitations
+~~~~~~~~~~~
+
+Although cuDF spilling is the best option for most ETL workflows using Dask cuDF,
+it will be much less effective if that workflow converts between ``cudf.DataFrame``
+and other data formats (e.g. ``cupy.ndarray``). Once the underlying device buffers
+are "exposed" to external memory references, they become "unspillable" by cuDF.
+In cases like this (e.g., Dask-CUDA + XGBoost), JIT-Unspill is usually a better choice.
diff --git a/pyproject.toml b/pyproject.toml
index 8daac618..11802a1e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -14,14 +14,14 @@ authors = [
{ name = "NVIDIA Corporation" },
]
license = { text = "Apache 2.0" }
-requires-python = ">=3.9"
+requires-python = ">=3.10"
dependencies = [
"click >=8.1",
"numba>=0.57",
- "numpy>=1.23,<2.0a0",
+ "numpy>=1.23,<3.0a0",
"pandas>=1.3",
"pynvml>=11.0.0",
- "rapids-dask-dependency==24.8.*,>=0.0.0a0",
+ "rapids-dask-dependency==24.12.*,>=0.0.0a0",
"zict>=2.0.0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit dependencies.yaml and run `rapids-dependency-file-generator`.
classifiers = [
@@ -30,9 +30,9 @@ classifiers = [
"Topic :: Scientific/Engineering",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
]
[project.scripts]
@@ -50,12 +50,12 @@ docs = [
"sphinx-rtd-theme>=0.5.1",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit dependencies.yaml and run `rapids-dependency-file-generator`.
test = [
- "cudf==24.8.*,>=0.0.0a0",
- "dask-cudf==24.8.*,>=0.0.0a0",
- "kvikio==24.8.*,>=0.0.0a0",
+ "cudf==24.12.*,>=0.0.0a0",
+ "dask-cudf==24.12.*,>=0.0.0a0",
+ "kvikio==24.12.*,>=0.0.0a0",
"pytest",
"pytest-cov",
- "ucx-py==0.39.*,>=0.0.0a0",
+ "ucx-py==0.41.*,>=0.0.0a0",
] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit dependencies.yaml and run `rapids-dependency-file-generator`.
[project.urls]
@@ -128,12 +128,16 @@ filterwarnings = [
# is enabled in both dask-cudf and dask-cuda.
# See: https://github.com/rapidsai/dask-cuda/issues/1311
"ignore:Dask DataFrame implementation is deprecated:DeprecationWarning",
+ # Dask now loudly throws warnings: https://github.com/dask/dask/pull/11437
+ # When the legacy implementation is removed we can remove this warning and stop running pytests with `DASK_DATAFRAME__QUERY_PLANNING=False`
+ "ignore:The legacy Dask DataFrame implementation is deprecated and will be removed in a future version.*:FutureWarning",
]
[tool.rapids-build-backend]
build-backend = "setuptools.build_meta"
dependencies-file = "dependencies.yaml"
disable-cuda = true
+matrix-entry = "cuda_suffixed=true"
[tool.setuptools]
license-files = ["LICENSE"]
@@ -148,3 +152,11 @@ exclude = [
"docs.*",
"tests.*",
]
+
+[tool.pydistcheck]
+select = [
+ "distro-too-large-compressed",
+]
+
+# PyPI limit is 100 MiB, fail CI before we get too close to that
+max_allowed_size_compressed = '75M'