Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add new dask_cudf.read_parquet API #17250

Draft
wants to merge 8 commits into
base: branch-24.12
Choose a base branch
from
Draft
136 changes: 3 additions & 133 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,140 +700,10 @@ def from_dict(
)

@staticmethod
def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs):
import dask_expr as dx
import fsspec

if (
isinstance(filesystem, fsspec.AbstractFileSystem)
or isinstance(filesystem, str)
and filesystem.lower() == "fsspec"
):
# Default "fsspec" filesystem
from dask_cudf._legacy.io.parquet import CudfEngine
def read_parquet(*args, **kwargs):
from dask_cudf.io.parquet import read_parquet as read_parquet_expr

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet,
path,
*args,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

else:
# EXPERIMENTAL filesystem="arrow" support.
# This code path uses PyArrow for IO, which is only
# beneficial for remote storage (e.g. S3)

from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

# CudfReadParquetPyarrowFS requires import of distributed beforehand
# (See: https://github.com/dask/dask/issues/11352)
import distributed # noqa: F401
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

from dask_cudf.io.parquet import CudfReadParquetPyarrowFS

if args:
raise ValueError(f"Unexpected positional arguments: {args}")

if not (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
raise ValueError(f"Unexpected filesystem value: {filesystem}.")

if not PYARROW_GE_15:
raise NotImplementedError(
"Experimental Arrow filesystem support requires pyarrow>=15"
)

if not isinstance(path, str):
path = stringify_path(path)

# Extract kwargs
columns = kwargs.pop("columns", None)
filters = kwargs.pop("filters", None)
categories = kwargs.pop("categories", None)
index = kwargs.pop("index", None)
storage_options = kwargs.pop("storage_options", None)
dtype_backend = kwargs.pop("dtype_backend", None)
calculate_divisions = kwargs.pop("calculate_divisions", False)
ignore_metadata_file = kwargs.pop("ignore_metadata_file", False)
metadata_task_size = kwargs.pop("metadata_task_size", None)
split_row_groups = kwargs.pop("split_row_groups", "infer")
blocksize = kwargs.pop("blocksize", "default")
aggregate_files = kwargs.pop("aggregate_files", None)
parquet_file_extension = kwargs.pop(
"parquet_file_extension", (".parq", ".parquet", ".pq")
)
arrow_to_pandas = kwargs.pop("arrow_to_pandas", None)
open_file_options = kwargs.pop("open_file_options", None)

# Validate and normalize kwargs
kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas is not None:
raise ValueError(
"arrow_to_pandas not supported for the 'cudf' backend."
)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)
if filters is not None:
for filter in flatten(filters, container=list):
_, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
warnings.warn(
"blocksize is not supported when using the pyarrow filesystem."
"blocksize argument will be ignored."
)
if aggregate_files is not None:
warnings.warn(
"aggregate_files is not supported when using the pyarrow filesystem. "
"Please use the 'dataframe.parquet.minimum-partition-size' config."
"aggregate_files argument will be ignored."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
)
)
return read_parquet_expr(*args, **kwargs)

@staticmethod
def read_csv(
Expand Down
4 changes: 1 addition & 3 deletions python/dask_cudf/dask_cudf/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
read_text = _deprecated_api(
"dask_cudf.io.read_text", new_api="dask_cudf.read_text"
)
read_parquet = _deprecated_api(
"dask_cudf.io.read_parquet", new_api="dask_cudf.read_parquet"
)
read_parquet = parquet.read_parquet
to_parquet = _deprecated_api(
"dask_cudf.io.to_parquet",
new_api="dask_cudf._legacy.io.parquet.to_parquet",
Expand Down
Loading
Loading