Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into tst/deterministic
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke authored Nov 8, 2024
2 parents dbd440f + 990734f commit 30aafc1
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dask_expr.io.io import FusedParquetIO
from dask_expr.io.parquet import FragmentWrapper, ReadParquetPyarrowFS

from dask._task_spec import Task

import cudf

from dask_cudf import _deprecated_api
Expand All @@ -19,7 +21,7 @@ def _load_multiple_files(
frag_filters,
columns,
schema,
*to_pandas_args,
**to_pandas_kwargs,
):
import pyarrow as pa

Expand All @@ -46,7 +48,7 @@ def _load_multiple_files(
)
return CudfReadParquetPyarrowFS._table_to_pandas(
get(dsk, name),
*to_pandas_args,
**to_pandas_kwargs,
)


Expand Down Expand Up @@ -89,7 +91,7 @@ def _table_to_pandas(table, index_name):
df = df.set_index(index_name)
return df

def _filtered_task(self, index: int):
def _filtered_task(self, name, index: int):
columns = self.columns.copy()
index_name = self.index.name
if self.index is not None:
Expand All @@ -99,16 +101,20 @@ def _filtered_task(self, index: int):
if columns is None:
columns = list(schema.names)
columns.append(index_name)
return (
return Task(
name,
self._table_to_pandas,
(
Task(
None,
self._fragment_to_table,
FragmentWrapper(self.fragments[index], filesystem=self.fs),
self.filters,
columns,
schema,
fragment_wrapper=FragmentWrapper(
self.fragments[index], filesystem=self.fs
),
filters=self.filters,
columns=columns,
schema=schema,
),
index_name,
index_name=index_name,
)

def _tune_up(self, parent):
Expand Down

0 comments on commit 30aafc1

Please sign in to comment.