Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add computable report when graceful-read-failure is active in from_map #415

Closed
wants to merge 16 commits into from
Closed
46 changes: 35 additions & 11 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast

import awkward as ak
from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token
from dask.highlevelgraph import MaterializedLayer
from dask.layers import DataFrameTreeReduction
Expand Down Expand Up @@ -107,20 +108,30 @@ def mock_empty(self, backend: BackendT = "cpu") -> AwkwardArray:
)


def io_func_empty_on_error_wrapped(func: ImplementsIOFunction) -> bool:
return hasattr(func, "_empty_on_error_wrapped")


def maybe_unwrap(func: Callable) -> Callable:
if io_func_empty_on_error_wrapped(func):
return func._empty_on_error_wrapped # type: ignore
return func


def io_func_implements_projection(func: ImplementsIOFunction) -> bool:
return hasattr(func, "prepare_for_projection")
return hasattr(maybe_unwrap(func), "prepare_for_projection")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be tempted to remove the unwrapping here, and require that the caller unwrap the IO function. That way it's a bit more explicit?



def io_func_implements_mocking(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock")
return hasattr(maybe_unwrap(func), "mock")


def io_func_implements_mock_empty(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock_empty")
return hasattr(maybe_unwrap(func), "mock_empty")


def io_func_implements_columnar(func: ImplementsIOFunction) -> bool:
return hasattr(func, "necessary_columns")
return hasattr(maybe_unwrap(func), "necessary_columns")


class AwkwardInputLayer(AwkwardBlockwiseLayer):
Expand Down Expand Up @@ -184,10 +195,12 @@ def is_columnar(self) -> bool:
def mock(self) -> AwkwardInputLayer:
assert self.is_mockable

fn = maybe_unwrap(self.io_func)

return AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=lambda *_, **__: cast(ImplementsMocking, self.io_func).mock(),
io_func=lambda *_, **__: cast(ImplementsMocking, fn).mock(),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
Expand Down Expand Up @@ -225,14 +238,20 @@ def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T
The black-box state object returned by the IO function.
"""
assert self.is_projectable
fn = maybe_unwrap(self.io_func)
new_meta_array, report, state = cast(
ImplementsProjection, self.io_func
ImplementsProjection, fn
).prepare_for_projection()

if io_func_empty_on_error_wrapped(self.io_func):
new_return = (new_meta_array, ak.from_iter([]))
else:
new_return = new_meta_array

new_input_layer = AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=lambda *_, **__: new_meta_array,
io_func=lambda *_, **__: new_return,
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
Expand All @@ -246,12 +265,16 @@ def project(
state: T,
) -> AwkwardInputLayer:
assert self.is_projectable
fn = maybe_unwrap(self.io_func)
io_func = cast(ImplementsProjection, fn).project(report=report, state=state)

if io_func_empty_on_error_wrapped(self.io_func):
io_func = self.io_func.recreate(io_func) # type: ignore

return AwkwardInputLayer(
name=self.name,
inputs=self.inputs,
io_func=cast(ImplementsProjection, self.io_func).project(
report=report, state=state
),
io_func=io_func,
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
Expand All @@ -260,7 +283,8 @@ def project(

def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]:
assert self.is_columnar
return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns(
fn = maybe_unwrap(self.io_func)
return cast(ImplementsNecessaryColumns, fn).necessary_columns(
report=report, state=state
)

Expand Down
174 changes: 152 additions & 22 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import annotations

import functools
import logging
import math
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, cast, overload

import awkward as ak
import numpy as np
from awkward.forms.listoffsetform import ListOffsetForm
from awkward.forms.numpyform import NumpyForm
from awkward.forms.recordform import RecordForm
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer
from dask.base import flatten, tokenize
Expand All @@ -32,6 +34,7 @@
new_array_object,
typetracer_array,
)
from dask_awkward.utils import first, second

if TYPE_CHECKING:
from dask.array.core import Array as DaskArray
Expand Down Expand Up @@ -496,29 +499,147 @@ def __call__(self, packed_arg):
)


_default_failure_array_form = RecordForm(
[
ListOffsetForm(
"i64",
ListOffsetForm(
"i64",
NumpyForm("uint8", parameters={"__array__": "char"}),
parameters={"__array__": "string"},
),
),
ListOffsetForm(
"i64",
ListOffsetForm(
"i64",
ListOffsetForm(
"i64",
NumpyForm("uint8", parameters={"__array__": "char"}),
parameters={"__array__": "string"},
),
),
),
ListOffsetForm(
"i64",
NumpyForm("uint8", parameters={"__array__": "char"}),
parameters={"__array__": "string"},
),
ListOffsetForm(
"i64",
NumpyForm("uint8", parameters={"__array__": "char"}),
parameters={"__array__": "string"},
),
],
["args", "kwargs", "exception", "message"],
)


def on_success_default(*args: Any, **kwargs: Any) -> ak.Array:
return ak.Array(_default_failure_array_form.length_one_array(highlevel=False))


def on_failure_default(
exception: type[BaseException],
*args: Any,
**kwargs: Any,
) -> ak.Array:
return ak.Array(
[
{
"args": [repr(a) for a in args],
"kwargs": [[k, repr(v)] for k, v in kwargs.items()],
"exception": type(exception).__name__,
"message": str(exception),
},
],
)


class ReturnEmptyOnRaise:
def __init__(
self,
fn: Callable[..., ak.Array],
allowed_exceptions: tuple[type[BaseException], ...],
backend: BackendT,
on_success: Callable[..., ak.Array],
on_failure: Callable[..., ak.Array],
):
self._empty_on_error_wrapped = fn
self.fn = fn
self.allowed_exceptions = allowed_exceptions
self.backend = backend
self.on_success = on_success
self.on_failure = on_failure

def recreate(self, fn):
return return_empty_on_raise(
fn,
self.allowed_exceptions,
self.backend,
self.on_success,
self.on_failure,
)

def __call__(self, *args, **kwargs):
try:
result = self.fn(*args, **kwargs)
return result, self.on_success(*args, **kwargs)
except self.allowed_exceptions as err:
result = self.fn.mock_empty(self.backend)
return result, self.on_failure(err, *args, **kwargs)


def return_empty_on_raise(
fn: Callable,
fn: Callable[..., ak.Array],
allowed_exceptions: tuple[type[BaseException], ...],
backend: BackendT,
) -> Callable:
@functools.wraps(fn)
def wrapped(*args, **kwargs):
try:
return fn(*args, **kwargs)
except allowed_exceptions as err:
logmsg = (
"%s call failed with args %s and kwargs %s; empty array returned. %s"
% (
str(fn),
str(args),
str(kwargs),
str(err),
)
)
logger.info(logmsg)
return fn.mock_empty(backend)
on_success: Callable[..., ak.Array],
on_failure: Callable[..., ak.Array],
) -> ReturnEmptyOnRaise:
return ReturnEmptyOnRaise(
fn,
allowed_exceptions,
backend,
on_success,
on_failure,
)


@overload
def from_map(
func: Callable,
*iterables: Iterable,
args: tuple[Any, ...] | None = None,
label: str | None = None,
token: str | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
meta: ak.Array | None = None,
empty_on_raise: None = None,
empty_backend: None = None,
on_success: Callable[..., ak.Array] = on_success_default,
on_failure: Callable[..., ak.Array] = on_failure_default,
**kwargs: Any,
) -> Array:
...

return wrapped

@overload
def from_map(
func: Callable,
*iterables: Iterable,
empty_on_raise: tuple[type[BaseException], ...],
empty_backend: BackendT,
args: tuple[Any, ...] | None = None,
label: str | None = None,
token: str | None = None,
divisions: tuple[int, ...] | tuple[None, ...] | None = None,
meta: ak.Array | None = None,
on_success: Callable[..., ak.Array] = on_success_default,
on_failure: Callable[..., ak.Array] = on_failure_default,
**kwargs: Any,
) -> tuple[Array, Array]:
...


def from_map(
Expand All @@ -531,8 +652,10 @@ def from_map(
meta: ak.Array | None = None,
empty_on_raise: tuple[type[BaseException], ...] | None = None,
empty_backend: BackendT | None = None,
on_success: Callable[..., ak.Array] = on_success_default,
on_failure: Callable[..., ak.Array] = on_failure_default,
**kwargs: Any,
) -> Array:
) -> Array | tuple[Array, Array]:
"""Create an Array collection from a custom mapping.

Parameters
Expand Down Expand Up @@ -654,6 +777,8 @@ def from_map(
io_func,
allowed_exceptions=empty_on_raise,
backend=empty_backend,
on_success=on_success,
on_failure=on_failure,
)

dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func)
Expand All @@ -664,6 +789,11 @@ def from_map(
else:
result = new_array_object(hlg, name, meta=array_meta, npartitions=len(inputs))

if empty_on_raise and empty_backend:
res = result.map_partitions(first, meta=array_meta, output_divisions=1)
rep = result.map_partitions(second, meta=empty_typetracer())
Comment on lines +793 to +794
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here res is the actual interesting Array collection. The array_meta object is the meta we'd normally expect without going through the process of dual returns with the report. We can manually pass that meta object to map_partitions to override its default behavior of automatically determining a new meta, we also make sure to preserve divisions if they're known.

And rep is the new report Array collection, it doesn't matter what the meta or divisions are, this should be a pretty small object once computed. If/when we converge on a proper schema for a report array perhaps we can pass in its typetracer version for absolute correctness here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not specify meta and let dask-awkward figure it out using the graph?

return res, rep

return result


Expand Down
6 changes: 6 additions & 0 deletions src/dask_awkward/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ def first(seq: Iterable[T]) -> T:

"""
return next(iter(seq))


def second(seq: Iterable[T]) -> T:
the_iter = iter(seq)
next(the_iter)
return next(the_iter)
Loading
Loading