Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask performance on xhistogram #8

Open
shanicetbailey opened this issue Apr 19, 2021 · 19 comments
Open

Dask performance on xhistogram #8

shanicetbailey opened this issue Apr 19, 2021 · 19 comments

Comments

@shanicetbailey
Copy link

shanicetbailey commented Apr 19, 2021

Hi everone,

I'm having trouble loading my variables when I call histogram() from xhistogram and am not sure what is going on. I'm running with a cluster and I'm noticing my Dask dashboard is not performing well - especially when I use the dims argument in histogram(). I've attached a simple workflow that'll replicate this issue and you will see from the dask performance reports (the two links below) that the variable, theta_dist_with_dims, is performing poorly compared to when dims is not given in the theta_dist_nodims variable.

I need to specify the dimensions argument for my variables, so I would appreciate any help in figuring out the issue here and a solution/workaround to making dask perform better with xhistogram, thanks!

from dask_gateway import GatewayCluster
cluster = GatewayCluster()
cluster.scale(30)
client = cluster.get_client()

import xarray as xr
from xhistogram.xarray import histogram
import gcsfs
import numpy as np

url_ocean = 'gs://pangeo-forge-us-central1/pangeo-forge/soda/soda3.4.2_5dy_ocean_or'
fs = gcsfs.GCSFileSystem()
ocean = xr.open_zarr(fs.get_mapper(url_ocean), consolidated=True, decode_times=False)

delta_theta = 0.1
theta_bins = np.arange(-2, 35, delta_theta)

theta_dist_nodims = histogram(ocean.temp, bins=[theta_bins])
theta_dist_nodims.load()

theta_dist_with_dims = histogram(ocean.temp, bins=[theta_bins], dim=['xt_ocean', 'yt_ocean', 'st_ocean'])
theta_dist_with_dims.load()

theta_dist_nodims-dask-report

theta_dist_with_dims-dask-report

To get dask reports when running load():

from dask.distributed import performance_report

with performance_report(filename="theta_dist_nodims-dask-report.html"):
    theta_dist_nodims.load()
@rabernat
Copy link
Contributor

Thanks Shanice for the detailed and actionable bug report!

@jbusecke
Copy link
Collaborator

I am pretty sure this is the same issue I am having for a project (I mentioned this in the last meeting). Thanks for the issue @stb2145. Very curious on how to improve this.

@jrbourbeau
Copy link

Yeah, thanks for the nice example @stb2145 -- that helps a lot

Which Pangeo Cloud JupyterHub are you running on (e.g. the normal gcp deployment vs. https://staging.us-central1-b.gcp.pangeo.io)? Also, what worker instance types are you running on (e.g. small, medium, large)?

Here are the performance reports I got when running your example on the staging deployment with medium instances:

I do see some difference in performance (~180s vs. ~210s for without dim vs with dim, respectively). However, since the with vs. without dim cases involve computing different histograms, I wouldn't be surprised if there is some difference in performance we should expect to see.

That said, I don't see the ~3x time difference which you observed. In particular, your theta_dist_with_dims-dask-report performance report seems to have some extra white space and large dependency transfers which I didn't run into. Do you consistently see a large performance discrepancy here, or is it sporadic?

@shanicetbailey
Copy link
Author

  • I am using staging pangeo
  • Worker instance is Large
  • I do consistently see a performance issue. From my last meeting with @rabernat, we suspect it may be how xhistogram reshapes the data to make it not optimal for dask tasks...

I can run the example again and send you updated dask reports. I'll try to also send dask reports of the actual data I'm working on so you can see the real performance issue - usually the workers die and give up on the .load() cell.

@jbusecke
Copy link
Collaborator

I can also try to submit a similar use case with a different data source as comparison. Will have to create those nice performance reports tomorrow. Thanks for working on this

@rabernat
Copy link
Contributor

So I am 90% sure this is an xhistogram bug and the problem is in this code:

https://github.com/xgcm/xhistogram/blob/2681aee6fe04e7656c458f32277f87e76653b6e8/xhistogram/core.py#L238-L254

I believe it can be fixed by changing the xhistogram implementation to use map_blocks and avoiding dask.array.reshape.

@shanicetbailey
Copy link
Author

Following up on dask reports for the actual data I'm working on, I can't provide info on the performance because I always have to eventually stop the kernel since the workers seem to freeze and not finish the tasks.

I reproduced the dask reports for the theta_dims and theta_no_dims and the discrepancy was similar to yours @jrbourbeau.

It is worth investigating further to see if changing xhistogram implementation will do the trick, as @rabernat already pointed out ☝️.

@jbusecke
Copy link
Collaborator

I will try to help out over here today. My idea was to first run a very simple test of the following.

Use xarray.map_blocks to apply histogram to chunks. This should use xhistogram on numpy arrays, and I hope to bypass that dask reshaping pointed out by @rabernat.

If this shows promise, I will try to dig deeper into the xhistogram code.

Do you see any issues with this @jrbourbeau?

@jrbourbeau
Copy link

@jbusecke trying out xarray.map_blocks seems like a good cross-check to do 👍

@gjoseph92 could you look at the section of xhistogram Ryan pointed out to see how we might be able to refactor to use map_blocks instead of reshape.

@jbusecke
Copy link
Collaborator

Much appreciate the help on the xhistogram side @gjoseph92, since I am not very familiar with the internals at all.

@jrbourbeau
Copy link

Neither are we : ) I'm looking forward to digging in on xhistogram

@shanicetbailey
Copy link
Author

So not sure why, but today I've been able to load multiple variables with dask and xhistogram. I didn't do anything different, just ran the same nb that had the issue and I've been able to move forward... Not sure if this means it was a fluke, or if there is still an underlying problem with how xhistogram works with dask... @jbusecke could you weigh in with your own issue you've been having, and have things 'miraculously' starting working again for you?

@jbusecke
Copy link
Collaborator

I ran @stb2145 notebook both on staging and production(with upstream fastjmd95 package) and was able to confirm that these calculation work. My suspicion was that perhaps the dask version was updated on the notebook images, and that somehow solved the issue? Or maybe some other transient problem on the kubernetes workers.

I suggested to leave this issue open while I run my (very similar) computations and can confirm that these are not experiencing any issue.

A performance audit on xhistogram might still be worth it though, just because it is pretty widely used in our group.

@jbusecke
Copy link
Collaborator

I am working on another use case that is blowing up my memory, but I am struggling with getting enough workers online to test it properly (currently crawling along on 1 worker 😭).

@jbusecke
Copy link
Collaborator

Ok I just pushed a sample notebook that emulates my workflow.

This should be a rather straightforward use of xhistogram, and I am using synthetic data (rather small in the context of the CMIP6 data available), yet the computation is basically showing worker comms all over the place, and eventually dies. Note that I have already set the time chunks to the smallest value (1) here, the data usually has time chunks along the size of 3-8.

I also pushed the performance report, but am not sure how to embed it as nicely as @stb2145 did.

I think this computation can be used as a good benchmark for xhistogram.

@jrbourbeau
Copy link

My suspicion was that perhaps the dask version was updated on the notebook images, and that somehow solved the issue?

There was this commit pangeo-data/pangeo-cloud-federation@e0b8afd a few days ago which updated packages on staging, but prod hasn't been updated in a few months 🤷

Ok I just pushed a sample notebook that emulates my workflow

Thanks @jbusecke -- I'll take a look

@shanicetbailey
Copy link
Author

I also pushed the performance report, but am not sure how to embed it as nicely as @stb2145 did.

It's a little extra steps by

  1. saving html file as a gist
  2. then pasting the raw file version GitHub link in raw githack to get a public, shareable link
  3. then all you have to do is hyperlink or paste the "URL in production"

(Like I said, a little extra but convenient for those you are sharing with!)

@gjoseph92
Copy link

First: should we split this into two issues? I'm not sure if @stb2145's issues with multiple dims are the same as @jbusecke's. I've just been working with @jbusecke's notebook assuming it was representative of all the problems, but after re-reading this thread from the top, I'm not sure.

Re the dims issue: (after reading a lot about joint-distribution histograms because I'm not that good at math) I would expect that a histogram along 3 dimensions is ~3x "harder" than a histogram of the flattened array. The output is 3x larger, plus you have to bring all 3 chunks together into one place to compute their conditional probabilities relative to each other—there's no way to parallelize that step. However, the computation should still run fine, even if it takes longer. So if it's consistently failing, something's wrong.

Brief update on our progress so far:

  • I opened Array: non-dask-arrays and broadcasting in ravel_multi_index dask/dask#7594, which will fix Error with multiple dask args xgcm/xhistogram#48 and Compliance with Dask version 2021.03 ? xgcm/xhistogram#27, so xhistogram will work with the next released version of dask
  • Last week, I tried running @jbusecke's sample notebook on https://staging.us-central1-b.gcp.pangeo.io/ with 10 workers. It actually ran pretty well for me.
  • I also tried running the notebook locally on my laptop (8CPU, 32GiB ram). Unsurprisingly, it eventually ran out of memory and ground to a halt. Surprisingly, if I set all the dask memory management and spill-to-disk thresholds to None, the notebook ran no problem. I have a suspicion that dask can be overly conservative about measuring memory use, and starts trying to compensate for low-memory situations when in fact memory is not a problem yet. And once spill-to-disk starts, it's usually game-over for your computation, and may ironically even make memory usage worse: Spill to disk may cause data duplication dask/distributed#3756. I haven't tested whether this helps on a multi-machine cluster yet though.
  • Spent some time understanding xhistogram, and thinking about what could improve. I'm not actually sure there's much need to make major changes (like rewriting lots of stuff to map_blocks or blockwise). I may be wrong here, but I also don't think chunks get combined in 4d array reshape dask/dask#5544 is coming up with the reshape; the graphs don't seem to include unnecessary merging of chunks, but I need to check on this more. From a brief profile, we could speed up compute a bit by replacing digitize with a fastpath for equal-interval bins like NumPy uses (if bins are uniformly spaced, to find the bin that a value belongs to, you don't need to binary-search the bins—you can just scale-and-offset the value to the range of the bins). But I'd like to be sure that compute is actually be bottleneck before doing that.
  • To look at the graph structure, I set nt = 4 (instead of nt = 1980) to reduce the size a lot. The graph structure doesn't look that bad to me; not much spaghetti. There's some communication—which is going to be impossible to avoid in a joint distribution histogram—but the work does seem to separate into mostly-independent "towers"
Graphs
hist.data.visualize()

image

hist.data.visualize(color="order")

image

hist.data.visualize(optimize_graph=True)

image

hist.data.dask.visualize("hlg.png")

image

Next steps:

  • Rerun the sample notebook on pangeo, try to reproduce the failures
  • Run on pangeo with memory management turned off ("distributed.worker.memory.spill": None, etc.)
    • If disabling memory management helps, try running on Coiled (easier to use custom software environments there) with jemalloc, or with malloc_trim as a gc callback, since this might be a manifestation of Memory leak with Numpy arrays and the threaded scheduler dask/dask#3530 (this is a good walkthrough of the issue; it's for ruby, but a similar situation). If all the problems boil down to "workers think they're out of memory and start flailing but really they're not", that's pretty good, since there are a number of hacks available to work around it while we wait for bigger fixes to distributed.
  • Look at the original dims issue; see if we can reproduce that.

@jbusecke
Copy link
Collaborator

Hi @gjoseph92, thanks so much for this.

I just realized that my example was actually flawed because the resulting dataarray is quite large (and as you point out, grinds to a halt due to the .load() command at the end. I am currently experimenting a bit more with it and will open a separate issue as suggested.

For context: My 'real-world' calculation actually succeeds now, but I still thing the performance is far from optimal, so I think using both of these cases as 'benchmarks' for xhistogram is still worth pursuing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants