From b44e661e322af7f4cf8aa316c00dbccf1c487ee4 Mon Sep 17 00:00:00 2001
From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
Date: Mon, 18 Dec 2023 08:58:57 -0500
Subject: [PATCH] Add cuDF spilling statistics to RMM/GPU memory plot (#8148)
---
continuous_integration/gpuci/build.sh | 4 +
distributed/dashboard/components/rmm.py | 262 +++++++++---------
distributed/dashboard/components/scheduler.py | 19 +-
distributed/diagnostics/cudf.py | 25 ++
.../tests/test_cudf_diagnostics.py | 45 +++
distributed/distributed-schema.yaml | 8 +-
distributed/distributed.yaml | 1 +
distributed/worker.py | 16 ++
8 files changed, 245 insertions(+), 135 deletions(-)
create mode 100644 distributed/diagnostics/cudf.py
create mode 100644 distributed/diagnostics/tests/test_cudf_diagnostics.py
diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh
index 87cc643f5a..ab3bc95c48 100644
--- a/continuous_integration/gpuci/build.sh
+++ b/continuous_integration/gpuci/build.sh
@@ -56,3 +56,7 @@ conda list --show-channel-urls
rapids-logger "Python py.test for distributed"
py.test distributed -v -m gpu --runslow --junitxml="$WORKSPACE/junit-distributed.xml"
+
+# cuDF spill stats monitoring must be enabled for this test
+CUDF_SPILL=on CUDF_SPILL_STATS=1 DASK_DISTRIBUTED__DIAGNOSTICS__CUDF=1 \
+ py.test distributed/diagnostics/tests/test_cudf_diagnostics.py -v -m gpu --runslow --junitxml="$WORKSPACE/junit-distributed.xml"
diff --git a/distributed/dashboard/components/rmm.py b/distributed/dashboard/components/rmm.py
index f955033b33..7376476570 100644
--- a/distributed/dashboard/components/rmm.py
+++ b/distributed/dashboard/components/rmm.py
@@ -1,7 +1,7 @@
from __future__ import annotations
-import math
-from textwrap import dedent
+from collections.abc import Iterable
+from typing import TypeVar
from bokeh.core.properties import without_property_validation
from bokeh.models import (
@@ -10,6 +10,7 @@
HoverTool,
NumeralTickFormatter,
OpenURL,
+ Range1d,
TapTool,
)
from bokeh.plotting import figure
@@ -18,12 +19,19 @@
from dask.utils import format_bytes
from distributed.dashboard.components import DashboardComponent, add_periodic_callback
-from distributed.dashboard.components.scheduler import BOKEH_THEME, TICKS_1024
+from distributed.dashboard.components.scheduler import (
+ BOKEH_THEME,
+ TICKS_1024,
+ XLABEL_ORIENTATION,
+ MemoryColor,
+)
from distributed.dashboard.utils import update
from distributed.utils import log_errors
+T = TypeVar("T")
+
-class RMMMemoryUsage(DashboardComponent):
+class RMMMemoryUsage(DashboardComponent, MemoryColor):
"""
GPU memory usage plot that includes information about memory
managed by RMM. If an RMM pool is being used, shows the amount of
@@ -32,168 +40,166 @@ class RMMMemoryUsage(DashboardComponent):
@log_errors
def __init__(self, scheduler, width=600, **kwargs):
+ DashboardComponent.__init__(self)
+ MemoryColor.__init__(self, neutral_color="#76B900")
+
self.last = 0
self.scheduler = scheduler
self.source = ColumnDataSource(
{
- "rmm-used": [1, 2],
- "rmm-used-half": [0.5, 1],
- "rmm-total": [2, 4],
- "rmm-total-half": [1, 2],
- "external-used": [2, 1],
- "external-used-x": [3, 4.5],
- "worker": ["a", "b"],
- "gpu-index": [0, 0],
- "y": [1, 2],
- "escaped_worker": ["a", "b"],
- "rmm_memory_text": [
- "RMM memory used: 1B/1B\nTotal GPU memory used: 1B/2B",
- "RMM memory used: 1B/1B\nTotal GPU memory used: 1B/2B",
- ],
+ "width": [],
+ "x": [],
+ "y": [],
+ "color": [],
+ "alpha": [],
+ "worker": [],
+ "escaped_worker": [],
+ "rmm_used": [],
+ "rmm_total": [],
+ "gpu_used": [],
+ "gpu_total": [],
+ "spilled": [],
}
)
- memory = figure(
- title="RMM Memory",
+ self.root = figure(
+ title="RMM memory used",
tools="",
width=int(width / 2),
- name="rmm_memory_histogram",
+ name="rmm_memory",
**kwargs,
)
-
- rect = memory.rect(
- source=self.source,
- x="rmm-used-half",
- y="y",
- width="rmm-used",
- height=1,
- color="#76B900",
- alpha=1.0,
- )
- rect.nonselection_glyph = None
-
- rect = memory.rect(
+ rect = self.root.rect(
source=self.source,
- x="rmm-total-half",
+ x="x",
y="y",
- width="rmm-total",
- height=1,
- color="#76B900",
- alpha=0.75,
+ width="width",
+ height=0.9,
+ color="color",
+ fill_alpha="alpha",
+ line_width=0,
)
rect.nonselection_glyph = None
- rect = memory.rect(
- source=self.source,
- x="external-used-x",
- y="y",
- width="external-used",
- height=1,
- color="#76B900",
- alpha=0.5,
+ self.root.axis[0].ticker = BasicTicker(**TICKS_1024)
+ self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
+ self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION
+ self.root.xaxis.minor_tick_line_alpha = 0
+ self.root.x_range = Range1d(start=0)
+ self.root.yaxis.visible = False
+ self.root.ygrid.visible = False
+ self.root.toolbar_location = None
+
+ tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
+ self.root.add_tools(tap)
+
+ hover = HoverTool(
+ point_policy="follow_mouse",
+ tooltips="""
+
+ Worker:
+ @worker
+
+
+ RMM memory used:
+ @rmm_used{0.00 b} / @rmm_total{0.00 b}
+
+
+ GPU memory used:
+ @gpu_used{0.00 b} / @gpu_total{0.00 b}
+
+
+ Spilled to CPU:
+ @spilled{0.00 b}
+
+ """,
)
- rect.nonselection_glyph = None
-
- memory.axis[0].ticker = BasicTicker(**TICKS_1024)
- memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
- memory.xaxis.major_label_orientation = -math.pi / 12
- memory.x_range.start = 0
-
- for fig in [memory]:
- fig.xaxis.minor_tick_line_alpha = 0
- fig.yaxis.visible = False
- fig.ygrid.visible = False
-
- tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html"))
- fig.add_tools(tap)
-
- fig.toolbar_location = None
- fig.yaxis.visible = False
-
- hover = HoverTool()
- hover.tooltips = "@worker : @rmm_memory_text"
- hover.point_policy = "follow_mouse"
- memory.add_tools(hover)
-
- self.memory_figure = memory
+ self.root.add_tools(hover)
@without_property_validation
@log_errors
def update(self):
+ def quadlist(i: Iterable[T]) -> list[T]:
+ out = []
+ for ii in i:
+ out += [ii, ii, ii, ii]
+ return out
+
workers = list(self.scheduler.workers.values())
- rmm_total = []
+
+ width = []
+ x = []
+ color = []
+ max_limit = 0
rmm_used = []
- external_used = []
- gpu_index = []
- y = []
- worker = []
- external_used_x = []
- memory_max = 0
+ rmm_total = []
+ gpu_used = []
gpu_total = []
- rmm_memory_text = []
+ spilled = []
- for idx, ws in enumerate(workers):
+ for ws in workers:
try:
rmm_metrics = ws.metrics["rmm"]
gpu_metrics = ws.metrics["gpu"]
gpu_info = ws.extra["gpu"]
except KeyError:
- continue
- rmm_total_worker = rmm_metrics["rmm-total"] # RMM memory only
- rmm_used_worker = rmm_metrics["rmm-used"]
- gpu_total_worker = gpu_info["memory-total"] # All GPU memory
- gpu_used_worker = gpu_metrics["memory-used"]
+ rmm_metrics = {"rmm-used": 0, "rmm-total": 0}
+ gpu_metrics = {"memory-used": 0}
+ gpu_info = {"memory-total": 0}
+
+ try:
+ cudf_metrics = ws.metrics["cudf"]
+ except KeyError:
+ cudf_metrics = {"cudf-spilled": 0}
- external_used_worker = gpu_used_worker - rmm_total_worker
+ rmm_used_worker = rmm_metrics["rmm-used"] # RMM memory only
+ rmm_total_worker = rmm_metrics["rmm-total"]
+ gpu_used_worker = gpu_metrics["memory-used"] # All GPU memory
+ gpu_total_worker = gpu_info["memory-total"]
+ spilled_worker = cudf_metrics["cudf-spilled"] or 0 # memory spilled to host
- rmm_total.append(rmm_total_worker)
+ max_limit = max(
+ max_limit, gpu_total_worker, gpu_used_worker + spilled_worker
+ )
+ color_i = self._memory_color(gpu_used_worker, gpu_total_worker, ws.status)
+
+ width += [
+ rmm_used_worker,
+ rmm_total_worker - rmm_used_worker,
+ gpu_used_worker - rmm_total_worker,
+ spilled_worker,
+ ]
+ x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)]
+ color += [color_i, color_i, color_i, "grey"]
+
+ # memory info
rmm_used.append(rmm_used_worker)
+ rmm_total.append(rmm_total_worker)
+ gpu_used.append(gpu_used_worker)
gpu_total.append(gpu_total_worker)
- external_used.append(external_used_worker)
- external_used_x.append(rmm_total_worker + external_used_worker / 2)
- worker.append(ws.address)
- gpu_index.append(idx)
- y.append(idx)
-
- memory_max = max(memory_max, gpu_total_worker)
-
- rmm_memory_text.append(
- "RMM memory used: {}/{}\nTotal GPU memory used: {}/{}".format(
- format_bytes(rmm_used_worker),
- format_bytes(rmm_total_worker),
- format_bytes(gpu_used_worker),
- format_bytes(gpu_total_worker),
- )
- )
+ spilled.append(spilled_worker)
- self.memory_figure.title.text = dedent(
- """\
- RMM Utilization: {} / {}
- GPU Memory: {} / {}
- """.format(
- format_bytes(sum(rmm_used)),
- format_bytes(sum(rmm_total)),
- format_bytes(sum([*rmm_total, *external_used])),
- format_bytes(sum(gpu_total)),
- )
- )
+ title = f"RMM memory used: {format_bytes(sum(rmm_used))} / {format_bytes(sum(rmm_total))}\nGPU memory used: {format_bytes(sum(gpu_used))} / {format_bytes(sum(gpu_total))}"
+ if sum(spilled):
+ title += f" + {format_bytes(sum(spilled))} spilled to CPU"
+ self.root.title.text = title
result = {
- "rmm-total": rmm_total,
- "rmm-used": rmm_used,
- "external-used": external_used,
- "rmm-total-half": [m // 2 for m in rmm_total],
- "rmm-used-half": [m // 2 for m in rmm_used],
- "external-used-x": external_used_x,
- "worker": worker,
- "gpu-index": gpu_index,
- "y": y,
- "escaped_worker": [escape.url_escape(w) for w in worker],
- "rmm_memory_text": rmm_memory_text,
+ "width": width,
+ "x": x,
+ "y": quadlist(range(len(workers))),
+ "color": color,
+ "alpha": [1, 0.7, 0.4, 1] * len(workers),
+ "worker": quadlist(ws.address for ws in workers),
+ "escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers),
+ "rmm_used": quadlist(rmm_used),
+ "rmm_total": quadlist(rmm_total),
+ "gpu_used": quadlist(gpu_used),
+ "gpu_total": quadlist(gpu_total),
+ "spilled": quadlist(spilled),
}
- self.memory_figure.x_range.end = memory_max
-
+ self.root.x_range.end = max_limit
update(self.source, result)
@@ -202,5 +208,5 @@ def rmm_memory_doc(scheduler, extra, doc):
rmm_load = RMMMemoryUsage(scheduler, sizing_mode="stretch_both")
rmm_load.update()
add_periodic_callback(doc, rmm_load, 100)
- doc.add_root(rmm_load.memory_figure)
+ doc.add_root(rmm_load.root)
doc.theme = BOKEH_THEME
diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py
index 730aef89b0..aacd4b21ff 100644
--- a/distributed/dashboard/components/scheduler.py
+++ b/distributed/dashboard/components/scheduler.py
@@ -276,10 +276,17 @@ class MemoryColor:
orange: float
red: float
- def __init__(self):
+ def __init__(
+ self, neutral_color="blue", target_color="orange", terminated_color="red"
+ ):
+ self.neutral_color = neutral_color
+ self.target_color = target_color
+ self.terminated_color = terminated_color
+
target = dask.config.get("distributed.worker.memory.target")
spill = dask.config.get("distributed.worker.memory.spill")
terminate = dask.config.get("distributed.worker.memory.terminate")
+
# These values can be False. It's also common to configure them to impossibly
# high values to achieve the same effect.
self.orange = min(target or math.inf, spill or math.inf)
@@ -287,14 +294,14 @@ def __init__(self):
def _memory_color(self, current: int, limit: int, status: Status) -> str:
if status != Status.running:
- return "red"
+ return self.terminated_color
if not limit:
- return "blue"
+ return self.neutral_color
if current >= limit * self.red:
- return "red"
+ return self.terminated_color
if current >= limit * self.orange:
- return "orange"
- return "blue"
+ return self.target_color
+ return self.neutral_color
class ClusterMemory(DashboardComponent, MemoryColor):
diff --git a/distributed/diagnostics/cudf.py b/distributed/diagnostics/cudf.py
new file mode 100644
index 0000000000..c118f7e503
--- /dev/null
+++ b/distributed/diagnostics/cudf.py
@@ -0,0 +1,25 @@
+"""
+Diagnostics for memory spilling managed by cuDF.
+"""
+
+from __future__ import annotations
+
+try:
+ from cudf.core.buffer.spill_manager import get_global_manager
+except ImportError:
+ get_global_manager = None
+
+
+def real_time():
+ if get_global_manager is None:
+ return {"cudf-spilled": None}
+ mgr = get_global_manager()
+ if mgr is None:
+ return {"cudf-spilled": None}
+
+ totals = mgr.statistics.spill_totals
+
+ return {
+ "cudf-spilled": totals.get(("gpu", "cpu"), (0,))[0]
+ - totals.get(("cpu", "gpu"), (0,))[0]
+ }
diff --git a/distributed/diagnostics/tests/test_cudf_diagnostics.py b/distributed/diagnostics/tests/test_cudf_diagnostics.py
new file mode 100644
index 0000000000..feb5681855
--- /dev/null
+++ b/distributed/diagnostics/tests/test_cudf_diagnostics.py
@@ -0,0 +1,45 @@
+from __future__ import annotations
+
+import os
+
+import pytest
+
+from distributed.utils_test import gen_cluster
+
+pytestmark = [
+ pytest.mark.gpu,
+ pytest.mark.skipif(
+ os.environ.get("CUDF_SPILL", "off") != "on"
+ or os.environ.get("CUDF_SPILL_STATS", "0") != "1"
+ or os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__CUDF", "0") != "1",
+ reason="cuDF spill stats monitoring must be enabled manually",
+ ),
+]
+
+cudf = pytest.importorskip("cudf")
+
+
+def force_spill():
+ from cudf.core.buffer.spill_manager import get_global_manager
+
+ manager = get_global_manager()
+
+ # 24 bytes
+ df = cudf.DataFrame({"a": [1, 2, 3]})
+
+ return manager.spill_to_device_limit(1)
+
+
+@gen_cluster(
+ client=True,
+ nthreads=[("127.0.0.1", 1)],
+)
+@pytest.mark.flaky(reruns=10, reruns_delay=5)
+async def test_cudf_metrics(c, s, *workers):
+ w = list(s.workers.values())[0]
+ assert "cudf" in w.metrics
+ assert w.metrics["cudf"]["cudf-spilled"] == 0
+
+ await c.run(force_spill)
+
+ assert w.metrics["cudf"]["cudf-spilled"] == 24
diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml
index 3d7cd5ead8..fffb9b3772 100644
--- a/distributed/distributed-schema.yaml
+++ b/distributed/distributed-schema.yaml
@@ -984,6 +984,12 @@ properties:
not a problem and will be automatically disabled if no GPUs are found in the
system, but in certain cases it may be desirable to completely disable NVML
diagnostics.
+ cudf:
+ type: boolean
+ description: |
+ If ``True``, enables tracking of GPU spilling and unspilling managed by cuDF (if it is enabled).
+ Note that this forces a cuDF import at worker startup, which may be undesirable for performance
+ and memory footprint.
computations:
type: object
properties:
@@ -991,7 +997,7 @@ properties:
type: integer
minimum: 0
description: |
- The maximum number of Computations to remember.
+ The maximum number of computations to remember.
nframes:
type: integer
minimum: 0
diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml
index 4f5b31249c..8f30c69363 100644
--- a/distributed/distributed.yaml
+++ b/distributed/distributed.yaml
@@ -264,6 +264,7 @@ distributed:
diagnostics:
nvml: True
+ cudf: False
computations:
max-history: 100
nframes: 0
diff --git a/distributed/worker.py b/distributed/worker.py
index d1ff008f51..d87dfe57c8 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -3228,6 +3228,22 @@ async def rmm_metric(worker):
DEFAULT_METRICS["rmm"] = rmm_metric
del _rmm
+# avoid importing cuDF unless explicitly enabled
+if dask.config.get("distributed.diagnostics.cudf"):
+ try:
+ import cudf as _cudf # noqa: F401
+ except Exception:
+ pass
+ else:
+ from distributed.diagnostics import cudf
+
+ async def cudf_metric(worker):
+ result = await offload(cudf.real_time)
+ return result
+
+ DEFAULT_METRICS["cudf"] = cudf_metric
+ del _cudf
+
def print(
*args,