Skip to content

Commit

Permalink
Remove unused
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Mar 28, 2024
1 parent b18b3af commit 3cdc778
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 107 deletions.
6 changes: 0 additions & 6 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,6 @@ def project(
annotations=self.annotations,
)

def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]:
assert self.is_columnar
return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns(
report=report, state=state
)


class AwkwardMaterializedLayer(MaterializedLayer):
def __init__(
Expand Down
96 changes: 2 additions & 94 deletions src/dask_awkward/lib/io/columnar.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,13 @@
import awkward as ak
from awkward import Array as AwkwardArray
from awkward.forms import Form
from awkward.typetracer import typetracer_from_form, typetracer_with_report

from dask_awkward.layers.layers import (
BackendT,
ImplementsIOFunction,
ImplementsNecessaryColumns,
)
from dask_awkward.lib.utils import (
METADATA_ATTRIBUTES,
FormStructure,
buffer_keys_required_to_compute_shapes,
form_with_unique_keys,
parse_buffer_key,
render_buffer_key,
trace_form_structure,
walk_graph_depth_first,
)
from dask_awkward.lib.utils import METADATA_ATTRIBUTES, FormStructure

if TYPE_CHECKING:
from awkward._nplikes.typetracer import TypeTracerReport
Expand Down Expand Up @@ -60,6 +50,7 @@ class ColumnProjectionMixin(ImplementsNecessaryColumns[FormStructure]):
"""

def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray:
# used by failure report generation
return cast(
AwkwardArray,
ak.to_backend(
Expand All @@ -68,86 +59,3 @@ def mock_empty(self: S, backend: BackendT = "cpu") -> AwkwardArray:
highlevel=True,
),
)

def necessary_columns(
self: S,
report: TypeTracerReport,
state: FormStructure,
) -> frozenset[str]:
## Read from stash
# Form hierarchy information
form_key_to_parent_form_key = state["form_key_to_parent_form_key"]
form_key_to_child_form_keys: dict[str, list[str]] = {}
for child_key, parent_key in form_key_to_parent_form_key.items():
form_key_to_child_form_keys.setdefault(parent_key, []).append(child_key) # type: ignore
form_key_to_form = state["form_key_to_form"]
# Buffer hierarchy information
form_key_to_buffer_keys = state["form_key_to_buffer_keys"]
# Column hierarchy information
form_key_to_path = state["form_key_to_path"]

# Require the data of metadata buffers above shape-only requests
data_buffers = {
*report.data_touched,
*buffer_keys_required_to_compute_shapes(
parse_buffer_key,
report.shape_touched,
form_key_to_parent_form_key,
form_key_to_buffer_keys,
),
}

# We can't read buffers directly, but if we encounter a metadata
# buffer, then we should be able to pick any child.
paths = set()
wildcard_form_key = set()
for buffer_key in data_buffers:
form_key, attribute = parse_buffer_key(buffer_key)
if attribute in METADATA_ATTRIBUTES:
wildcard_form_key.add(form_key)
else:
paths.add(form_key_to_path[form_key])

# Select the most appropriate column for each wildcard
for form_key in wildcard_form_key:
# Find (DFS) any non-empty record form in any child
recursive_child_forms = (
form_key_to_form[k]
for k in walk_graph_depth_first(form_key, form_key_to_child_form_keys)
)
record_form_keys_with_contents = (
f.form_key
for f in recursive_child_forms
if isinstance(f, ak.forms.RecordForm) and f.contents
)
# Now find the deepest of such records
try:
last_record_form_key = next(record_form_keys_with_contents)
except StopIteration:
# This is a leaf! Therefore, we read this column
paths.add(form_key_to_path[form_key])
continue
else:
# Ensure we get the "actual" last form key
for last_record_form_key in record_form_keys_with_contents:
...

# First see if any child is already included
for any_child_form_key in form_key_to_child_form_keys[last_record_form_key]:
any_child_path = form_key_to_path[any_child_form_key]
if any_child_path in paths:
break
# Otherwise, add the last child
else:
paths.add(any_child_path)
return frozenset({".".join(p) for p in paths if p})

def project(
self: S,
report: TypeTracerReport,
state: FormStructure,
) -> ImplementsIOFunction:
if not self.use_optimization: # type: ignore[attr-defined]
return self

return self.project_columns(self.necessary_columns(report, state))
6 changes: 0 additions & 6 deletions src/dask_awkward/lib/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,3 @@ def _recursive_replace(args, layer, parent, indices):
else:
args2.append(arg)
return args2


def _buffer_keys_for_layer(
buffer_keys: Iterable[str], known_buffer_keys: frozenset[str]
) -> set[str]:
return {k for k in buffer_keys if k in known_buffer_keys}
1 change: 0 additions & 1 deletion src/dask_awkward/lib/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import awkward as ak
import numpy as np
from awkward.typetracer import typetracer_from_form
from dask.base import is_dask_collection
from packaging.version import Version

Expand Down

0 comments on commit 3cdc778

Please sign in to comment.