Skip to content

Commit

Permalink
fix IO report
Browse files Browse the repository at this point in the history
@LGrey - moved from IO reporting layers returning a tuple to returning
a record (or dict) for my own sanity.
  • Loading branch information
martindurant committed Aug 2, 2024
1 parent 961dd0c commit c8b254b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 62 deletions.
66 changes: 9 additions & 57 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,22 @@
import math
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Any, cast

import awkward as ak
import dask.config
import numpy as np
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer, typetracer_with_report
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.local import identity
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import infer_compression

from dask_awkward.layers.layers import (
AwkwardBlockwiseLayer,
AwkwardInputLayer,
AwkwardMaterializedLayer,
AwkwardTreeReductionLayer,
ImplementsReport,
io_func_implements_projection,
io_func_implements_report,
)
from dask_awkward.lib.core import (
Array,
Expand All @@ -36,7 +30,6 @@
)
from dask_awkward.lib.io.columnar import ColumnProjectionMixin
from dask_awkward.lib.utils import form_with_unique_keys, render_buffer_key
from dask_awkward.utils import first, second

if TYPE_CHECKING:
from dask.array.core import Array as DaskArray
Expand Down Expand Up @@ -661,6 +654,13 @@ def from_map(
dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func)

hlg = HighLevelGraph.from_collections(name, dsk)
making_report = getattr(io_func, "return_report", False)
if making_report:
array_meta = ak.Array(
{"ioreport": ak.Array([0]).layout.to_typetracer(True), "data": array_meta}
)
array_meta._report = {report}

if divisions is not None:
result = new_array_object(hlg, name, meta=array_meta, divisions=divisions, **kw)
else:
Expand All @@ -669,56 +669,8 @@ def from_map(
)
dsk.meta = result._meta

if io_func_implements_report(io_func):
if cast(ImplementsReport, io_func).return_report:
# first element of each output tuple is the actual data
res = result.map_partitions(
first, meta=empty_typetracer(), label=label, output_divisions=1
)
res._meta = array_meta

concat_fn = partial(
ak.concatenate,
axis=0,
)

split_every = dask.config.get("awkward.aggregation.split-every", 8)

rep_trl_label = f"{label}-report"
rep_trl_token = tokenize(result, second, concat_fn, split_every)
rep_trl_name = f"{rep_trl_label}-{rep_trl_token}"
rep_trl_tree_node_name = f"{rep_trl_label}-tree-node-{rep_trl_token}"

# second element of each output tuple is the result, which does not
# depend on any of the actual data
rep_part = result.map_partitions(
second, meta=empty_typetracer(), label=f"{label}-partitioned-report"
)

rep_trl = AwkwardTreeReductionLayer(
name=rep_trl_name,
name_input=rep_part.name,
npartitions_input=rep_part.npartitions,
concat_func=concat_fn,
tree_node_func=identity,
finalize_func=identity,
split_every=split_every,
tree_node_name=rep_trl_tree_node_name,
)

rep_graph = HighLevelGraph.from_collections(
rep_trl_name, rep_trl, dependencies=[rep_part]
)
rep_trl.meta = empty_typetracer()

rep = new_array_object(
rep_graph,
rep_trl_name,
meta=rep_trl.meta,
npartitions=len(rep_trl.output_partitions),
)

return res, rep
if making_report:
return result.data, result.ioreport

return result

Expand Down
7 changes: 5 additions & 2 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ def __call__(self, *args, **kwargs):
if self.return_report:
try:
result = self.read_fn(source)
return result, report_success(self.columns, source)
return {
"data": result,
"ioreport": report_success(self.columns, source),
}
except self.allowed_exceptions as err:
return self.mock_empty(), report_failure(err, source)
return {"data": ak.Array([]), "ioreport": report_failure(err, source)}

return self.read_fn(source)

Expand Down
2 changes: 1 addition & 1 deletion src/dask_awkward/lib/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def _optimize_columns(dsk, all_layers):
rep = getattr(lay.meta, "_report", None)
if not rep:
continue
rep = first(rep) # each meta of an IL layer should have just one report
rep = first(rep) # each meta of an IO layer should have just one report
cols = set()
# this loop not required after next ak release
for ln in all_layers:
Expand Down
10 changes: 8 additions & 2 deletions src/dask_awkward/lib/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,15 @@ def __call__(self, *args, **kwargs):
if self.return_report:
try:
result, time = time_it(self.read_fn)(*args, **kwargs)
return result, self.make_success_report(time, *args, **kwargs)
return {
"data": result,
"ioreport": self.make_success_report(time, *args, **kwargs),
}
except self.allowed_exceptions as err:
return self.mock_empty(), self.make_fail_report(err, *args, **kwargs)
return {
"data": self.mock_empty(),
"ioreport": self.make_fail_report(err, *args, **kwargs),
}

return self.read_fn(*args, **kwargs)

Expand Down

0 comments on commit c8b254b

Please sign in to comment.