Skip to content

Commit

Permalink
feat: add attrs and highlevel
Browse files Browse the repository at this point in the history
  • Loading branch information
agoose77 committed Nov 8, 2023
1 parent 850c9c2 commit 9ebaa37
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 72 deletions.
9 changes: 9 additions & 0 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,7 @@ def new_array_object(
*,
meta: ak.Array | None = None,
behavior: dict | None = None,
attrs: dict | None = None,
npartitions: int | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
) -> Array:
Expand All @@ -1507,6 +1508,10 @@ def new_array_object(
typetracer for the new Array. If the configuration option
``awkward.compute-unknown-meta`` is set to ``False``,
undefined `meta` will be assigned an empty typetracer.
behavior : dict, optional
Custom #ak.behavior for the output array.
attrs : dict, optional
Custom attributes for the output array.
npartitions : int, optional
Total number of partitions; if used `divisions` will be a
tuple of length `npartitions` + 1 with all elements``None``.
Expand Down Expand Up @@ -1550,6 +1555,8 @@ def new_array_object(

if behavior is not None:
actual_meta.behavior = behavior
if attrs is not None:
actual_meta.attrs = attrs

out = Array(dsk, name, actual_meta, divs)
if actual_meta.__doc__ != actual_meta.__class__.__doc__:
Expand Down Expand Up @@ -1879,6 +1886,8 @@ def non_trivial_reduction(
keepdims: bool,
mask_identity: bool,
reducer: Callable,
behavior: dict | None = None,
attrs: dict | None = None,
combiner: Callable | None = None,
token: str | None = None,
dtype: Any | None = None,
Expand Down
14 changes: 8 additions & 6 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,17 @@ def from_awkward(


class _FromListsFn:
def __init__(self, behavior: dict | None = None):
def __init__(self, behavior: dict | None = None, attrs: dict | None = None):
self.behavior = behavior
self.attrs = attrs

def __call__(self, x: list) -> ak.Array:
return ak.Array(x, behavior=self.behavior)
return ak.Array(x, behavior=self.behavior, attrs=self.attrs)


def from_lists(source: list, behavior: dict | None = None) -> Array:
def from_lists(
source: list, behavior: dict | None = None, attrs: dict | None = None
) -> Array:
"""Create an Array collection from a list of lists.
Parameters
Expand Down Expand Up @@ -140,11 +143,10 @@ def from_lists(source: list, behavior: dict | None = None) -> Array:
lists = list(source)
divs = (0, *np.cumsum(list(map(len, lists))))
return from_map(
_FromListsFn(behavior=behavior),
_FromListsFn(behavior=behavior, attrs=attrs),
lists,
meta=typetracer_array(ak.Array(lists[0])),
meta=typetracer_array(ak.Array(lists[0], attrs=attrs, behavior=behavior)),
divisions=divs,
behavior=behavior,
label="from-lists",
)

Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/lib/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ def from_json(
resize: float = 8,
highlevel: bool = True,
behavior: dict | None = None,
attrs: dict | None = None,
blocksize: int | str | None = None,
delimiter: bytes | None = None,
compression: str | None = "infer",
Expand Down
37 changes: 29 additions & 8 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
attrs: dict | None = None,
**kwargs: Any,
) -> None:
self.fs = fs
Expand All @@ -53,6 +54,7 @@ def __init__(
self.columns = [f".{c}" for c in self.columns]
self.original_form = original_form
self.behavior = behavior
self.attrs = attrs
self.kwargs = kwargs

@abc.abstractmethod
Expand Down Expand Up @@ -111,17 +113,22 @@ def __init__(
)

def __call__(self, source: Any) -> Any:
array = ak_from_parquet._load(
layout = ak_from_parquet._load(
[source],
parquet_columns=self.columns,
subrg=[None],
subform=self.form,
highlevel=True,
highlevel=False,
attrs=None,
behavior=None,
fs=self.fs,
behavior=self.behavior,
**self.kwargs,
)
return ak.Array(unproject_layout(self.original_form, array.layout))
return ak.Array(
unproject_layout(self.original_form, layout),
attrs=self.attrs,
behavior=self.behavior,
)

def project_columns(self, columns):
return _FromParquetFileWiseFn(
Expand All @@ -130,6 +137,7 @@ def project_columns(self, columns):
listsep=self.listsep,
unnamed_root=self.unnamed_root,
original_form=self.form,
attrs=self.attrs,
behavior=self.behavior,
**self.kwargs,
)
Expand All @@ -145,6 +153,7 @@ def __init__(
unnamed_root: bool = False,
original_form: Form | None = None,
behavior: dict | None = None,
attrs: dict | None = None,
**kwargs: Any,
) -> None:
super().__init__(
Expand All @@ -154,24 +163,30 @@ def __init__(
unnamed_root=unnamed_root,
original_form=original_form,
behavior=behavior,
attrs=attrs,
**kwargs,
)

def __call__(self, pair: Any) -> ak.Array:
subrg, source = pair
if isinstance(subrg, int):
subrg = [[subrg]]
array = ak_from_parquet._load(
layout = ak_from_parquet._load(
[source],
parquet_columns=self.columns,
subrg=subrg,
subform=self.form,
highlevel=True,
highlevel=False,
attrs=None,
behavior=None,
fs=self.fs,
behavior=self.behavior,
**self.kwargs,
)
return ak.Array(unproject_layout(self.original_form, array.layout))
return ak.Array(
unproject_layout(self.original_form, layout),
behavior=self.behavior,
attrs=self.attrs,
)

def project_columns(self, columns):
return _FromParquetFragmentWiseFn(
Expand All @@ -180,6 +195,7 @@ def project_columns(self, columns):
unnamed_root=self.unnamed_root,
original_form=self.form,
behavior=self.behavior,
attrs=self.attrs,
**self.kwargs,
)

Expand All @@ -194,6 +210,7 @@ def from_parquet(
generate_bitmasks: bool = False,
highlevel: bool = True,
behavior: dict | None = None,
attrs: dict | None = None,
ignore_metadata: bool = True,
scan_files: bool = False,
split_row_groups: bool | None = False,
Expand Down Expand Up @@ -263,6 +280,8 @@ def from_parquet(
ignore_metadata,
scan_files,
split_row_groups,
behavior,
attrs,
)

(
Expand Down Expand Up @@ -307,6 +326,7 @@ def from_parquet(
footer_sample_size=footer_sample_size,
generate_bitmasks=generate_bitmasks,
behavior=behavior,
attrs=attrs,
),
actual_paths,
label=label,
Expand Down Expand Up @@ -345,6 +365,7 @@ def from_parquet(
footer_sample_size=footer_sample_size,
generate_bitmasks=generate_bitmasks,
behavior=behavior,
attrs=attrs,
),
pairs,
label=label,
Expand Down
5 changes: 3 additions & 2 deletions src/dask_awkward/lib/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def concatenate(
mergebool: bool = True,
highlevel: bool = True,
behavior: dict | None = None,
attrs: dict | None = None,
) -> Array:
label = "concatenate"
token = tokenize(arrays, axis, mergebool, highlevel, behavior)
Expand All @@ -49,7 +50,7 @@ def concatenate(
g[(name, i)] = k
i += 1

meta = ak.concatenate(metas)
meta = ak.concatenate(metas, behavior=behavior, attrs=attrs)
assert isinstance(meta, ak.Array)

prev_names = [iarr.name for iarr in arrays]
Expand All @@ -65,7 +66,7 @@ def concatenate(
if partition_compatibility(*arrays) == PartitionCompatibility.NO:
raise IncompatiblePartitions("concatenate", *arrays)

fn = _ConcatenateFnAxisGT0(axis=axis)
fn = _ConcatenateFnAxisGT0(axis=axis, behavior=behavior, attrs=attrs)
return map_partitions(fn, *arrays)

else:
Expand Down
Loading

0 comments on commit 9ebaa37

Please sign in to comment.