Skip to content

Commit

Permalink
Merge branch 'count_nested' of https://github.com/lincc-frameworks/da…
Browse files Browse the repository at this point in the history
…sk-nested into count_nested
  • Loading branch information
dougbrn committed Jul 16, 2024
2 parents 5cf6320 + d2e372e commit 23810a6
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 56 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
[![Read The Docs](https://img.shields.io/readthedocs/nested-dask)](https://nested-dask.readthedocs.io/)
[![Benchmarks](https://img.shields.io/github/actions/workflow/status/lincc-frameworks/nested-dask/asv-main.yml?label=benchmarks)](https://lincc-frameworks.github.io/nested-dask/)

A ![dask](https://www.dask.org/) extension of
![nested-pandas](https://nested-pandas.readthedocs.io/en/latest/).
A [dask](https://www.dask.org/) extension of
[nested-pandas](https://nested-pandas.readthedocs.io/en/latest/).

Nested-pandas is a pandas extension package that empowers efficient analysis
of nested associated datasets. This package wraps the majority of the
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def _generate_benchmark_data(add_nested=True):
layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index()

# Convert to Dask
base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5)
layer_nf = nd.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50)
base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=5)
layer_nf = nd.NestedFrame.from_pandas(layer_nf).repartition(npartitions=50)

# Return based on add_nested
if add_nested:
Expand Down
6 changes: 3 additions & 3 deletions docs/tutorials/loading_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"source": [
"## From Nested-Pandas\n",
"\n",
"Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_nested_pandas` class function."
"Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_pandas` class function."
]
},
{
Expand All @@ -48,7 +48,7 @@
"nf = nf.add_nested(nested, \"nested\")\n",
"\n",
"# Convert to Nested-Dask NestedFrame\n",
"nf = nd.NestedFrame.from_nested_pandas(nf)\n",
"nf = nd.NestedFrame.from_pandas(nf)\n",
"nf"
]
},
Expand Down Expand Up @@ -225,7 +225,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
"version": "3.10.11"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/work_with_lsdb.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
"version": "3.10.11"
}
},
"nbformat": 4,
Expand Down
6 changes: 5 additions & 1 deletion src/nested_dask/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ def make_meta_frame(x, index=None) -> npd.NestedFrame:
"""Create an empty NestedFrame to use as Dask's underlying object meta."""

dtypes = x.dtypes.to_dict()
result = npd.NestedFrame({key: pd.Series(dtype=d) for key, d in dtypes.items()})
index = index if index is not None else x.index
index = index[:0].copy()
result = npd.NestedFrame({key: pd.Series(dtype=d) for key, d in dtypes.items()}, index=index)
return result


@meta_nonempty.register(npd.NestedFrame)
def _nonempty_nestedframe(x, index=None) -> npd.NestedFrame:
"""Construct a new NestedFrame with the same underlying data."""
df = meta_nonempty_dataframe(x)
if index is not None:
df.index = index
return npd.NestedFrame(df)


Expand Down
121 changes: 116 additions & 5 deletions src/nested_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ def __getitem__(self, key):
return result

@classmethod
def from_nested_pandas(
def from_pandas(
cls,
data,
npartitions=None,
chunksize=None,
sort=True,
) -> NestedFrame:
"""Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas
NestedFrame.
NestedFrame or Pandas DataFrame.
Parameters
----------
data: `NestedFrame`
data: `NestedFrame` or `DataFrame`
Nested-Pandas NestedFrame containing the underlying data
npartitions: `int`, optional
The number of partitions of the index to create. Note that depending on
Expand All @@ -98,7 +98,7 @@ def from_nested_pandas(
return NestedFrame.from_dask_dataframe(result)

@classmethod
def from_dask_dataframe(cls, df) -> NestedFrame:
def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame:
"""Converts a Dask Dataframe to a Dask-Nested NestedFrame
Parameters
Expand All @@ -110,7 +110,118 @@ def from_dask_dataframe(cls, df) -> NestedFrame:
-------
`nested_dask.NestedFrame`
"""
return df.map_partitions(npd.NestedFrame)
return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy()))

@classmethod
def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True):
"""
Create Nested-Dask NestedFrames from many Dask Delayed objects.
Docstring is copied from `dask.dataframe.from_delayed`.
Parameters
----------
dfs :
A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either
of these objects, e.g. returned by ``client.submit``. These comprise the
individual partitions of the resulting dataframe.
If a single object is provided (not an iterable), then the resulting dataframe
will have only one partition.
meta:
An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of
the output. This metadata is necessary for many algorithms in dask dataframe
to work. For ease of use, some alternative inputs are also available. Instead of a
DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that
the order of the names should match the order of the columns). Instead of a series, a tuple of
(name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead
to unexpected results, so providing meta is recommended. For more information, see
dask.dataframe.utils.make_meta.
divisions :
Partition boundaries along the index.
For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
For string 'sorted' will compute the delayed values to find index
values. Assumes that the indexes are mutually sorted.
If None, then won't use index information
prefix :
Prefix to prepend to the keys.
verify_meta :
If True check that the partitions have consistent metadata, defaults to True.
"""
nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta)
return NestedFrame.from_dask_dataframe(nf)

@classmethod
def from_map(
cls,
func,
*iterables,
args=None,
meta=None,
divisions=None,
label=None,
enforce_metadata=True,
**kwargs,
):
"""
Create a DataFrame collection from a custom function map
WARNING: The ``from_map`` API is experimental, and stability is not
yet guaranteed. Use at your own risk!
Parameters
----------
func : callable
Function used to create each partition. If ``func`` satisfies the
``DataFrameIOFunction`` protocol, column projection will be enabled.
*iterables : Iterable objects
Iterable objects to map to each output partition. All iterables must
be the same length. This length determines the number of partitions
in the output collection (only one element of each iterable will
be passed to ``func`` for each partition).
args : list or tuple, optional
Positional arguments to broadcast to each output partition. Note
that these arguments will always be passed to ``func`` after the
``iterables`` positional arguments.
meta:
An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of
the output. This metadata is necessary for many algorithms in dask dataframe
to work. For ease of use, some alternative inputs are also available. Instead of a
DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that
the order of the names should match the order of the columns). Instead of a series, a tuple of
(name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead
to unexpected results, so providing meta is recommended. For more information, see
dask.dataframe.utils.make_meta.
divisions : tuple, str, optional
Partition boundaries along the index.
For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
For string 'sorted' will compute the delayed values to find index
values. Assumes that the indexes are mutually sorted.
If None, then won't use index information
label : str, optional
String to use as the function-name label in the output
collection-key names.
enforce_metadata : bool, default True
Whether to enforce at runtime that the structure of the DataFrame
produced by ``func`` actually matches the structure of ``meta``.
This will rename and reorder columns for each partition,
and will raise an error if this doesn't work,
but it won't raise if dtypes don't match.
**kwargs:
Key-word arguments to broadcast to each output partition. These
same arguments will be passed to ``func`` for every output partition.
"""
nf = dd.from_map(
func,
*iterables,
args=args,
meta=meta,
divisions=divisions,
label=label,
enforce_metadata=enforce_metadata,
**kwargs,
)
return NestedFrame.from_dask_dataframe(nf)

def compute(self, **kwargs):
"""Compute this Dask collection, returning the underlying dataframe or series."""
Expand Down
2 changes: 1 addition & 1 deletion src/nested_dask/datasets/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame:
base_nf = datasets.generate_data(n_base, n_layer, seed=seed)

# Convert to nested-dask
base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions)
base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=npartitions)

return base_nf

Expand Down
12 changes: 6 additions & 6 deletions tests/nested_dask/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def test_dataset():
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index()

base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10)
base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10)

return base_nd.add_nested(layer_nd, "nested")

Expand Down Expand Up @@ -53,8 +53,8 @@ def test_dataset_with_nans():
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index")

base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10)
base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10)

return base_nd.add_nested(layer_nd, "nested")

Expand All @@ -78,7 +78,7 @@ def test_dataset_no_add_nested():
}
layer_nf = npd.NestedFrame(data=layer_data).set_index("index")

base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10)
base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5)
layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10)

return (base_nd, layer_nd)
Loading

0 comments on commit 23810a6

Please sign in to comment.