Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into more-json-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis committed Jul 13, 2023
2 parents 8826cf8 + 35563a5 commit 5aa5d01
Show file tree
Hide file tree
Showing 35 changed files with 1,396 additions and 454 deletions.
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Set update schedule for GitHub Actions

version: 2
updates:

- package-ecosystem: "github-actions"
directory: "/"
schedule:
# Check for updates to GitHub Actions every week
interval: "weekly"
2 changes: 1 addition & 1 deletion .github/workflows/awkward-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Install
run: |
python3 -m pip install pip wheel -U
python3 -m pip install -q --no-cache-dir -e .[complete]
python3 -m pip install -q --no-cache-dir -e .[complete,test]
python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main
- name: Run tests
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/conda-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
shell: bash -l {0}
run: |
conda activate test-environment
python -m pip install -q --no-cache-dir .[complete]
python -m pip install -q --no-cache-dir .[complete,test]
- name: Run tests
shell: bash -l {0}
run: |
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ jobs:
run: |
pip install wheel
pip install pytest-cov
pip install -q --no-cache-dir -e .[complete]
pip install dask[complete]
pip install -q --no-cache-dir -e .[complete,test]
pytest --cov=dask_awkward --cov-report=xml
- name: Upload Coverage to Codecov
uses: codecov/codecov-action@v3
2 changes: 1 addition & 1 deletion .github/workflows/pypi-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: |
pip install pip wheel -U
pip install dask[complete]
pip install -q --no-cache-dir .[complete]
pip install -q --no-cache-dir .[complete,test]
pip list
- name: test
run: |
Expand Down
15 changes: 11 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/psf/black
rev: 23.3.0
rev: 23.7.0
hooks:
- id: black
language_version: python3
args:
- --target-version=py38

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.272
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.277
hooks:
- id: ruff

Expand All @@ -32,7 +32,7 @@ repos:
language_version: python3

- repo: https://github.com/asottile/pyupgrade
rev: v3.6.0
rev: v3.9.0
hooks:
- id: pyupgrade
args:
Expand All @@ -47,3 +47,10 @@ repos:
rev: v1.5.0
hooks:
- id: yesqa

- repo: https://github.com/adamchainz/blacken-docs
rev: 1.15.0
hooks:
- id: blacken-docs
additional_dependencies:
- black
2 changes: 2 additions & 0 deletions docs/api-inspect.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ Inspection
.. autosummary::
:toctree: generated/

partition_compatibility
PartitionCompatibility
necessary_columns
sample
2 changes: 2 additions & 0 deletions docs/gs-intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ notice the use of wildcard syntax ("*").
from pathlib import Path
import awkward as ak
file = Path("data.00.json")
x = ak.from_json(file, line_delimited=True)
x = x[ak.num(x.foo) > 2]
Expand All @@ -52,6 +53,7 @@ notice the use of wildcard syntax ("*").
.. code-block:: python
import dask_awkward as dak
# dask-awkward only supports line-delimited=True
x = dak.from_json("data.*.json")
x = x[dak.num(x.foo) > 2]
Expand Down
1 change: 1 addition & 0 deletions docs/gs-terminology.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ objects from the namespaces:
# don't do this!
from dask_awkward import Array
# or this!
from awkward import Array
Expand Down
42 changes: 23 additions & 19 deletions docs/ht-behaviors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,32 @@ topic).
behavior: dict = {}
@ak.mixin_class(behavior)
class Point:
def distance(self, other):
return np.sqrt(
(self.x - other.x) ** 2 + (self.y - other.y) ** 2
)
points1 = ak.Array([
[{"x": 1.0, "y": 1.1}, {"x": 2.0, "y": 2.2}, {"x": 3, "y": 3.3}],
[],
[{"x": 4.0, "y": 4.4}, {"x": 5.0, "y": 5.5}],
[{"x": 6.0, "y": 6.6}],
[{"x": 7.0, "y": 7.7}, {"x": 8.0, "y": 8.8}, {"x": 9, "y": 9.9}],
])
points2 = ak.Array([
[{"x": 0.9, "y": 1.0}, {"x": 2.0, "y": 2.2}, {"x": 2.9, "y": 3.0}],
[],
[{"x": 3.9, "y": 4.0}, {"x": 5.0, "y": 5.5}],
[{"x": 5.9, "y": 6.0}],
[{"x": 6.9, "y": 7.0}, {"x": 8.0, "y": 8.8}, {"x": 8.9, "y": 9.0}],
])
return np.sqrt((self.x - other.x) ** 2 + (self.y - other.y) ** 2)
points1 = ak.Array(
[
[{"x": 1.0, "y": 1.1}, {"x": 2.0, "y": 2.2}, {"x": 3, "y": 3.3}],
[],
[{"x": 4.0, "y": 4.4}, {"x": 5.0, "y": 5.5}],
[{"x": 6.0, "y": 6.6}],
[{"x": 7.0, "y": 7.7}, {"x": 8.0, "y": 8.8}, {"x": 9, "y": 9.9}],
]
)
points2 = ak.Array(
[
[{"x": 0.9, "y": 1.0}, {"x": 2.0, "y": 2.2}, {"x": 2.9, "y": 3.0}],
[],
[{"x": 3.9, "y": 4.0}, {"x": 5.0, "y": 5.5}],
[{"x": 5.9, "y": 6.0}],
[{"x": 6.9, "y": 7.0}, {"x": 8.0, "y": 8.8}, {"x": 8.9, "y": 9.0}],
]
)
array1 = dak.from_awkward(points1, npartitions=2)
array2 = dak.from_awkward(points2, npartitions=2)
Expand Down
2 changes: 1 addition & 1 deletion docs/ht-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ datasets stored in Parquet or JSON format.

Take this code-block for example:

.. code:: python
.. code:: pycon
>>> import dask_awkward as dak
>>> ds1 = dak.from_parquet("s3://path/to/dataset")
Expand Down
2 changes: 1 addition & 1 deletion docs/me-faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ on the first partition will occur." what does that mean?**
is called ``awkward.compute-unknown-meta``. The default setting is
``True``. In code you can do something like this:

.. code-block:: python
.. code-block:: pycon
with dask.config.set({"awkward.compute-unknown-meta": False}):
# ... your code
Expand Down
7 changes: 4 additions & 3 deletions docs/me-optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ column of floats for ``bar.y``.
If our task graph is of the form:

.. code:: python
.. code:: pycon
>>> ds = dak.from_parquet("/path/to/data")
>>> result = ds.bar.x / ds.foo
Expand Down Expand Up @@ -84,7 +84,7 @@ You can see which columns are determined to be necessary by calling
(it returns a mapping that pairs an input layer with the list of
necessary columns):

.. code:: python
.. code:: pycon
>>> dak.necessary_columns(result)
{"some-layer-name": ["foo", "bar.x"]}
Expand Down Expand Up @@ -112,12 +112,13 @@ which columns should be read from disk. The
determine how one should use the ``columns=`` argument. Using our
above example, we write

.. code:: python
.. code:: pycon
>>> ds = dak.from_parquet("/path/to/data", columns=["bar.x", "foo"])
>>> result = ds.bar.x / ds.foo
>>> with dask.config.set({"awkward.optimization.enabled": False}):
... result.compute()
...
With this code we can save a little bit of overhead by not running the
necessary columns optimization after already defining, by hand, the
Expand Down
23 changes: 9 additions & 14 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ classifiers = [
"Topic :: Scientific/Engineering",
]
dependencies = [
"awkward >=2.2.2",
"awkward >=2.2.4",
"dask >=2023.04.0",
]
dynamic = ["version"]
Expand All @@ -37,29 +37,24 @@ Homepage = "https://github.com/dask-contrib/dask-awkward"
"Bug Tracker" = "https://github.com/dask-contrib/dask-awkward/issues"

[project.optional-dependencies]
complete = [
io = [
"aiohttp",
"pyarrow",
"pytest >=6.0",
"pytest-cov >=3.0.0",
"requests >=2.27.1",
]
complete = [
"dask-awkward[io]",
]
# `docs` and `test` are separate from user installs
docs = [
"dask-awkward[complete]",
"dask-sphinx-theme >=3.0.2",
"pyarrow",
"sphinx-design",
"pytest >=6.0",
"pytest-cov >=3.0.0",
"requests >=2.27.1",
]
io = [
"aiohttp",
"pyarrow",
]
test = [
"aiohttp",
"dask-awkward[complete]",
"distributed",
"pyarrow",
"pandas",
"pytest >=6.0",
"pytest-cov >=3.0.0",
"requests >=2.27.1",
Expand Down
8 changes: 7 additions & 1 deletion src/dask_awkward/layers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
AwkwardBlockwiseLayer,
AwkwardInputLayer,
AwkwardMaterializedLayer,
AwkwardTreeReductionLayer,
)

__all__ = ("AwkwardInputLayer", "AwkwardBlockwiseLayer", "AwkwardMaterializedLayer")
__all__ = (
"AwkwardInputLayer",
"AwkwardBlockwiseLayer",
"AwkwardMaterializedLayer",
"AwkwardTreeReductionLayer",
)
78 changes: 65 additions & 13 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token
from dask.highlevelgraph import MaterializedLayer
from dask.layers import DataFrameTreeReduction

from dask_awkward.utils import LazyInputsDict

Expand All @@ -18,12 +19,12 @@ class AwkwardBlockwiseLayer(Blockwise):
"""Just like upstream Blockwise, except we override pickling"""

@classmethod
def from_blockwise(cls, layer) -> AwkwardBlockwiseLayer:
def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer:
ob = object.__new__(cls)
ob.__dict__.update(layer.__dict__)
return ob

def mock(self):
def mock(self) -> tuple[AwkwardBlockwiseLayer, Any | None]:
layer = copy.copy(self)
nb = layer.numblocks
layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()}
Expand Down Expand Up @@ -205,26 +206,77 @@ def project_columns(self, columns: list[str]) -> AwkwardInputLayer:


class AwkwardMaterializedLayer(MaterializedLayer):
def __init__(self, mapping, previous_layer_name, **kwargs):
self.prev_name = previous_layer_name
def __init__(
self,
mapping: dict,
*,
previous_layer_names: list[str],
fn: Callable | None = None,
**kwargs: Any,
):
self.previous_layer_names: list[str] = previous_layer_names
self.fn = fn
super().__init__(mapping, **kwargs)

def mock(self):
def mock(self) -> tuple[MaterializedLayer, Any | None]:
mapping = self.mapping.copy()
if not mapping:
# no partitions at all
return self, None
name = next(iter(mapping))[0]

if (name, 0) in mapping:
task = mapping[(name, 0)]
task = tuple(
(self.prev_name, 0)
if isinstance(v, tuple) and len(v) == 2 and v[0] == self.prev_name
else v
for v in task
)
# one previous layer name
#
# this case is used for mocking repartition or slicing where
# we maybe have multiple partitions that need to be included
# in a task.
if len(self.previous_layer_names) == 1:
prev_name: str = self.previous_layer_names[0]
if (name, 0) in mapping:
task = mapping[(name, 0)]
task = tuple(
(prev_name, 0)
if isinstance(v, tuple) and len(v) == 2 and v[0] == prev_name
else v
for v in task
)

# when using Array.partitions we need to mock that we
# just want the first partition.
if len(task) == 2 and task[1] > 0:
task = (task[0], 0)
return MaterializedLayer({(name, 0): task}), None
return self, None

# more than one previous_layer_names
#
# this case is needed for dak.concatenate on axis=0; we need
# the first partition of _each_ of the previous layer names!
else:
if self.fn is None:
raise ValueError(
"For multiple previous layers the fn argument cannot be None."
)
name0s = tuple((name, 0) for name in self.previous_layer_names)
task = (self.fn, *name0s)
return MaterializedLayer({(name, 0): task}), None

# failed to cull during column opt
return self, None


class AwkwardTreeReductionLayer(DataFrameTreeReduction):
def mock(self) -> tuple[AwkwardTreeReductionLayer, Any | None]:
return (
AwkwardTreeReductionLayer(
name=self.name,
name_input=self.name_input,
npartitions_input=1,
concat_func=self.concat_func,
tree_node_func=self.tree_node_func,
finalize_func=self.finalize_func,
split_every=self.split_every,
tree_node_name=self.tree_node_name,
),
None,
)
Loading

0 comments on commit 5aa5d01

Please sign in to comment.