Skip to content

Commit

Permalink
fix patching
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Oct 29, 2024
1 parent 86b3fa6 commit 60853f6
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 120 deletions.
2 changes: 2 additions & 0 deletions python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def inner_func(*args, **kwargs):

if QUERY_PLANNING_ON:
from ._collection import DataFrame, Index, Series # noqa: E402
from ._expr import _patch_dask_expr

groupby_agg = raise_not_implemented_error("groupby_agg")
read_text = DataFrame.read_text
to_orc = raise_not_implemented_error("to_orc")
_patch_dask_expr()

else:
from .legacy.core import DataFrame, Index, Series # noqa: F401
Expand Down
248 changes: 128 additions & 120 deletions python/dask_cudf/dask_cudf/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,134 +62,142 @@ def _simplify_down(self):
##


# This can be removed after cudf#15176 is addressed.
# See: https://github.com/rapidsai/cudf/issues/15176
class PatchCumulativeBlockwise(CumulativeBlockwise):
@property
def _args(self) -> list:
return self.operands[:1]

@property
def _kwargs(self) -> dict:
# Must pass axis and skipna as kwargs in cudf
return {"axis": self.axis, "skipna": self.skipna}


CumulativeBlockwise._args = PatchCumulativeBlockwise._args
CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs


# The upstream Var code uses `Series.values`, and relies on numpy
# for most of the logic. Unfortunately, cudf -> cupy conversion
# is not supported for data containing null values. Therefore,
# we must implement our own version of Var for now. This logic
# is mostly copied from dask-cudf.


class VarCudf(Reduction):
# Uses the parallel version of Welford's online algorithm (Chan '79)
# (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf)
_parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"]
_defaults = {
"skipna": True,
"ddof": 1,
"numeric_only": False,
"split_every": False,
}

@functools.cached_property
def _meta(self):
return make_meta(
meta_nonempty(self.frame._meta).var(
skipna=self.skipna, numeric_only=self.numeric_only
def _patch_dask_expr():
# This can be removed after cudf#15176 is addressed.
# See: https://github.com/rapidsai/cudf/issues/15176
class PatchCumulativeBlockwise(CumulativeBlockwise):
@property
def _args(self) -> list:
return self.operands[:1]

@property
def _kwargs(self) -> dict:
# Must pass axis and skipna as kwargs in cudf
return {"axis": self.axis, "skipna": self.skipna}

CumulativeBlockwise._args = PatchCumulativeBlockwise._args
CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs

# The upstream Var code uses `Series.values`, and relies on numpy
# for most of the logic. Unfortunately, cudf -> cupy conversion
# is not supported for data containing null values. Therefore,
# we must implement our own version of Var for now. This logic
# is mostly copied from dask-cudf.

class VarCudf(Reduction):
# Uses the parallel version of Welford's online algorithm (Chan '79)
# (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf)
_parameters = [
"frame",
"skipna",
"ddof",
"numeric_only",
"split_every",
]
_defaults = {
"skipna": True,
"ddof": 1,
"numeric_only": False,
"split_every": False,
}

@functools.cached_property
def _meta(self):
return make_meta(
meta_nonempty(self.frame._meta).var(
skipna=self.skipna, numeric_only=self.numeric_only
)
)
)

@property
def chunk_kwargs(self):
return dict(skipna=self.skipna, numeric_only=self.numeric_only)

@property
def combine_kwargs(self):
return {}

@property
def aggregate_kwargs(self):
return dict(ddof=self.ddof)

@classmethod
def reduction_chunk(cls, x, skipna=True, numeric_only=False):
kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {}
if skipna or numeric_only:
n = x.count(**kwargs)
kwargs["skipna"] = skipna
avg = x.mean(**kwargs)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
kwargs["skipna"] = skipna
avg = x.sum(**kwargs) / n
if numeric_only:
# Workaround for cudf bug
# (see: https://github.com/rapidsai/cudf/issues/13731)
x = x[n.index]
m2 = ((x - avg) ** 2).sum(**kwargs)
return n, avg, m2

@classmethod
def reduction_combine(cls, parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta**2 * n_a * n_b / n
return n, avg, m2

@classmethod
def reduction_aggregate(cls, vals, ddof=1):
vals = cls.reduction_combine(vals)
n, _, m2 = vals
return m2 / (n - ddof)


def _patched_var(
self, axis=0, skipna=True, ddof=1, numeric_only=False, split_every=False
):
if axis == 0:
if hasattr(self._meta, "to_pandas"):
return VarCudf(self, skipna, ddof, numeric_only, split_every)
else:
return Var(self, skipna, ddof, numeric_only, split_every)
elif axis == 1:
return VarColumns(self, skipna, ddof, numeric_only)
else:
raise ValueError(f"axis={axis} not supported. Please specify 0 or 1")
@property
def chunk_kwargs(self):
return dict(skipna=self.skipna, numeric_only=self.numeric_only)

@property
def combine_kwargs(self):
return {}

Expr.var = _patched_var
@property
def aggregate_kwargs(self):
return dict(ddof=self.ddof)

@classmethod
def reduction_chunk(cls, x, skipna=True, numeric_only=False):
kwargs = (
{"numeric_only": numeric_only} if is_dataframe_like(x) else {}
)
if skipna or numeric_only:
n = x.count(**kwargs)
kwargs["skipna"] = skipna
avg = x.mean(**kwargs)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
kwargs["skipna"] = skipna
avg = x.sum(**kwargs) / n
if numeric_only:
# Workaround for cudf bug
# (see: https://github.com/rapidsai/cudf/issues/13731)
x = x[n.index]
m2 = ((x - avg) ** 2).sum(**kwargs)
return n, avg, m2

@classmethod
def reduction_combine(cls, parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta**2 * n_a * n_b / n
return n, avg, m2

@classmethod
def reduction_aggregate(cls, vals, ddof=1):
vals = cls.reduction_combine(vals)
n, _, m2 = vals
return m2 / (n - ddof)

def _patched_var(
self,
axis=0,
skipna=True,
ddof=1,
numeric_only=False,
split_every=False,
):
if axis == 0:
if hasattr(self._meta, "to_pandas"):
return VarCudf(self, skipna, ddof, numeric_only, split_every)
else:
return Var(self, skipna, ddof, numeric_only, split_every)
elif axis == 1:
return VarColumns(self, skipna, ddof, numeric_only)
else:
raise ValueError(
f"axis={axis} not supported. Please specify 0 or 1"
)

# Temporary work-around for missing cudf + categorical support
# See: https://github.com/rapidsai/cudf/issues/11795
# TODO: Fix RepartitionQuantiles and remove this in cudf>24.06

_original_get_divisions = _shuffle_module._get_divisions
Expr.var = _patched_var

# Temporary work-around for missing cudf + categorical support
# See: https://github.com/rapidsai/cudf/issues/11795
# TODO: Fix RepartitionQuantiles and remove this in cudf>24.06

def _patched_get_divisions(frame, other, *args, **kwargs):
# NOTE: The following two lines contains the "patch"
# (we simply convert the partitioning column to pandas)
if is_categorical_dtype(other._meta.dtype) and hasattr(
other.frame._meta, "to_pandas"
):
other = new_collection(other).to_backend("pandas")._expr
_original_get_divisions = _shuffle_module._get_divisions

# Call "original" function
return _original_get_divisions(frame, other, *args, **kwargs)
def _patched_get_divisions(frame, other, *args, **kwargs):
# NOTE: The following two lines contains the "patch"
# (we simply convert the partitioning column to pandas)
if is_categorical_dtype(other._meta.dtype) and hasattr(
other.frame._meta, "to_pandas"
):
other = new_collection(other).to_backend("pandas")._expr

# Call "original" function
return _original_get_divisions(frame, other, *args, **kwargs)

_shuffle_module._get_divisions = _patched_get_divisions
_shuffle_module._get_divisions = _patched_get_divisions

0 comments on commit 60853f6

Please sign in to comment.