Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate messages in UCX #3732

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
070a19f
Define `as_cuda_array`
jakirkham Apr 21, 2020
bee6f0b
Send/recv host and device frames in a message each
jakirkham Apr 21, 2020
a9b3161
Compute host and device total frames size
jakirkham Apr 23, 2020
5ed7332
Fast path cases with 0 or 1 frames
jakirkham Apr 23, 2020
0473527
Add concat helper functions
jakirkham Apr 23, 2020
610e864
Add split helper functions
jakirkham Apr 23, 2020
87c85cf
Coerce other types to NumPy/CuPy arrays
jakirkham Apr 28, 2020
0dc0bb0
Only return `DeviceBuffer`s/`memoryview`s
jakirkham Apr 28, 2020
3d325f8
Make sure global split functions are overridden
jakirkham Apr 28, 2020
107a2db
Drop leftover line
jakirkham Apr 28, 2020
dbd57cf
Finish accumulation
jakirkham Apr 28, 2020
6fba794
Fix variable name
jakirkham Apr 28, 2020
c04bb39
Fix other variable names
jakirkham Apr 28, 2020
820fbc4
Ensure `uint8` is used in concat/split
jakirkham Apr 28, 2020
18d4331
Use `nbytes` with buffer objects
jakirkham Apr 28, 2020
19dfbf6
Move sync before send/recv of device buffers
jakirkham Apr 28, 2020
1a4a324
Use RMM allocator with CuPy
jakirkham Apr 28, 2020
5bf32e0
Fix arg to `device_concat` in fallback
jakirkham Apr 28, 2020
fb6ba72
Add `device_split` fallback
jakirkham Apr 28, 2020
0dcbd5c
Assign `cupy.split` before copying each array
jakirkham Apr 28, 2020
5c3ad3a
Skip last size when splitting
jakirkham Apr 28, 2020
5663983
Cast Numba arrays to `uint8`
jakirkham Apr 28, 2020
791fb26
Skip test that segfaults now
jakirkham Apr 28, 2020
6eac4d4
Use `as_cuda_array` to take view
jakirkham Apr 28, 2020
99a73a0
Check `len` to `concat` frames
jakirkham Apr 30, 2020
c4c6801
Allocate frames only when needed
jakirkham Apr 30, 2020
62f7f12
Restore test that was previously segfaulting
jakirkham Apr 30, 2020
877dab4
Run black
jakirkham Apr 30, 2020
81f718b
Compute total frame sizes during allocation
jakirkham Apr 30, 2020
ee528d5
Update comments
jakirkham Apr 30, 2020
af94abb
Collect individual frame metadata then collect it
jakirkham Apr 30, 2020
94aee85
Group `nframes` with other data collection
jakirkham Apr 30, 2020
a9900a0
Move comment closer to `synchronize_stream`
jakirkham Apr 30, 2020
f37951a
Rewrite `device_split` to use `cupy.copyto`
jakirkham Apr 30, 2020
a236401
Drop synchronize call
jakirkham Apr 30, 2020
b26b58d
Allocate `device_array`s directly in `split`
jakirkham Apr 30, 2020
a05293b
Use `device_array` to allocate memory in concat
jakirkham Apr 30, 2020
0d046d7
Drop unneeded slice
jakirkham Apr 30, 2020
e4a6d1e
Run black
jakirkham Apr 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 184 additions & 35 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

.. _UCX: https://github.com/openucx/ucx
"""
import itertools
import logging
import struct
import weakref
Expand Down Expand Up @@ -34,7 +35,11 @@
# variables to be set before being imported.
ucp = None
host_array = None
host_concat = None
host_split = None
device_array = None
device_concat = None
device_split = None


def synchronize_stream(stream=0):
Expand All @@ -47,7 +52,7 @@ def synchronize_stream(stream=0):


def init_once():
global ucp, host_array, device_array
global ucp, host_array, host_concat, host_split, device_array, device_concat, device_split
if ucp is not None:
return

Expand All @@ -61,12 +66,45 @@ def init_once():
ucp.init(options=ucx_config, env_takes_precedence=True)

# Find the function, `host_array()`, to use when allocating new host arrays
# Also find `host_concat()` and `host_split()` to merge/split frames
try:
import numpy

host_array = lambda n: numpy.empty((n,), dtype="u1")
host_array = lambda n: numpy.empty((n,), dtype="u1").data
host_concat = lambda arys: numpy.concatenate(
[numpy.asarray(memoryview(e)).view("u1") for e in arys], axis=None
).data
host_split = lambda ary, indices: [
e.copy().data
for e in numpy.split(numpy.asarray(memoryview(ary)).view("u1"), indices)
]
except ImportError:
host_array = lambda n: bytearray(n)
host_array = lambda n: memoryview(bytearray(n))

def host_concat(arys):
arys = [memoryview(a) for a in arys]
sizes = [nbytes(a) for a in arys]
r = host_array(sum(sizes))
r_view = memoryview(r)
for each_ary, each_size in zip(arys, sizes):
if each_size:
r_view[:each_size] = each_ary[:]
r_view = r_view[each_size:]
return r

def host_split(a, indices):
arys = []
a_view = memoryview(a)
indices = list(indices)
for each_ij in zip([0] + indices, indices + [a.size]):
each_size = each_ij[1] - each_ij[0]
each_slice = slice(*each_ij)
each_ary = host_array(each_size)
if each_size:
each_ary_view = memoryview(each_ary)
each_ary_view[:] = a_view[each_slice]
arys.append(each_ary)
return arys

# Find the function, `cuda_array()`, to use when allocating new CUDA arrays
try:
Expand Down Expand Up @@ -100,6 +138,72 @@ def device_array(n):
"In order to send/recv CUDA arrays, Numba or RMM is required"
)

# Find the functions `device_concat` and `device_split`
try:
import cupy

def device_concat(arys):
with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator):
arys = [cupy.asarray(e).view("u1") for e in arys]
result = cupy.concatenate(arys, axis=None)
result_buffer = result.data.mem._owner
return result_buffer

def device_split(ary, indices):
with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator):
ary = cupy.asarray(ary).view("u1")
ary_split = cupy.split(ary, indices)
results = []
result_buffers = []
for e in ary_split:
e2 = cupy.empty_like(e)
cupy.copyto(e2, e)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally was calling e.copy() however this calls cudaMemcpy, which is synchronous under-the-hood. So have switched to allocating arrays and calling copyto, which uses cudaMemcpyAsync. Should cutdown on the overhead of copying data into smaller buffers during unpacking.

results.append(e2)
result_buffers.append(e2.data.mem._owner)
cupy.cuda.stream.get_current_stream().synchronize()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we synchronize once to ensure all of the copies did complete. We do this to make sure the data is valid before the DeviceBuffer it came from is freed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the DeviceBuffer goes out of scope I believe the actual freeing of memory is enqueued onto the stream, so you should be fine due to stream ordering. @harrism @jrhemstad is that correct from the perspective of RMM?

If this is just a normal cuda memory allocation that will call cudaFree then that also gets enqueued onto the stream so you should be safe in general without this synchronize I believe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I went ahead and dropped it. If we need it back, it is easy to revert.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to make sure that the buffer's memory was never used on a different stream than the one it will be deleted on without synchronizing the streams before deleting.

[Answer to Keith's question: It's not strictly to correct to say the freeing of memory is enqueued onto the stream. Just that an allocation on the same stream will be allowed to use the freed block in stream order (e.g. safely). An allocation on a different stream will only use that block after the streams are synchronized. ]

return result_buffers

except ImportError:
try:
import numba.cuda

def device_concat(arys):
arys = [numba.cuda.as_cuda_array(a).view("u1") for a in arys]
sizes = [nbytes(a) for a in arys]
r = device_array(sum(sizes))
r_view = numba.cuda.as_cuda_array(r)
for each_ary, each_size in zip(arys, sizes):
if each_size:
r_view[:each_size] = each_ary[:]
r_view = r_view[each_size:]
return r

def device_split(a, indices):
arys = []
a_view = numba.cuda.as_cuda_array(a).view("u1")
indices = list(indices)
for each_ij in zip([0] + indices, indices + [a.size]):
each_size = each_ij[1] - each_ij[0]
each_slice = slice(*each_ij)
each_ary = device_array(each_size)
if each_size:
each_ary_view = numba.cuda.as_cuda_array(each_ary)
each_ary_view[:] = a_view[each_slice]
arys.append(each_ary)
return arys

except ImportError:

def device_concat(arys):
raise RuntimeError(
"In order to send/recv CUDA arrays, CuPy or Numba is required"
)

def device_split(a, indices):
raise RuntimeError(
"In order to send/recv CUDA arrays, CuPy or Numba is required"
)

pool_size_str = dask.config.get("rmm.pool-size")
if pool_size_str is not None:
pool_size = parse_bytes(pool_size_str)
Expand Down Expand Up @@ -178,16 +282,22 @@ async def write(
frames = await to_frames(
msg, serializers=serializers, on_error=on_error
)

nframes = len(frames)
cuda_frames = tuple(
hasattr(f, "__cuda_array_interface__") for f in frames
)
sizes = tuple(nbytes(f) for f in frames)
send_frames = [
each_frame
for each_frame, each_size in zip(frames, sizes)
if each_size
]
cuda_frames = []
sizes = []
device_frames = []
host_frames = []
for each_frame in frames:
is_cuda = hasattr(each_frame, "__cuda_array_interface__")
each_size = nbytes(each_frame)
cuda_frames.append(is_cuda)
sizes.append(each_size)
if each_size:
if is_cuda:
device_frames.append(each_frame)
else:
host_frames.append(each_frame)

# Send meta data

Expand All @@ -201,16 +311,24 @@ async def write(

# Send frames

# It is necessary to first synchronize the default stream before start sending
# We synchronize the default stream because UCX is not stream-ordered and
# syncing the default stream will wait for other non-blocking CUDA streams.
# Note this is only sufficient if the memory being sent is not currently in use on
# non-blocking CUDA streams.
if any(cuda_frames):
if host_frames:
if len(host_frames) == 1:
host_frames = host_frames[0]
else:
host_frames = host_concat(host_frames)
await self.ep.send(host_frames)
if device_frames:
if len(device_frames) == 1:
device_frames = device_frames[0]
else:
device_frames = device_concat(device_frames)
# It is necessary to first synchronize the default stream before start sending
# We synchronize the default stream because UCX is not stream-ordered and
# syncing the default stream will wait for other non-blocking CUDA streams.
# Note this is only sufficient if the memory being sent is not currently in use on
# non-blocking CUDA streams.
synchronize_stream(0)

for each_frame in send_frames:
await self.ep.send(each_frame)
await self.ep.send(device_frames)
return sum(sizes)
except (ucp.exceptions.UCXBaseException):
self.abort()
Expand Down Expand Up @@ -245,21 +363,52 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
raise CommClosedError("While reading, the connection was closed")
else:
# Recv frames
frames = [
device_array(each_size) if is_cuda else host_array(each_size)
for is_cuda, each_size in zip(cuda_frames, sizes)
]
recv_frames = [
each_frame for each_frame in frames if len(each_frame) > 0
]

# It is necessary to first populate `frames` with CUDA arrays and synchronize
# the default stream before starting receiving to ensure buffers have been allocated
if any(cuda_frames):
synchronize_stream(0)
host_frame_sizes = []
device_frame_sizes = []
for is_cuda, each_size in zip(cuda_frames, sizes):
if is_cuda:
device_frame_sizes.append(each_size)
else:
host_frame_sizes.append(each_size)

if host_frame_sizes:
host_frames = host_array(sum(host_frame_sizes))
if host_frames.nbytes:
await self.ep.recv(host_frames)
if device_frame_sizes:
device_frames = device_array(sum(device_frame_sizes))
if device_frames.nbytes:
# It is necessary to first populate `frames` with CUDA arrays and synchronize
# the default stream before starting receiving to ensure buffers have been allocated
synchronize_stream(0)
await self.ep.recv(device_frames)

if len(host_frame_sizes) == 0:
host_frames = []
elif len(host_frame_sizes) == 1:
host_frames = [host_frames]
else:
host_frames = host_split(
host_frames, list(itertools.accumulate(host_frame_sizes[:-1]))
)

if len(device_frame_sizes) == 0:
device_frames = []
elif len(device_frame_sizes) == 1:
device_frames = [device_frames]
else:
device_frames = device_split(
device_frames,
list(itertools.accumulate(device_frame_sizes[:-1])),
)

frames = []
for is_cuda in cuda_frames:
if is_cuda:
frames.append(device_frames.pop(0))
else:
frames.append(host_frames.pop(0))

for each_frame in recv_frames:
await self.ep.recv(each_frame)
msg = await from_frames(
frames, deserialize=self.deserialize, deserializers=deserializers
)
Expand Down