diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 03608e59..ecc208bf 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -177,12 +177,6 @@ def project( annotations=self.annotations, ) - def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]: - assert self.is_columnar - return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns( - report=report, state=state - ) - class AwkwardMaterializedLayer(MaterializedLayer): def __init__( diff --git a/src/dask_awkward/lib/io/columnar.py b/src/dask_awkward/lib/io/columnar.py index a21b8b10..7268b816 100644 --- a/src/dask_awkward/lib/io/columnar.py +++ b/src/dask_awkward/lib/io/columnar.py @@ -6,23 +6,13 @@ import awkward as ak from awkward import Array as AwkwardArray from awkward.forms import Form -from awkward.typetracer import typetracer_from_form, typetracer_with_report from dask_awkward.layers.layers import ( BackendT, ImplementsIOFunction, ImplementsNecessaryColumns, ) -from dask_awkward.lib.utils import ( - METADATA_ATTRIBUTES, - FormStructure, - buffer_keys_required_to_compute_shapes, - form_with_unique_keys, - parse_buffer_key, - render_buffer_key, - trace_form_structure, - walk_graph_depth_first, -) +from dask_awkward.lib.utils import METADATA_ATTRIBUTES, FormStructure if TYPE_CHECKING: from awkward._nplikes.typetracer import TypeTracerReport @@ -60,6 +50,7 @@ class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]): """ def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray: + # used by failure report generation return cast( AwkwardArray, ak.to_backend( @@ -68,86 +59,3 @@ def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray: highlevel=True, ), ) - - def necessary_columns( - self: S, - report: TypeTracerReport, - state: FormStructure, - ) -> frozenset[str]: - ## Read from stash - # Form hierarchy information - form_key_to_parent_form_key = state["form_key_to_parent_form_key"] - form_key_to_child_form_keys: dict[str, list[str]] = {} - for child_key, parent_key in form_key_to_parent_form_key.items(): - form_key_to_child_form_keys.setdefault(parent_key, []).append(child_key) # type: ignore - form_key_to_form = state["form_key_to_form"] - # Buffer hierarchy information - form_key_to_buffer_keys = state["form_key_to_buffer_keys"] - # Column hierarchy information - form_key_to_path = state["form_key_to_path"] - - # Require the data of metadata buffers above shape-only requests - data_buffers = { - *report.data_touched, - *buffer_keys_required_to_compute_shapes( - parse_buffer_key, - report.shape_touched, - form_key_to_parent_form_key, - form_key_to_buffer_keys, - ), - } - - # We can't read buffers directly, but if we encounter a metadata - # buffer, then we should be able to pick any child. - paths = set() - wildcard_form_key = set() - for buffer_key in data_buffers: - form_key, attribute = parse_buffer_key(buffer_key) - if attribute in METADATA_ATTRIBUTES: - wildcard_form_key.add(form_key) - else: - paths.add(form_key_to_path[form_key]) - - # Select the most appropriate column for each wildcard - for form_key in wildcard_form_key: - # Find (DFS) any non-empty record form in any child - recursive_child_forms = ( - form_key_to_form[k] - for k in walk_graph_depth_first(form_key, form_key_to_child_form_keys) - ) - record_form_keys_with_contents = ( - f.form_key - for f in recursive_child_forms - if isinstance(f, ak.forms.RecordForm) and f.contents - ) - # Now find the deepest of such records - try: - last_record_form_key = next(record_form_keys_with_contents) - except StopIteration: - # This is a leaf! Therefore, we read this column - paths.add(form_key_to_path[form_key]) - continue - else: - # Ensure we get the "actual" last form key - for last_record_form_key in record_form_keys_with_contents: - ... - - # First see if any child is already included - for any_child_form_key in form_key_to_child_form_keys[last_record_form_key]: - any_child_path = form_key_to_path[any_child_form_key] - if any_child_path in paths: - break - # Otherwise, add the last child - else: - paths.add(any_child_path) - return frozenset({".".join(p) for p in paths if p}) - - def project( - self: S, - report: TypeTracerReport, - state: FormStructure, - ) -> ImplementsIOFunction: - if not self.use_optimization: # type: ignore[attr-defined] - return self - - return self.project_columns(self.necessary_columns(report, state)) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 4dcde735..cc792db6 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -254,9 +254,3 @@ def _recursive_replace(args, layer, parent, indices): else: args2.append(arg) return args2 - - -def _buffer_keys_for_layer( - buffer_keys: Iterable[str], known_buffer_keys: frozenset[str] -) -> set[str]: - return {k for k in buffer_keys if k in known_buffer_keys} diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index e47d5c8d..3dee6d36 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -7,7 +7,6 @@ import awkward as ak import numpy as np -from awkward.typetracer import typetracer_from_form from dask.base import is_dask_collection from packaging.version import Version