Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A NB draft showing an example of LSDB pipeline #18

Merged
3 commits merged into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions docs/gettingstarted/best_practices.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Like Dask, Nested-Dask is focused towards working with large amounts of data. In particular, the threshold where this really will matter is when the amount of data exceeds the available memory of your machine/system. In such cases, Nested-Dask provides built-in tooling for working with these datasets and is recommended over using Nested-Pandas. These tools encompassing (but not limited to): \n",
"Like Dask, Nested-Dask is focused towards working with large amounts of data. In particular, the threshold where this really will matter is when the amount of data exceeds the available memory of your machine/system and/or if parallel computing is needed. In such cases, Nested-Dask provides built-in tooling for working with these datasets and is recommended over using Nested-Pandas. These tools encompassing (but not limited to): \n",
"\n",
"* **lazy computation**: enabling construction of workflows with more control over when computation actually begins\n",
"\n",
"* **partitioning**: breaking data up into smaller partitions that can fit into memory, enabling work on each chunk while keeping the overall memory footprint smaller than the full dataset size\n",
"\n",
"* **progress tracking**: The [Dask Dashboard](https://docs.dask.org/en/latest/dashboard.html) can be used to track the progress of complex workflows, assess memory usage, find bottlenecks, etc.\n",
"\n",
"* **parallel processing**: Dask workers are able to work in parallel on the partitions of a dataset."
"* **parallel processing**: Dask workers are able to work in parallel on the partitions of a dataset, both on a local machine and on a distributed cluster."
]
},
{
Expand All @@ -47,7 +47,7 @@
"metadata": {},
"outputs": [],
"source": [
"# Setting up a Dask client\n",
"# Setting up a Dask client, which would apply parallel processing\n",
"from dask.distributed import Client\n",
"\n",
"client = Client()\n",
Expand All @@ -65,7 +65,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"By contrast, when working with smaller datasets able to fit into memory it's often better to work directly with Nested-Pandas. This is particularly relevant for workflows that start with large amounts of data and filter down to a small dataset. By the nature of lazy computation, these filtering operations are not automatically applied to the dataset and therefore you're still working effectively at scale. Let's walk through an example where we load a \"large\" dataset, in this case it will fit into memory but let's imagine that it is larger than memory."
"By contrast, when working with smaller datasets able to fit into memory it's often better to work directly with Nested-Pandas. This is particularly relevant for workflows that start with large amounts of data and filter down to a small dataset and do not require computationally heavy processing of this small dataset. By the nature of lazy computation, these filtering operations are not automatically applied to the dataset, and therefore you're still working effectively at scale. Let's walk through an example where we load a \"large\" dataset, in this case it will fit into memory but let's imagine that it is larger than memory."
]
},
{
Expand Down Expand Up @@ -100,7 +100,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"When `compute()` is called above, the Dask task graph is executed. However, the ndf object above is still a lazy Dask object meaning that any subsequent work will still need to apply this query work all over again."
"When `compute()` is called above, the Dask task graph is executed and the query is being run. However, the ndf object above is still a lazy Dask object meaning that any subsequent `.compute()`-like method (e.g. `.head()` or `.to_parquet()`) will still need to apply this query work all over again."
]
},
{
Expand All @@ -115,8 +115,11 @@
"# The result will be a series with float values\n",
"meta = pd.Series(name=\"mean\", dtype=float)\n",
"\n",
"# Dask has to reapply the query here\n",
"ndf.reduce(np.mean, \"nested.flux\", meta=meta).compute()"
"# Apply a mean operation on the \"nested.flux\" column\n",
"mean_flux = ndf.reduce(np.mean, \"nested.flux\", meta=meta)\n",
"\n",
"# Dask has to reapply the query over `ndf` here, then apply the mean operation\n",
"mean_flux.compute()"
]
},
{
Expand All @@ -140,6 +143,16 @@
"isinstance(nf, npd.NestedFrame)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Now we can apply the mean operation directly to the nested_pandas.NestedFrame\n",
"nf.reduce(np.mean, \"nested.flux\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
3 changes: 0 additions & 3 deletions docs/gettingstarted/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,3 @@ Download code and install dependencies in a conda environment. Run unit tests at
git clone https://github.com/lincc-frameworks/nested-dask.git
cd nested-dask/
bash ./.setup_dev.sh

pip install pytest
pytest
7 changes: 1 addition & 6 deletions docs/gettingstarted/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@ development version of nested-dask, you should instead build 'nested-dask' from
git clone https://github.com/lincc-frameworks/nested-dask.git
cd nested-dask
pip install .
pip install .[dev] # it may be necessary to use `pip install .'[dev]'` (with single quotes) depending on your machine.
pip install '.[dev]'

The ``pip install .[dev]`` command is optional, and installs dependencies needed to run the unit tests and build
the documentation. The latest source version of nested-dask may be less stable than a release, and so we recommend
running the unit test suite to verify that your local install is performing as expected.

.. code-block:: bash

pip install pytest
pytest
4 changes: 2 additions & 2 deletions docs/gettingstarted/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"With a valid Python environment, nested-dask and it's dependencies are easy to install using the `pip` package manager. The following command can be used to install it:"
"With a valid Python environment, nested-dask and its dependencies are easy to install using the `pip` package manager. The following command can be used to install it:"
]
},
{
Expand Down Expand Up @@ -54,7 +54,7 @@
"* `npartitions=1` indicates how many partitions the dataset has been split into.\n",
"* The `0` and `9` tell us the \"divisions\" of the partitions. When the dataset is sorted by the index, these divisions are ranges to show which index values reside in each partition.\n",
"\n",
"We can signal to Dask that we'd like to actually view the data by using `compute`."
"We can signal to Dask that we'd like to actually obtain the data as `nested_pandas.NestedFrame` by using `compute`."
]
},
{
Expand Down
236 changes: 236 additions & 0 deletions docs/tutorials/work_with_lsdb.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
{
Copy link
Collaborator

@dougbrn dougbrn May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line #22.    nested_ddf

Are you getting that this is actually a Dask-Nested NestedFrame object? On my end it seems like this is stuck as a Dask DataFrame. Interestingly from_dask_dataframe works when loading the data directly using dask. Investigating...

import dask.dataframe as dd


test_dd = dd.read_parquet(f"{catalogs_dir}/ztf_object",
    columns=["ra", "dec", "ps1_objid"],)


type(NestedFrame.from_dask_dataframe(test_dd)) #NestedFrame

Reply via ReviewNB

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be an issue with LSDB using legacy frames, vs dask natively using dask-expr dataframes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

from dask_expr import from_legacy_dataframe
object_ndf = NestedFrame.from_dask_dataframe(from_legacy_dataframe(lsdb_object._ddf))
type(object_ndf) #nested_dask.core.NestedFrame

Copy link
Collaborator

@dougbrn dougbrn May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line #4.    %pip install aiohttp lsdb

Just a note, I'm suggesting we add dask-expr as a dependency to this workflow, but it's already a requirement of the package so it's not needed here


Reply via ReviewNB

Copy link
Collaborator

@dougbrn dougbrn May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below comment on from_legacy_dataframe.

Do we need to actually do the LSDB join for this use case? Would it be better to just do the join at the NestedFrame level:

from dask_expr import from_legacy_dataframe
nf_object = NestedFrame.from_dask_dataframe(from_legacy_dataframe(lsdb_object._ddf))
nf_source = NestedFrame.from_dask_dataframe(from_legacy_dataframe(lsdb_source._ddf))


nf_joined = nf_object.add_nested(nf_source, "ztf_source", how="left")

Reply via ReviewNB

Copy link
Collaborator

@dougbrn dougbrn May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were you planning on adding a bit of analysis to this? Maybe just a query and a function run would be helpful


Reply via ReviewNB

"cells": [
{
"cell_type": "markdown",
"id": "6171e5bbd47ce869",
"metadata": {},
"source": [
"# Load large catalog data from the LSDB\n",
"\n",
"Here we load a small part of ZTF DR14 stored as HiPSCat catalog using the [LSDB](https://lsdb.readthedocs.io/)."
]
},
{
"cell_type": "markdown",
"id": "c055a44b8ce3b34",
"metadata": {},
"source": [
"## Install LSDB and its dependencies and import the necessary modules\n",
"\n",
"We also need `aiohttp`, which is an optional LSDB's dependency, needed to access the catalog data from the web."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "af1710055600582",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:00.759441Z",
"start_time": "2024-05-24T12:53:58.854875Z"
}
},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"# Comment the following line to skip LSDB installation\n",
"%pip install aiohttp lsdb"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e4d03aa76aeeb1c0",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:03.834087Z",
"start_time": "2024-05-24T12:54:00.761330Z"
}
},
"outputs": [],
"source": [
"import nested_pandas as npd\n",
"from lsdb import read_hipscat\n",
"from nested_dask import NestedFrame\n",
"from nested_pandas.series.packer import pack"
]
},
{
"cell_type": "markdown",
"id": "e169686259687cb2",
"metadata": {},
"source": [
"## Load ZTF DR14\n",
"For the demonstration purposes we use a light version of the ZTF DR14 catalog distributed by LINCC Frameworks, a half-degree circle around RA=180, Dec=10.\n",
"We load the data from HTTPS as two LSDB catalogs: objects (metadata catalog) and source (light curve catalog)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a403e00e2fd8d081",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:06.745405Z",
"start_time": "2024-05-24T12:54:03.834904Z"
}
},
"outputs": [],
"source": [
"catalogs_dir = \"https://epyc.astro.washington.edu/~lincc-frameworks/half_degree_surveys/ztf/\"\n",
"\n",
"lsdb_object = read_hipscat(\n",
" f\"{catalogs_dir}/ztf_object\",\n",
" columns=[\"ra\", \"dec\", \"ps1_objid\"],\n",
")\n",
"lsdb_source = read_hipscat(\n",
" f\"{catalogs_dir}/ztf_source\",\n",
" columns=[\"mjd\", \"mag\", \"magerr\", \"band\", \"ps1_objid\", \"catflags\"],\n",
")"
]
},
{
"cell_type": "markdown",
"id": "4ed4201f2c59f542",
"metadata": {},
"source": [
"We need to merge these two catalogs to get the light curve data.\n",
"It is done with LSDB's `.join()` method which would give us a new catalog with all the columns from both catalogs. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b9b57bced7f810c0",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:06.770931Z",
"start_time": "2024-05-24T12:54:06.746786Z"
}
},
"outputs": [],
"source": [
"# We can ignore warning here - for this particular case we don't need margin cache\n",
"lsdb_joined = lsdb_object.join(\n",
" lsdb_source,\n",
" left_on=\"ps1_objid\",\n",
" right_on=\"ps1_objid\",\n",
" suffixes=(\"\", \"\"),\n",
")\n",
"joined_ddf = lsdb_joined._ddf\n",
"joined_ddf"
]
},
{
"cell_type": "markdown",
"id": "8ac9c6ceaf6bc3d2",
"metadata": {},
"source": [
"## Convert LSDB joined catalog to `nested_dask.NestedFrame`"
]
},
{
"cell_type": "markdown",
"id": "347f97583a3c1ba4",
"metadata": {},
"source": [
"First, we plan the computation to convert the joined Dask DataFrame to a NestedFrame."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9522ce0977ff9fdf",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:06.789983Z",
"start_time": "2024-05-24T12:54:06.772721Z"
}
},
"outputs": [],
"source": [
"def convert_to_nested_frame(df: pd.DataFrame, nested_columns: list[str]):\n",
" other_columns = [col for col in df.columns if col not in nested_columns]\n",
"\n",
" # Since object rows are repeated, we just drop duplicates\n",
" object_df = df[other_columns].groupby(level=0).first()\n",
" nested_frame = npd.NestedFrame(object_df)\n",
"\n",
" source_df = df[nested_columns]\n",
" # lc is for light curve\n",
" # https://github.com/lincc-frameworks/nested-pandas/issues/88\n",
" # nested_frame.add_nested(source_df, 'lc')\n",
" nested_frame[\"lc\"] = pack(source_df, name=\"lc\")\n",
"\n",
" return nested_frame\n",
"\n",
"\n",
"ddf = joined_ddf.map_partitions(\n",
" lambda df: convert_to_nested_frame(df, nested_columns=lsdb_source.columns),\n",
" meta=convert_to_nested_frame(joined_ddf._meta, nested_columns=lsdb_source.columns),\n",
")\n",
"nested_ddf = NestedFrame.from_dask_dataframe(ddf)\n",
"nested_ddf"
]
},
{
"cell_type": "markdown",
"id": "de6820724bcd4781",
"metadata": {},
"source": [
"Second, we compute the NestedFrame."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a82bd831fc1f6f92",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:19.282406Z",
"start_time": "2024-05-24T12:54:06.790699Z"
}
},
"outputs": [],
"source": [
"ndf = nested_ddf.compute()\n",
"ndf"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9c82f593efca9d30",
"metadata": {
"ExecuteTime": {
"end_time": "2024-05-24T12:54:19.284710Z",
"start_time": "2024-05-24T12:54:19.283179Z"
}
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}