Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU-friendly loads / merge_frames #2565

Closed
TomAugspurger opened this issue Mar 15, 2019 · 14 comments
Closed

GPU-friendly loads / merge_frames #2565

TomAugspurger opened this issue Mar 15, 2019 · 14 comments

Comments

@TomAugspurger
Copy link
Member

TomAugspurger commented Mar 15, 2019

Just throwing this up here for now, need to investigate more.

I'm working on a distributed cudf join using UCX. Things progress fine until, in the Client process, we attempt to deserialize some data (I think the final result?). We end up calling calling loads with deserialize=True:

def loads(frames, deserialize=True, deserializers=None):

which calls merge_frames:

out.append(b''.join(map(ensure_bytes, L)))

which attempt to convert the data to a byte string.

At this point in the client process, frames is a list of objects representing device memory. If possible (and I think it's possible), I'd like to avoid copying to the host here.

Actually, this may only be possible if the Client happens to have a GPU as well. In this case that's true, but not in general.


TODO:

  1. figure out exactly where the client is calling this
  2. ...
@mrocklin
Copy link
Member

Hrm, that does seem like incorrect behavior in the GPU memory case

It also means that these lines weren't triggered.

if all(len(f) == l for f, l in zip(frames, lengths)):
return frames

This is somewhat surprising, especially given that UCX handles message framing itself. I wonder if this is a case where we should be using nbytes rather than len.

@TomAugspurger
Copy link
Member Author

Ahh... I completely missed those lines. That may actually explain it. I've been hacking up the serialization code to make things work. I likely have a bug in there.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Mar 15, 2019

So, after looking at this further, there may still be some merit to this issue.

I'm sending a cudf DataFrame with three columns and no nulls, so that's a total of 3 numba device arrays (I ignore the index for now).

Somewhere on the send size (frame_split_size perhaps?) the three arrays are split into 9. Which means that the condition for the early return at

if all(len(f) == l for f, l in zip(frames, lengths)):
return frames
is not met.

(Pdb) len(frames)
72
(Pdb) len(lengths)
3
(Pdb) len(gpu_frames)
9
(Pdb) sum(x.nbytes for x in gpu_frames) == sum(lengths)
True

At the cost of another memory allocation (and a numba dependency), we can do something similar to the host-memory case. For each of the 3 "expected" frames, I allocate an empty numba DeviceNDArray with the expected length. I then iterate over the "split" frames, filling in the empty DeviceNDArray. When I do that, returning the 3 new DeviceArrays from merge_frames, the join complete successfully (kinda, the float dtypes are integers. Probably a bug in deserialization).

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Mar 15, 2019

After writing this up, I had the bright idea of confirming that maybe_split_size is the culprit by commenting it out. And indeed that seems to remove the need for my hacky "gpu merge frames."


And, finally getting to the point where we're starting to see some payoff:

# tcp
$ python  bench_cudf_join.py tcp://10.33.225.160:13337
Persist : 2.305909065529704
Schedule: 0.015540305525064468
Compute : 30.916105058044195
Total   : 33.237554429098964
<class 'cudf.dataframe.dataframe.DataFrame'>
                     x    id                    y
0   0.9394083735568608   970   0.9702642813249537
1   0.7923831257671888  1348   0.8483160493672297
2  0.49662136329473205  1680   0.2193417408099766
3   0.6129094562139971   871   0.9695896748672598
4   0.1669684452065503  1879   0.5988319607085707
5  0.46667134167832713  3372   0.8666944707556485
6   0.6380139895729927   808   0.9165479452875207
7    0.687046533493771  2927   0.2885059556684315
8   0.7397596886497526  3041   0.8309951012597302
9   0.3836856679061439  4100  0.24057533536253295
[199996235 more rows]

# ucx
$ python  bench_cudf_join.py ucx://10.33.225.160:13337
left  : <dask_cudf.DataFrame | 1 tasks | 1 npartitions>
right : <dask_cudf.DataFrame | 1 tasks | 1 npartitions>
Persist : 0.8973543355241418
Schedule: 0.022366967983543873
Compute : 16.4527532113716
Total   : 17.372474514879286
<class 'cudf.dataframe.dataframe.DataFrame'>
                      x    id                     y
0    0.8885290155613642   247   0.37342561353977854
1    0.5687751493374146  4297   0.48730493687795373
2    0.5156553384003363   503    0.2275636769697652
3  0.028027519964668013   956     0.683257107064665
4    0.4630480881482437  2911   0.10907716396611178
5   0.42176101990271087  2996    0.9287751488462233
6   0.33437379243815235  2876    0.6852693979572206
7    0.4443354982173291  3807  0.046435227683446434
8    0.6094465927697827   125   0.14380404397157476
9    0.8957993173272103  3258   0.09619118829477646
[200008846 more rows]

I wouldn't put too much stock in those numbers yet, but it's at least encouraging. In particular this is mixing

  1. ucx for comms
  2. custom serializers for cudf to avoid (extra) device <-> host copies: https://github.com/TomAugspurger/distributed/blob/ucx/distributed/protocol/cudf.py

but it's something :)

@mrocklin
Copy link
Member

My guess is that we need a concat function that dispatchs on the type of the frame objects. Dask will include implementations for bytes, bytearray, and memoryview, and maybe we include a lazily registered function that does something like what you describe. Thoughts?

@TomAugspurger
Copy link
Member Author

Yeah, I think that seems to be necessary. We would dispatch upon header['type'], right?

It's a little unfortunate. bytes/memoryview is sufficient for all host-memory things. It'd be nice if there was a device-equivalent, so we would just need to include a kind of 'is_gpu' key in the header and dispatch to host or GPU based on that.

@mrocklin
Copy link
Member

We would dispatch upon header['type'], right

Or, at this point, on the type of the frames (so that this wouldn't be as tied to Dask's protocol).

It'd be nice if there was a device-equivalent

We are probably the most qualified group to start defining what that looks like. I encourage you to consider what requirements would look like.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Mar 15, 2019 via email

@mrocklin
Copy link
Member

This is the first time I've thought "I could see this being a PEP someday".

I suspect that we would start with something that was lighter weight, like a protocol (like __cuda_array_interface__) or a small library that ucx-py and possibly others would depend on.

@jakirkham
Copy link
Member

jakirkham commented Apr 27, 2020

Am curious what other things we wanted to support here @TomAugspurger 🙂 Are there things that we still haven't implemented today?

Edit: It may be that PR ( #3732 ) solves this or at least makes it much easier to solve whatever is left here IIUC what is being asked.

@TomAugspurger
Copy link
Member Author

Does the benchmark in https://github.com/TomAugspurger/dask-perf/blob/master/bench_cudf_join.py run (I don't have easy access to a GPU anymore, otherwise I would check)? If so, I'm guessing things are sufficiently fixed :)

@jakirkham
Copy link
Member

Hmm...I'm not actually sure how one should run that these days. It looks like we need to setup our own scheduler. Also distributed.protocol.cudf no longer exists. At least a naive attempt to get this to run locally doesn't seem to work. In any event, we are able to run similar routines today. So my guess is this can work if the code were updated.

@jakirkham
Copy link
Member

To the spirit of the issue, we do have cuda_dumps and cuda_loads. Thus far merge_frames doesn't behave how we would expect ( #3580 ) so we mostly avoid it. Though PR ( #3732 ) has merge and split frame style functions. So maybe that solves that piece of this issue?

@TomAugspurger
Copy link
Member Author

I think we'll close this and reopen if we come across it with the current implementations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants