Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Exploration]: How Dask works and how it is utilized in xarray #352

Closed
1 task
tomvothecoder opened this issue Sep 27, 2022 · 3 comments
Closed
1 task
Assignees
Labels
type: docs Updates to documentation

Comments

@tomvothecoder
Copy link
Collaborator

tomvothecoder commented Sep 27, 2022

  • Explore how Dask works and how it is utilized in xarray.
    • How do chunks communicate with one another?
    • Does chunking affect edge cases (e.g., DJF seasons that are sliced by chunk)?
    • When might we recommend to chunk with xcdat?

Experiment with bounds and edges:

  • Core dims vs. non-core dims, determines chunking

Performance factors:

  • Given a large enough dataset, can process and may see a speed up
  • Local vs. network
  • Dask cluster setup correctly (10 GBps network)
  • Multiple threads/processes on a single machine
@tomvothecoder tomvothecoder added type: docs Updates to documentation Priority: Medium labels Sep 27, 2022
@jasonb5
Copy link
Collaborator

jasonb5 commented Oct 10, 2022

Here's a small intro from what I can recall when I was working with Xarray and Dask a lot.

Most of my experience with these libraries came from working with ESGF Compute Service. This service would translate WPS requests into Xarray DAG and then execute on a Dask Cluster that was allocated using Dask Gateway. This service also tried to utilize Xarray's ability to read Zarr formatted datasets off of S3 stores to improve throughput for parallelized operations.

Here's a quick intro to Dask. Anything built with a dask array, bag, dataframe, delayed or future is turned into a task graph, the scheduler can optimize the graph and finally assign the tasks to workers. To answer the first question, the communication depends on the scheduler. There's either a single-machine or distributed scheduler. For single-machine you have single thread, multi-threaded or processes. Multi-threaded is pretty straight forward as it can use shared variables in it's thread pool, but processes actually uses cloudpickle to serial/deserialize messages/data passed between processes. The pattern of serialize/deserialize message/data is the same used when using distributed for local/remote clusters.

In my experience chunking is recommended when dealing with out-of-core operations. I remember losing performance with small datasets and chunking with a Local Cluster due to the communication overhead. Chunking works best when you have an independent variable e.g. if you're averaging over time you could chunk by lat, lon, lev or some combination. You can still benefit from chunking even if some of the tasks are not operating on an independent variable e.g. building large task graphs. An issue I ran into when working on the Compute service was using groupby functions which would cause all the data to load prematurely, I think there was a Dask/Xarry issue about this. Another time to utilize chunking is when operating with large task graphs where the same chunk of data has multiple operations performed on it.

@jasonb5
Copy link
Collaborator

jasonb5 commented Oct 26, 2022

Here are some related links.

@tomvothecoder
Copy link
Collaborator Author

tomvothecoder commented Oct 31, 2022

An issue I ran into when working on the Compute service was using groupby functions which would cause all the data to load prematurely, I think there was a Dask/Xarry issue about this.

It is good to know that groupby operations are potentially eager rather than lazy, since xcdat's temporal averaging APIs use groupby internally.

I found the related xarray issue: pydata/xarray#2852.
Comments from that issue:

It is very hard to make this sort of groupby lazy, because you are grouping over the variable label itself. Groupby uses a split-apply-combine paradigm to transform the data. The apply and combine steps can be lazy. But the split step cannot. Xarray uses the group variable to determine how to index the array, i.e. which items belong in which group. To do this, it needs to read the whole variable into memory.
-- pydata/xarray#2852 (comment)

The current design of GroupBy.apply() in xarray is entirely ignorant of dask: it simply uses a for loop over the grouped variable to built up a computation with high level array operations.

This makes operations that group over large keys stored in dask inefficient. This could be done efficiently (dask.dataframe does this, and might be worth trying in your case) but it's a more challenging distributed computing problem, and xarray's current data model would not know how large of a dimension to create for the returned ararys (doing this properly would require supporting arrays with unknown dimension sizes).
--pydata/xarray#2852 (comment)

Workarounds:

I think these comments suggest that xcdat's temporal averaging is either partially lazy or not lazy at all. More investigation needs to be done here to confirm.

Action Items:

  • Investigate xcdat's temporal averaging APIs to determine if lazy, partially lazy, or eager.
    • If partially lazy, which steps are lazy and which are eager?

In xarray, Dask arrays are not loaded into memory unexpectedly (an exception is raised instead). In xcdat, we load Dask arrays into memory in specific spots.

When you load data as a Dask array in an xarray data structure, almost all xarray operations will keep it as a Dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory. Converting a Dask array into memory generally requires an explicit conversion step. One notable exception is indexing operations: to enable label based indexing, xarray will automatically load coordinate labels into memory.
-- https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray

More investigation items

  • List all of the locations in the xcdat codebase where Dask arrays are explicitly loaded into memory
  • Investigate the performance implications of having to load these Dask arrays into memory
    • How large can these arrays be, typically?
    • After loading arrays into memory, do they stay in memory?

@xCDAT xCDAT locked and limited conversation to collaborators Nov 1, 2022
@tomvothecoder tomvothecoder converted this issue into discussion #376 Nov 1, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
type: docs Updates to documentation
Projects
None yet
Development

No branches or pull requests

3 participants