diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 48cea7266af..a7a116875ea 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -5,6 +5,8 @@ from dask_expr.io.io import FusedParquetIO from dask_expr.io.parquet import FragmentWrapper, ReadParquetPyarrowFS +from dask._task_spec import Task + import cudf from dask_cudf import _deprecated_api @@ -19,7 +21,7 @@ def _load_multiple_files( frag_filters, columns, schema, - *to_pandas_args, + **to_pandas_kwargs, ): import pyarrow as pa @@ -46,7 +48,7 @@ def _load_multiple_files( ) return CudfReadParquetPyarrowFS._table_to_pandas( get(dsk, name), - *to_pandas_args, + **to_pandas_kwargs, ) @@ -89,7 +91,7 @@ def _table_to_pandas(table, index_name): df = df.set_index(index_name) return df - def _filtered_task(self, index: int): + def _filtered_task(self, name, index: int): columns = self.columns.copy() index_name = self.index.name if self.index is not None: @@ -99,16 +101,20 @@ def _filtered_task(self, index: int): if columns is None: columns = list(schema.names) columns.append(index_name) - return ( + return Task( + name, self._table_to_pandas, - ( + Task( + None, self._fragment_to_table, - FragmentWrapper(self.fragments[index], filesystem=self.fs), - self.filters, - columns, - schema, + fragment_wrapper=FragmentWrapper( + self.fragments[index], filesystem=self.fs + ), + filters=self.filters, + columns=columns, + schema=schema, ), - index_name, + index_name=index_name, ) def _tune_up(self, parent):