From 070a19f61be11c47bc06b9ede2023e0e79120676 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 21 Apr 2020 16:38:32 -0700 Subject: [PATCH 01/39] Define `as_cuda_array` Provides a function to let us coerce our underlying `__cuda_array_interface__` objects into something that behaves more like an array. Prefers CuPy if possible, but will fallback to Numba if its not available. --- distributed/comm/ucx.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 7761afef7a1..57fa8262e38 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -35,6 +35,7 @@ ucp = None host_array = None device_array = None +as_device_array = None def synchronize_stream(stream=0): @@ -47,7 +48,7 @@ def synchronize_stream(stream=0): def init_once(): - global ucp, host_array, device_array + global ucp, host_array, device_array, as_device_array if ucp is not None: return @@ -100,6 +101,23 @@ def device_array(n): "In order to send/recv CUDA arrays, Numba or RMM is required" ) + # Find the function, `as_device_array()` + try: + import cupy + + as_device_array = lambda a: cupy.asarray(a) + except ImportError: + try: + import numba.cuda + + as_device_array = lambda a: numba.cuda.as_cuda_array(a) + except ImportError: + + def as_device_array(n): + raise RuntimeError( + "In order to send/recv CUDA arrays, CuPy or Numba is required" + ) + pool_size_str = dask.config.get("rmm.pool-size") if pool_size_str is not None: pool_size = parse_bytes(pool_size_str) From bee6f0b2bb4fa5811df3a2297bb0582968f1e792 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 21 Apr 2020 16:38:53 -0700 Subject: [PATCH 02/39] Send/recv host and device frames in a message each To cutdown on the number of send/recv operations and also to transmit larger amounts of data at a time, this condenses all frames into a host buffer and a device buffer, which are sent as two separate transmissions. --- distributed/comm/ucx.py | 84 +++++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 16 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 57fa8262e38..eddb6d46557 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -201,11 +201,34 @@ async def write( hasattr(f, "__cuda_array_interface__") for f in frames ) sizes = tuple(nbytes(f) for f in frames) - send_frames = [ - each_frame - for each_frame, each_size in zip(frames, sizes) - if each_size - ] + host_frames = host_array( + sum( + each_size + for is_cuda, each_size in zip(cuda_frames, sizes) + if not is_cuda + ) + ) + device_frames = device_array( + sum( + each_size + for is_cuda, each_size in zip(cuda_frames, sizes) + if is_cuda + ) + ) + + # Pack frames + host_frames_view = memoryview(host_frames) + device_frames_view = as_device_array(device_frames) + for each_frame, is_cuda, each_size in zip(frames, cuda_frames, sizes): + if each_size: + if is_cuda: + each_frame_view = as_device_array(each_frame) + device_frames_view[:each_size] = each_frame_view[:] + device_frames_view = device_frames_view[each_size:] + else: + each_frame_view = memoryview(each_frame).cast("B") + host_frames_view[:each_size] = each_frame_view[:] + host_frames_view = host_frames_view[each_size:] # Send meta data @@ -227,8 +250,10 @@ async def write( if any(cuda_frames): synchronize_stream(0) - for each_frame in send_frames: - await self.ep.send(each_frame) + if nbytes(host_frames): + await self.ep.send(host_frames) + if nbytes(device_frames): + await self.ep.send(device_frames) return sum(sizes) except (ucp.exceptions.UCXBaseException): self.abort() @@ -263,21 +288,48 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError("While reading, the connection was closed") else: # Recv frames - frames = [ - device_array(each_size) if is_cuda else host_array(each_size) - for is_cuda, each_size in zip(cuda_frames, sizes) - ] - recv_frames = [ - each_frame for each_frame in frames if len(each_frame) > 0 - ] + host_frames = host_array( + sum( + each_size + for is_cuda, each_size in zip(cuda_frames, sizes) + if not is_cuda + ) + ) + device_frames = device_array( + sum( + each_size + for is_cuda, each_size in zip(cuda_frames, sizes) + if is_cuda + ) + ) # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated if any(cuda_frames): synchronize_stream(0) - for each_frame in recv_frames: - await self.ep.recv(each_frame) + if nbytes(host_frames): + await self.ep.recv(host_frames) + if nbytes(device_frames): + await self.ep.recv(device_frames) + + frames = [ + device_array(each_size) if is_cuda else host_array(each_size) + for is_cuda, each_size in zip(cuda_frames, sizes) + ] + host_frames_view = memoryview(host_frames) + device_frames_view = as_device_array(device_frames) + for each_frame, is_cuda, each_size in zip(frames, cuda_frames, sizes): + if each_size: + if is_cuda: + each_frame_view = as_device_array(each_frame) + each_frame_view[:] = device_frames_view[:each_size] + device_frames_view = device_frames_view[each_size:] + else: + each_frame_view = memoryview(each_frame) + each_frame_view[:] = host_frames_view[:each_size] + host_frames_view = host_frames_view[each_size:] + msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From a9b3161fdfbd1444ecd97b45603978af8ab95692 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 22 Apr 2020 21:25:55 -0700 Subject: [PATCH 03/39] Compute host and device total frames size --- distributed/comm/ucx.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index eddb6d46557..61c0215367d 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -201,20 +201,17 @@ async def write( hasattr(f, "__cuda_array_interface__") for f in frames ) sizes = tuple(nbytes(f) for f in frames) - host_frames = host_array( - sum( - each_size - for is_cuda, each_size in zip(cuda_frames, sizes) - if not is_cuda - ) - ) - device_frames = device_array( - sum( - each_size - for is_cuda, each_size in zip(cuda_frames, sizes) - if is_cuda - ) - ) + + host_frames_size = 0 + device_frames_size = 0 + for is_cuda, each_size in zip(cuda_frames, sizes): + if is_cuda: + device_frames_size += each_size + else: + host_frames_size += each_size + + host_frames = host_array(host_frames_size) + device_frames = device_array(device_frames_size) # Pack frames host_frames_view = memoryview(host_frames) From 5ed733216e08ef16969237c132d573f031a6c851 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 22 Apr 2020 22:13:02 -0700 Subject: [PATCH 04/39] Fast path cases with 0 or 1 frames No need to concatenate them together in this case. --- distributed/comm/ucx.py | 52 ++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 61c0215367d..530d3a6c41b 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -202,30 +202,40 @@ async def write( ) sizes = tuple(nbytes(f) for f in frames) - host_frames_size = 0 - device_frames_size = 0 - for is_cuda, each_size in zip(cuda_frames, sizes): - if is_cuda: - device_frames_size += each_size + host_frames = host_array(0) + device_frames = device_array(0) + if nframes == 1: + if cuda_frames[0]: + device_frames = frames[0] else: - host_frames_size += each_size - - host_frames = host_array(host_frames_size) - device_frames = device_array(device_frames_size) - - # Pack frames - host_frames_view = memoryview(host_frames) - device_frames_view = as_device_array(device_frames) - for each_frame, is_cuda, each_size in zip(frames, cuda_frames, sizes): - if each_size: + host_frames = frames[0] + elif nframes > 1: + host_frames_size = 0 + device_frames_size = 0 + for is_cuda, each_size in zip(cuda_frames, sizes): if is_cuda: - each_frame_view = as_device_array(each_frame) - device_frames_view[:each_size] = each_frame_view[:] - device_frames_view = device_frames_view[each_size:] + device_frames_size += each_size else: - each_frame_view = memoryview(each_frame).cast("B") - host_frames_view[:each_size] = each_frame_view[:] - host_frames_view = host_frames_view[each_size:] + host_frames_size += each_size + + host_frames = host_array(host_frames_size) + device_frames = device_array(device_frames_size) + + # Pack frames + host_frames_view = memoryview(host_frames) + device_frames_view = as_device_array(device_frames) + for each_frame, is_cuda, each_size in zip( + frames, cuda_frames, sizes + ): + if each_size: + if is_cuda: + each_frame_view = as_device_array(each_frame) + device_frames_view[:each_size] = each_frame_view[:] + device_frames_view = device_frames_view[each_size:] + else: + each_frame_view = memoryview(each_frame).cast("B") + host_frames_view[:each_size] = each_frame_view[:] + host_frames_view = host_frames_view[each_size:] # Send meta data From 047352786945c10b62db74d26a36083c741b68da Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 22 Apr 2020 23:00:08 -0700 Subject: [PATCH 05/39] Add concat helper functions To optimize concatenation in the case where NumPy and CuPy are around, just use their `concatenate` functions. However when they are absent fallback to some hand-rolled concatenate routines. --- distributed/comm/ucx.py | 94 +++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 530d3a6c41b..11a40d76152 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -34,8 +34,9 @@ # variables to be set before being imported. ucp = None host_array = None +host_concat = None device_array = None -as_device_array = None +device_concat = None def synchronize_stream(stream=0): @@ -48,7 +49,7 @@ def synchronize_stream(stream=0): def init_once(): - global ucp, host_array, device_array, as_device_array + global ucp, host_array, host_concat, device_array, device_concat if ucp is not None: return @@ -66,9 +67,21 @@ def init_once(): import numpy host_array = lambda n: numpy.empty((n,), dtype="u1") + host_concat = lambda arys: numpy.concatenate(arys, axis=None) except ImportError: host_array = lambda n: bytearray(n) + def host_concat(arys): + arys = [memoryview(a) for a in arys] + sizes = [nbytes(a) for a in arys] + r = host_array(sum(sizes)) + r_view = memoryview(r) + for each_ary, each_size in zip(arys, sizes): + if each_size: + r_view[:each_size] = each_ary[:] + r_view = r_view[each_size:] + return r + # Find the function, `cuda_array()`, to use when allocating new CUDA arrays try: import rmm @@ -105,15 +118,24 @@ def device_array(n): try: import cupy - as_device_array = lambda a: cupy.asarray(a) + device_concat = lambda arys: cupy.concatenate(arys, axis=None) except ImportError: try: import numba.cuda - as_device_array = lambda a: numba.cuda.as_cuda_array(a) + def device_concat(arys): + arys = [numba.cuda.as_cuda_array(a) for a in arys] + sizes = [nbytes(a) for a in arys] + r = device_array(sum(sizes)) + r_view = r[:] + for each_ary, each_size in zip(arys, sizes): + if each_size: + r_view[:each_size] = each_ary[:] + r_view = r_view[each_size:] + return r except ImportError: - def as_device_array(n): + def device_concat(n): raise RuntimeError( "In order to send/recv CUDA arrays, CuPy or Numba is required" ) @@ -197,45 +219,33 @@ async def write( msg, serializers=serializers, on_error=on_error ) nframes = len(frames) - cuda_frames = tuple( - hasattr(f, "__cuda_array_interface__") for f in frames - ) - sizes = tuple(nbytes(f) for f in frames) - host_frames = host_array(0) - device_frames = device_array(0) - if nframes == 1: - if cuda_frames[0]: - device_frames = frames[0] + cuda_frames = [] + sizes = [] + device_frames = [] + host_frames = [] + for each_frame in frames: + is_cuda = hasattr(each_frame, "__cuda_array_interface__") + cuda_frames.append(is_cuda) + sizes.append(nbytes(each_frame)) + if is_cuda: + device_frames.append(each_frame) else: - host_frames = frames[0] - elif nframes > 1: - host_frames_size = 0 - device_frames_size = 0 - for is_cuda, each_size in zip(cuda_frames, sizes): - if is_cuda: - device_frames_size += each_size - else: - host_frames_size += each_size - - host_frames = host_array(host_frames_size) - device_frames = device_array(device_frames_size) - - # Pack frames - host_frames_view = memoryview(host_frames) - device_frames_view = as_device_array(device_frames) - for each_frame, is_cuda, each_size in zip( - frames, cuda_frames, sizes - ): - if each_size: - if is_cuda: - each_frame_view = as_device_array(each_frame) - device_frames_view[:each_size] = each_frame_view[:] - device_frames_view = device_frames_view[each_size:] - else: - each_frame_view = memoryview(each_frame).cast("B") - host_frames_view[:each_size] = each_frame_view[:] - host_frames_view = host_frames_view[each_size:] + host_frames.append(each_frame) + + if len(device_frames) == 0: + device_frames = device_array(0) + elif len(device_frames) == 1: + device_frames = device_frames[0] + else: + device_frames = device_concat(device_frames) + + if len(host_frames) == 0: + host_frames = host_array(0) + elif len(host_frames) == 1: + host_frames = host_frames[0] + else: + host_frames = host_concat(host_frames) # Send meta data From 610e864c7ab4f2af5b4ee415b78a3b73aac76706 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 22 Apr 2020 23:53:41 -0700 Subject: [PATCH 06/39] Add split helper functions To optimize the case where NumPy and CuPy are around, simply use their `split` function to pull apart large frames into smaller chunks. --- distributed/comm/ucx.py | 102 ++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 30 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 11a40d76152..0a9f51baaa5 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -5,6 +5,7 @@ .. _UCX: https://github.com/openucx/ucx """ +import itertools import logging import struct import weakref @@ -35,8 +36,10 @@ ucp = None host_array = None host_concat = None +host_split = None device_array = None device_concat = None +device_split = None def synchronize_stream(stream=0): @@ -68,6 +71,7 @@ def init_once(): host_array = lambda n: numpy.empty((n,), dtype="u1") host_concat = lambda arys: numpy.concatenate(arys, axis=None) + host_split = lambda ary, indices: [e.copy() for e in numpy.split(ary, indices)] except ImportError: host_array = lambda n: bytearray(n) @@ -82,6 +86,20 @@ def host_concat(arys): r_view = r_view[each_size:] return r + def host_split(a, indices): + arys = [] + a_view = memoryview(a) + indices = list(indices) + for each_ij in zip([0] + indices, indices + [ary.size]): + each_size = each_ij[1] - each_ij[0] + each_slice = slice(*each_ij) + each_ary = host_array(each_size) + if each_size: + each_ary_view = memoryview(each_ary) + each_ary_view[:] = a_view[each_slice] + arys.append(each_ary) + return arys + # Find the function, `cuda_array()`, to use when allocating new CUDA arrays try: import rmm @@ -119,6 +137,7 @@ def device_array(n): import cupy device_concat = lambda arys: cupy.concatenate(arys, axis=None) + device_split = lambda ary, indices: [e.copy() for e in cupy.split(ary, indices)] except ImportError: try: import numba.cuda @@ -133,6 +152,21 @@ def device_concat(arys): r_view[:each_size] = each_ary[:] r_view = r_view[each_size:] return r + + def device_split(a, indices): + arys = [] + a_view = numba.cuda.as_cuda_array(a) + indices = list(indices) + for each_ij in zip([0] + indices, indices + [ary.size]): + each_size = each_ij[1] - each_ij[0] + each_slice = slice(*each_ij) + each_ary = device_array(each_size) + if each_size: + each_ary_view = numba.cuda.as_cuda_array(each_ary) + each_ary_view[:] = a_view[each_slice] + arys.append(each_ary) + return arys + except ImportError: def device_concat(n): @@ -305,20 +339,16 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError("While reading, the connection was closed") else: # Recv frames - host_frames = host_array( - sum( - each_size - for is_cuda, each_size in zip(cuda_frames, sizes) - if not is_cuda - ) - ) - device_frames = device_array( - sum( - each_size - for is_cuda, each_size in zip(cuda_frames, sizes) - if is_cuda - ) - ) + host_frame_sizes = [] + device_frames_sizes = [] + for is_cuda, each_size in zip(cuda_frames, sizes): + if is_cuda: + device_frame_sizes.append(each_size) + else: + host_frame_sizes.append(each_size) + + host_frames = host_array(sum(host_frame_sizes)) + device_frames = device_array(sum(device_frames_sizes)) # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated @@ -330,22 +360,34 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): if nbytes(device_frames): await self.ep.recv(device_frames) - frames = [ - device_array(each_size) if is_cuda else host_array(each_size) - for is_cuda, each_size in zip(cuda_frames, sizes) - ] - host_frames_view = memoryview(host_frames) - device_frames_view = as_device_array(device_frames) - for each_frame, is_cuda, each_size in zip(frames, cuda_frames, sizes): - if each_size: - if is_cuda: - each_frame_view = as_device_array(each_frame) - each_frame_view[:] = device_frames_view[:each_size] - device_frames_view = device_frames_view[each_size:] - else: - each_frame_view = memoryview(each_frame) - each_frame_view[:] = host_frames_view[:each_size] - host_frames_view = host_frames_view[each_size:] + host_frames = host_split( + host_frames, itertools.accumulate(host_frame_sizes) + ) + + if len(host_frame_sizes) == 0: + host_frames = [] + elif len(host_frame_sizes) == 1: + host_frames = [host_frames] + else: + host_frames = host_split( + host_frames, itertools.accumulate(host_frame_sizes) + ) + + if len(device_frame_sizes) == 0: + device_frames = [] + elif len(device_frame_sizes) == 1: + device_frames = [device_frames] + else: + device_frames = device_split( + device_frames, itertools.accumulate(device_frame_sizes) + ) + + frames = [] + for is_cuda in cuda_frames: + if is_cuda: + frames.append(device_frames.pop(0)) + else: + frames.append(host_frames.pop(0)) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers From 87c85cf5942653831c7b0e7ccdac9adb59c904eb Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 19:56:17 -0700 Subject: [PATCH 07/39] Coerce other types to NumPy/CuPy arrays --- distributed/comm/ucx.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 0a9f51baaa5..394c165ec13 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -70,8 +70,12 @@ def init_once(): import numpy host_array = lambda n: numpy.empty((n,), dtype="u1") - host_concat = lambda arys: numpy.concatenate(arys, axis=None) - host_split = lambda ary, indices: [e.copy() for e in numpy.split(ary, indices)] + host_concat = lambda arys: numpy.concatenate( + [numpy.asarray(memoryview(e)) for e in arys], axis=None + ) + host_split = lambda ary, indices: [ + e.copy() for e in numpy.split(numpy.asarray(memoryview(ary)), indices) + ] except ImportError: host_array = lambda n: bytearray(n) @@ -136,8 +140,12 @@ def device_array(n): try: import cupy - device_concat = lambda arys: cupy.concatenate(arys, axis=None) - device_split = lambda ary, indices: [e.copy() for e in cupy.split(ary, indices)] + device_concat = lambda arys: cupy.concatenate( + [cupy.asarray(e) for e in arys], axis=None + ) + device_split = lambda ary, indices: [ + e.copy() for e in cupy.split(cupy.asarray(ary), indices) + ] except ImportError: try: import numba.cuda From 0dc0bb0d50dfb6373c06784db02ba9423fe81867 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 19:59:51 -0700 Subject: [PATCH 08/39] Only return `DeviceBuffer`s/`memoryview`s Make sure that we extract and return the underlying `DeviceBuffer`s/`memoryview`s. --- distributed/comm/ucx.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 394c165ec13..acf5fff7f9d 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -69,15 +69,15 @@ def init_once(): try: import numpy - host_array = lambda n: numpy.empty((n,), dtype="u1") + host_array = lambda n: numpy.empty((n,), dtype="u1").data host_concat = lambda arys: numpy.concatenate( [numpy.asarray(memoryview(e)) for e in arys], axis=None - ) + ).data host_split = lambda ary, indices: [ - e.copy() for e in numpy.split(numpy.asarray(memoryview(ary)), indices) + e.copy().data for e in numpy.split(numpy.asarray(memoryview(ary)), indices) ] except ImportError: - host_array = lambda n: bytearray(n) + host_array = lambda n: memoryview(bytearray(n)) def host_concat(arys): arys = [memoryview(a) for a in arys] @@ -142,9 +142,9 @@ def device_array(n): device_concat = lambda arys: cupy.concatenate( [cupy.asarray(e) for e in arys], axis=None - ) + ).data.mem._owner device_split = lambda ary, indices: [ - e.copy() for e in cupy.split(cupy.asarray(ary), indices) + e.copy().data.mem._owner for e in cupy.split(cupy.asarray(ary), indices) ] except ImportError: try: From 3d325f8cf26e6c6d1039180a40a4cbf1f6d6ae2d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:02:48 -0700 Subject: [PATCH 09/39] Make sure global split functions are overridden --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index acf5fff7f9d..f6c15f88a26 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -52,7 +52,7 @@ def synchronize_stream(stream=0): def init_once(): - global ucp, host_array, host_concat, device_array, device_concat + global ucp, host_array, host_concat, host_split, device_array, device_concat, device_split if ucp is not None: return From 107a2db1f2040f1094342a79fa48cc5598eb52d0 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:07:49 -0700 Subject: [PATCH 10/39] Drop leftover line --- distributed/comm/ucx.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index f6c15f88a26..98b5ebce420 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -368,10 +368,6 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): if nbytes(device_frames): await self.ep.recv(device_frames) - host_frames = host_split( - host_frames, itertools.accumulate(host_frame_sizes) - ) - if len(host_frame_sizes) == 0: host_frames = [] elif len(host_frame_sizes) == 1: From dbd57cf70aaeb6a39775e42b2946804607bfafa4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:08:21 -0700 Subject: [PATCH 11/39] Finish accumulation --- distributed/comm/ucx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 98b5ebce420..9b0d58c2f83 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -374,7 +374,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): host_frames = [host_frames] else: host_frames = host_split( - host_frames, itertools.accumulate(host_frame_sizes) + host_frames, list(itertools.accumulate(host_frame_sizes)) ) if len(device_frame_sizes) == 0: @@ -383,7 +383,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): device_frames = [device_frames] else: device_frames = device_split( - device_frames, itertools.accumulate(device_frame_sizes) + device_frames, list(itertools.accumulate(device_frame_sizes)) ) frames = [] From 6fba7941c2e8a6d21c0c525fc60819639e0964e7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:09:54 -0700 Subject: [PATCH 12/39] Fix variable name --- distributed/comm/ucx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 9b0d58c2f83..1aa36fba7b2 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -348,7 +348,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else: # Recv frames host_frame_sizes = [] - device_frames_sizes = [] + device_frame_sizes = [] for is_cuda, each_size in zip(cuda_frames, sizes): if is_cuda: device_frame_sizes.append(each_size) @@ -356,7 +356,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): host_frame_sizes.append(each_size) host_frames = host_array(sum(host_frame_sizes)) - device_frames = device_array(sum(device_frames_sizes)) + device_frames = device_array(sum(device_frame_sizes)) # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated From c04bb39266fa37801381c1b610f97fd9981b4e91 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:10:52 -0700 Subject: [PATCH 13/39] Fix other variable names --- distributed/comm/ucx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 1aa36fba7b2..4fec7028e01 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -94,7 +94,7 @@ def host_split(a, indices): arys = [] a_view = memoryview(a) indices = list(indices) - for each_ij in zip([0] + indices, indices + [ary.size]): + for each_ij in zip([0] + indices, indices + [a.size]): each_size = each_ij[1] - each_ij[0] each_slice = slice(*each_ij) each_ary = host_array(each_size) @@ -165,7 +165,7 @@ def device_split(a, indices): arys = [] a_view = numba.cuda.as_cuda_array(a) indices = list(indices) - for each_ij in zip([0] + indices, indices + [ary.size]): + for each_ij in zip([0] + indices, indices + [a.size]): each_size = each_ij[1] - each_ij[0] each_slice = slice(*each_ij) each_ary = device_array(each_size) From 820fbc469b26e0ae9af761fbecd85dd8acf77f98 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:19:45 -0700 Subject: [PATCH 14/39] Ensure `uint8` is used in concat/split --- distributed/comm/ucx.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 4fec7028e01..704cd333562 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -71,10 +71,11 @@ def init_once(): host_array = lambda n: numpy.empty((n,), dtype="u1").data host_concat = lambda arys: numpy.concatenate( - [numpy.asarray(memoryview(e)) for e in arys], axis=None + [numpy.asarray(memoryview(e)).view("u1") for e in arys], axis=None ).data host_split = lambda ary, indices: [ - e.copy().data for e in numpy.split(numpy.asarray(memoryview(ary)), indices) + e.copy().data + for e in numpy.split(numpy.asarray(memoryview(ary)).view("u1"), indices) ] except ImportError: host_array = lambda n: memoryview(bytearray(n)) @@ -141,10 +142,11 @@ def device_array(n): import cupy device_concat = lambda arys: cupy.concatenate( - [cupy.asarray(e) for e in arys], axis=None + [cupy.asarray(e).view("u1") for e in arys], axis=None ).data.mem._owner device_split = lambda ary, indices: [ - e.copy().data.mem._owner for e in cupy.split(cupy.asarray(ary), indices) + e.copy().data.mem._owner + for e in cupy.split(cupy.asarray(ary).view("u1"), indices) ] except ImportError: try: From 18d43314eea45c8f7084f48daac3009ae2672076 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:26:43 -0700 Subject: [PATCH 15/39] Use `nbytes` with buffer objects --- distributed/comm/ucx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 704cd333562..540eb00fcb4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -277,16 +277,16 @@ async def write( else: host_frames.append(each_frame) - if len(device_frames) == 0: + if nbytes(device_frames) == 0: device_frames = device_array(0) - elif len(device_frames) == 1: + elif nbytes(device_frames) == 1: device_frames = device_frames[0] else: device_frames = device_concat(device_frames) - if len(host_frames) == 0: + if nbytes(host_frames) == 0: host_frames = host_array(0) - elif len(host_frames) == 1: + elif nbytes(host_frames) == 1: host_frames = host_frames[0] else: host_frames = host_concat(host_frames) From 19dfbf6164028601f8464e7e251c2e7ecd4135c2 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:30:53 -0700 Subject: [PATCH 16/39] Move sync before send/recv of device buffers This limits synchronization to cases where only non-trivial device buffers are being sent. --- distributed/comm/ucx.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 540eb00fcb4..c394055b4ea 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -303,17 +303,15 @@ async def write( # Send frames - # It is necessary to first synchronize the default stream before start sending - # We synchronize the default stream because UCX is not stream-ordered and - # syncing the default stream will wait for other non-blocking CUDA streams. - # Note this is only sufficient if the memory being sent is not currently in use on - # non-blocking CUDA streams. - if any(cuda_frames): - synchronize_stream(0) - if nbytes(host_frames): await self.ep.send(host_frames) if nbytes(device_frames): + # It is necessary to first synchronize the default stream before start sending + # We synchronize the default stream because UCX is not stream-ordered and + # syncing the default stream will wait for other non-blocking CUDA streams. + # Note this is only sufficient if the memory being sent is not currently in use on + # non-blocking CUDA streams. + synchronize_stream(0) await self.ep.send(device_frames) return sum(sizes) except (ucp.exceptions.UCXBaseException): @@ -360,14 +358,12 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): host_frames = host_array(sum(host_frame_sizes)) device_frames = device_array(sum(device_frame_sizes)) - # It is necessary to first populate `frames` with CUDA arrays and synchronize - # the default stream before starting receiving to ensure buffers have been allocated - if any(cuda_frames): - synchronize_stream(0) - if nbytes(host_frames): await self.ep.recv(host_frames) if nbytes(device_frames): + # It is necessary to first populate `frames` with CUDA arrays and synchronize + # the default stream before starting receiving to ensure buffers have been allocated + synchronize_stream(0) await self.ep.recv(device_frames) if len(host_frame_sizes) == 0: From 1a4a324d416fafc00e2a0013fd3138329d6a0b5e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:57:53 -0700 Subject: [PATCH 17/39] Use RMM allocator with CuPy --- distributed/comm/ucx.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index c394055b4ea..92ea16a1454 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -141,13 +141,20 @@ def device_array(n): try: import cupy - device_concat = lambda arys: cupy.concatenate( - [cupy.asarray(e).view("u1") for e in arys], axis=None - ).data.mem._owner - device_split = lambda ary, indices: [ - e.copy().data.mem._owner - for e in cupy.split(cupy.asarray(ary).view("u1"), indices) - ] + def device_concat(arys): + with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator): + arys = [cupy.asarray(e).view("u1") for e in arys] + result = cupy.concatenate(arys, axis=None) + result_buffer = result.data.mem._owner + return result_buffer + + def device_split(ary, indices): + with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator): + ary = cupy.asarray(ary).view("u1") + results = [e.copy() for e in cupy.split(ary, indices)] + result_buffers = [e.data.mem._owner for e in results] + return result_buffers + except ImportError: try: import numba.cuda From 5bf32e0207b505989958844317516a543a67cc8f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:59:32 -0700 Subject: [PATCH 18/39] Fix arg to `device_concat` in fallback --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 92ea16a1454..13507ccaee3 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -186,7 +186,7 @@ def device_split(a, indices): except ImportError: - def device_concat(n): + def device_concat(arys): raise RuntimeError( "In order to send/recv CUDA arrays, CuPy or Numba is required" ) From fb6ba72e67564ba5b71b6ecca128f8317e53375c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 20:59:50 -0700 Subject: [PATCH 19/39] Add `device_split` fallback --- distributed/comm/ucx.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 13507ccaee3..26c18dc3bcc 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -191,6 +191,11 @@ def device_concat(arys): "In order to send/recv CUDA arrays, CuPy or Numba is required" ) + def device_split(a, indices): + raise RuntimeError( + "In order to send/recv CUDA arrays, CuPy or Numba is required" + ) + pool_size_str = dask.config.get("rmm.pool-size") if pool_size_str is not None: pool_size = parse_bytes(pool_size_str) From 0dcbd5cff1a1dcfed55a959ef66fc0b7aa70ead7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 21:13:39 -0700 Subject: [PATCH 20/39] Assign `cupy.split` before copying each array --- distributed/comm/ucx.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 26c18dc3bcc..c14ade7f374 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -151,7 +151,8 @@ def device_concat(arys): def device_split(ary, indices): with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator): ary = cupy.asarray(ary).view("u1") - results = [e.copy() for e in cupy.split(ary, indices)] + results = cupy.split(ary, indices) + results = [e.copy() for e in results] result_buffers = [e.data.mem._owner for e in results] return result_buffers From 5c3ad3a847ff841b872a1fed92f651a828e43410 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 21:32:02 -0700 Subject: [PATCH 21/39] Skip last size when splitting This will result in an extra empty frame getting added to the back of the list of frames, which we don't need. So go ahead and drop the last length as split already will grab until the end. --- distributed/comm/ucx.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index c14ade7f374..ea61c604ea4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -385,7 +385,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): host_frames = [host_frames] else: host_frames = host_split( - host_frames, list(itertools.accumulate(host_frame_sizes)) + host_frames, list(itertools.accumulate(host_frame_sizes[:-1])) ) if len(device_frame_sizes) == 0: @@ -394,7 +394,8 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): device_frames = [device_frames] else: device_frames = device_split( - device_frames, list(itertools.accumulate(device_frame_sizes)) + device_frames, + list(itertools.accumulate(device_frame_sizes[:-1])), ) frames = [] From 5663983df7e13e70caa848861a874c9bbd2f4ad8 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 21:44:02 -0700 Subject: [PATCH 22/39] Cast Numba arrays to `uint8` --- distributed/comm/ucx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index ea61c604ea4..bab71df1970 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -161,7 +161,7 @@ def device_split(ary, indices): import numba.cuda def device_concat(arys): - arys = [numba.cuda.as_cuda_array(a) for a in arys] + arys = [numba.cuda.as_cuda_array(a).view("u1") for a in arys] sizes = [nbytes(a) for a in arys] r = device_array(sum(sizes)) r_view = r[:] @@ -173,7 +173,7 @@ def device_concat(arys): def device_split(a, indices): arys = [] - a_view = numba.cuda.as_cuda_array(a) + a_view = numba.cuda.as_cuda_array(a).view("u1") indices = list(indices) for each_ij in zip([0] + indices, indices + [a.size]): each_size = each_ij[1] - each_ij[0] From 791fb26f6202f8aefc444079c9c7fa9ee8d08140 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 22:01:00 -0700 Subject: [PATCH 23/39] Skip test that segfaults now --- distributed/comm/tests/test_ucx.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index 7e3cb61e375..5a9151f30bf 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -170,7 +170,12 @@ async def test_ucx_deserialize(): lambda cudf: cudf.DataFrame([1]).head(0), lambda cudf: cudf.DataFrame([1.0]).head(0), lambda cudf: cudf.DataFrame({"a": []}), - lambda cudf: cudf.DataFrame({"a": ["a"]}).head(0), + pytest.param( + lambda cudf: cudf.DataFrame({"a": ["a"]}).head(0), + marks=pytest.mark.skip( + reason="This test segfaults for some reason. So skip running it entirely." + ), + ), lambda cudf: cudf.DataFrame({"a": [1.0]}).head(0), lambda cudf: cudf.DataFrame({"a": [1]}).head(0), lambda cudf: cudf.DataFrame({"a": [1, 2, None], "b": [1.0, 2.0, None]}), From 6eac4d431eaa056389f01d777db9e780535e93e4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 27 Apr 2020 22:40:05 -0700 Subject: [PATCH 24/39] Use `as_cuda_array` to take view --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index bab71df1970..f9d7a5e1f5d 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -164,7 +164,7 @@ def device_concat(arys): arys = [numba.cuda.as_cuda_array(a).view("u1") for a in arys] sizes = [nbytes(a) for a in arys] r = device_array(sum(sizes)) - r_view = r[:] + r_view = numba.cuda.as_cuda_array(r) for each_ary, each_size in zip(arys, sizes): if each_size: r_view[:each_size] = each_ary[:] From 99a73a0a00fb73a19c568430caa2edd981b5ff88 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 17:17:39 -0700 Subject: [PATCH 25/39] Check `len` to `concat` frames --- distributed/comm/ucx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index f9d7a5e1f5d..fcc8cfbbc61 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -290,16 +290,16 @@ async def write( else: host_frames.append(each_frame) - if nbytes(device_frames) == 0: + if len(device_frames) == 0: device_frames = device_array(0) - elif nbytes(device_frames) == 1: + elif len(device_frames) == 1: device_frames = device_frames[0] else: device_frames = device_concat(device_frames) - if nbytes(host_frames) == 0: + if len(host_frames) == 0: host_frames = host_array(0) - elif nbytes(host_frames) == 1: + elif len(host_frames) == 1: host_frames = host_frames[0] else: host_frames = host_concat(host_frames) From c4c6801b41b73ddaac60f5d4d583fa6332caab3c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 17:51:03 -0700 Subject: [PATCH 26/39] Allocate frames only when needed --- distributed/comm/ucx.py | 58 +++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index fcc8cfbbc61..ac2badfa90b 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -284,25 +284,13 @@ async def write( for each_frame in frames: is_cuda = hasattr(each_frame, "__cuda_array_interface__") cuda_frames.append(is_cuda) - sizes.append(nbytes(each_frame)) - if is_cuda: - device_frames.append(each_frame) - else: - host_frames.append(each_frame) - - if len(device_frames) == 0: - device_frames = device_array(0) - elif len(device_frames) == 1: - device_frames = device_frames[0] - else: - device_frames = device_concat(device_frames) - - if len(host_frames) == 0: - host_frames = host_array(0) - elif len(host_frames) == 1: - host_frames = host_frames[0] - else: - host_frames = host_concat(host_frames) + each_size = nbytes(each_frame) + sizes.append(each_size) + if each_size: + if is_cuda: + device_frames.append(each_frame) + else: + host_frames.append(each_frame) # Send meta data @@ -316,9 +304,17 @@ async def write( # Send frames - if nbytes(host_frames): + if host_frames: + if len(host_frames) == 1: + host_frames = host_frames[0] + else: + host_frames = host_concat(host_frames) await self.ep.send(host_frames) - if nbytes(device_frames): + if device_frames: + if len(device_frames) == 1: + device_frames = device_frames[0] + else: + device_frames = device_concat(device_frames) # It is necessary to first synchronize the default stream before start sending # We synchronize the default stream because UCX is not stream-ordered and # syncing the default stream will wait for other non-blocking CUDA streams. @@ -360,24 +356,30 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError("While reading, the connection was closed") else: # Recv frames + host_frame_total_size = 0 + device_frame_total_size = 0 host_frame_sizes = [] device_frame_sizes = [] for is_cuda, each_size in zip(cuda_frames, sizes): if is_cuda: + device_frame_total_size += each_size device_frame_sizes.append(each_size) else: + host_frame_total_size += each_size host_frame_sizes.append(each_size) - host_frames = host_array(sum(host_frame_sizes)) - device_frames = device_array(sum(device_frame_sizes)) - if nbytes(host_frames): - await self.ep.recv(host_frames) - if nbytes(device_frames): + if host_frame_sizes: + host_frames = host_array(host_frame_total_size) + if host_frame_total_size: + await self.ep.recv(host_frames) + if device_frame_sizes: + device_frames = device_array(device_frame_total_size) # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated - synchronize_stream(0) - await self.ep.recv(device_frames) + if device_frame_total_size: + synchronize_stream(0) + await self.ep.recv(device_frames) if len(host_frame_sizes) == 0: host_frames = [] From 62f7f1272d497a0243a5e85a855297ae62d930c7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 17:52:06 -0700 Subject: [PATCH 27/39] Restore test that was previously segfaulting --- distributed/comm/tests/test_ucx.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index 5a9151f30bf..7e3cb61e375 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -170,12 +170,7 @@ async def test_ucx_deserialize(): lambda cudf: cudf.DataFrame([1]).head(0), lambda cudf: cudf.DataFrame([1.0]).head(0), lambda cudf: cudf.DataFrame({"a": []}), - pytest.param( - lambda cudf: cudf.DataFrame({"a": ["a"]}).head(0), - marks=pytest.mark.skip( - reason="This test segfaults for some reason. So skip running it entirely." - ), - ), + lambda cudf: cudf.DataFrame({"a": ["a"]}).head(0), lambda cudf: cudf.DataFrame({"a": [1.0]}).head(0), lambda cudf: cudf.DataFrame({"a": [1]}).head(0), lambda cudf: cudf.DataFrame({"a": [1, 2, None], "b": [1.0, 2.0, None]}), From 877dab4d3b65c0563aab5c680949a9c0cce24a14 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 17:52:42 -0700 Subject: [PATCH 28/39] Run black --- distributed/comm/ucx.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index ac2badfa90b..ad5950a0423 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -368,7 +368,6 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): host_frame_total_size += each_size host_frame_sizes.append(each_size) - if host_frame_sizes: host_frames = host_array(host_frame_total_size) if host_frame_total_size: From 81f718b846478471727e4f83fef9c316de46fc78 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 18:03:00 -0700 Subject: [PATCH 29/39] Compute total frame sizes during allocation --- distributed/comm/ucx.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index ad5950a0423..1982691f389 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -356,27 +356,23 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError("While reading, the connection was closed") else: # Recv frames - host_frame_total_size = 0 - device_frame_total_size = 0 host_frame_sizes = [] device_frame_sizes = [] for is_cuda, each_size in zip(cuda_frames, sizes): if is_cuda: - device_frame_total_size += each_size device_frame_sizes.append(each_size) else: - host_frame_total_size += each_size host_frame_sizes.append(each_size) if host_frame_sizes: - host_frames = host_array(host_frame_total_size) - if host_frame_total_size: + host_frames = host_array(sum(host_frame_sizes)) + if host_frames.nbytes: await self.ep.recv(host_frames) if device_frame_sizes: - device_frames = device_array(device_frame_total_size) + device_frames = device_array(sum(device_frame_sizes)) # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated - if device_frame_total_size: + if device_frames.nbytes: synchronize_stream(0) await self.ep.recv(device_frames) From ee528d54bfd3df425aa6cda7b1b4db0f3a26a02f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 18:07:04 -0700 Subject: [PATCH 30/39] Update comments --- distributed/comm/ucx.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 1982691f389..2e88f32d9f9 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -66,6 +66,7 @@ def init_once(): ucp.init(options=ucx_config, env_takes_precedence=True) # Find the function, `host_array()`, to use when allocating new host arrays + # Also find `host_concat()` and `host_split()` to merge/split frames try: import numpy @@ -137,7 +138,7 @@ def device_array(n): "In order to send/recv CUDA arrays, Numba or RMM is required" ) - # Find the function, `as_device_array()` + # Find the functions `device_concat` and `device_split` try: import cupy From af94abb99fdf7ec4237b52d0193822ba9ae64ad9 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 18:38:23 -0700 Subject: [PATCH 31/39] Collect individual frame metadata then collect it --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 2e88f32d9f9..aef93acac18 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -284,8 +284,8 @@ async def write( host_frames = [] for each_frame in frames: is_cuda = hasattr(each_frame, "__cuda_array_interface__") - cuda_frames.append(is_cuda) each_size = nbytes(each_frame) + cuda_frames.append(is_cuda) sizes.append(each_size) if each_size: if is_cuda: From 94aee853d50cf280cb4cb9ff67dfeb448aa7f79d Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 19:24:55 -0700 Subject: [PATCH 32/39] Group `nframes` with other data collection --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index aef93acac18..e6341589425 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -276,8 +276,8 @@ async def write( frames = await to_frames( msg, serializers=serializers, on_error=on_error ) - nframes = len(frames) + nframes = len(frames) cuda_frames = [] sizes = [] device_frames = [] From a9900a0d23438b0e7dcff01fd85c1b21733f665a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 19:34:16 -0700 Subject: [PATCH 33/39] Move comment closer to `synchronize_stream` --- distributed/comm/ucx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index e6341589425..99d5baea4f2 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -371,9 +371,9 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): await self.ep.recv(host_frames) if device_frame_sizes: device_frames = device_array(sum(device_frame_sizes)) - # It is necessary to first populate `frames` with CUDA arrays and synchronize - # the default stream before starting receiving to ensure buffers have been allocated if device_frames.nbytes: + # It is necessary to first populate `frames` with CUDA arrays and synchronize + # the default stream before starting receiving to ensure buffers have been allocated synchronize_stream(0) await self.ep.recv(device_frames) From f37951a63b630f522c962ea052131c8359e919e4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 20:24:48 -0700 Subject: [PATCH 34/39] Rewrite `device_split` to use `cupy.copyto` As `.copy()` calls `memcpy`, which is synchronous, performance is worse as we synchronize after copying each part of the buffer. To fix this, we switch to `cupy.copyto` with calls `memcpyasync`. This lets us avoid having a synchronize after each copy. --- distributed/comm/ucx.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 99d5baea4f2..eb6f2741fc4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -152,9 +152,15 @@ def device_concat(arys): def device_split(ary, indices): with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator): ary = cupy.asarray(ary).view("u1") - results = cupy.split(ary, indices) - results = [e.copy() for e in results] - result_buffers = [e.data.mem._owner for e in results] + ary_split = cupy.split(ary, indices) + results = [] + result_buffers = [] + for e in ary_split: + e2 = cupy.empty_like(e) + cupy.copyto(e2, e) + results.append(e2) + result_buffers.append(e2.data.mem._owner) + cupy.cuda.stream.get_current_stream().synchronize() return result_buffers except ImportError: From a23640122a3c27fb756521ebeec1293a014e9cba Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 21:10:37 -0700 Subject: [PATCH 35/39] Drop synchronize call Shouldn't be needed as copying should occur before deletion of the original buffer as it is stream ordered. --- distributed/comm/ucx.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index eb6f2741fc4..5f20faae456 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -160,7 +160,6 @@ def device_split(ary, indices): cupy.copyto(e2, e) results.append(e2) result_buffers.append(e2.data.mem._owner) - cupy.cuda.stream.get_current_stream().synchronize() return result_buffers except ImportError: From b26b58d8680c590e5fb03992fb9e8e5c538dad67 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 21:25:57 -0700 Subject: [PATCH 36/39] Allocate `device_array`s directly in `split` --- distributed/comm/ucx.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 5f20faae456..683dff623d6 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -150,17 +150,17 @@ def device_concat(arys): return result_buffer def device_split(ary, indices): - with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator): - ary = cupy.asarray(ary).view("u1") - ary_split = cupy.split(ary, indices) - results = [] - result_buffers = [] - for e in ary_split: - e2 = cupy.empty_like(e) - cupy.copyto(e2, e) - results.append(e2) - result_buffers.append(e2.data.mem._owner) - return result_buffers + ary = cupy.asarray(ary).view("u1") + ary_split = cupy.split(ary, indices) + results = [] + result_buffers = [] + for e in ary_split: + b2 = device_array(e.nbytes) + e2 = cupy.asarray(b2) + cupy.copyto(e2, e) + results.append(e2) + result_buffers.append(b2) + return result_buffers except ImportError: try: From a05293b1c85277a2d4a698c544b3d490ffc1d41b Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 21:35:38 -0700 Subject: [PATCH 37/39] Use `device_array` to allocate memory in concat --- distributed/comm/ucx.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 683dff623d6..d1cc5cd8259 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -142,8 +142,15 @@ def device_array(n): try: import cupy + def dask_cupy_allocator(nbytes): + a = device_array(nbytes) + ptr = a.__cuda_array_interface__["data"][0] + dev_id = -1 if ptr else cupy.cuda.device.get_device_id() + mem = cupy.cuda.UnownedMemory(ptr=ptr, size=nbytes, owner=a, device_id=dev_id) + return cupy.cuda.memory.MemoryPointer(mem, 0) + def device_concat(arys): - with cupy.cuda.using_allocator(rmm.rmm_cupy_allocator): + with cupy.cuda.using_allocator(dask_cupy_allocator): arys = [cupy.asarray(e).view("u1") for e in arys] result = cupy.concatenate(arys, axis=None) result_buffer = result.data.mem._owner From 0d046d78526e30cff14a8209509d746d4d2c436c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 22:17:46 -0700 Subject: [PATCH 38/39] Drop unneeded slice --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index d1cc5cd8259..64924796548 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -88,7 +88,7 @@ def host_concat(arys): r_view = memoryview(r) for each_ary, each_size in zip(arys, sizes): if each_size: - r_view[:each_size] = each_ary[:] + r_view[:each_size] = each_ary r_view = r_view[each_size:] return r From e4a6d1e01041b13a54a91782f75677b2371d2c43 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 29 Apr 2020 23:33:50 -0700 Subject: [PATCH 39/39] Run black --- distributed/comm/ucx.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 64924796548..d9c610d7854 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -146,7 +146,9 @@ def dask_cupy_allocator(nbytes): a = device_array(nbytes) ptr = a.__cuda_array_interface__["data"][0] dev_id = -1 if ptr else cupy.cuda.device.get_device_id() - mem = cupy.cuda.UnownedMemory(ptr=ptr, size=nbytes, owner=a, device_id=dev_id) + mem = cupy.cuda.UnownedMemory( + ptr=ptr, size=nbytes, owner=a, device_id=dev_id + ) return cupy.cuda.memory.MemoryPointer(mem, 0) def device_concat(arys):