Skip to content

Commit

Permalink
sync from_parquet function arguments with upstream awkward
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Jul 24, 2023
1 parent d159390 commit 8e09b6d
Showing 1 changed file with 117 additions and 94 deletions.
211 changes: 117 additions & 94 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from typing import Any, Sequence

import awkward as ak
import fsspec
import awkward.operations.ak_from_parquet as ak_from_parquet
import awkward.operations.ak_to_arrow_table as ak_to_arrow_table
from awkward.forms.form import Form
from awkward.operations import ak_from_parquet, to_arrow_table
from awkward.operations.ak_from_parquet import _load
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.highlevelgraph import HighLevelGraph
from fsspec import AbstractFileSystem
from fsspec.core import get_fs_token_paths
from fsspec.core import get_fs_token_paths, url_to_fs

from dask_awkward.lib.core import (
Array,
Expand All @@ -40,6 +40,8 @@ def __init__(
listsep: str = "list.item",
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
**kwargs: Any,
) -> None:
self.fs = fs
self.form = form
Expand All @@ -49,6 +51,8 @@ def __init__(
if self.unnamed_root:
self.columns = [f".{c}" for c in self.columns]
self.original_form = original_form
self.behavior = behavior
self.kwargs = kwargs

@abc.abstractmethod
def __call__(self, source: Any) -> ak.Array:
Expand All @@ -69,7 +73,11 @@ def __repr__(self) -> str:
f" listsep={self.listsep}\n"
f" unnamed_root={self.unnamed_root}\n"
f" columns={self.columns}\n"
f" behavior={self.behavior}\n"
)
for key, val in self.kwargs.items():
s += f" {key}={val}\n"
s = f"{s})"
return s

def __str__(self) -> str:
Expand All @@ -85,21 +93,29 @@ def __init__(
listsep: str = "list.item",
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
**kwargs: Any,
) -> None:
super().__init__(
fs=fs,
form=form,
listsep=listsep,
unnamed_root=unnamed_root,
original_form=original_form,
behavior=behavior,
**kwargs,
)

def __call__(self, source: Any) -> Any:
array = _file_to_partition(
source,
self.fs,
self.columns,
self.form,
array = _load(
[source],
parquet_columns=self.columns,
subrg=[None],
subform=self.form,
highlevel=True,
fs=self.fs,
behavior=self.behavior,
**self.kwargs,
)
return ak.Array(unproject_layout(self.original_form, array.layout))

Expand All @@ -110,21 +126,16 @@ def project_columns(
) -> _FromParquetFileWiseFn:
if columns is None:
return self

new_form = self.form.select_columns(columns)
new = _FromParquetFileWiseFn(
fs=self.fs,
form=new_form,
listsep=self.listsep,
unnamed_root=self.unnamed_root,
original_form=original_form,
behavior=self.behavior,
**self.kwargs,
)

log.debug(f"project_columns received: {columns}")
log.debug(f"new form is {repr(new_form)}")
log.debug(f"new form columns are: {new_form.columns(self.listsep)}")
log.debug(new)

return new


Expand All @@ -137,26 +148,40 @@ def __init__(
listsep: str = "list.item",
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
**kwargs: Any,
) -> None:
super().__init__(
fs=fs,
form=form,
listsep=listsep,
unnamed_root=unnamed_root,
original_form=original_form,
behavior=behavior,
**kwargs,
)

def __call__(self, pair: Any) -> ak.Array:
subrg, source = pair
if isinstance(subrg, int):
subrg = [[subrg]]
array = _file_to_partition(
source,
self.fs,
self.columns,
self.form,
array = _load(
[source],
parquet_columns=self.columns,
subrg=subrg,
subform=self.form,
highlevel=True,
fs=self.fs,
behavior=self.behavior,
**self.kwargs,
)
# array = _file_to_partition(
# source,
# self.fs,
# self.columns,
# self.form,
# subrg=subrg,
# )
return ak.Array(unproject_layout(self.original_form, array.layout))

def project_columns(
Expand All @@ -171,65 +196,65 @@ def project_columns(
form=self.form.select_columns(columns),
unnamed_root=self.unnamed_root,
original_form=original_form,
behavior=self.behavior,
**self.kwargs,
)


def from_parquet(
path: Any,
storage_options: dict | None = None,
path: str | list[str],
*,
columns: str | list[str] | None = None,
storage_options: dict[str, Any] | None = None,
max_gap: int = 64_000,
max_block: int = 256_000_000,
footer_sample_size: int = 1_000_000,
generate_bitmasks: bool = False,
highlevel: bool = True,
behavior: dict | None = None,
ignore_metadata: bool = True,
scan_files: bool = False,
columns: Sequence[str] | None = None,
filters: Any | None = None,
split_row_groups: Any | None = None,
split_row_groups: bool = False,
) -> Array:
"""Create an Array collection from a Parquet dataset.
if not highlevel:
raise ValueError("dask-awkward only supports highlevel=True")

Parameters
----------
url : str
Location of data, including protocol (e.g. ``s3://``)
storage_options : dict
For creating filesystem (see ``fsspec`` documentation).
ignore_metadata : bool
Ignore parquet metadata associated with the input dataset (the
``_metadata`` file).
scan_files : bool
TBD
columns : list[str], optional
Select columns to load
filters : list[list[tuple]], optional
Parquet-style filters for excluding row groups based on column statistics
split_row_groups: bool, optional
If True, each row group becomes a partition. If False, each
file becomes a partition. If None, the existence of a
``_metadata`` file and ignore_metadata=False implies True,
else False.
Returns
-------
Array
Array collection from the parquet dataset.
"""
fs, tok, paths = get_fs_token_paths(
path, mode="rb", storage_options=storage_options
fs, token, paths = get_fs_token_paths(
path,
mode="rb",
storage_options=storage_options,
)
label = "from-parquet"
token = tokenize(
tok, paths, ignore_metadata, columns, filters, scan_files, split_row_groups
token,
paths,
columns,
max_gap,
max_block,
footer_sample_size,
generate_bitmasks,
behavior,
ignore_metadata,
scan_files,
split_row_groups,
)

# same as ak_metadata_from_parquet
results = ak_from_parquet.metadata(
(
parquet_columns,
subform,
actual_paths,
fs,
subrg,
row_counts,
metadata,
) = ak_from_parquet.metadata(
path,
storage_options,
row_groups=None,
columns=columns,
ignore_metadata=ignore_metadata,
scan_files=scan_files,
)
parquet_columns, subform, actual_paths, fs, subrg, row_counts, metadata = results

listsep = "list.item"
unnamed_root = False
Expand All @@ -244,26 +269,34 @@ def from_parquet(
split_row_groups = row_counts is not None and len(row_counts) > 1

meta = ak.Array(
subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True)
subform.length_zero_array(highlevel=False).to_typetracer(forget_length=True),
behavior=behavior,
)

if split_row_groups is False or subrg is None:
# file-wise

fn = _FromParquetFileWiseFn(
fs=fs,
form=subform,
listsep=listsep,
unnamed_root=unnamed_root,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
generate_bitmasks=generate_bitmasks,
behavior=behavior,
)

return from_map(
_FromParquetFileWiseFn(
fs=fs,
form=subform,
listsep=listsep,
unnamed_root=unnamed_root,
),
fn,
actual_paths,
label=label,
token=token,
meta=typetracer_array(meta),
)
else:
# row-group wise

if set(subrg) == {None}:
rgs_paths = {path: 0 for path in actual_paths}
for i in range(metadata.num_row_groups):
Expand All @@ -283,13 +316,21 @@ def from_parquet(

for isubrg, path in zip(subrg, actual_paths):
pairs.extend([(irg, path) for irg in isubrg])

fn = _FromParquetFragmentWiseFn(
fs=fs,
form=subform,
listsep=listsep,
unnamed_root=unnamed_root,
max_gap=max_gap,
max_block=max_block,
footer_sample_size=footer_sample_size,
generate_bitmasks=generate_bitmasks,
behavior=behavior,
)

return from_map(
_FromParquetFragmentWiseFn(
fs=fs,
form=subform,
listsep=listsep,
unnamed_root=unnamed_root,
),
fn,
pairs,
label=label,
token=token,
Expand All @@ -298,24 +339,6 @@ def from_parquet(
)


def _file_to_partition(path, fs, columns, form, subrg=None):
"""read a whole parquet file to awkward"""
return _load(
actual_paths=[path],
fs=fs,
parquet_columns=columns,
subrg=subrg or [None],
footer_sample_size=2**15,
max_gap=2**10,
max_block=2**22,
generate_bitmasks=False,
metadata=None,
highlevel=True,
subform=form,
behavior=None,
)


def _metadata_file_from_data_files(path_list, fs, out_path):
"""
Aggregate _metadata and _common_metadata from data files
Expand Down Expand Up @@ -377,7 +400,7 @@ def _write_partition(
):
import pyarrow.parquet as pq

t = to_arrow_table(
t = ak_to_arrow_table.to_arrow_table(
data,
list_to32=True,
string_to32=True,
Expand Down Expand Up @@ -414,7 +437,7 @@ def __init__(
path: Any,
return_metadata: bool = False,
compression: Any | None = None,
head: Any | None = None,
head: bool = False,
npartitions: int | None = None,
prefix: str | None = None,
):
Expand Down Expand Up @@ -484,7 +507,7 @@ def to_parquet(
# - parquet 2 for full set of time and int types
# - v2 data page (for possible later fastparquet implementation)
# - dict encoding always off
fs, _ = fsspec.core.url_to_fs(path, **(storage_options or {}))
fs, _ = url_to_fs(path, **(storage_options or {}))
name = f"write-parquet-{tokenize(fs, data, path)}"

map_res = map_partitions(
Expand Down

0 comments on commit 8e09b6d

Please sign in to comment.