diff --git a/README.md b/README.md index 54e1cfd..78c5907 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ [![Read The Docs](https://img.shields.io/readthedocs/nested-dask)](https://nested-dask.readthedocs.io/) [![Benchmarks](https://img.shields.io/github/actions/workflow/status/lincc-frameworks/nested-dask/asv-main.yml?label=benchmarks)](https://lincc-frameworks.github.io/nested-dask/) -A ![dask](https://www.dask.org/) extension of -![nested-pandas](https://nested-pandas.readthedocs.io/en/latest/). +A [dask](https://www.dask.org/) extension of +[nested-pandas](https://nested-pandas.readthedocs.io/en/latest/). Nested-pandas is a pandas extension package that empowers efficient analysis of nested associated datasets. This package wraps the majority of the diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 26fc7be..a220f98 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -31,8 +31,8 @@ def _generate_benchmark_data(add_nested=True): layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() # Convert to Dask - base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5) - layer_nf = nd.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50) + base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=5) + layer_nf = nd.NestedFrame.from_pandas(layer_nf).repartition(npartitions=50) # Return based on add_nested if add_nested: diff --git a/docs/tutorials/loading_data.ipynb b/docs/tutorials/loading_data.ipynb index ddd9843..242314a 100644 --- a/docs/tutorials/loading_data.ipynb +++ b/docs/tutorials/loading_data.ipynb @@ -28,7 +28,7 @@ "source": [ "## From Nested-Pandas\n", "\n", - "Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_nested_pandas` class function." + "Nested-Dask can load data from Nested-Pandas `NestedFrame` objects by using the `from_pandas` class function." ] }, { @@ -48,7 +48,7 @@ "nf = nf.add_nested(nested, \"nested\")\n", "\n", "# Convert to Nested-Dask NestedFrame\n", - "nf = nd.NestedFrame.from_nested_pandas(nf)\n", + "nf = nd.NestedFrame.from_pandas(nf)\n", "nf" ] }, @@ -225,7 +225,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/docs/tutorials/work_with_lsdb.ipynb b/docs/tutorials/work_with_lsdb.ipynb index f028496..c3fed88 100644 --- a/docs/tutorials/work_with_lsdb.ipynb +++ b/docs/tutorials/work_with_lsdb.ipynb @@ -439,7 +439,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", - "version": "2.7.6" + "version": "3.10.11" } }, "nbformat": 4, diff --git a/src/nested_dask/backends.py b/src/nested_dask/backends.py index bd589f2..041cabf 100644 --- a/src/nested_dask/backends.py +++ b/src/nested_dask/backends.py @@ -23,7 +23,9 @@ def make_meta_frame(x, index=None) -> npd.NestedFrame: """Create an empty NestedFrame to use as Dask's underlying object meta.""" dtypes = x.dtypes.to_dict() - result = npd.NestedFrame({key: pd.Series(dtype=d) for key, d in dtypes.items()}) + index = index if index is not None else x.index + index = index[:0].copy() + result = npd.NestedFrame({key: pd.Series(dtype=d) for key, d in dtypes.items()}, index=index) return result @@ -31,6 +33,8 @@ def make_meta_frame(x, index=None) -> npd.NestedFrame: def _nonempty_nestedframe(x, index=None) -> npd.NestedFrame: """Construct a new NestedFrame with the same underlying data.""" df = meta_nonempty_dataframe(x) + if index is not None: + df.index = index return npd.NestedFrame(df) diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index dc4da09..13f418e 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -64,7 +64,7 @@ def __getitem__(self, key): return result @classmethod - def from_nested_pandas( + def from_pandas( cls, data, npartitions=None, @@ -72,11 +72,11 @@ def from_nested_pandas( sort=True, ) -> NestedFrame: """Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas - NestedFrame. + NestedFrame or Pandas DataFrame. Parameters ---------- - data: `NestedFrame` + data: `NestedFrame` or `DataFrame` Nested-Pandas NestedFrame containing the underlying data npartitions: `int`, optional The number of partitions of the index to create. Note that depending on @@ -98,7 +98,7 @@ def from_nested_pandas( return NestedFrame.from_dask_dataframe(result) @classmethod - def from_dask_dataframe(cls, df) -> NestedFrame: + def from_dask_dataframe(cls, df: dd.DataFrame) -> NestedFrame: """Converts a Dask Dataframe to a Dask-Nested NestedFrame Parameters @@ -110,7 +110,118 @@ def from_dask_dataframe(cls, df) -> NestedFrame: ------- `nested_dask.NestedFrame` """ - return df.map_partitions(npd.NestedFrame) + return df.map_partitions(npd.NestedFrame, meta=npd.NestedFrame(df._meta.copy())) + + @classmethod + def from_delayed(cls, dfs, meta=None, divisions=None, prefix="from-delayed", verify_meta=True): + """ + Create Nested-Dask NestedFrames from many Dask Delayed objects. + + Docstring is copied from `dask.dataframe.from_delayed`. + + Parameters + ---------- + dfs : + A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either + of these objects, e.g. returned by ``client.submit``. These comprise the + individual partitions of the resulting dataframe. + If a single object is provided (not an iterable), then the resulting dataframe + will have only one partition. + meta: + An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of + the output. This metadata is necessary for many algorithms in dask dataframe + to work. For ease of use, some alternative inputs are also available. Instead of a + DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that + the order of the names should match the order of the columns). Instead of a series, a tuple of + (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead + to unexpected results, so providing meta is recommended. For more information, see + dask.dataframe.utils.make_meta. + divisions : + Partition boundaries along the index. + For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions + For string 'sorted' will compute the delayed values to find index + values. Assumes that the indexes are mutually sorted. + If None, then won't use index information + prefix : + Prefix to prepend to the keys. + verify_meta : + If True check that the partitions have consistent metadata, defaults to True. + + """ + nf = dd.from_delayed(dfs=dfs, meta=meta, divisions=divisions, prefix=prefix, verify_meta=verify_meta) + return NestedFrame.from_dask_dataframe(nf) + + @classmethod + def from_map( + cls, + func, + *iterables, + args=None, + meta=None, + divisions=None, + label=None, + enforce_metadata=True, + **kwargs, + ): + """ + Create a DataFrame collection from a custom function map + + WARNING: The ``from_map`` API is experimental, and stability is not + yet guaranteed. Use at your own risk! + + Parameters + ---------- + func : callable + Function used to create each partition. If ``func`` satisfies the + ``DataFrameIOFunction`` protocol, column projection will be enabled. + *iterables : Iterable objects + Iterable objects to map to each output partition. All iterables must + be the same length. This length determines the number of partitions + in the output collection (only one element of each iterable will + be passed to ``func`` for each partition). + args : list or tuple, optional + Positional arguments to broadcast to each output partition. Note + that these arguments will always be passed to ``func`` after the + ``iterables`` positional arguments. + meta: + An empty NestedFrame, pd.DataFrame, or pd.Series that matches the dtypes and column names of + the output. This metadata is necessary for many algorithms in dask dataframe + to work. For ease of use, some alternative inputs are also available. Instead of a + DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that + the order of the names should match the order of the columns). Instead of a series, a tuple of + (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead + to unexpected results, so providing meta is recommended. For more information, see + dask.dataframe.utils.make_meta. + divisions : tuple, str, optional + Partition boundaries along the index. + For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions + For string 'sorted' will compute the delayed values to find index + values. Assumes that the indexes are mutually sorted. + If None, then won't use index information + label : str, optional + String to use as the function-name label in the output + collection-key names. + enforce_metadata : bool, default True + Whether to enforce at runtime that the structure of the DataFrame + produced by ``func`` actually matches the structure of ``meta``. + This will rename and reorder columns for each partition, + and will raise an error if this doesn't work, + but it won't raise if dtypes don't match. + **kwargs: + Key-word arguments to broadcast to each output partition. These + same arguments will be passed to ``func`` for every output partition. + """ + nf = dd.from_map( + func, + *iterables, + args=args, + meta=meta, + divisions=divisions, + label=label, + enforce_metadata=enforce_metadata, + **kwargs, + ) + return NestedFrame.from_dask_dataframe(nf) def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" diff --git a/src/nested_dask/datasets/generation.py b/src/nested_dask/datasets/generation.py index 6c283f3..70873d7 100644 --- a/src/nested_dask/datasets/generation.py +++ b/src/nested_dask/datasets/generation.py @@ -37,7 +37,7 @@ def generate_data(n_base, n_layer, npartitions=1, seed=None) -> nd.NestedFrame: base_nf = datasets.generate_data(n_base, n_layer, seed=seed) # Convert to nested-dask - base_nf = nd.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions) + base_nf = nd.NestedFrame.from_pandas(base_nf).repartition(npartitions=npartitions) return base_nf diff --git a/tests/nested_dask/conftest.py b/tests/nested_dask/conftest.py index 778553b..bdcfad7 100644 --- a/tests/nested_dask/conftest.py +++ b/tests/nested_dask/conftest.py @@ -23,8 +23,8 @@ def test_dataset(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return base_nd.add_nested(layer_nd, "nested") @@ -53,8 +53,8 @@ def test_dataset_with_nans(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return base_nd.add_nested(layer_nd, "nested") @@ -78,7 +78,7 @@ def test_dataset_no_add_nested(): } layer_nf = npd.NestedFrame(data=layer_data).set_index("index") - base_nd = nd.NestedFrame.from_nested_pandas(base_nf, npartitions=5) - layer_nd = nd.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + base_nd = nd.NestedFrame.from_pandas(base_nf, npartitions=5) + layer_nd = nd.NestedFrame.from_pandas(layer_nf, npartitions=10) return (base_nd, layer_nd) diff --git a/tests/nested_dask/test_accessor.py b/tests/nested_dask/test_accessor.py index 8cc7603..2d89adf 100644 --- a/tests/nested_dask/test_accessor.py +++ b/tests/nested_dask/test_accessor.py @@ -1,3 +1,4 @@ +import nested_dask as nd import pandas as pd import pyarrow as pa import pytest @@ -19,75 +20,90 @@ def test_fields(test_dataset): assert test_dataset.nested.nest.fields == ["t", "flux", "band"] -def test_to_flat(test_dataset): +def test_to_flat(): """test the to_flat function""" - flat_ztf = test_dataset.nested.nest.to_flat() + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + + flat_nf = nf.nested.nest.to_flat() # check dtypes - assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) - assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) - assert flat_ztf.dtypes["band"] == pd.ArrowDtype(pa.large_string()) + assert flat_nf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["band"] == pd.ArrowDtype(pa.string()) # Make sure we retain all rows - assert len(flat_ztf.loc[1]) == 500 + assert len(flat_nf.loc[1]) == 100 + + one_row = flat_nf.compute().iloc[0] - one_row = flat_ztf.loc[1].compute().iloc[1] - assert pytest.approx(one_row["t"], 0.01) == 5.4584 - assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + assert pytest.approx(one_row["t"], 0.01) == 16.0149 + assert pytest.approx(one_row["flux"], 0.01) == 51.2061 assert one_row["band"] == "r" -def test_to_flat_with_fields(test_dataset): +def test_to_flat_with_fields(): """test the to_flat function""" - flat_ztf = test_dataset.nested.nest.to_flat(fields=["t", "flux"]) + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + + flat_nf = nf.nested.nest.to_flat(fields=["t", "flux"]) + + assert "band" not in flat_nf.columns # check dtypes - assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) - assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_nf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) # Make sure we retain all rows - assert len(flat_ztf.loc[1]) == 500 + assert len(flat_nf.loc[1]) == 100 - one_row = flat_ztf.loc[1].compute().iloc[1] - assert pytest.approx(one_row["t"], 0.01) == 5.4584 - assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + one_row = flat_nf.compute().iloc[0] + assert pytest.approx(one_row["t"], 0.01) == 16.0149 + assert pytest.approx(one_row["flux"], 0.01) == 51.2061 -def test_to_lists(test_dataset): + +def test_to_lists(): """test the to_lists function""" - list_ztf = test_dataset.nested.nest.to_lists() + + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + list_nf = nf.nested.nest.to_lists() # check dtypes - assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) - assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) - assert list_ztf.dtypes["band"] == pd.ArrowDtype(pa.list_(pa.large_string())) + assert list_nf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["band"] == pd.ArrowDtype(pa.list_(pa.string())) # Make sure we have a single row for an id - assert len(list_ztf.loc[1]) == 1 + assert len(list_nf.loc[1]) == 1 # Make sure we retain all rows -- double loc for speed and pandas get_item - assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + assert len(list_nf.loc[1].compute().loc[1]["t"]) == 100 + one_row = list_nf.compute().iloc[1] # spot-check values - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 - assert list_ztf.loc[1].compute().loc[1]["band"][0] == "g" + assert pytest.approx(one_row["t"][0], 0.01) == 19.3652 + assert pytest.approx(one_row["flux"][0], 0.01) == 61.7461 + assert one_row["band"][0] == "g" -def test_to_lists_with_fields(test_dataset): +def test_to_lists_with_fields(): """test the to_lists function""" - list_ztf = test_dataset.nested.nest.to_lists(fields=["t", "flux"]) + nf = nd.datasets.generate_data(10, 100, npartitions=2, seed=1) + list_nf = nf.nested.nest.to_lists(fields=["t", "flux"]) + + assert "band" not in list_nf.columns # check dtypes - assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) - assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_nf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) # Make sure we have a single row for an id - assert len(list_ztf.loc[1]) == 1 + assert len(list_nf.loc[1]) == 1 # Make sure we retain all rows -- double loc for speed and pandas get_item - assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + assert len(list_nf.loc[1].compute().loc[1]["t"]) == 100 + one_row = list_nf.compute().iloc[1] # spot-check values - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 - assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 + assert pytest.approx(one_row["t"][0], 0.01) == 19.3652 + assert pytest.approx(one_row["flux"][0], 0.01) == 61.7461 diff --git a/tests/nested_dask/test_datasets.py b/tests/nested_dask/test_datasets.py index 26f9ad7..7a2e66a 100644 --- a/tests/nested_dask/test_datasets.py +++ b/tests/nested_dask/test_datasets.py @@ -1,4 +1,5 @@ import nested_dask as nd +import pytest def test_generate_data(): @@ -18,3 +19,8 @@ def test_generate_data(): # test the length assert len(generate_1) == 10 assert len(generate_1.nested.nest.to_flat()) == 1000 + + # test seed stability + assert pytest.approx(generate_1.compute().loc[0]["a"], 0.1) == 0.417 + assert pytest.approx(generate_1.compute().loc[0]["b"], 0.1) == 0.838 + assert pytest.approx(generate_1.nested.nest.to_flat().compute().iloc[0]["t"], 0.1) == 16.015 diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index e4b9295..d110ecd 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -1,4 +1,6 @@ +import dask.dataframe as dd import nested_dask as nd +import nested_pandas as npd import numpy as np import pandas as pd import pytest @@ -12,6 +14,18 @@ def test_nestedframe_construction(test_dataset): assert isinstance(test_dataset["nested"].dtype, NestedDtype) +def test_nestedframe_from_dask_keeps_index_name(): + """test index name is set in from_dask_dataframe""" + index_name = "test" + a = pd.DataFrame({"a": [1, 2, 3]}) + a.index.name = index_name + ddf = dd.from_pandas(a) + assert ddf.index.name == index_name + ndf = nd.NestedFrame.from_dask_dataframe(ddf) + assert isinstance(ndf, nd.NestedFrame) + assert ndf.index.name == index_name + + def test_all_columns(test_dataset): """all_columns property test""" all_cols = test_dataset.all_columns @@ -173,3 +187,53 @@ def test_from_epyc(): # just make sure the result was successfully computed assert len(result) == 9817 + + +@pytest.mark.parametrize("pkg", ["pandas", "nested-pandas"]) +@pytest.mark.parametrize("with_nested", [True, False]) +def test_from_pandas(pkg, with_nested): + """Test that from_pandas returns a NestedFrame""" + + if pkg == "pandas": + df = pd.DataFrame({"a": [1, 2, 3]}, index=[1, 2, 3]) + elif pkg == "nested-pandas": + df = npd.NestedFrame({"a": [1, 2, 3]}, index=[1, 2, 3]) + if with_nested: + nested = npd.NestedFrame({"b": [5, 10, 15, 20, 25, 30]}, index=[1, 1, 2, 2, 3, 3]) + df = df.add_nested(nested, "nested") + + ndf = nd.NestedFrame.from_pandas(df) + assert isinstance(ndf, nd.NestedFrame) + + +@pytest.mark.parametrize("with_nested", [True, False]) +def test_from_delayed(with_nested): + """Test that from_delayed returns a NestedFrame""" + + nf = nd.datasets.generate_data(10, 10) + if not with_nested: + nf = nf.drop("nested", axis=1) + + delayed = nf.to_delayed() + + ndf = nd.NestedFrame.from_delayed(dfs=delayed, meta=nf._meta) + assert isinstance(ndf, nd.NestedFrame) + + +def test_from_map(test_dataset, tmp_path): + """Test that from_map returns a NestedFrame""" + + # Setup a temporary directory for files + test_save_path = tmp_path / "test_dataset" + + # Save Base to Parquet + test_dataset[["a", "b"]].to_parquet(test_save_path, write_index=True) + + # Load from_map + paths = [ + tmp_path / "test_dataset" / "0.parquet", + tmp_path / "test_dataset" / "1.parquet", + tmp_path / "test_dataset" / "2.parquet", + ] + ndf = nd.NestedFrame.from_map(nd.read_parquet, paths, meta=test_dataset[["a", "b"]]._meta) + assert isinstance(ndf, nd.NestedFrame)