Skip to content

Commit

Permalink
start
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Mar 28, 2024
1 parent 052aff5 commit b18b3af
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 338 deletions.
164 changes: 4 additions & 160 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer:
ob.__dict__.update(layer.__dict__)
return ob

def mock(self) -> AwkwardBlockwiseLayer:
layer = copy.copy(self)
nb = layer.numblocks
layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()}
layer.__dict__.pop("_dims", None)
return layer

def __getstate__(self) -> dict:
# Indicator that this layer has been serialised
state = self.__dict__.copy()
Expand All @@ -54,10 +47,6 @@ def __call__(self, *args, **kwargs): ...
T = TypeVar("T")


class ImplementsMocking(ImplementsIOFunction, Protocol):
def mock(self) -> AwkwardArray: ...


class ImplementsMockEmpty(ImplementsIOFunction, Protocol):
def mock_empty(self, backend: BackendT) -> AwkwardArray: ...

Expand All @@ -67,9 +56,7 @@ class ImplementsReport(ImplementsIOFunction, Protocol):
def return_report(self) -> bool: ...


class ImplementsProjection(ImplementsMocking, Protocol[T]):
def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]: ...

class ImplementsProjection(Protocol[T]):
def project(self, report: TypeTracerReport, state: T) -> ImplementsIOFunction: ...


Expand All @@ -79,7 +66,7 @@ def necessary_columns(
) -> frozenset[str]: ...


class IOFunctionWithMocking(ImplementsMocking, ImplementsIOFunction):
class IOFunctionWithMocking(ImplementsIOFunction):
def __init__(self, meta: AwkwardArray, io_func: ImplementsIOFunction):
self._meta = meta
self._io_func = io_func
Expand All @@ -92,23 +79,11 @@ def __getstate__(self) -> dict:
def __call__(self, *args, **kwargs):
return self._io_func(*args, **kwargs)

def mock(self) -> AwkwardArray:
assert self._meta is not None
return self._meta


def io_func_implements_projection(func: ImplementsIOFunction) -> bool:
return hasattr(func, "prepare_for_projection")


def io_func_implements_mocking(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock")


def io_func_implements_mock_empty(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock_empty")


def io_func_implements_columnar(func: ImplementsIOFunction) -> bool:
return hasattr(func, "necessary_columns")

Expand Down Expand Up @@ -179,78 +154,10 @@ def is_projectable(self) -> bool:
io_func_implements_projection(self.io_func) and not self.has_been_unpickled
)

@property
def is_mockable(self) -> bool:
# isinstance(self.io_func, ImplementsMocking)
return io_func_implements_mocking(self.io_func)

@property
def is_columnar(self) -> bool:
return io_func_implements_columnar(self.io_func)

def mock(self) -> AwkwardInputLayer:
assert self.is_mockable
return AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=lambda *_, **__: cast(ImplementsMocking, self.io_func).mock(),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
annotations=self.annotations,
)

def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T]:
"""Mock the input layer as starting with a data-less typetracer.
This method is used to create new dask task graphs that
operate purely on typetracer Arrays (that is, array with
awkward structure but without real data buffers). This allows
us to test which parts of a real awkward array will be used in
a real computation. We do this by running a graph which starts
with mocked AwkwardInputLayers.
We mock an AwkwardInputLayer in these steps:
1. Ask the IO function to prepare a new meta array, and return
any transient state.
2. Build a new AwkwardInputLayer whose IO function just returns
this meta (typetracer) array
3. Return the new input layer and the transient state
When this new layer is added to a dask task graph and that
graph is computed, the report object will be mutated.
Inspecting the report object after the compute tells us which
buffers from the original form would be required for a real
compute with the same graph.
Returns
-------
AwkwardInputLayer
Copy of the input layer with data-less input.
TypeTracerReport
The report object used to track touched buffers.
Any
The black-box state object returned by the IO function.
"""
assert self.is_projectable
new_meta_array, report, state = cast(
ImplementsProjection, self.io_func
).prepare_for_projection()

new_return = new_meta_array
if io_func_implements_report(self.io_func):
if cast(ImplementsReport, self.io_func).return_report:
new_return = (new_meta_array, type(new_meta_array)([]))

new_input_layer = AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=AwkwardTokenizable(new_return, self.name),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
annotations=self.annotations,
)
return new_input_layer, report, state

def project(
self,
report: TypeTracerReport,
Expand Down Expand Up @@ -290,68 +197,5 @@ def __init__(
self.fn = fn
super().__init__(mapping, **kwargs)

def mock(self) -> MaterializedLayer:
mapping = copy.copy(self.mapping)
if not mapping:
# no partitions at all
return self
name = next(iter(mapping))[0]

npln = len(self.previous_layer_names)
# one previous layer name
#
# this case is used for mocking repartition or slicing where
# we maybe have multiple partitions that need to be included
# in a task.
if npln == 1:
prev_name: str = self.previous_layer_names[0]
if (name, 0) in mapping:
task = mapping[(name, 0)]
task = tuple(
(
(prev_name, 0)
if isinstance(v, tuple) and len(v) == 2 and v[0] == prev_name
else v
)
for v in task
)

# when using Array.partitions we need to mock that we
# just want the first partition.
if len(task) == 2 and isinstance(task[1], int) and task[1] > 0:
task = (task[0], 0)
return MaterializedLayer({(name, 0): task})
return self

# zero previous layers; this is likely a known scalar.
#
# we just use the existing mapping
elif npln == 0:
return MaterializedLayer({(name, 0): mapping[(name, 0)]})

# more than one previous_layer_names
#
# this case is needed for dak.concatenate on axis=0; we need
# the first partition of _each_ of the previous layer names!
else:
if self.fn is None:
raise ValueError(
"For multiple previous layers the fn argument cannot be None."
)
name0s = tuple((name, 0) for name in self.previous_layer_names)
task = (self.fn, *name0s)
return MaterializedLayer({(name, 0): task})


class AwkwardTreeReductionLayer(DataFrameTreeReduction):
def mock(self) -> AwkwardTreeReductionLayer:
return AwkwardTreeReductionLayer(
name=self.name,
name_input=self.name_input,
npartitions_input=1,
concat_func=self.concat_func,
tree_node_func=self.tree_node_func,
finalize_func=self.finalize_func,
split_every=self.split_every,
tree_node_name=self.tree_node_name,
)

class AwkwardTreeReductionLayer(DataFrameTreeReduction): ...
16 changes: 12 additions & 4 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ def _rebuild(self, dsk, *, rename=None):
def __reduce__(self):
return (Scalar, (self.dask, self.name, None, self.dtype, self.known_value))

@property
def report(self):
return getattr(self.meta, "report", set())

@property
def dask(self) -> HighLevelGraph:
return self._dask
Expand Down Expand Up @@ -937,6 +941,10 @@ def reset_meta(self) -> None:
"""Assign an empty typetracer array as the collection metadata."""
self._meta = empty_typetracer()

@property
def report(self):
return getattr(self.meta, "report", set())

def repartition(
self,
npartitions: int | None = None,
Expand Down Expand Up @@ -1734,6 +1742,7 @@ def new_array_object(
attrs: Mapping[str, Any] | None = None,
npartitions: int | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
report=set(),
) -> Array:
"""Instantiate a new Array collection object.
Expand Down Expand Up @@ -1801,6 +1810,9 @@ def new_array_object(
actual_meta.attrs = attrs

out = Array(dsk, name, actual_meta, divs)
if report:
[r.commit(out.name) for r in report]
actual_meta._report = report
if actual_meta.__doc__ != actual_meta.__class__.__doc__:
out.__doc__ = actual_meta.__doc__

Expand Down Expand Up @@ -2195,10 +2207,6 @@ def non_trivial_reduction(
if combiner is None:
combiner = reducer

# is_positional == True is not implemented
# if is_positional:
# assert combiner is reducer

# For `axis=None`, we prepare each array to have the following structure:
# [[[ ... [x1 x2 x3 ... xN] ... ]]] (length-1 outer lists)
# This makes the subsequent reductions an `axis=-1` reduction
Expand Down
25 changes: 0 additions & 25 deletions src/dask_awkward/lib/io/columnar.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]):
when only metadata buffers are required.
"""

def mock(self: S) -> AwkwardArray:
return cast(
AwkwardArray,
typetracer_from_form(self.form, behavior=self.behavior, attrs=self.attrs),
)

def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray:
return cast(
AwkwardArray,
Expand All @@ -75,25 +69,6 @@ def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray:
),
)

def prepare_for_projection(
self: S,
) -> tuple[AwkwardArray, TypeTracerReport, FormStructure]:
form = form_with_unique_keys(self.form, "@")

# Build typetracer and associated report object
(meta, report) = typetracer_with_report(
form,
highlevel=True,
behavior=self.behavior,
buffer_key=render_buffer_key,
)

return (
cast(AwkwardArray, meta),
report,
trace_form_structure(form, buffer_key=render_buffer_key),
)

def necessary_columns(
self: S,
report: TypeTracerReport,
Expand Down
27 changes: 18 additions & 9 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import dask.config
import numpy as np
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer
from awkward.typetracer import length_zero_if_typetracer, typetracer_with_report
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.local import identity
Expand All @@ -23,10 +23,9 @@
AwkwardInputLayer,
AwkwardMaterializedLayer,
AwkwardTreeReductionLayer,
ImplementsMocking,
ImplementsReport,
IOFunctionWithMocking,
io_func_implements_mocking,
io_func_implements_projection,
io_func_implements_report,
)
from dask_awkward.lib.core import (
Expand All @@ -37,6 +36,7 @@
typetracer_array,
)
from dask_awkward.lib.io.columnar import ColumnProjectionMixin
from dask_awkward.lib.utils import render_buffer_key
from dask_awkward.utils import first, second

if TYPE_CHECKING:
Expand Down Expand Up @@ -620,11 +620,18 @@ def from_map(
packed=packed,
)

# Special `io_func` implementations can implement mocking and optionally
# support buffer projection.
if io_func_implements_mocking(func):
kw = {}
if io_func_implements_projection(func):
# Special `io_func` implementations can do buffer projection - choosing columns
# so here we start with a blank report
io_func = func
array_meta = cast(ImplementsMocking, func).mock()
array_meta, report = typetracer_with_report(
io_func.form,
highlevel=True,
behavior=io_func.behavior,
buffer_key=render_buffer_key,
)
kw["report"] = {report} # column tracking report, not failure report, below
# If we know the meta, we can spoof mocking
elif meta is not None:
io_func = IOFunctionWithMocking(meta, func)
Expand All @@ -638,9 +645,11 @@ def from_map(

hlg = HighLevelGraph.from_collections(name, dsk)
if divisions is not None:
result = new_array_object(hlg, name, meta=array_meta, divisions=divisions)
result = new_array_object(hlg, name, meta=array_meta, divisions=divisions, **kw)
else:
result = new_array_object(hlg, name, meta=array_meta, npartitions=len(inputs))
result = new_array_object(
hlg, name, meta=array_meta, npartitions=len(inputs), **kw
)

if io_func_implements_report(io_func):
if cast(ImplementsReport, io_func).return_report:
Expand Down
2 changes: 2 additions & 0 deletions src/dask_awkward/lib/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def concatenate(
name = f"{label}-{token}"

metas = [c._meta for c in arrays]
report = set.union(getattr(m, "_report", set()) for m in metas)

if len(metas) == 0:
raise ValueError("Need at least one array to concatenate")
Expand Down Expand Up @@ -127,6 +128,7 @@ def concatenate(
name,
meta=meta_no_report,
npartitions=sum(a.npartitions for a in arrays),
report=report,
)

if axis > 0:
Expand Down
Loading

0 comments on commit b18b3af

Please sign in to comment.