diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..dfd0e308 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +# Set update schedule for GitHub Actions + +version: 2 +updates: + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + # Check for updates to GitHub Actions every week + interval: "weekly" diff --git a/.github/workflows/awkward-main.yml b/.github/workflows/awkward-main.yml index c27d57fd..e3e519b2 100644 --- a/.github/workflows/awkward-main.yml +++ b/.github/workflows/awkward-main.yml @@ -28,7 +28,7 @@ jobs: - name: Install run: | python3 -m pip install pip wheel -U - python3 -m pip install -q --no-cache-dir -e .[complete] + python3 -m pip install -q --no-cache-dir -e .[complete,test] python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main - name: Run tests run: | diff --git a/.github/workflows/conda-tests.yml b/.github/workflows/conda-tests.yml index 959b345d..4fd22332 100644 --- a/.github/workflows/conda-tests.yml +++ b/.github/workflows/conda-tests.yml @@ -37,7 +37,7 @@ jobs: shell: bash -l {0} run: | conda activate test-environment - python -m pip install -q --no-cache-dir .[complete] + python -m pip install -q --no-cache-dir .[complete,test] - name: Run tests shell: bash -l {0} run: | diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 025336cd..6e0ab9a1 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -29,7 +29,8 @@ jobs: run: | pip install wheel pip install pytest-cov - pip install -q --no-cache-dir -e .[complete] + pip install dask[complete] + pip install -q --no-cache-dir -e .[complete,test] pytest --cov=dask_awkward --cov-report=xml - name: Upload Coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/.github/workflows/pypi-tests.yml b/.github/workflows/pypi-tests.yml index 3225f7d1..3d705c18 100644 --- a/.github/workflows/pypi-tests.yml +++ b/.github/workflows/pypi-tests.yml @@ -31,7 +31,7 @@ jobs: run: | pip install pip wheel -U pip install dask[complete] - pip install -q --no-cache-dir .[complete] + pip install -q --no-cache-dir .[complete,test] pip list - name: test run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dd111878..86187130 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,15 +13,15 @@ repos: - id: trailing-whitespace - repo: https://github.com/psf/black - rev: 23.3.0 + rev: 23.7.0 hooks: - id: black language_version: python3 args: - --target-version=py38 -- repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.0.272 +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.0.277 hooks: - id: ruff @@ -32,7 +32,7 @@ repos: language_version: python3 - repo: https://github.com/asottile/pyupgrade - rev: v3.6.0 + rev: v3.9.0 hooks: - id: pyupgrade args: @@ -47,3 +47,10 @@ repos: rev: v1.5.0 hooks: - id: yesqa + +- repo: https://github.com/adamchainz/blacken-docs + rev: 1.15.0 + hooks: + - id: blacken-docs + additional_dependencies: + - black diff --git a/docs/api-inspect.rst b/docs/api-inspect.rst index d7049fb2..0e03a8c4 100644 --- a/docs/api-inspect.rst +++ b/docs/api-inspect.rst @@ -6,5 +6,7 @@ Inspection .. autosummary:: :toctree: generated/ + partition_compatibility + PartitionCompatibility necessary_columns sample diff --git a/docs/gs-intro.rst b/docs/gs-intro.rst index e8c13305..8282fc3f 100644 --- a/docs/gs-intro.rst +++ b/docs/gs-intro.rst @@ -43,6 +43,7 @@ notice the use of wildcard syntax ("*"). from pathlib import Path import awkward as ak + file = Path("data.00.json") x = ak.from_json(file, line_delimited=True) x = x[ak.num(x.foo) > 2] @@ -52,6 +53,7 @@ notice the use of wildcard syntax ("*"). .. code-block:: python import dask_awkward as dak + # dask-awkward only supports line-delimited=True x = dak.from_json("data.*.json") x = x[dak.num(x.foo) > 2] diff --git a/docs/gs-terminology.rst b/docs/gs-terminology.rst index 985d7e40..d0208309 100644 --- a/docs/gs-terminology.rst +++ b/docs/gs-terminology.rst @@ -29,6 +29,7 @@ objects from the namespaces: # don't do this! from dask_awkward import Array + # or this! from awkward import Array diff --git a/docs/ht-behaviors.rst b/docs/ht-behaviors.rst index 9a346f41..4e28fc99 100644 --- a/docs/ht-behaviors.rst +++ b/docs/ht-behaviors.rst @@ -23,28 +23,32 @@ topic). behavior: dict = {} + @ak.mixin_class(behavior) class Point: def distance(self, other): - return np.sqrt( - (self.x - other.x) ** 2 + (self.y - other.y) ** 2 - ) - - points1 = ak.Array([ - [{"x": 1.0, "y": 1.1}, {"x": 2.0, "y": 2.2}, {"x": 3, "y": 3.3}], - [], - [{"x": 4.0, "y": 4.4}, {"x": 5.0, "y": 5.5}], - [{"x": 6.0, "y": 6.6}], - [{"x": 7.0, "y": 7.7}, {"x": 8.0, "y": 8.8}, {"x": 9, "y": 9.9}], - ]) - - points2 = ak.Array([ - [{"x": 0.9, "y": 1.0}, {"x": 2.0, "y": 2.2}, {"x": 2.9, "y": 3.0}], - [], - [{"x": 3.9, "y": 4.0}, {"x": 5.0, "y": 5.5}], - [{"x": 5.9, "y": 6.0}], - [{"x": 6.9, "y": 7.0}, {"x": 8.0, "y": 8.8}, {"x": 8.9, "y": 9.0}], - ]) + return np.sqrt((self.x - other.x) ** 2 + (self.y - other.y) ** 2) + + + points1 = ak.Array( + [ + [{"x": 1.0, "y": 1.1}, {"x": 2.0, "y": 2.2}, {"x": 3, "y": 3.3}], + [], + [{"x": 4.0, "y": 4.4}, {"x": 5.0, "y": 5.5}], + [{"x": 6.0, "y": 6.6}], + [{"x": 7.0, "y": 7.7}, {"x": 8.0, "y": 8.8}, {"x": 9, "y": 9.9}], + ] + ) + + points2 = ak.Array( + [ + [{"x": 0.9, "y": 1.0}, {"x": 2.0, "y": 2.2}, {"x": 2.9, "y": 3.0}], + [], + [{"x": 3.9, "y": 4.0}, {"x": 5.0, "y": 5.5}], + [{"x": 5.9, "y": 6.0}], + [{"x": 6.9, "y": 7.0}, {"x": 8.0, "y": 8.8}, {"x": 8.9, "y": 9.0}], + ] + ) array1 = dak.from_awkward(points1, npartitions=2) array2 = dak.from_awkward(points2, npartitions=2) diff --git a/docs/ht-io.rst b/docs/ht-io.rst index 75c49983..20d5687a 100644 --- a/docs/ht-io.rst +++ b/docs/ht-io.rst @@ -7,7 +7,7 @@ datasets stored in Parquet or JSON format. Take this code-block for example: -.. code:: python +.. code:: pycon >>> import dask_awkward as dak >>> ds1 = dak.from_parquet("s3://path/to/dataset") diff --git a/docs/me-faq.rst b/docs/me-faq.rst index 94906239..4a713815 100644 --- a/docs/me-faq.rst +++ b/docs/me-faq.rst @@ -39,7 +39,7 @@ on the first partition will occur." what does that mean?** is called ``awkward.compute-unknown-meta``. The default setting is ``True``. In code you can do something like this: - .. code-block:: python + .. code-block:: pycon with dask.config.set({"awkward.compute-unknown-meta": False}): # ... your code diff --git a/docs/me-optimization.rst b/docs/me-optimization.rst index 9205f6eb..0d2117d8 100644 --- a/docs/me-optimization.rst +++ b/docs/me-optimization.rst @@ -40,7 +40,7 @@ column of floats for ``bar.y``. If our task graph is of the form: -.. code:: python +.. code:: pycon >>> ds = dak.from_parquet("/path/to/data") >>> result = ds.bar.x / ds.foo @@ -84,7 +84,7 @@ You can see which columns are determined to be necessary by calling (it returns a mapping that pairs an input layer with the list of necessary columns): -.. code:: python +.. code:: pycon >>> dak.necessary_columns(result) {"some-layer-name": ["foo", "bar.x"]} @@ -112,12 +112,13 @@ which columns should be read from disk. The determine how one should use the ``columns=`` argument. Using our above example, we write -.. code:: python +.. code:: pycon >>> ds = dak.from_parquet("/path/to/data", columns=["bar.x", "foo"]) >>> result = ds.bar.x / ds.foo >>> with dask.config.set({"awkward.optimization.enabled": False}): ... result.compute() + ... With this code we can save a little bit of overhead by not running the necessary columns optimization after already defining, by hand, the diff --git a/pyproject.toml b/pyproject.toml index fc2fbe2c..da6e207a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ "Topic :: Scientific/Engineering", ] dependencies = [ - "awkward >=2.2.2", + "awkward >=2.2.4", "dask >=2023.04.0", ] dynamic = ["version"] @@ -37,29 +37,24 @@ Homepage = "https://github.com/dask-contrib/dask-awkward" "Bug Tracker" = "https://github.com/dask-contrib/dask-awkward/issues" [project.optional-dependencies] -complete = [ +io = [ "aiohttp", "pyarrow", - "pytest >=6.0", - "pytest-cov >=3.0.0", - "requests >=2.27.1", ] +complete = [ + "dask-awkward[io]", +] +# `docs` and `test` are separate from user installs docs = [ + "dask-awkward[complete]", "dask-sphinx-theme >=3.0.2", - "pyarrow", "sphinx-design", - "pytest >=6.0", - "pytest-cov >=3.0.0", "requests >=2.27.1", ] -io = [ - "aiohttp", - "pyarrow", -] test = [ - "aiohttp", + "dask-awkward[complete]", "distributed", - "pyarrow", + "pandas", "pytest >=6.0", "pytest-cov >=3.0.0", "requests >=2.27.1", diff --git a/src/dask_awkward/layers/__init__.py b/src/dask_awkward/layers/__init__.py index 137b680c..ebdeb516 100644 --- a/src/dask_awkward/layers/__init__.py +++ b/src/dask_awkward/layers/__init__.py @@ -2,6 +2,12 @@ AwkwardBlockwiseLayer, AwkwardInputLayer, AwkwardMaterializedLayer, + AwkwardTreeReductionLayer, ) -__all__ = ("AwkwardInputLayer", "AwkwardBlockwiseLayer", "AwkwardMaterializedLayer") +__all__ = ( + "AwkwardInputLayer", + "AwkwardBlockwiseLayer", + "AwkwardMaterializedLayer", + "AwkwardTreeReductionLayer", +) diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 5ab130a0..eb9c1805 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -7,6 +7,7 @@ from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token from dask.highlevelgraph import MaterializedLayer +from dask.layers import DataFrameTreeReduction from dask_awkward.utils import LazyInputsDict @@ -18,12 +19,12 @@ class AwkwardBlockwiseLayer(Blockwise): """Just like upstream Blockwise, except we override pickling""" @classmethod - def from_blockwise(cls, layer) -> AwkwardBlockwiseLayer: + def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer: ob = object.__new__(cls) ob.__dict__.update(layer.__dict__) return ob - def mock(self): + def mock(self) -> tuple[AwkwardBlockwiseLayer, Any | None]: layer = copy.copy(self) nb = layer.numblocks layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()} @@ -205,26 +206,77 @@ def project_columns(self, columns: list[str]) -> AwkwardInputLayer: class AwkwardMaterializedLayer(MaterializedLayer): - def __init__(self, mapping, previous_layer_name, **kwargs): - self.prev_name = previous_layer_name + def __init__( + self, + mapping: dict, + *, + previous_layer_names: list[str], + fn: Callable | None = None, + **kwargs: Any, + ): + self.previous_layer_names: list[str] = previous_layer_names + self.fn = fn super().__init__(mapping, **kwargs) - def mock(self): + def mock(self) -> tuple[MaterializedLayer, Any | None]: mapping = self.mapping.copy() if not mapping: # no partitions at all return self, None name = next(iter(mapping))[0] - if (name, 0) in mapping: - task = mapping[(name, 0)] - task = tuple( - (self.prev_name, 0) - if isinstance(v, tuple) and len(v) == 2 and v[0] == self.prev_name - else v - for v in task - ) + # one previous layer name + # + # this case is used for mocking repartition or slicing where + # we maybe have multiple partitions that need to be included + # in a task. + if len(self.previous_layer_names) == 1: + prev_name: str = self.previous_layer_names[0] + if (name, 0) in mapping: + task = mapping[(name, 0)] + task = tuple( + (prev_name, 0) + if isinstance(v, tuple) and len(v) == 2 and v[0] == prev_name + else v + for v in task + ) + + # when using Array.partitions we need to mock that we + # just want the first partition. + if len(task) == 2 and task[1] > 0: + task = (task[0], 0) + return MaterializedLayer({(name, 0): task}), None + return self, None + + # more than one previous_layer_names + # + # this case is needed for dak.concatenate on axis=0; we need + # the first partition of _each_ of the previous layer names! + else: + if self.fn is None: + raise ValueError( + "For multiple previous layers the fn argument cannot be None." + ) + name0s = tuple((name, 0) for name in self.previous_layer_names) + task = (self.fn, *name0s) return MaterializedLayer({(name, 0): task}), None # failed to cull during column opt return self, None + + +class AwkwardTreeReductionLayer(DataFrameTreeReduction): + def mock(self) -> tuple[AwkwardTreeReductionLayer, Any | None]: + return ( + AwkwardTreeReductionLayer( + name=self.name, + name_input=self.name_input, + npartitions_input=1, + concat_func=self.concat_func, + tree_node_func=self.tree_node_func, + finalize_func=self.finalize_func, + split_every=self.split_every, + tree_node_name=self.tree_node_name, + ), + None, + ) diff --git a/src/dask_awkward/lib/__init__.py b/src/dask_awkward/lib/__init__.py index 9e8b0d49..197d141c 100644 --- a/src/dask_awkward/lib/__init__.py +++ b/src/dask_awkward/lib/__init__.py @@ -1,6 +1,11 @@ -from dask_awkward.lib.core import Array, Record, Scalar +from dask_awkward.lib.core import Array, PartitionCompatibility, Record, Scalar from dask_awkward.lib.core import _type as type -from dask_awkward.lib.core import map_partitions, typetracer_from_form +from dask_awkward.lib.core import ( + compatible_partitions, + map_partitions, + partition_compatibility, + typetracer_from_form, +) from dask_awkward.lib.describe import fields from dask_awkward.lib.inspect import necessary_columns, sample from dask_awkward.lib.io.io import ( @@ -47,6 +52,7 @@ cartesian, combinations, copy, + drop_none, fill_none, firsts, flatten, diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index 60ad37d9..c287b77b 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -8,9 +8,10 @@ import sys import warnings from collections.abc import Callable, Hashable, Mapping, Sequence +from enum import IntEnum from functools import cached_property, partial from numbers import Number -from typing import TYPE_CHECKING, Any, TypeVar +from typing import TYPE_CHECKING, Any, Literal, TypeVar, Union, overload import awkward as ak import dask.config @@ -21,16 +22,21 @@ TypeTracerArray, is_unknown_scalar, ) -from awkward.highlevel import _dir_pattern -from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize +from awkward.highlevel import NDArrayOperatorsMixin, _dir_pattern +from dask.base import ( + DaskMethodsMixin, + dont_optimize, + is_dask_collection, + tokenize, + unpack_collections, +) from dask.blockwise import BlockwiseDep from dask.blockwise import blockwise as dask_blockwise from dask.context import globalmethod from dask.delayed import Delayed from dask.highlevelgraph import HighLevelGraph from dask.threaded import get as threaded_get -from dask.utils import IndexCallable, funcname, key_split -from numpy.lib.mixins import NDArrayOperatorsMixin +from dask.utils import IndexCallable, funcname, is_arraylike, key_split from tlz import first from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardMaterializedLayer @@ -71,7 +77,7 @@ def _finalize_array(results: Sequence[Any]) -> Any: # sometimes we just check the length of partitions so all results # will be integers, just make an array out of that. - elif isinstance(results, tuple) and all( + elif isinstance(results, (tuple, list)) and all( isinstance(r, (int, np.integer)) for r in results ): return ak.Array(list(results)) @@ -555,6 +561,40 @@ def reset_meta(self) -> None: """Assign an empty typetracer array as the collection metadata.""" self._meta = empty_typetracer() + def repartition(self, npartitions=None, divisions=None, rows_per_partition=None): + from dask_awkward.layers import AwkwardMaterializedLayer + from dask_awkward.lib.structure import repartition_layer + + if sum(bool(_) for _ in [npartitions, divisions, rows_per_partition]) != 1: + raise ValueError("Please specify exactly one of the inputs") + if not self.known_divisions: + self.eager_compute_divisions() + nrows = self.divisions[-1] + if npartitions: + rows_per_partition = math.ceil(nrows / npartitions) + if rows_per_partition: + divisions = list(range(0, nrows, rows_per_partition)) + divisions.append(nrows) + + token = tokenize(self, divisions) + key = f"repartition-{token}" + + new_layer_raw = repartition_layer(self, key, divisions) + new_layer = AwkwardMaterializedLayer( + new_layer_raw, + previous_layer_names=[self.name], + ) + new_graph = HighLevelGraph.from_collections( + key, new_layer, dependencies=(self,) + ) + return new_array_object( + new_graph, + key, + meta=self._meta, + behavior=self.behavior, + divisions=divisions, + ) + def __len__(self) -> int: if not self.known_divisions: self.eager_compute_divisions() @@ -642,7 +682,7 @@ def divisions(self) -> tuple[int | None, ...]: @property def known_divisions(self) -> bool: - """True of the divisions are known (absence of ``None`` in the tuple).""" + """True if the divisions are known (absence of ``None`` in the tuple).""" return len(self.divisions) > 0 and None not in self.divisions @property @@ -695,6 +735,7 @@ def keys_array(self) -> np.ndarray: return np.array(self.__dask_keys__(), dtype=object) def _partitions(self, index: Any) -> Array: + # TODO: this produces a materialized layer, but could work like repartition() and slice() if not isinstance(index, tuple): index = (index,) token = tokenize(self, index) @@ -707,12 +748,13 @@ def _partitions(self, index: Any) -> Array: dsk = {(name, i): tuple(key) for i, key in enumerate(new_keys)} graph = HighLevelGraph.from_collections( name, - AwkwardMaterializedLayer(dsk, previous_layer_name=self.name), - dependencies=[self], + AwkwardMaterializedLayer(dsk, previous_layer_names=[self.name]), + dependencies=(self,), ) # if a single partition was requested we trivially know the new divisions. if len(raw) == 1 and isinstance(raw[0], int) and self.known_divisions: + # TODO: don't we always know the divisions? new_divisions = ( 0, self.divisions[raw[0] + 1] - self.divisions[raw[0]], # type: ignore @@ -777,7 +819,7 @@ def _getitem_outer_bool_or_int_lazy_array( self, where: Array | tuple[Any, ...] ) -> Any: ba = where if isinstance(where, Array) else where[0] - if not compatible_partitions(self, ba): + if partition_compatibility(self, ba) == PartitionCompatibility.NO: raise IncompatiblePartitions("getitem", self, ba) new_meta: Any | None = None @@ -843,6 +885,14 @@ def _getitem_outer_int(self, where: int | tuple[Any, ...]) -> Any: partition = self.partitions[pidx] if partition._meta is not None: new_meta = partition._meta[where] + # new_meta = make_unknown_length(partition._meta)[where] + # new_meta = ak.Array( + # ak.to_backend( + # partition._meta, + # "typetracer", + # highlevel=False, + # ).to_typetracer(forget_length=True) + # )[where] # if we know a new array is going to be made, just call the # trivial inner on the new partition. @@ -866,7 +916,7 @@ def _getitem_outer_int(self, where: int | tuple[Any, ...]) -> Any: } hlg = HighLevelGraph.from_collections( name, - AwkwardMaterializedLayer(dsk, previous_layer_name=self.name), + AwkwardMaterializedLayer(dsk, previous_layer_names=[self.name]), dependencies=[partition], ) if isinstance(new_meta, ak.Record): @@ -933,7 +983,7 @@ def _getitem_slice_on_zero(self, where: tuple[slice, ...]): hlg = HighLevelGraph.from_collections( name, - AwkwardMaterializedLayer(dask, previous_layer_name=self.name), + AwkwardMaterializedLayer(dask, previous_layer_names=[self.name]), dependencies=[self], ) return new_array_object( @@ -1154,6 +1204,7 @@ def map_partitions( self, func: Callable, *args: Any, + traverse: bool = True, **kwargs: Any, ) -> Array: """Map a function across all partitions of the collection. @@ -1167,6 +1218,8 @@ def map_partitions( collection, if arguments are Array collections they must be compatibly partitioned with the object this method is being called from. + traverse : bool + Unpack basic python containers to find dask collections. **kwargs : Any Additional keyword arguments passed to the `func`. @@ -1180,7 +1233,7 @@ def map_partitions( dask_awkward.map_partitions """ - return map_partitions(func, self, *args, **kwargs) + return map_partitions(func, self, *args, traverse=traverse, **kwargs) def eager_compute_divisions(self) -> None: """Force a compute of the divisions.""" @@ -1190,38 +1243,35 @@ def clear_divisions(self) -> None: """Clear the divisions of a Dask Awkward Collection.""" self._divisions = (None,) * (self.npartitions + 1) - def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): - if method != "__call__": - raise RuntimeError("Array ufunc supports only method == '__call__'") + def __awkward_function__(self, func, array_likes, args, kwargs): + import dask_awkward - new_meta = None + if any(isinstance(arg, ak.Array) for arg in array_likes): + raise TypeError("cannot mix awkward.Array and dask_awkward.Array") - # divisions need to be compat. (identical for now?) + fn_name = func.__qualname__ + try: + fn = getattr(dask_awkward, fn_name) + except AttributeError: + return NotImplemented + return fn(*args, **kwargs) - inputs_meta = [] - for inp in inputs: - # if input is a Dask Awkward Array collection, grab it's meta - if isinstance(inp, Array): - inputs_meta.append(inp._meta) - # if input is a concrete Awkward Array, grab it's typetracer - elif isinstance(inp, ak.Array): - inputs_meta.append(typetracer_array(inp)) - # otherwise pass along - else: - inputs_meta.append(inp) + def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): + if method != "__call__": + raise RuntimeError("Array ufunc supports only method == '__call__'") - # compute new meta from inputs - new_meta = ufunc(*inputs_meta) + dak_arrays = tuple(a for a in inputs if isinstance(a, Array)) + if partition_compatibility(*dak_arrays) == PartitionCompatibility.NO: + raise IncompatiblePartitions(*dak_arrays) return map_partitions( ufunc, *inputs, - meta=new_meta, output_divisions=1, **kwargs, ) - def __array__(self, *args, **kwargs): + def __array__(self, *_, **__): raise NotImplementedError def to_delayed(self, optimize_graph: bool = True) -> list[Delayed]: @@ -1421,6 +1471,9 @@ def partitionwise_layer( pairs.extend([arg, "i"]) elif len(arg.numblocks) == 2: pairs.extend([arg, "ij"]) + elif is_arraylike(arg) and is_dask_collection(arg) and arg.ndim == 1: + pairs.extend([arg.name, "i"]) + numblocks[arg.name] = arg.numblocks elif is_dask_collection(arg): raise DaskAwkwardNotImplemented( "Use of Array with other Dask collections is currently unsupported." @@ -1442,22 +1495,58 @@ def partitionwise_layer( return layer +class ArgsKwargsPackedFunction: + def __init__(self, the_fn, arg_repackers, kwarg_repacker, arg_lens_for_repackers): + self.fn = the_fn + self.arg_repackers = arg_repackers + self.kwarg_repacker = kwarg_repacker + self.arg_lens_for_repackers = arg_lens_for_repackers + + def __call__(self, *args_deps_expanded): + """This packing function receives a list of strictly + ordered arguments. The first range of arguments, + [0:sum(self.arg_lens_for_repackers)], corresponding to + the origin *args of self.fn but flattened to a list of + dask collections and non-dask-collection-containing arguments. + The remainder are the dask-collection-deps of self.fn's original + kwargs. The lengths of expected flattened inputs for each arg are + specified when this class is created, and we use that to process + the input flattened list of arguments sequentially. + + The various repackers deal with restructuring the received flattened + list into the shape that self.fn expects. + """ + args = [] + len_args = 0 + for repacker, n_args in zip(self.arg_repackers, self.arg_lens_for_repackers): + args.append( + repacker(args_deps_expanded[len_args : len_args + n_args])[0] + if repacker is not None + else args_deps_expanded[len_args] + ) + len_args += n_args + kwargs = self.kwarg_repacker(args_deps_expanded[len_args:])[0] + return self.fn(*args, **kwargs) + + def map_partitions( - fn: Callable, + base_fn: Callable, *args: Any, label: str | None = None, token: str | None = None, meta: Any | None = None, output_divisions: int | None = None, opt_touch_all: bool = False, + traverse: bool = True, **kwargs: Any, ) -> Array: """Map a callable across all partitions of any number of collections. Parameters ---------- - fn : Callable - Function to apply on all partitions. + base_fn : Callable + Function to apply on all partitions, this will get wraped to + handle kwargs, including dask collections. *args : Collections and function arguments Arguments passed to the function. Partitioned arguments (i.e. Dask collections) will have `fn` applied to each partition. @@ -1488,6 +1577,8 @@ def map_partitions( opt_touch_all : bool Touch all layers in this graph during typetracer based optimization. + traverse : bool + Unpack basic python containers to find dask collections. **kwargs : Any Additional keyword arguments passed to the `fn`. @@ -1523,35 +1614,58 @@ def map_partitions( This is effectively the same as `d = c * a` """ - token = token or tokenize(fn, *args, meta, **kwargs) - label = label or funcname(fn) + token = token or tokenize(base_fn, *args, meta, **kwargs) + label = hyphenize(label or funcname(base_fn)) name = f"{label}-{token}" + kwarg_flat_deps, kwarg_repacker = unpack_collections(kwargs, traverse=traverse) + flat_deps, _ = unpack_collections(*args, *kwargs.values(), traverse=traverse) + + arg_flat_deps_expanded = [] + arg_repackers = [] + arg_lens_for_repackers = [] + for arg in args: + this_arg_flat_deps, repacker = unpack_collections(arg, traverse=traverse) + if ( + len(this_arg_flat_deps) > 0 + ): # if the deps list is empty this arg does not contain any dask collection, no need to repack! + arg_flat_deps_expanded.extend(this_arg_flat_deps) + arg_repackers.append(repacker) + arg_lens_for_repackers.append(len(this_arg_flat_deps)) + else: + arg_flat_deps_expanded.append(arg) + arg_repackers.append(None) + arg_lens_for_repackers.append(1) + + fn = ArgsKwargsPackedFunction( + base_fn, + arg_repackers, + kwarg_repacker, + arg_lens_for_repackers, + ) + lay = partitionwise_layer( fn, name, - *args, + *arg_flat_deps_expanded, + *kwarg_flat_deps, opt_touch_all=opt_touch_all, - **kwargs, ) - deps = [a for a in args if is_dask_collection(a)] + [ - v for _, v in kwargs.items() if is_dask_collection(v) - ] if meta is None: - meta = map_meta(fn, *args, **kwargs) + meta = map_meta(fn, *arg_flat_deps_expanded, *kwarg_flat_deps) hlg = HighLevelGraph.from_collections( name, lay, - dependencies=deps, + dependencies=flat_deps, ) if output_divisions is not None: if output_divisions == 1: - new_divisions = deps[0].divisions + new_divisions = flat_deps[0].divisions else: new_divisions = tuple( - map(lambda x: x * output_divisions, deps[0].divisions) + map(lambda x: x * output_divisions, flat_deps[0].divisions) ) return new_array_object( hlg, @@ -1564,83 +1678,145 @@ def map_partitions( hlg, name=name, meta=meta, - npartitions=deps[0].npartitions, + npartitions=flat_deps[0].npartitions, ) -def _from_iter(obj): - """Try to run ak.from_iter, but have fallbacks. +PartialReductionType = ak.Array - This function first tries to call ak.form_iter on the input (which - should be some iterable). We expect a list of Scalar typetracers - to fail, so if the call fails due to ValueError or TypeError then - we manually do some typetracer operations to return the proper - representation of the input iterable-of-typetracers. - """ - try: - return ak.from_iter(obj) - except (ValueError, TypeError): - first_obj = obj[0] +def _chunk_reducer_non_positional( + chunk: ak.Array | PartialReductionType, + is_axis_none: bool, + *, + reducer: Callable, + mask_identity: bool, +) -> PartialReductionType: + return reducer( + chunk, + keepdims=True, + axis=-1 if is_axis_none else 0, + mask_identity=mask_identity, + ) - if isinstance(first_obj, MaybeNone): - first_obj = first_obj.content - return ak.Array( - ak.Array(first_obj) - .layout.form.length_one_array() - .layout.to_typetracer(forget_length=True) - ) +def _concat_reducer_non_positional( + partials: list[PartialReductionType], is_axis_none: bool +) -> ak.Array: + concat_axis = -1 if is_axis_none else 0 + return ak.concatenate(partials, axis=concat_axis) -def total_reduction_to_scalar( +def _finalise_reducer_non_positional( + partial: PartialReductionType, + is_axis_none: bool, + *, + reducer: Callable, + mask_identity: bool, + keepdims: bool, +) -> ak.Array: + return reducer( + partial, + axis=None if is_axis_none else 0, + keepdims=keepdims, + mask_identity=mask_identity, + ) + + +def _prepare_axis_none_chunk(chunk: ak.Array) -> ak.Array: + # TODO: this is private Awkward code. We should figure out how to export it + # if needed + (layout,) = ak._do.remove_structure( + ak.to_layout(chunk), + flatten_records=False, + drop_nones=False, + keepdims=True, + allow_records=False, + ) + return ak.Array(layout, behavior=chunk.behavior) + + +def non_trivial_reduction( *, label: str, array: Array, - meta: Any, - chunked_fn: Callable, - comb_fn: Callable | None = None, - agg_fn: Callable | None = None, + axis: Literal[0] | None, + is_positional: bool, + keepdims: bool, + mask_identity: bool, + reducer: Callable, + combiner: Callable | None = None, token: str | None = None, dtype: Any | None = None, split_every: int | bool | None = None, - chunked_kwargs: dict[str, Any] | None = None, - comb_kwargs: dict[str, Any] | None = None, - agg_kwargs: dict[str, Any] | None = None, -) -> Scalar: - from dask.layers import DataFrameTreeReduction +): + if is_positional: + raise NotImplementedError("positional reducers at axis=0 or axis=None") + + # Regularise the axis to (0, None) + if axis == 0 or axis == -1 * array.ndim: + axis = 0 + elif axis is not None: + raise ValueError(axis) + + if combiner is None: + combiner = reducer + + if is_positional: + assert combiner is reducer + + # For `axis=None`, we prepare each array to have the following structure: + # [[[ ... [x1 x2 x3 ... xN] ... ]]] (length-1 outer lists) + # This makes the subsequent reductions an `axis=-1` reduction + if axis is None: + prepared_array = map_partitions(_prepare_axis_none_chunk, array) + else: + prepared_array = array + + chunked_fn = _chunk_reducer_non_positional + tree_node_fn = _chunk_reducer_non_positional + concat_fn = _concat_reducer_non_positional + finalize_fn = _finalise_reducer_non_positional + + chunked_kwargs = { + "reducer": reducer, + "is_axis_none": axis is None, + "mask_identity": mask_identity, + } + tree_node_kwargs = { + "reducer": combiner, + "is_axis_none": axis is None, + "mask_identity": mask_identity, + } + + concat_kwargs = {"is_axis_none": axis is None} + finalize_kwargs = { + "reducer": combiner, + "mask_identity": mask_identity, + "keepdims": keepdims, + "is_axis_none": axis is None, + } + + from dask_awkward.layers import AwkwardTreeReductionLayer - chunked_kwargs = chunked_kwargs or {} token = token or tokenize( array, - chunked_fn, - comb_fn, - agg_fn, + reducer, label, dtype, split_every, chunked_kwargs, - comb_kwargs, - agg_kwargs, + tree_node_kwargs, + concat_kwargs, + finalize_kwargs, ) - name_comb = f"{label}-combine-{token}" - name_agg = f"{label}-agg-{token}" - - comb_kwargs = comb_kwargs or chunked_kwargs - agg_kwargs = agg_kwargs or comb_kwargs - - comb_fn = comb_fn or chunked_fn - agg_fn = agg_fn or comb_fn + name_tree_node = f"{label}-tree-node-{token}" + name_finalize = f"{label}-finalize-{token}" chunked_fn = partial(chunked_fn, **chunked_kwargs) - comb_fn = partial(comb_fn, **comb_kwargs) - agg_fn = partial(agg_fn, **agg_kwargs) - - chunked_result = map_partitions( - chunked_fn, - array, - meta=empty_typetracer(), - ) + tree_node_fn = partial(tree_node_fn, **tree_node_kwargs) + concat_fn = partial(concat_fn, **concat_kwargs) + finalize_fn = partial(finalize_fn, **finalize_kwargs) if split_every is None: split_every = 8 @@ -1649,21 +1825,31 @@ def total_reduction_to_scalar( else: pass - dftr = DataFrameTreeReduction( - name=name_agg, - name_input=chunked_result.name, - npartitions_input=chunked_result.npartitions, - concat_func=_from_iter, - tree_node_func=comb_fn, - finalize_func=agg_fn, + chunked = map_partitions(chunked_fn, prepared_array, meta=empty_typetracer()) + + trl = AwkwardTreeReductionLayer( + name=name_finalize, + name_input=chunked.name, + npartitions_input=prepared_array.npartitions, + concat_func=concat_fn, + tree_node_func=tree_node_fn, + finalize_func=finalize_fn, split_every=split_every, - tree_node_name=name_comb, + tree_node_name=name_tree_node, ) - graph = HighLevelGraph.from_collections( - name_agg, dftr, dependencies=(chunked_result,) + graph = HighLevelGraph.from_collections(name_finalize, trl, dependencies=(chunked,)) + + meta = reducer( + array._meta, + axis=axis, + keepdims=keepdims, + mask_identity=mask_identity, ) - return new_scalar_object(graph, name_agg, meta=meta) + if isinstance(meta, ak.highlevel.Array): + return new_array_object(graph, name_finalize, meta=meta, npartitions=1) + else: + return new_scalar_object(graph, name_finalize, meta=meta) def calculate_known_divisions(array: Array) -> tuple[int, ...]: @@ -1713,24 +1899,6 @@ def _type(array: Array) -> Type | None: return None -def ndim(array: Array) -> int: - """Number of dimensions before reaching a numeric type or a record. - - Parameters - ---------- - array : dask_awkward.Array - The collection - - Returns - ------- - int or None - Number of dimensions as an integer, or ``None`` if the - collection does not contain metadata. - - """ - return array.ndim - - def is_awkward_collection(obj: Any) -> bool: """Check if an object is a Dask Awkward collection. @@ -1814,24 +1982,40 @@ def meta_or_identity(obj: Any) -> Any: """ if is_awkward_collection(obj): return obj._meta + elif is_dask_collection(obj) and is_arraylike(obj): + return ak.Array( + ak.from_numpy(obj._meta).layout.to_typetracer(forget_length=True) + ) return obj +@overload def to_meta(objects: Sequence[Any]) -> tuple[Any, ...]: - """In a sequence convert Dask Awkward collections to their metas. + ... + + +@overload +def to_meta(objects: dict[str, Any]) -> dict[str, Any]: + ... + + +def to_meta(objects): + """Convert sequence or dict of Dask Awkward collections to their metas. Parameters ---------- - objects : Sequence[Any] - Sequence of objects. + objects : Sequence[Any] or dict[str, Any] + Sequence or dictionary of objects to retrieve metas from. Returns ------- - tuple[Any, ...] - The sequence of objects where collections have been replaced - with their metadata. + tuple[Any, ...] or dict[str, Any] + The sequence of objects (or dictionary) where collections have + been replaced with their metadata. """ + if isinstance(objects, dict): + return {k: meta_or_identity(v) for k, v in objects.items()} return tuple(map(meta_or_identity, objects)) @@ -1848,10 +2032,11 @@ def to_length_zero_arrays(objects: Sequence[Any]) -> tuple[Any, ...]: return tuple(map(length_zero_array_or_identity, objects)) -def map_meta(fn: Callable, *args: Any, **kwargs: Any) -> ak.Array | None: - metas = to_meta(args) +def map_meta(fn: ArgsKwargsPackedFunction, *deps: Any) -> ak.Array | None: + # NOTE: fn is assumed to be a *packed* function + # as defined up in map_partitions. be careful! try: - meta = fn(*metas, **kwargs) + meta = fn(*to_meta(deps)) return meta except Exception as err: # if compute-unknown-meta is False then we don't care about @@ -1874,8 +2059,8 @@ def map_meta(fn: Callable, *args: Any, **kwargs: Any) -> ak.Array | None: ) pass try: - lzas = to_length_zero_arrays(args) - meta = typetracer_from_form(fn(*lzas, **kwargs).layout.form) + arg_lzas = to_length_zero_arrays(deps) + meta = typetracer_from_form(fn(*arg_lzas).layout.form) return meta except Exception: # if compute-unknown-meta is True and we've gotten to this @@ -1883,9 +2068,7 @@ def map_meta(fn: Callable, *args: Any, **kwargs: Any) -> ak.Array | None: # to happen as a consequence of us not being able to determine # metadata. if dask.config.get("awkward.compute-unknown-meta"): - extras = ( - f"function call: {fn}\n" f"metadata: {metas}\n" f"kwargs: {kwargs}\n" - ) + extras = f"function call: {fn}\n" f"metadata: {deps}\n" warnings.warn( "metadata could not be determined; " "a compute on the first partition will occur.\n" @@ -1921,50 +2104,6 @@ def typetracer_array(a: ak.Array | Array) -> ak.Array: raise TypeError(msg) -def compatible_partitions(*args: Array) -> bool: - """Check if all arguments are compatibly partitioned. - - In operations where the blocks of multiple collections are used - simultaneously, we need the collections to be equally partitioned. - If the first argument has known divisions, other collections with - known divisions will be tested against the first arguments - divisions. - - Parameters - ---------- - *args : Array - Array collections of interest. - - Returns - ------- - bool - ``True`` if the collections appear to be equally partitioned. - - """ - a = args[0] - - for arg in args[1:]: - if a.npartitions != arg.npartitions: - return False - - if a.known_divisions: - for arg in args[1:]: - if arg.known_divisions: - if a.divisions != arg.divisions: - return False - - return True - - -def compatible_divisions(*args: Array) -> bool: - if not all(a.known_divisions for a in args): - return False - for arg in args[1:]: - if arg.divisions != args[0].divisions: - return False - return True - - def empty_typetracer() -> ak.Array: """Instantiate a typetracer array with unknown length. @@ -2051,3 +2190,195 @@ def typetracer_from_form(form: Form) -> ak.Array: """ layout = form.length_zero_array(highlevel=False) return ak.Array(layout.to_typetracer(forget_length=True)) + + +def make_unknown_length(array: ak.Array) -> ak.Array: + """Make any highlevel Array a highlevel typetracer Array with unknown length. + + Parameters + ---------- + array : ak.Array + Array of interest + + Returns + ------- + ak.Array + Highlevel typetracer Array with unknown length. + + """ + return ak.Array(ak.to_layout(array).to_typetracer(forget_length=True)) + + +class PartitionCompatibility(IntEnum): + """Sum type for describing partition compatibility. + + Use the :func:`partition_compatibility` function as an entry point + to instances of this class. + + Attributes + ---------- + NO + The compatibility is absolutely false; either an unequal + number of partitions or known divisions do not match + MAYBE + The compatibility is possible; the total number of partitions + are equal but some divisions are unknown so therefore it's + possible that partitions are not compatible, but this cannot + be determined without some compute. + YES + The compatibility is absolutely true; equal number of + partitions and known divisions match. + + See Also + -------- + dask_awkward.partition_compatibility + + """ + + NO = 0 + MAYBE = 1 + YES = 2 + + @staticmethod + def _check(*args: Array) -> PartitionCompatibility: + # first check to see if all arguments have the same number of + # partitions; this is _always_ defined. + for arg in args[1:]: + if args[0].npartitions != arg.npartitions: + return PartitionCompatibility.NO + + # now we check if divisions are compatible. Sometimes divisions + # are unknown and we just have a tuple of Nones; but if divisions + # are known we want to check if they are compatible. + refarr: Array | None = None + for arg in args: + if arg.known_divisions: + refarr = arg + break + # if we never hit the break just return True because we have no + # known division Arrays. + else: + return PartitionCompatibility.MAYBE + + # at this point we have a reference array to compare divisions + ngood = 0 + for arg in args: + if arg.known_divisions: + if arg.divisions != refarr.divisions: + return PartitionCompatibility.NO + else: + ngood += 1 + + # the ngood counter tells us if all divisions were present and are equal + if ngood == len(args): + return PartitionCompatibility.YES + + # if ngood is less than len(args) then we fall back on maybe compatible + return PartitionCompatibility.MAYBE + + +def partition_compatibility(*args: Array) -> PartitionCompatibility: + """Check if multiple collections have compatible partitions. + + Parameters + ---------- + *args : Array + Any number of array collections to check. + + Returns + ------- + PartitionCompatibility + Result of the check. + + Examples + -------- + + Starting with an absolutely compatible comparison: + + >>> import dask_awkward as dak + >>> import awkward as ak + >>> concrete = ak.Array([[1, 2, 3], [4], [5, 6], [0, 0, 0, 0]]) + >>> lazy = dak.from_awkward(concrete npartitions=2) + >>> selection = dak.sum(lazy, axis=1) == 0 + >>> dak.partition_compatibility(lazy, selection) + + + The selection doesn't change the length of the arrays at each + partition, so the divisions are known to be conserved for those + operations (the sum on ``axis=1`` along with the equality + comparison). + + In general we have no way of knowing what the resulting divisions + will be after a boolean selection, but the total number of + partitions will be conserved, so we have to report ``MAYBE``: + + >>> selected_lazy = lazy[selection] + >>> dak.partition_compatibility(lazy, lazy_selection) + + + Due the simple nature of this example we know that after the + selection the partitions will not be compatible (because it's + clear only 1 element of the original array will survive the + selection, so the divisions will change after that compute). Now + we can eagerly compute what the divisions will be on the + ``lazy_selection`` collection and get a ``NO`` result: + + >>> lazy_selection.eager_compute_divisions() + >>> dak.partition_compatibility(lazy, lazy_selection) + + + Remember that :func:`Array.eager_compute_divisions` is going to + trigger a compute to determine the divisions (to know divisions we + need to know the length of each partition) + + """ + return PartitionCompatibility._check(*args) + + +HowStrictT = Union[Literal[1], Literal[2], PartitionCompatibility] + + +def compatible_partitions( + *args: Array, + how_strict: HowStrictT = PartitionCompatibility.MAYBE, +) -> bool: + """Check if all arguments are compatibly partitioned. + + In operations where the blocks of multiple collections are used + simultaneously, we need the collections to be equally partitioned. + If the first argument has known divisions, other collections with + known divisions will be tested against the first arguments + divisions. + + Parameters + ---------- + *args : Array + Array collections of interest. + how_strict : PartitionCompatibility or Literal[1] or Literal[2] + Strictness level for the compatibility. If + ``PartitionCompatbility.MAYBE`` or the integer 1, the check + will return ``True`` if the arrays are maybe compatible (that + is, some unknown divisions exist but the total number of + partitions are compatible). If ``PartitionCompatibility.YES`` + or the integer 2, the check will return ``True`` if and only + if the arrays are absolutely compatible (that is, all + divisions are known and they are equal). + + Returns + ------- + bool + ``True`` if the collections have compatible partitions at the + level of requested strictness. + + See Also + -------- + dask_awkward.PartitionCompatibility + dask_awkward.partition_compatibility + + """ + partcomp = partition_compatibility(*args) + if partcomp == PartitionCompatibility.NO: + return False + elif partcomp == PartitionCompatibility.MAYBE: + return how_strict == 1 + return True diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 1f852a49..c47bec44 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -12,6 +12,7 @@ from dask.utils import funcname from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer +from dask_awkward.layers.layers import AwkwardMaterializedLayer from dask_awkward.lib.core import ( empty_typetracer, map_partitions, @@ -184,7 +185,10 @@ def from_delayed( parts = [source] if isinstance(source, Delayed) else source name = f"{prefix}-{tokenize(parts)}" - dsk = {(name, i): part.key for i, part in enumerate(parts)} + dsk = AwkwardMaterializedLayer( + {(name, i): part.key for i, part in enumerate(parts)}, + previous_layer_names=[parts[0].name], + ) if divisions is None: divs: tuple[int | None, ...] = (None,) * (len(parts) + 1) else: diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index c0e633b5..0bb5147d 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -215,7 +215,7 @@ def from_parquet( fs, tok, paths = get_fs_token_paths( path, mode="rb", storage_options=storage_options ) - label = "read-parquet" + label = "from-parquet" token = tokenize( tok, paths, ignore_metadata, columns, filters, scan_files, split_row_groups ) diff --git a/src/dask_awkward/lib/operations.py b/src/dask_awkward/lib/operations.py index a46da9e9..9abbef3c 100644 --- a/src/dask_awkward/lib/operations.py +++ b/src/dask_awkward/lib/operations.py @@ -4,11 +4,13 @@ from dask.base import tokenize from dask.highlevelgraph import HighLevelGraph +from dask_awkward.layers import AwkwardMaterializedLayer from dask_awkward.lib.core import ( Array, - compatible_partitions, + PartitionCompatibility, map_partitions, new_array_object, + partition_compatibility, ) from dask_awkward.utils import DaskAwkwardNotImplemented, IncompatiblePartitions @@ -21,6 +23,10 @@ def __call__(self, *args): return ak.concatenate(list(args), **self.kwargs) +def _concatenate_axis0_multiarg(*args): + return ak.concatenate(list(args), axis=0) + + def concatenate( arrays: list[Array], axis: int = 0, @@ -45,11 +51,17 @@ def concatenate( meta = ak.concatenate(metas) + prev_names = [iarr.name for iarr in arrays] + g = AwkwardMaterializedLayer( + g, + previous_layer_names=prev_names, + fn=_concatenate_axis0_multiarg, + ) hlg = HighLevelGraph.from_collections(name, g, dependencies=arrays) return new_array_object(hlg, name, meta=meta, npartitions=npartitions) if axis > 0: - if not compatible_partitions(*arrays): + if partition_compatibility(*arrays) == PartitionCompatibility.NO: raise IncompatiblePartitions("concatenate", *arrays) fn = _ConcatenateFnAxisGT0(axis=axis) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index e48fdd00..f2dbe201 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -349,6 +349,8 @@ def _get_column_reports(dsk: HighLevelGraph) -> dict[str, Any]: layers = dsk.layers.copy() # type: ignore deps = dsk.dependencies.copy() # type: ignore + dependents = dsk.dependents + reports = {} # make labelled report @@ -367,12 +369,30 @@ def _get_column_reports(dsk: HighLevelGraph) -> dict[str, Any]: layers[name] = _touch_and_call(layers[name]) hlg = HighLevelGraph(layers, deps) - outlayer = hlg.layers[hlg._toposort_layers()[-1]] + # this loop builds up what are the possible final leaf nodes by + # inspecting the dependents dictionary. If something does not have + # a dependent, it must be the end of a graph. These are the things + # we need to compute for; we only use a single partition (the + # first). for a single collection `.compute()` this list will just + # be length 1; but if we are using `dask.compute` to pass in + # multiple collections to be computed simultaneously, this list + # will increase in length. + leaf_layers_keys = [ + (k, 0) for k, v in dependents.items() if isinstance(v, set) and len(v) == 0 + ] + + # now we try to compute for each possible output layer key (leaf + # node on partition 0); this will cause the typetacer reports to + # get correct fields/columns touched. If the result is a record or + # an array we of course want to touch all of the data/fields. try: for layer in hlg.layers.values(): layer.__dict__.pop("_cached_dict", None) - out = get_sync(hlg, list(outlayer.keys())[0]) + results = get_sync(hlg, leaf_layers_keys) + for out in results: + if isinstance(out, (ak.Array, ak.Record)): + out.layout._touch_data(recursive=True) except Exception as err: on_fail = dask.config.get("awkward.optimization.on-fail") # this is the default, throw a warning but skip the optimization. @@ -394,8 +414,6 @@ def _get_column_reports(dsk: HighLevelGraph) -> dict[str, Any]: "Valid options are 'warn', 'pass', or 'raise'." ) - if isinstance(out, (ak.Array, ak.Record)): - out.layout._touch_data(recursive=True) return reports diff --git a/src/dask_awkward/lib/reducers.py b/src/dask_awkward/lib/reducers.py index f869a49a..716327cc 100644 --- a/src/dask_awkward/lib/reducers.py +++ b/src/dask_awkward/lib/reducers.py @@ -3,10 +3,8 @@ from typing import TYPE_CHECKING, Any import awkward as ak -import numpy as np -from awkward._nplikes.typetracer import TypeTracerArray -from dask_awkward.lib.core import map_partitions, total_reduction_to_scalar +from dask_awkward.lib.core import map_partitions, non_trivial_reduction from dask_awkward.utils import DaskAwkwardNotImplemented, borrow_docstring if TYPE_CHECKING: @@ -42,7 +40,17 @@ def all( keepdims: bool = False, mask_identity: bool = False, ) -> Any: - if axis and axis != 0: + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="all", + array=array, + reducer=ak.all, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, + ) + else: return map_partitions( ak.all, array, @@ -51,7 +59,6 @@ def all( keepdims=keepdims, mask_identity=mask_identity, ) - raise DaskAwkwardNotImplemented(f"axis={axis} is a TODO") @borrow_docstring(ak.any) @@ -61,7 +68,17 @@ def any( keepdims: bool = False, mask_identity: bool = False, ) -> Any: - if axis and axis != 0: + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="any", + array=array, + reducer=ak.any, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, + ) + else: return map_partitions( ak.any, array, @@ -70,7 +87,6 @@ def any( keepdims=keepdims, mask_identity=mask_identity, ) - raise DaskAwkwardNotImplemented(f"axis={axis} is a TODO") @borrow_docstring(ak.argmax) @@ -80,7 +96,17 @@ def argmax( keepdims: bool = False, mask_identity: bool = True, ) -> Any: - if axis and axis >= 1: + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="argmax", + array=array, + reducer=ak.argmax, + is_positional=True, + keepdims=keepdims, + mask_identity=mask_identity, + ) + else: return map_partitions( ak.argmax, array, @@ -89,7 +115,6 @@ def argmax( keepdims=keepdims, mask_identity=mask_identity, ) - raise DaskAwkwardNotImplemented(f"axis={axis} is a TODO") @borrow_docstring(ak.argmin) @@ -99,7 +124,17 @@ def argmin( keepdims: bool = False, mask_identity: bool = True, ) -> Any: - if axis and axis >= 1: + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="argmin", + array=array, + reducer=ak.argmin, + is_positional=True, + keepdims=keepdims, + mask_identity=mask_identity, + ) + else: return map_partitions( ak.argmin, array, @@ -108,7 +143,6 @@ def argmin( keepdims=keepdims, mask_identity=mask_identity, ) - raise DaskAwkwardNotImplemented(f"axis={axis} is a TODO") @borrow_docstring(ak.corr) @@ -130,11 +164,18 @@ def count( keepdims: bool = False, mask_identity: bool = False, ) -> Any: - if axis == 0 or axis == -1 * array.ndim: - raise DaskAwkwardNotImplemented( - f"axis={axis} is not supported for this array yet." + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="count", + array=array, + reducer=ak.count, + combiner=ak.sum, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, ) - if axis and axis != 0: + else: return map_partitions( ak.count, array, @@ -143,20 +184,6 @@ def count( keepdims=keepdims, mask_identity=mask_identity, ) - elif axis is None: - return total_reduction_to_scalar( - label="count", - array=array, - meta=TypeTracerArray._new(dtype=np.int64, shape=()), - chunked_fn=ak.count, - chunked_kwargs={"axis": 1}, - comb_fn=ak.sum, - comb_kwargs={"axis": None}, - agg_fn=ak.sum, - agg_kwargs={"axis": None}, - ) - else: - raise ValueError("axis must be None or an integer.") @borrow_docstring(ak.count_nonzero) @@ -166,33 +193,26 @@ def count_nonzero( keepdims: bool = False, mask_identity: bool = False, ) -> Any: - if axis == 0 or axis == -1 * array.ndim: - raise DaskAwkwardNotImplemented( - f"axis={axis} is not supported for this array yet." + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="count_nonzero", + array=array, + reducer=ak.count_nonzero, + combiner=ak.sum, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, ) - if axis and axis != 0: + else: return map_partitions( ak.count_nonzero, array, output_divisions=1, - axis=1, + axis=axis, keepdims=keepdims, mask_identity=mask_identity, ) - elif axis is None: - return total_reduction_to_scalar( - label="count_nonzero", - array=array, - meta=TypeTracerArray._new(dtype=np.int64, shape=()), - chunked_fn=ak.count_nonzero, - chunked_kwargs={"axis": 1}, - comb_fn=ak.sum, - comb_kwargs={"axis": None}, - agg_fn=ak.sum, - agg_kwargs={"axis": None}, - ) - else: - raise ValueError("axis must be None or an integer.") @borrow_docstring(ak.covar) @@ -227,33 +247,25 @@ def max( initial: float | None = None, mask_identity: bool = True, ) -> Any: - if axis == 0 or axis == -1 * array.ndim: - raise DaskAwkwardNotImplemented( - f"axis={axis} is not supported for this array yet." + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="max", + array=array, + reducer=ak.max, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, ) - if axis and axis != 0: + else: return map_partitions( ak.max, array, output_divisions=1, axis=axis, keepdims=keepdims, - initial=initial, mask_identity=mask_identity, ) - if axis is None: - return total_reduction_to_scalar( - label="max", - array=array, - chunked_fn=ak.max, - chunked_kwargs={ - "axis": None, - "mask_identity": mask_identity, - }, - meta=ak.max(array._meta, axis=None), - ) - else: - raise DaskAwkwardNotImplemented(f"axis={axis} is a TODO") @borrow_docstring(ak.mean) @@ -288,33 +300,25 @@ def min( initial: float | None = None, mask_identity: bool = True, ) -> Any: - if axis == 0 or axis == -1 * array.ndim: - raise DaskAwkwardNotImplemented( - f"axis={axis} is not supported for this array yet." + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="min", + array=array, + reducer=ak.min, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, ) - if axis and axis != 0: + else: return map_partitions( ak.min, array, output_divisions=1, axis=axis, keepdims=keepdims, - initial=initial, mask_identity=mask_identity, ) - if axis is None: - return total_reduction_to_scalar( - label="min", - array=array, - chunked_fn=ak.min, - chunked_kwargs={ - "axis": None, - "mask_identity": mask_identity, - }, - meta=ak.min(array._meta, axis=None), - ) - else: - raise DaskAwkwardNotImplemented(f"axis={axis} is a TODO") @borrow_docstring(ak.moment) @@ -331,7 +335,25 @@ def moment( @borrow_docstring(ak.prod) def prod(array, axis=None, keepdims=False, mask_identity=False): - raise DaskAwkwardNotImplemented("TODO") + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="prod", + array=array, + reducer=ak.prod, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, + ) + else: + return map_partitions( + ak.prod, + array, + output_divisions=1, + axis=axis, + keepdims=keepdims, + mask_identity=mask_identity, + ) @borrow_docstring(ak.ptp) @@ -388,11 +410,17 @@ def sum( keepdims: bool = False, mask_identity: bool = False, ) -> Any: - if axis == 0 or axis == -1 * array.ndim: - raise DaskAwkwardNotImplemented( - f"axis={axis} is not supported for this array yet." + if axis is None or axis == 0 or axis == -1 * array.ndim: + return non_trivial_reduction( + axis=axis, + label="sum", + array=array, + reducer=ak.sum, + is_positional=False, + keepdims=keepdims, + mask_identity=mask_identity, ) - if axis and axis != 0: + else: return map_partitions( ak.sum, array, @@ -401,23 +429,6 @@ def sum( keepdims=keepdims, mask_identity=mask_identity, ) - elif axis is None: - return total_reduction_to_scalar( - label="sum", - array=array, - chunked_fn=ak.sum, - chunked_kwargs={ - "axis": None, - "mask_identity": mask_identity, - }, - meta=ak.sum(array._meta, axis=None), - ) - elif axis == 0: - raise DaskAwkwardNotImplemented( - f"axis={axis} is not supported for this array yet." - ) - else: - raise ValueError("axis must be none or an integer") class _VarFn: diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index af7318ec..f0df5a48 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -7,16 +7,13 @@ from typing import TYPE_CHECKING, Any import awkward as ak -import numpy as np -from awkward._nplikes.typetracer import TypeTracerArray from dask.base import is_dask_collection from dask_awkward.lib.core import ( Array, - compatible_partitions, + PartitionCompatibility, map_partitions, - new_known_scalar, - total_reduction_to_scalar, + partition_compatibility, ) from dask_awkward.utils import ( DaskAwkwardNotImplemented, @@ -37,6 +34,7 @@ "cartesian", "combinations", "copy", + "drop_none", "fill_none", "firsts", "flatten", @@ -208,7 +206,7 @@ def broadcast_arrays(*arrays, highlevel=True, **kwargs): if not highlevel: raise ValueError("Only highlevel=True is supported") - if not compatible_partitions(*arrays): + if partition_compatibility(*arrays) == PartitionCompatibility.NO: raise IncompatiblePartitions("broadcast_arrays", *arrays) array_metas = (array._meta for array in arrays) @@ -342,6 +340,28 @@ def fill_none( return map_partitions(fn, array, label="fill-none", output_divisions=1) +class _DropNoneFn: + def __init__(self, **kwargs): + self.kwargs = kwargs + + def __call__(self, arr): + return ak.drop_none(arr, **self.kwargs) + + +@borrow_docstring(ak.drop_none) +def drop_none( + array: Array, + axis: int | None = None, + highlevel: bool = True, + behavior: dict | None = None, +) -> Array: + if not highlevel: + raise ValueError("Only highlevel=True is supported") + + fn = _DropNoneFn(axis=axis, highlevel=highlevel, behavior=behavior) + return map_partitions(fn, array, label="drop-none", output_divisions=1) + + class _FirstsFn: def __init__(self, **kwargs): self.kwargs = kwargs @@ -439,6 +459,7 @@ def full_like(array, fill_value, highlevel=True, behavior=None, dtype=None): highlevel=highlevel, behavior=behavior, dtype=dtype, + output_divisions=1, ) @@ -449,7 +470,7 @@ def isclose( if not highlevel: raise ValueError("Only highlevel=True is supported") - if not compatible_partitions(a, b): + if partition_compatibility(a, b) == PartitionCompatibility.NO: raise IncompatiblePartitions("isclose", a, b) return map_partitions( @@ -462,6 +483,7 @@ def isclose( highlevel=highlevel, behavior=behavior, label="is-close", + output_divisions=1, ) @@ -497,7 +519,7 @@ def local_index(array, axis=-1, highlevel=True, behavior=None): @borrow_docstring(ak.mask) def mask(array, mask, valid_when=True, highlevel=True, behavior=None): - if not compatible_partitions(array, mask): + if partition_compatibility(array, mask) == PartitionCompatibility.NO: raise IncompatiblePartitions("mask", array, mask) return map_partitions( ak.mask, @@ -549,20 +571,10 @@ def num( axis=axis, highlevel=highlevel, behavior=behavior, + output_divisions=1, ) if axis == 0: - if array.known_divisions: - return new_known_scalar(array.divisions[-1], dtype=int) - else: - return total_reduction_to_scalar( - label="num", - array=array, - meta=TypeTracerArray._new(dtype=np.int64, shape=()), - chunked_fn=ak.num, - chunked_kwargs={"axis": 0}, - comb_fn=ak.sum, - comb_kwargs={"axis": None}, - ) + return len(array) raise DaskAwkwardNotImplemented("TODO") @@ -578,10 +590,10 @@ def ones_like( return map_partitions( ak.ones_like, array, - output_divisions=1, label="ones-like", behavior=behavior, dtype=dtype, + output_divisions=1, ) @@ -834,7 +846,7 @@ def where( "The condition argugment to where must be a dask_awkward.Array" ) - if not compatible_partitions(*dask_args): + if partition_compatibility(*dask_args) == PartitionCompatibility.NO: raise IncompatiblePartitions("where", *dask_args) return map_partitions( @@ -883,8 +895,8 @@ def with_field(base, what, where=None, highlevel=True, behavior=None): maybe_dask_args = [base, what] dask_args = tuple(arg for arg in maybe_dask_args if is_dask_collection(arg)) - if not compatible_partitions(*dask_args): - raise IncompatiblePartitions("with_field", base, what) + if partition_compatibility(*dask_args) == PartitionCompatibility.NO: + raise IncompatiblePartitions("with_field", *dask_args) return map_partitions( _WithFieldFn(where=where, highlevel=highlevel, behavior=behavior), base, @@ -986,10 +998,10 @@ def zeros_like( return map_partitions( ak.zeros_like, array, - output_divisions=1, label="zeros-like", behavior=behavior, dtype=dtype, + output_divisions=1, ) @@ -1082,3 +1094,51 @@ def zip( raise DaskAwkwardNotImplemented( "only sized iterables are supported by dak.zip (dict, list, or tuple)" ) + + +def _repartition_func(*stuff): + import builtins + + import awkward as ak + + *data, slices = stuff + data = [ + d[sl[0] : sl[1]] if sl is not None else d + for d, sl in builtins.zip(data, slices) + ] + return ak.concatenate(data) + + +def repartition_layer(arr: Array, key: str, divisions: list[int, ...]): + layer = {} + + indivs = arr.divisions + i = 0 + for index, (start, end) in enumerate(builtins.zip(divisions[:-1], divisions[1:])): + pp = [] + ss = [] + while indivs[i] <= start: + i += 1 + j = i + i -= 1 + while indivs[j] < end: + j += 1 + for k in range(i, j): + if start < indivs[k]: + st = None + elif start < indivs[k + 1]: + st = start - indivs[k] + else: + continue + if end < indivs[k]: + continue + elif end < indivs[k + 1]: + en = end - indivs[k] + else: + en = None + pp.append(k) + ss.append((st, en)) + layer[(key, index)] = ( + (_repartition_func,) + tuple((arr.name, part) for part in pp) + (ss,) + ) + return layer diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index fd87d350..e5a5961f 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -4,7 +4,9 @@ from typing import Any import awkward as ak +import numpy as np from dask.base import is_dask_collection +from packaging.version import Version from dask_awkward.lib.core import Array, Record, typetracer_array from dask_awkward.lib.io.io import from_lists @@ -15,10 +17,15 @@ DEFAULT_SCHEDULER: Any = "sync" +NP_LTE_1_25_0 = Version(np.__version__) >= Version("1.25.0") +AK_GTE_2_2_3 = Version(ak.__version__) <= Version("2.2.3") +BAD_NP_AK_MIXIN_VERSIONING = NP_LTE_1_25_0 and AK_GTE_2_2_3 + + def assert_eq( a: Any, b: Any, - check_forms: bool = True, + check_forms: bool = False, check_divisions: bool = True, scheduler: Any | None = None, **kwargs: Any, @@ -43,7 +50,7 @@ def assert_eq_arrays( a: Array | ak.Array, b: Array | ak.Array, isclose_equal_nan: bool = False, - check_forms: bool = True, + check_forms: bool = False, check_divisions: bool = True, scheduler: Any | None = None, ) -> None: @@ -52,8 +59,6 @@ def assert_eq_arrays( b_is_coll = is_dask_collection(b) a_comp = a.compute(scheduler=scheduler) if a_is_coll else a b_comp = b.compute(scheduler=scheduler) if b_is_coll else b - a_comp_form = a_comp.layout.form - b_comp_form = b_comp.layout.form a_tt = typetracer_array(a) b_tt = typetracer_array(b) @@ -61,8 +66,14 @@ def assert_eq_arrays( assert b_tt is not None if check_forms: - assert a_comp_form == a.layout.form - assert a_comp_form == b.layout.form + a_form = ak.concatenate([a.layout, a.layout[0:0]], highlevel=False).form + b_form = ak.concatenate([b.layout, b.layout[0:0]], highlevel=False).form + # a_form = a.layout.form + # b_form = b.layout.form + a_comp_form = a_comp.layout.form + b_comp_form = b_comp.layout.form + assert a_comp_form == a_form + assert a_comp_form == b_form assert b_comp_form == a_comp_form if check_divisions: diff --git a/src/dask_awkward/lib/unproject_layout.py b/src/dask_awkward/lib/unproject_layout.py index 34dbacbc..eb6d4052 100644 --- a/src/dask_awkward/lib/unproject_layout.py +++ b/src/dask_awkward/lib/unproject_layout.py @@ -374,5 +374,33 @@ def _unproject_layout(form, layout, length, backend): raise AssertionError(f"unexpected combination: {type(form)} and {type(layout)}") -def unproject_layout(form: Form, layout: Content) -> Content: +def unproject_layout(form: Form | None, layout: Content) -> Content: + """Rehydrate a layout to include all parts of an original form. + + When we perform the necessary columns optimization we drop fields + that are not necessary for a computed result. Sometimes we have + task graphs that expect to see fields in name only (but no their + data). To protect against FieldNotFound exception we "unproject" + or "rehydrate" the layout with the original form. This reapplys + all original fields, but the ones that were orignally projected + away are data-less. + + Parameters + ---------- + form : awkward.forms.form.Form, optional + The complete Form to apply to a projected layout. If ``None``, + the layout will be returned without unprojection (this case + assumes column projection did not occur). + layout : awkward.contents.content.Content + The projected layout. + + Returns + ------- + awkward.contents.content.Content + Unprojected layout (all fields from the original form that did + not appear in the projected layout will be PlaceholderArrays). + + """ + if form is None: + return layout return _unproject_layout(form, layout, layout.length, layout.backend) diff --git a/src/dask_awkward/utils.py b/src/dask_awkward/utils.py index c0642fa9..a90b12cb 100644 --- a/src/dask_awkward/utils.py +++ b/src/dask_awkward/utils.py @@ -1,7 +1,11 @@ from __future__ import annotations from collections.abc import Callable, Mapping -from typing import Any, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar + +if TYPE_CHECKING: + from dask_awkward.lib.core import Array + T = TypeVar("T") @@ -19,12 +23,12 @@ def __init__(self, msg: str | None = None) -> None: class IncompatiblePartitions(ValueError): - def __init__(self, name, *args): + def __init__(self, name: str, *args: Array) -> None: msg = self.divisions_msg(name, *args) super().__init__(msg) @staticmethod - def divisions_msg(name: str, *args: Any) -> str: + def divisions_msg(name: str, *args: Array) -> str: msg = f"The inputs to {name} are incompatibly partitioned\n" for i, arg in enumerate(args): msg += f"- arg{i} divisions: {arg.divisions}\n" @@ -37,7 +41,7 @@ class LazyInputsDict(Mapping): Parameters ---------- inputs : list[Any] - The list of dicionary values. + The list of dictionary values. """ diff --git a/tests/conftest.py b/tests/conftest.py index a5b4fe39..881f287d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,8 +5,6 @@ except ImportError: import json # type: ignore[no-redef] -from pathlib import Path - import awkward as ak import fsspec import pytest @@ -16,8 +14,8 @@ @pytest.fixture(scope="session") -def single_record_file(tmpdir_factory: pytest.TempdirFactory) -> str: - fname = Path(tmpdir_factory.mktemp("data")) / "single_record.json" +def single_record_file(tmp_path_factory: pytest.TempPathFactory) -> str: + fname = tmp_path_factory.mktemp("data") / "single_record.json" record = [{"record": [1, 2, 3]}] with fsspec.open(fname, "w") as f: print(json.dumps(record), file=f) @@ -25,9 +23,9 @@ def single_record_file(tmpdir_factory: pytest.TempdirFactory) -> str: @pytest.fixture(scope="session") -def ndjson_points1(tmpdir_factory: pytest.TempdirFactory) -> str: +def ndjson_points1(tmp_path_factory: pytest.TempPathFactory) -> str: array = daktu.awkward_xy_points() - fname = Path(tmpdir_factory.mktemp("data")) / "points_ndjson1.json" + fname = tmp_path_factory.mktemp("data") / "points_ndjson1.json" with fsspec.open(fname, "w") as f: for entry in array.tolist(): print(json.dumps({"points": entry}), file=f) @@ -35,9 +33,9 @@ def ndjson_points1(tmpdir_factory: pytest.TempdirFactory) -> str: @pytest.fixture(scope="session") -def ndjson_points1_str(tmpdir_factory: pytest.TempdirFactory) -> str: +def ndjson_points1_str(tmp_path_factory: pytest.TempPathFactory) -> str: array = daktu.awkward_xy_points_str() - fname = Path(tmpdir_factory.mktemp("data")) / "points_ndjson1.json" + fname = tmp_path_factory.mktemp("data") / "points_ndjson1.json" with fsspec.open(fname, "w") as f: for entry in array.tolist(): print(json.dumps({"points": entry}), file=f) @@ -45,9 +43,9 @@ def ndjson_points1_str(tmpdir_factory: pytest.TempdirFactory) -> str: @pytest.fixture(scope="session") -def ndjson_points2(tmpdir_factory: pytest.TempdirFactory) -> str: +def ndjson_points2(tmp_path_factory: pytest.TempPathFactory) -> str: array = daktu.awkward_xy_points() - fname = Path(tmpdir_factory.mktemp("data")) / "points_ndjson2.json" + fname = tmp_path_factory.mktemp("data") / "points_ndjson2.json" with fsspec.open(fname, "w") as f: for entry in array.tolist(): print(json.dumps({"points": entry}), file=f) @@ -65,10 +63,22 @@ def ndjson_points_file_str(ndjson_points1_str: str) -> str: @pytest.fixture(scope="session") -def daa(ndjson_points1: str) -> dak.Array: +def daa_old(ndjson_points1: str) -> dak.Array: return dak.from_json([ndjson_points1] * 3) +@pytest.fixture(scope="session") +def pq_points_dir(daa_old, tmp_path_factory) -> str: + pqdir = tmp_path_factory.mktemp("pqfiles") + dak.to_parquet(daa_old, str(pqdir), compute=True) + return str(pqdir) + + +@pytest.fixture(scope="session") +def daa(pq_points_dir: str) -> dak.Array: + return dak.from_parquet(pq_points_dir) + + @pytest.fixture(scope="session") def daa_str(ndjson_points1_str: str) -> dak.Array: return dak.from_json([ndjson_points1_str] * 3) @@ -157,16 +167,16 @@ def L4() -> list[list[dict[str, float]] | None]: @pytest.fixture(scope="session") -def caa_parquet(caa: ak.Array, tmpdir_factory: pytest.TempdirFactory) -> str: - fname = tmpdir_factory.mktemp("parquet_data") / "caa.parquet" # type: ignore +def caa_parquet(caa: ak.Array, tmp_path_factory: pytest.TempPathFactory) -> str: + fname = tmp_path_factory.mktemp("parquet_data") / "caa.parquet" # type: ignore ak.to_parquet(caa, str(fname), extensionarray=False) return str(fname) @pytest.fixture(scope="session") -def unnamed_root_parquet_file(tmpdir_factory: pytest.TempdirFactory) -> str: +def unnamed_root_parquet_file(tmp_path_factory: pytest.TempPathFactory) -> str: from dask_awkward.lib.testutils import unnamed_root_ds - fname = Path(tmpdir_factory.mktemp("unnamed_parquet_data")) / "file.parquet" + fname = tmp_path_factory.mktemp("unnamed_parquet_data") / "file.parquet" ak.to_parquet(unnamed_root_ds(), str(fname), extensionarray=False, row_group_size=3) return str(fname) diff --git a/tests/test_behavior.py b/tests/test_behavior.py index ea090335..67d4f78a 100644 --- a/tests/test_behavior.py +++ b/tests/test_behavior.py @@ -5,7 +5,7 @@ import pytest import dask_awkward as dak -from dask_awkward.lib.testutils import assert_eq +from dask_awkward.lib.testutils import BAD_NP_AK_MIXIN_VERSIONING, assert_eq behaviors: dict = {} @@ -31,6 +31,10 @@ def non_dask_method(self, _dask_array_=None): return _dask_array_ +@pytest.mark.xfail( + BAD_NP_AK_MIXIN_VERSIONING, + reason="NumPy 1.25 mixin __slots__ change", +) def test_distance_behavior( daa_p1: dak.Array, daa_p2: dak.Array, @@ -45,6 +49,10 @@ def test_distance_behavior( assert_eq(np.abs(daa1), np.abs(caa1)) +@pytest.mark.xfail( + BAD_NP_AK_MIXIN_VERSIONING, + reason="NumPy 1.25 mixin __slots__ change", +) def test_property_behavior(daa_p1: dak.Array, caa_p1: ak.Array) -> None: daa = dak.with_name(daa_p1.points, name="Point", behavior=behaviors) caa = ak.Array(caa_p1.points, with_name="Point", behavior=behaviors) @@ -57,6 +65,10 @@ def test_property_behavior(daa_p1: dak.Array, caa_p1: ak.Array) -> None: assert repr(daa.non_dask_method()) == repr(daa) +@pytest.mark.xfail( + BAD_NP_AK_MIXIN_VERSIONING, + reason="NumPy 1.25 mixin __slots__ change", +) def test_nonexistent_behavior(daa_p1: dak.Array, daa_p2: dak.Array) -> None: daa1 = dak.with_name(daa_p1["points"], "Point", behavior=behaviors) daa2 = daa_p2 diff --git a/tests/test_core.py b/tests/test_core.py index 3ca34efa..5e4f295e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, Any import awkward as ak +import dask.array as da import fsspec import numpy as np import pytest @@ -20,7 +21,6 @@ Record, Scalar, calculate_known_divisions, - compatible_partitions, compute_typetracer, is_typetracer, meta_or_identity, @@ -33,6 +33,7 @@ typetracer_array, ) from dask_awkward.lib.testutils import assert_eq +from dask_awkward.utils import IncompatiblePartitions if TYPE_CHECKING: from dask_awkward.lib.core import Array @@ -48,7 +49,7 @@ def test_clear_divisions(ndjson_points_file: str) -> None: def test_dunder_str(daa: Array) -> None: - assert str(daa) == "dask.awkward" + assert str(daa) == "dask.awkward" def test_calculate_known_divisions(ndjson_points_file: str) -> None: @@ -418,7 +419,7 @@ def test_scalar_dtype() -> None: def test_scalar_pickle(daa: Array) -> None: - import pickle + import cloudpickle as pickle s = 2 c1 = new_known_scalar(s) @@ -449,18 +450,103 @@ def test_scalar_to_delayed(daa: Array, optimize_graph: bool) -> None: def test_compatible_partitions(ndjson_points_file: str) -> None: daa1 = dak.from_json([ndjson_points_file] * 5) daa2 = dak.from_awkward(daa1.compute(), npartitions=4) - assert compatible_partitions(daa1, daa1) - assert compatible_partitions(daa1, daa1, daa1) - assert not compatible_partitions(daa1, daa2) + assert dak.compatible_partitions(daa1, daa1) + assert dak.compatible_partitions(daa1, daa1, daa1) + assert not dak.compatible_partitions(daa1, daa2) daa1.eager_compute_divisions() - assert compatible_partitions(daa1, daa1) + assert dak.compatible_partitions(daa1, daa1) x = ak.Array([[1, 2, 3], [1, 2, 3], [3, 4, 5]]) y = ak.Array([[1, 2, 3], [3, 4, 5]]) x = dak.from_awkward(x, npartitions=2) y = dak.from_awkward(y, npartitions=2) - assert not compatible_partitions(x, y) - assert not compatible_partitions(x, x, y) - assert compatible_partitions(y, y) + assert not dak.compatible_partitions(x, y) + assert not dak.compatible_partitions(x, x, y) + assert dak.compatible_partitions(y, y) + + +def test_compatible_partitions_after_slice() -> None: + a = [[1, 2, 3], [4, 5]] + b = [[5, 6, 7, 8], [], [9]] + lazy = dak.from_lists([a, b]) + ccrt = ak.Array(a + b) + + # sanity + assert_eq(lazy, ccrt) + + # sanity + assert dak.compatible_partitions(lazy, lazy + 2) + assert dak.compatible_partitions(lazy, dak.num(lazy, axis=1) > 2) + + assert not dak.compatible_partitions(lazy[:-2], lazy) + assert not dak.compatible_partitions(lazy[:-2], dak.num(lazy, axis=1) != 3) + + with pytest.raises(IncompatiblePartitions, match="incompatibly partitioned"): + (lazy[:-2] + lazy).compute() + + +def test_compatible_partitions_mixed() -> None: + a = ak.Array([[1, 2, 3], [0, 0, 0, 0], [5, 6, 7, 8, 9], [0, 0, 0, 0]]) + b = dak.from_awkward(a, npartitions=2) + assert b.known_divisions + c = b[dak.num(b, axis=1) == 4] + d = b[dak.num(b, axis=1) >= 3] + assert not c.known_divisions + # compatible partitions is going to get called in the __add__ ufunc + e = b + c + f = b + d + with pytest.raises(ValueError): + e.compute() + assert_eq(f, a + a) + + +def test_compatible_partitions_all_unknown() -> None: + a = ak.Array([[1, 2, 3], [0, 0, 0, 0], [5, 6, 7, 8, 9], [0, 0, 0, 0]]) + b = dak.from_awkward(a, npartitions=2) + c = b[dak.sum(b, axis=1) == 0] + d = b[dak.sum(b, axis=1) == 6] + # this will pass compatible partitions which gets called in the + # __add__ ufunc; both have unknown divisions but equal number of + # partitions. the unknown divisions are going to materialize to be + # incompatible so an exception will get raised at compute time. + e = c + d + with pytest.raises(ValueError): + e.compute() + + +def test_partition_compatiblity() -> None: + a = ak.Array([[1, 2, 3], [0, 0, 0, 0], [5, 6, 7, 8, 9], [0, 0, 0, 0]]) + b = dak.from_awkward(a, npartitions=2) + c = b[dak.sum(b, axis=1) == 0] + d = b[dak.sum(b, axis=1) == 6] + assert dak.partition_compatibility(c, d) == dak.PartitionCompatibility.MAYBE + assert dak.partition_compatibility(b, c, d) == dak.PartitionCompatibility.MAYBE + assert ( + dak.partition_compatibility(b, dak.num(b, axis=1)) + == dak.PartitionCompatibility.YES + ) + c.eager_compute_divisions() + assert dak.partition_compatibility(b, c) == dak.PartitionCompatibility.NO + + +def test_partition_compat_with_strictness() -> None: + a = ak.Array([[1, 2, 3], [0, 0, 0, 0], [5, 6, 7, 8, 9], [0, 0, 0, 0]]) + b = dak.from_awkward(a, npartitions=2) + c = b[dak.sum(b, axis=1) == 0] + d = b[dak.sum(b, axis=1) == 6] + + assert dak.compatible_partitions(c, d, how_strict=1) + assert dak.compatible_partitions( + c, + d, + how_strict=dak.PartitionCompatibility.MAYBE, + ) + + assert not dak.compatible_partitions(c, d, how_strict=2) + assert not dak.compatible_partitions( + c, + d, + how_strict=dak.PartitionCompatibility.YES, + ) @pytest.mark.parametrize("meta", [5, False, [1, 2, 3]]) @@ -522,7 +608,6 @@ def test_scalar_persist_and_rebuild(daa: Array) -> None: def test_output_divisions(daa: Array) -> None: assert dak.max(daa.points.y, axis=1).divisions == daa.divisions - assert dak.num(daa.points.y, axis=1).divisions == (None,) * (daa.npartitions + 1) assert daa["points"][["x", "y"]].divisions == daa.divisions assert daa["points"].divisions == daa.divisions @@ -601,3 +686,123 @@ def test_optimize_chain_multiple(daa): result = (daa.points.x**2 - daa.points.y) + 1 assert len(result.compute()) > 0 + + +def test_make_unknown_length(): + from dask_awkward.lib.core import make_unknown_length + + arr = ak.Array( + [ + {"a": [1, 2, 3], "b": 5}, + {"a": [], "b": -1}, + {"a": [9, 8, 7, 6], "b": 0}, + ] + ) + tt1 = ak.Array(arr.layout.to_typetracer()) + + # sanity checks + assert ak.backend(tt1) == "typetracer" + assert len(tt1) == 3 + + ul_arr = make_unknown_length(arr) + ul_tt1 = make_unknown_length(tt1) + + assert ul_arr.layout.form == ul_tt1.layout.form + + with pytest.raises(TypeError, match="cannot interpret unknown lengths"): + len(ul_arr) + + with pytest.raises(TypeError, match="cannot interpret unknown lengths"): + len(ul_tt1) + + +def my_power(arg_x, *, kwarg_y=None): + return arg_x**kwarg_y + + +def structured_function(*, inputs={}): + return inputs["x"] + inputs["y"] * inputs["z"] + + +def scaled_structured_function(scale, *, inputs={}): + return scale * (inputs["x"] + inputs["y"] * inputs["z"]) + + +def mix_arg_and_kwarg_with_scalar_broadcasting(aaa, bbb, *, ccc=None, ddd=None): + return (aaa + bbb) ** ccc - ddd + + +def test_map_partitions_args_and_kwargs_have_collection(): + xc = ak.Array([[1, 2, 3], [4, 5], [6, 7, 8]]) + yc = ak.Array([0, 1, 2]) + xl = dak.from_awkward(xc, npartitions=3) + yl = dak.from_awkward(yc, npartitions=3) + + zc = my_power(xc, kwarg_y=yc) + zl = dak.map_partitions(my_power, xl, kwarg_y=yl) + + assert_eq(zc, zl) + + zd = structured_function(inputs={"x": xc, "y": xc, "z": yc}) + zm = dak.map_partitions(structured_function, inputs={"x": xl, "y": xl, "z": yl}) + + assert_eq(zd, zm) + + ze = scaled_structured_function(2.0, inputs={"x": xc, "y": xc, "z": yc}) + zn = dak.map_partitions( + scaled_structured_function, 2.0, inputs={"x": xl, "y": xl, "z": yl} + ) + + assert_eq(ze, zn) + + zf = scaled_structured_function(2.0, inputs={"x": xc, "y": xc, "z": 4.0}) + zo = dak.map_partitions( + scaled_structured_function, 2.0, inputs={"x": xl, "y": xl, "z": 4.0} + ) + + assert_eq(zf, zo) + + zg = my_power(xc, kwarg_y=2.0) + zp = dak.map_partitions(my_power, xl, kwarg_y=2.0) + + assert_eq(zg, zp) + + a = ak.Array( + [ + [ + 1, + 2, + 3, + ], + [4, 5], + [6, 7, 8], + ] + ) + b = ak.Array([[-10, -10, -10], [-10, -10], [-10, -10, -10]]) + c = ak.Array([0, 1, 2]) + d = 1 + + aa = dak.from_awkward(a, npartitions=2) + bb = dak.from_awkward(b, npartitions=2) + cc = dak.from_awkward(c, npartitions=2) + dd = d + + res1 = mix_arg_and_kwarg_with_scalar_broadcasting(a, b, ccc=c, ddd=d) + res2 = dak.map_partitions( + mix_arg_and_kwarg_with_scalar_broadcasting, + aa, + bb, + ccc=cc, + ddd=dd, + ) + assert_eq(res1, res2) + + +def test_dask_array_in_map_partitions(daa, caa): + x1 = dak.zeros_like(daa.points.x) + y1 = da.ones(len(x1), chunks=x1.divisions[1]) + z1 = x1 + y1 + x2 = ak.zeros_like(caa.points.x) + y2 = np.ones(len(x2)) + z2 = x2 + y2 + assert_eq(z1, z2) diff --git a/tests/test_distributed.py b/tests/test_distributed.py index fc1dc9ef..1aea8db8 100644 --- a/tests/test_distributed.py +++ b/tests/test_distributed.py @@ -18,7 +18,7 @@ from distributed.utils_test import cluster, gen_cluster import dask_awkward as dak -from dask_awkward.lib.testutils import assert_eq +from dask_awkward.lib.testutils import BAD_NP_AK_MIXIN_VERSIONING, assert_eq # @pytest.fixture(scope="session") # def small_cluster(): @@ -101,6 +101,10 @@ def point_abs(self): return np.sqrt(self.x**2 + self.y**2) +@pytest.mark.xfail( + BAD_NP_AK_MIXIN_VERSIONING, + reason="NumPy 1.25 mixin __slots__ change", +) def test_from_list_behaviorized(loop, L1, L2): # noqa with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as client: diff --git a/tests/test_operations.py b/tests/test_operations.py index 01deb974..8acf106a 100644 --- a/tests/test_operations.py +++ b/tests/test_operations.py @@ -8,10 +8,11 @@ from dask_awkward.utils import IncompatiblePartitions -def test_concatenate_simple(daa, caa): +@pytest.mark.parametrize("axis", [0, 1]) +def test_concatenate_simple(daa, caa, axis): assert_eq( - ak.concatenate([caa.points.x, caa.points.y], axis=0), - dak.concatenate([daa.points.x, daa.points.y], axis=0), + ak.concatenate([caa.points.x, caa.points.y], axis=axis), + dak.concatenate([daa.points.x, daa.points.y], axis=axis), ) diff --git a/tests/test_optimize.py b/tests/test_optimize.py new file mode 100644 index 00000000..66108048 --- /dev/null +++ b/tests/test_optimize.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import awkward as ak +import dask + +import dask_awkward as dak + + +def test_multiple_computes(pq_points_dir) -> None: + ds1 = dak.from_parquet(pq_points_dir) + # add a columns= argument to force a new tokenize result in + # from_parquet so we get two unique collections. + ds2 = dak.from_parquet(pq_points_dir, columns=["points"]) + + lists = [[[1, 2, 3], [4, 5]], [[], [0, 0, 0]]] + ds3 = dak.from_lists(lists) + + assert ds1.name != ds2.name + things1 = dask.compute(ds1.points.x, ds2.points.y) + things2 = dask.compute(ds1.points) + assert things2[0].x.tolist() == things1[0].tolist() + + things3 = dask.compute(ds2.points.y, ds1.points.partitions[0]) + assert things3[0].tolist() == things1[1].tolist() + + assert len(things3[1]) < len(things3[0]) + + things = dask.compute(ds1.points, ds2.points.x, ds2.points.y, ds1.points.y, ds3) + assert things[-1].tolist() == ak.Array(lists[0] + lists[1]).tolist() diff --git a/tests/test_reducers.py b/tests/test_reducers.py index dbba2daf..183666ba 100644 --- a/tests/test_reducers.py +++ b/tests/test_reducers.py @@ -7,7 +7,7 @@ from dask_awkward.lib.testutils import assert_eq -@pytest.mark.parametrize("axis", [1, -1]) +@pytest.mark.parametrize("axis", [1, -1, 0, None]) @pytest.mark.parametrize("keepdims", [True, False]) @pytest.mark.parametrize("mask_identity", [True, False]) @pytest.mark.parametrize("testval", [-1, 3, 100]) @@ -28,7 +28,7 @@ def test_all( assert_eq(ar, dr) -@pytest.mark.parametrize("axis", [1, -1]) +@pytest.mark.parametrize("axis", [1, -1, 0, None]) @pytest.mark.parametrize("keepdims", [True, False]) @pytest.mark.parametrize("mask_identity", [True, False]) @pytest.mark.parametrize("testval", [-1, 3, 100]) @@ -49,41 +49,121 @@ def test_any( assert_eq(ar, dr) -@pytest.mark.parametrize("axis", [1]) -def test_argmax(daa: dak.Array, caa: ak.Array, axis: int) -> None: +@pytest.mark.parametrize("axis", [1, -1]) +@pytest.mark.parametrize("keepdims", [True, False]) +@pytest.mark.parametrize("mask_identity", [True, False]) +def test_argmax( + daa: dak.Array, + caa: ak.Array, + axis: int, + keepdims: bool, + mask_identity: bool, +) -> None: xd = daa.points.x xc = caa.points.x - dr = dak.argmax(xd, axis=axis) - ar = ak.argmax(xc, axis=axis) + dr = dak.argmax(xd, axis=axis, keepdims=keepdims, mask_identity=mask_identity) + ar = ak.argmax(xc, axis=axis, keepdims=keepdims, mask_identity=mask_identity) assert_eq(dr, ar) -@pytest.mark.parametrize("axis", [1]) -def test_argmin(daa: dak.Array, caa: ak.Array, axis: int) -> None: +@pytest.mark.xfail( + reason="positional reducers are not supported for axis=0 and axis=None" +) +@pytest.mark.parametrize("axis", [0, None]) +@pytest.mark.parametrize("keepdims", [True, False]) +@pytest.mark.parametrize("mask_identity", [True, False]) +def test_argmax_axis_0_none( + daa: dak.Array, + caa: ak.Array, + axis: int, + keepdims: bool, + mask_identity: bool, +) -> None: xd = daa.points.x xc = caa.points.x - dr = dak.argmin(xd, axis=axis) - ar = ak.argmin(xc, axis=axis) + dr = dak.argmax(xd, axis=axis, keepdims=keepdims, mask_identity=mask_identity) + ar = ak.argmax(xc, axis=axis, keepdims=keepdims, mask_identity=mask_identity) assert_eq(dr, ar) -@pytest.mark.parametrize("axis", [None, 1, -1]) -def test_count(daa: dak.Array, caa: ak.Array, axis: int | None) -> None: - ar = ak.count(caa["points"]["x"], axis=axis) - dr = dak.count(daa["points"]["x"], axis=axis) +@pytest.mark.parametrize("axis", [1, -1]) +@pytest.mark.parametrize("keepdims", [True, False]) +@pytest.mark.parametrize("mask_identity", [True, False]) +def test_argmin( + daa: dak.Array, + caa: ak.Array, + axis: int, + keepdims: bool, + mask_identity: bool, +) -> None: + xd = daa.points.x + xc = caa.points.x + dr = dak.argmin(xd, axis=axis, keepdims=keepdims, mask_identity=mask_identity) + ar = ak.argmin(xc, axis=axis, keepdims=keepdims, mask_identity=mask_identity) + assert_eq(dr, ar) + + +@pytest.mark.xfail( + reason="positional reducers are not supported for axis=0 and axis=None" +) +@pytest.mark.parametrize("axis", [0, None]) +@pytest.mark.parametrize("keepdims", [True, False]) +@pytest.mark.parametrize("mask_identity", [True, False]) +def test_argmin_axis_0_none( + daa: dak.Array, + caa: ak.Array, + axis: int, + keepdims: bool, + mask_identity: bool, +) -> None: + xd = daa.points.x + xc = caa.points.x + dr = dak.argmin(xd, axis=axis, keepdims=keepdims, mask_identity=mask_identity) + ar = ak.argmin(xc, axis=axis, keepdims=keepdims, mask_identity=mask_identity) + assert_eq(dr, ar) + + +@pytest.mark.parametrize("axis", [1, -1, 0, None]) +@pytest.mark.parametrize("keepdims", [True, False]) +@pytest.mark.parametrize("mask_identity", [True, False]) +def test_count( + daa: dak.Array, + caa: ak.Array, + axis: int, + keepdims: bool, + mask_identity: bool, +) -> None: + ar = ak.count( + caa["points"]["x"], axis=axis, keepdims=keepdims, mask_identity=mask_identity + ) + dr = dak.count( + daa["points"]["x"], axis=axis, keepdims=keepdims, mask_identity=mask_identity + ) assert_eq(ar, dr) -@pytest.mark.parametrize("axis", [None, 1, -1]) -def test_count_nonzero(daa: dak.Array, caa: ak.Array, axis: int | None) -> None: - ar = ak.count_nonzero(caa["points"]["x"], axis=axis) - dr = dak.count_nonzero(daa["points"]["x"], axis=axis) +@pytest.mark.parametrize("axis", [1, -1, 0, None]) +@pytest.mark.parametrize("keepdims", [True, False]) +@pytest.mark.parametrize("mask_identity", [True, False]) +def test_count_nonzero( + daa: dak.Array, + caa: ak.Array, + axis: int, + keepdims: bool, + mask_identity: bool, +) -> None: + ar = ak.count_nonzero( + caa["points"]["x"], axis=axis, keepdims=keepdims, mask_identity=mask_identity + ) + dr = dak.count_nonzero( + daa["points"]["x"], axis=axis, keepdims=keepdims, mask_identity=mask_identity + ) assert_eq(ar, dr) @pytest.mark.parametrize("axis", [None, 1, -1]) @pytest.mark.parametrize("attr", ["x", "y"]) -def test_max(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None: +def test_max(daa: dak.Array, caa: ak.Array, axis: int, attr: str) -> None: ar = ak.max(caa.points[attr], axis=axis) dr = dak.max(daa.points[attr], axis=axis) assert_eq(ar, dr) @@ -91,7 +171,7 @@ def test_max(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None @pytest.mark.parametrize("axis", [1, -1]) @pytest.mark.parametrize("attr", ["y", "x"]) -def test_mean(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None: +def test_mean(daa: dak.Array, caa: ak.Array, axis: int, attr: str) -> None: ar = ak.mean(caa.points[attr], axis=axis) dr = dak.mean(daa.points[attr], axis=axis) assert_eq(ar, dr, isclose_equal_nan=True) @@ -99,7 +179,7 @@ def test_mean(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> Non @pytest.mark.parametrize("axis", [None, 1, -1]) @pytest.mark.parametrize("attr", ["x", "y"]) -def test_min(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None: +def test_min(daa: dak.Array, caa: ak.Array, axis: int, attr: str) -> None: ar = ak.min(caa.points[attr], axis=axis) dr = dak.min(daa["points"][attr], axis=axis) assert_eq(ar, dr) @@ -107,7 +187,7 @@ def test_min(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None @pytest.mark.parametrize("axis", [None, 1, -1]) @pytest.mark.parametrize("attr", ["x", "y"]) -def test_sum(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None: +def test_sum(daa: dak.Array, caa: ak.Array, axis: int, attr: str) -> None: ar = ak.sum(caa.points[attr], axis=axis) dr = dak.sum(daa.points[attr], axis=axis) assert_eq(ar, dr) @@ -115,7 +195,7 @@ def test_sum(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None @pytest.mark.parametrize("axis", [1, -1]) @pytest.mark.parametrize("attr", ["y", "x"]) -def test_var(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None: +def test_var(daa: dak.Array, caa: ak.Array, axis: int, attr: str) -> None: ar = ak.var(caa.points[attr], axis=axis) dr = dak.var(daa.points[attr], axis=axis) assert_eq(ar, dr, isclose_equal_nan=True) @@ -123,7 +203,7 @@ def test_var(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None @pytest.mark.parametrize("axis", [1, -1]) @pytest.mark.parametrize("attr", ["y", "x"]) -def test_std(daa: dak.Array, caa: ak.Array, axis: int | None, attr: str) -> None: +def test_std(daa: dak.Array, caa: ak.Array, axis: int, attr: str) -> None: ar = ak.std(caa.points[attr], axis=axis) dr = dak.std(daa.points[attr], axis=axis) assert_eq(ar, dr, isclose_equal_nan=True) diff --git a/tests/test_structure.py b/tests/test_structure.py index 5be93d9a..296d84c5 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -138,6 +138,16 @@ def test_fill_none(vf: int | float | str, axis: int | None) -> None: assert_eq(d, e, check_forms=(not isinstance(vf, str))) +@pytest.mark.parametrize("axis", [None, 0, 1, -1]) +def test_drop_none(axis: int) -> None: + a = [[1, 2, None], [], [None], [5, 6, 7, None], [1, 2], None] + b = [[None, 2, 1], [None], [], None, [7, 6, None, 5], [None, None]] + c = dak.from_lists([a, b]) + d = dak.drop_none(c) + e = ak.drop_none(ak.from_iter(a + b)) + assert_eq(d, e) + + @pytest.mark.parametrize("axis", [0, 1, -1]) def test_is_none(axis: int) -> None: a: list[Any] = [[1, 2, None], None, None, [], [None], [5, 6, 7, None], [1, 2], None] @@ -468,7 +478,6 @@ def test_from_regular(caa): ) -@pytest.mark.xfail(reason="typetracer") def test_to_regular(caa): regular = ak.to_packed(caa[[0, 4, 5, 9, 10, 14]].points.x) dregular = dak.from_awkward(regular, 3) @@ -497,3 +506,29 @@ def test_values_astype(daa, caa): dak.values_astype(daa, np.float32), ak.values_astype(caa, np.float32), ) + + +def test_repartition_whole(daa): + daa1 = daa.repartition(npartitions=1) + assert daa1.npartitions == 1 + assert_eq(daa, daa1, check_divisions=False) + + +def test_repartition_no_change(daa): + daa1 = daa.repartition(divisions=(0, 5, 10, 15)) + assert daa1.npartitions == 3 + assert_eq(daa, daa1, check_divisions=False) + + +def test_repartition_split_all(daa): + daa1 = daa.repartition(rows_per_partition=1) + assert daa1.npartitions == len(daa) + out = daa1.compute() + assert out.tolist() == daa.compute().tolist() + + +def test_repartition_uneven(daa): + daa1 = daa.repartition(divisions=(0, 7, 8, 11, 12)) + assert daa1.npartitions == 4 + out = daa1.compute() + assert out.tolist() == daa.compute()[:12].tolist()