diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index aad3936c..ad9163b2 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -8,15 +8,15 @@ from typing import Any, Sequence import awkward as ak -import fsspec +import awkward.operations.ak_from_parquet as ak_from_parquet +import awkward.operations.ak_to_arrow_table as ak_to_arrow_table from awkward.forms.form import Form -from awkward.operations import ak_from_parquet, to_arrow_table from awkward.operations.ak_from_parquet import _load from dask.base import tokenize from dask.blockwise import BlockIndex from dask.highlevelgraph import HighLevelGraph from fsspec import AbstractFileSystem -from fsspec.core import get_fs_token_paths +from fsspec.core import get_fs_token_paths, url_to_fs from dask_awkward.lib.core import ( Array, @@ -40,6 +40,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + behavior: dict | None = None, + **kwargs: Any, ) -> None: self.fs = fs self.form = form @@ -49,6 +51,8 @@ def __init__( if self.unnamed_root: self.columns = [f".{c}" for c in self.columns] self.original_form = original_form + self.behavior = behavior + self.kwargs = kwargs @abc.abstractmethod def __call__(self, source: Any) -> ak.Array: @@ -69,7 +73,11 @@ def __repr__(self) -> str: f" listsep={self.listsep}\n" f" unnamed_root={self.unnamed_root}\n" f" columns={self.columns}\n" + f" behavior={self.behavior}\n" ) + for key, val in self.kwargs.items(): + s += f" {key}={val}\n" + s = f"{s})" return s def __str__(self) -> str: @@ -85,6 +93,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + behavior: dict | None = None, + **kwargs: Any, ) -> None: super().__init__( fs=fs, @@ -92,14 +102,20 @@ def __init__( listsep=listsep, unnamed_root=unnamed_root, original_form=original_form, + behavior=behavior, + **kwargs, ) def __call__(self, source: Any) -> Any: - array = _file_to_partition( - source, - self.fs, - self.columns, - self.form, + array = _load( + [source], + parquet_columns=self.columns, + subrg=[None], + subform=self.form, + highlevel=True, + fs=self.fs, + behavior=self.behavior, + **self.kwargs, ) return ak.Array(unproject_layout(self.original_form, array.layout)) @@ -110,7 +126,6 @@ def project_columns( ) -> _FromParquetFileWiseFn: if columns is None: return self - new_form = self.form.select_columns(columns) new = _FromParquetFileWiseFn( fs=self.fs, @@ -118,13 +133,9 @@ def project_columns( listsep=self.listsep, unnamed_root=self.unnamed_root, original_form=original_form, + behavior=self.behavior, + **self.kwargs, ) - - log.debug(f"project_columns received: {columns}") - log.debug(f"new form is {repr(new_form)}") - log.debug(f"new form columns are: {new_form.columns(self.listsep)}") - log.debug(new) - return new @@ -137,6 +148,8 @@ def __init__( listsep: str = "list.item", unnamed_root: bool = False, original_form: Form | None = None, + behavior: dict | None = None, + **kwargs: Any, ) -> None: super().__init__( fs=fs, @@ -144,19 +157,31 @@ def __init__( listsep=listsep, unnamed_root=unnamed_root, original_form=original_form, + behavior=behavior, + **kwargs, ) def __call__(self, pair: Any) -> ak.Array: subrg, source = pair if isinstance(subrg, int): subrg = [[subrg]] - array = _file_to_partition( - source, - self.fs, - self.columns, - self.form, + array = _load( + [source], + parquet_columns=self.columns, subrg=subrg, + subform=self.form, + highlevel=True, + fs=self.fs, + behavior=self.behavior, + **self.kwargs, ) + # array = _file_to_partition( + # source, + # self.fs, + # self.columns, + # self.form, + # subrg=subrg, + # ) return ak.Array(unproject_layout(self.original_form, array.layout)) def project_columns( @@ -171,57 +196,58 @@ def project_columns( form=self.form.select_columns(columns), unnamed_root=self.unnamed_root, original_form=original_form, + behavior=self.behavior, + **self.kwargs, ) def from_parquet( - path: Any, - storage_options: dict | None = None, + path: str | list[str], + *, + columns: str | list[str] | None = None, + storage_options: dict[str, Any] | None = None, + max_gap: int = 64_000, + max_block: int = 256_000_000, + footer_sample_size: int = 1_000_000, + generate_bitmasks: bool = False, + highlevel: bool = True, + behavior: dict | None = None, ignore_metadata: bool = True, scan_files: bool = False, - columns: Sequence[str] | None = None, - filters: Any | None = None, - split_row_groups: Any | None = None, + split_row_groups: bool = False, ) -> Array: - """Create an Array collection from a Parquet dataset. + if not highlevel: + raise ValueError("dask-awkward only supports highlevel=True") - Parameters - ---------- - url : str - Location of data, including protocol (e.g. ``s3://``) - storage_options : dict - For creating filesystem (see ``fsspec`` documentation). - ignore_metadata : bool - Ignore parquet metadata associated with the input dataset (the - ``_metadata`` file). - scan_files : bool - TBD - columns : list[str], optional - Select columns to load - filters : list[list[tuple]], optional - Parquet-style filters for excluding row groups based on column statistics - split_row_groups: bool, optional - If True, each row group becomes a partition. If False, each - file becomes a partition. If None, the existence of a - ``_metadata`` file and ignore_metadata=False implies True, - else False. - - Returns - ------- - Array - Array collection from the parquet dataset. - - """ - fs, tok, paths = get_fs_token_paths( - path, mode="rb", storage_options=storage_options + fs, token, paths = get_fs_token_paths( + path, + mode="rb", + storage_options=storage_options, ) label = "from-parquet" token = tokenize( - tok, paths, ignore_metadata, columns, filters, scan_files, split_row_groups + token, + paths, + columns, + max_gap, + max_block, + footer_sample_size, + generate_bitmasks, + behavior, + ignore_metadata, + scan_files, + split_row_groups, ) - # same as ak_metadata_from_parquet - results = ak_from_parquet.metadata( + ( + parquet_columns, + subform, + actual_paths, + fs, + subrg, + row_counts, + metadata, + ) = ak_from_parquet.metadata( path, storage_options, row_groups=None, @@ -229,7 +255,6 @@ def from_parquet( ignore_metadata=ignore_metadata, scan_files=scan_files, ) - parquet_columns, subform, actual_paths, fs, subrg, row_counts, metadata = results listsep = "list.item" unnamed_root = False @@ -244,18 +269,27 @@ def from_parquet( split_row_groups = row_counts is not None and len(row_counts) > 1 meta = ak.Array( - subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True) + subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True), + behavior=behavior, ) if split_row_groups is False or subrg is None: # file-wise + + fn = _FromParquetFileWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + ) + return from_map( - _FromParquetFileWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - ), + fn, actual_paths, label=label, token=token, @@ -263,7 +297,6 @@ def from_parquet( ) else: # row-group wise - if set(subrg) == {None}: rgs_paths = {path: 0 for path in actual_paths} for i in range(metadata.num_row_groups): @@ -283,13 +316,21 @@ def from_parquet( for isubrg, path in zip(subrg, actual_paths): pairs.extend([(irg, path) for irg in isubrg]) + + fn = _FromParquetFragmentWiseFn( + fs=fs, + form=subform, + listsep=listsep, + unnamed_root=unnamed_root, + max_gap=max_gap, + max_block=max_block, + footer_sample_size=footer_sample_size, + generate_bitmasks=generate_bitmasks, + behavior=behavior, + ) + return from_map( - _FromParquetFragmentWiseFn( - fs=fs, - form=subform, - listsep=listsep, - unnamed_root=unnamed_root, - ), + fn, pairs, label=label, token=token, @@ -298,24 +339,6 @@ def from_parquet( ) -def _file_to_partition(path, fs, columns, form, subrg=None): - """read a whole parquet file to awkward""" - return _load( - actual_paths=[path], - fs=fs, - parquet_columns=columns, - subrg=subrg or [None], - footer_sample_size=2**15, - max_gap=2**10, - max_block=2**22, - generate_bitmasks=False, - metadata=None, - highlevel=True, - subform=form, - behavior=None, - ) - - def _metadata_file_from_data_files(path_list, fs, out_path): """ Aggregate _metadata and _common_metadata from data files @@ -377,7 +400,7 @@ def _write_partition( ): import pyarrow.parquet as pq - t = to_arrow_table( + t = ak_to_arrow_table.to_arrow_table( data, list_to32=True, string_to32=True, @@ -414,7 +437,7 @@ def __init__( path: Any, return_metadata: bool = False, compression: Any | None = None, - head: Any | None = None, + head: bool = False, npartitions: int | None = None, prefix: str | None = None, ): @@ -484,7 +507,7 @@ def to_parquet( # - parquet 2 for full set of time and int types # - v2 data page (for possible later fastparquet implementation) # - dict encoding always off - fs, _ = fsspec.core.url_to_fs(path, **(storage_options or {})) + fs, _ = url_to_fs(path, **(storage_options or {})) name = f"write-parquet-{tokenize(fs, data, path)}" map_res = map_partitions(