From bc8707548844518e50e2ac7510f4e5a540df7b2e Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:22:50 -0700 Subject: [PATCH 01/15] Make `device_array`'s shape a `tuple` While it works to have this be a single `int` (as it will be coerced to a `tuple`), go ahead and make it a `tuple` for clarity and to match more closely to the Numba case. --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 4e6ca8116c..99ca468fce 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -69,7 +69,7 @@ def init_once(): import numba.cuda def rmm_cuda_array(n): - a = rmm.device_array(n, dtype=np.uint8) + a = rmm.device_array((n,), dtype=np.uint8) weakref.finalize(a, numba.cuda.current_context) return a From 8e1e1c07886ea4f29c2aca091eab0fde70be2611 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:22:51 -0700 Subject: [PATCH 02/15] Use `"u1"` to specify `uint8` typed arrays This is equivalent to using NumPy's `uint8`, but has the added benefit of not requiring NumPy be imported to work. --- distributed/comm/ucx.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 99ca468fce..a8e7d81c40 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -69,7 +69,7 @@ def init_once(): import numba.cuda def rmm_cuda_array(n): - a = rmm.device_array((n,), dtype=np.uint8) + a = rmm.device_array(n, dtype="u1") weakref.finalize(a, numba.cuda.current_context) return a @@ -79,7 +79,7 @@ def rmm_cuda_array(n): import numba.cuda def numba_cuda_array(n): - a = numba.cuda.device_array((n,), dtype=np.uint8) + a = numba.cuda.device_array((n,), dtype="u1") weakref.finalize(a, numba.cuda.current_context) return a @@ -225,7 +225,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): frames = [ cuda_array(each_size) if is_cuda - else np.empty(each_size, dtype=np.uint8) + else np.empty(each_size, dtype="u1") for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) ] recv_frames = [ From 4fc853fba1083f2bc1ebfe23796f1d55e5ecb2f9 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:22:52 -0700 Subject: [PATCH 03/15] Rename `is_cudas` to `cuda_frames` Matches the variable name in the `send` case to make things easier to follow. --- distributed/comm/ucx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index a8e7d81c40..a4d82859e7 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -213,8 +213,8 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): # Recv meta data nframes = np.empty(1, dtype=np.uint64) await self.ep.recv(nframes) - is_cudas = np.empty(nframes[0], dtype=np.bool) - await self.ep.recv(is_cudas) + cuda_frames = np.empty(nframes[0], dtype=np.bool) + await self.ep.recv(cuda_frames) sizes = np.empty(nframes[0], dtype=np.uint64) await self.ep.recv(sizes) except (ucp.exceptions.UCXBaseException, CancelledError): @@ -226,7 +226,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): cuda_array(each_size) if is_cuda else np.empty(each_size, dtype="u1") - for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) + for is_cuda, each_size in zip(cuda_frames.tolist(), sizes.tolist()) ] recv_frames = [ each_frame for each_frame in frames if len(each_frame) > 0 @@ -234,7 +234,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated - if is_cudas.any(): + if cuda_frames.any(): synchronize_stream(0) for each_frame in recv_frames: From 12f4d479c4315eb6fd2cf3aaab003837673857fb Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:23:19 -0700 Subject: [PATCH 04/15] Use `pack`/`unpack` for UCX frame metadata As `struct.pack` and `struct.unpack` are able to build `bytes` objects containing the frame metadata needed by UCX easily, just use these functions instead of creating NumPy arrays each time. Helps soften the NumPy requirement a bit. --- distributed/comm/ucx.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index a4d82859e7..9abecfff33 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -6,6 +6,7 @@ .. _UCX: https://github.com/openucx/ucx """ import logging +import struct import weakref import dask @@ -174,14 +175,13 @@ async def write( ] # Send meta data - cuda_frames = np.array( - [hasattr(f, "__cuda_array_interface__") for f in frames], - dtype=np.bool, + cuda_frames = tuple( + hasattr(f, "__cuda_array_interface__") for f in frames ) - await self.ep.send(np.array([len(frames)], dtype=np.uint64)) - await self.ep.send(cuda_frames) + await self.ep.send(struct.pack("Q", len(frames))) + await self.ep.send(struct.pack(len(cuda_frames) * "?", *cuda_frames)) await self.ep.send( - np.array([nbytes(f) for f in frames], dtype=np.uint64) + struct.pack(len(frames) * "Q", *(nbytes(f) for f in frames)) ) # Send frames @@ -191,7 +191,7 @@ async def write( # syncing the default stream will wait for other non-blocking CUDA streams. # Note this is only sufficient if the memory being sent is not currently in use on # non-blocking CUDA streams. - if cuda_frames.any(): + if any(cuda_frames): synchronize_stream(0) for each_frame in send_frames: @@ -211,12 +211,20 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): try: # Recv meta data - nframes = np.empty(1, dtype=np.uint64) + nframes_fmt = "Q" + nframes = bytearray(struct.calcsize(nframes_fmt)) await self.ep.recv(nframes) - cuda_frames = np.empty(nframes[0], dtype=np.bool) + (nframes,) = struct.unpack(nframes_fmt, nframes) + + cuda_frames_fmt = nframes * "?" + cuda_frames = bytearray(struct.calcsize(cuda_frames_fmt)) await self.ep.recv(cuda_frames) - sizes = np.empty(nframes[0], dtype=np.uint64) + cuda_frames = struct.unpack(cuda_frames_fmt, cuda_frames) + + sizes_fmt = nframes * "Q" + sizes = bytearray(struct.calcsize(sizes_fmt)) await self.ep.recv(sizes) + sizes = struct.unpack(sizes_fmt, sizes) except (ucp.exceptions.UCXBaseException, CancelledError): self.abort() raise CommClosedError("While reading, the connection was closed") @@ -226,7 +234,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): cuda_array(each_size) if is_cuda else np.empty(each_size, dtype="u1") - for is_cuda, each_size in zip(cuda_frames.tolist(), sizes.tolist()) + for is_cuda, each_size in zip(cuda_frames, sizes) ] recv_frames = [ each_frame for each_frame in frames if len(each_frame) > 0 @@ -234,7 +242,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): # It is necessary to first populate `frames` with CUDA arrays and synchronize # the default stream before starting receiving to ensure buffers have been allocated - if cuda_frames.any(): + if any(cuda_frames): synchronize_stream(0) for each_frame in recv_frames: From fe7018c3231a60d906e05792e3c6a25d62283be4 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:23:22 -0700 Subject: [PATCH 05/15] Rename `cuda_array` to `device_array` Matches more closely to the name used by RMM and Numba. --- distributed/comm/ucx.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 9abecfff33..45450d0ef4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -34,7 +34,7 @@ # required to ensure Dask configuration gets propagated to UCX, which needs # variables to be set before being imported. ucp = None -cuda_array = None +device_array = None def synchronize_stream(stream=0): @@ -47,7 +47,7 @@ def synchronize_stream(stream=0): def init_once(): - global ucp, cuda_array + global ucp, device_array if ucp is not None: return @@ -60,34 +60,34 @@ def init_once(): ucp.init(options=ucx_config, env_takes_precedence=True) - # Find the function, `cuda_array()`, to use when allocating new CUDA arrays + # Find the function, `device_array()`, to use when allocating new CUDA arrays try: import rmm if hasattr(rmm, "DeviceBuffer"): - cuda_array = lambda n: rmm.DeviceBuffer(size=n) + device_array = lambda n: rmm.DeviceBuffer(size=n) else: # pre-0.11.0 import numba.cuda - def rmm_cuda_array(n): + def rmm_device_array(n): a = rmm.device_array(n, dtype="u1") weakref.finalize(a, numba.cuda.current_context) return a - cuda_array = rmm_cuda_array + device_array = rmm_device_array except ImportError: try: import numba.cuda - def numba_cuda_array(n): + def numba_device_array(n): a = numba.cuda.device_array((n,), dtype="u1") weakref.finalize(a, numba.cuda.current_context) return a - cuda_array = numba_cuda_array + device_array = numba_device_array except ImportError: - def cuda_array(n): + def device_array(n): raise RuntimeError( "In order to send/recv CUDA arrays, Numba or RMM is required" ) @@ -231,7 +231,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else: # Recv frames frames = [ - cuda_array(each_size) + device_array(each_size) if is_cuda else np.empty(each_size, dtype="u1") for is_cuda, each_size in zip(cuda_frames, sizes) From 78ba3856ccd4d376737c12ac4f8baa7cad779036 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:23:24 -0700 Subject: [PATCH 06/15] Create function to allocate arrays on host To relax the NumPy requirement completely, add a function to allocate arrays on host. If NumPy is not present, this falls back to just allocating `bytearray` objects, which work just as well. --- distributed/comm/ucx.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 45450d0ef4..72c3c59a63 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -10,7 +10,6 @@ import weakref import dask -import numpy as np from .addressing import parse_host_port, unparse_host_port from .core import Comm, Connector, Listener, CommClosedError @@ -34,6 +33,7 @@ # required to ensure Dask configuration gets propagated to UCX, which needs # variables to be set before being imported. ucp = None +host_array = None device_array = None @@ -47,7 +47,7 @@ def synchronize_stream(stream=0): def init_once(): - global ucp, device_array + global ucp, host_array, device_array if ucp is not None: return @@ -60,7 +60,15 @@ def init_once(): ucp.init(options=ucx_config, env_takes_precedence=True) - # Find the function, `device_array()`, to use when allocating new CUDA arrays + # Find the function, `host_array()`, to use when allocating new host arrays + try: + import numpy + + host_array = lambda n: numpy.empty((n,), dtype="u1") + except ImportError: + host_array = lambda n: bytearray(n) + + # Find the function, `cuda_array()`, to use when allocating new CUDA arrays try: import rmm @@ -233,7 +241,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): frames = [ device_array(each_size) if is_cuda - else np.empty(each_size, dtype="u1") + else host_array(each_size) for is_cuda, each_size in zip(cuda_frames, sizes) ] recv_frames = [ From 258bdca1c0471d97a2d1491b8f750402ace61c88 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 13:23:25 -0700 Subject: [PATCH 07/15] Fix formatting with black --- distributed/comm/ucx.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 72c3c59a63..386c181017 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -239,9 +239,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else: # Recv frames frames = [ - device_array(each_size) - if is_cuda - else host_array(each_size) + device_array(each_size) if is_cuda else host_array(each_size) for is_cuda, each_size in zip(cuda_frames, sizes) ] recv_frames = [ From 6aad1aefb2ee7e7b3361a32908a5ac364b62367f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 17:01:05 -0700 Subject: [PATCH 08/15] Define `cuda_frames` with other frame definitions --- distributed/comm/ucx.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 386c181017..04f56614f0 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -178,14 +178,14 @@ async def write( frames = await to_frames( msg, serializers=serializers, on_error=on_error ) + cuda_frames = tuple( + hasattr(f, "__cuda_array_interface__") for f in frames + ) send_frames = [ each_frame for each_frame in frames if len(each_frame) > 0 ] # Send meta data - cuda_frames = tuple( - hasattr(f, "__cuda_array_interface__") for f in frames - ) await self.ep.send(struct.pack("Q", len(frames))) await self.ep.send(struct.pack(len(cuda_frames) * "?", *cuda_frames)) await self.ep.send( From 59811004a06113fe4a9874bbebf2b983ea996767 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 17:28:48 -0700 Subject: [PATCH 09/15] Store `nframes` for simplicity Avoids multiple calls to `len(frames)`, is a bit easier to read, and matches the receive code path more closely. --- distributed/comm/ucx.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 04f56614f0..f060385691 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -178,6 +178,7 @@ async def write( frames = await to_frames( msg, serializers=serializers, on_error=on_error ) + nframes = len(frames) cuda_frames = tuple( hasattr(f, "__cuda_array_interface__") for f in frames ) @@ -186,10 +187,10 @@ async def write( ] # Send meta data - await self.ep.send(struct.pack("Q", len(frames))) - await self.ep.send(struct.pack(len(cuda_frames) * "?", *cuda_frames)) + await self.ep.send(struct.pack("Q", nframes)) + await self.ep.send(struct.pack(nframes * "?", *cuda_frames)) await self.ep.send( - struct.pack(len(frames) * "Q", *(nbytes(f) for f in frames)) + struct.pack(nframes * "Q", *(nbytes(f) for f in frames)) ) # Send frames From b950a86dcd70876787514a3a33903a9717602dcd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 17:28:49 -0700 Subject: [PATCH 10/15] Collect sizes along with other frame info --- distributed/comm/ucx.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index f060385691..240749d268 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -182,6 +182,7 @@ async def write( cuda_frames = tuple( hasattr(f, "__cuda_array_interface__") for f in frames ) + sizes = tuple(nbytes(f) for f in frames) send_frames = [ each_frame for each_frame in frames if len(each_frame) > 0 ] @@ -189,9 +190,7 @@ async def write( # Send meta data await self.ep.send(struct.pack("Q", nframes)) await self.ep.send(struct.pack(nframes * "?", *cuda_frames)) - await self.ep.send( - struct.pack(nframes * "Q", *(nbytes(f) for f in frames)) - ) + await self.ep.send(struct.pack(nframes * "Q", *sizes)) # Send frames From 249c84a933e978f4ea06d2467ad5e99d53123058 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 17:33:06 -0700 Subject: [PATCH 11/15] Use `sizes` to pick out non-trivial frames to send --- distributed/comm/ucx.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 240749d268..3386b7324e 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -184,7 +184,9 @@ async def write( ) sizes = tuple(nbytes(f) for f in frames) send_frames = [ - each_frame for each_frame in frames if len(each_frame) > 0 + each_frame + for each_frame, each_size in zip(frames, sizes) + if each_size ] # Send meta data From 4f1f4935ee93e75cb42bfa8865c07e8ecc47d29c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 17:37:32 -0700 Subject: [PATCH 12/15] Simply call `sum` on `sizes` for bytes sent --- distributed/comm/ucx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 3386b7324e..724ee50c98 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -206,7 +206,7 @@ async def write( for each_frame in send_frames: await self.ep.send(each_frame) - return sum(map(nbytes, send_frames)) + return sum(sizes) except (ucp.exceptions.UCXBaseException): self.abort() raise CommClosedError("While writing, the connection was closed") From 7b3cecd8f52b16a959e3098031809514bd75c733 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 17:44:48 -0700 Subject: [PATCH 13/15] Use `host_array` to make buffers to receive into --- distributed/comm/ucx.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 724ee50c98..3e2c868e3d 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -222,17 +222,17 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): try: # Recv meta data nframes_fmt = "Q" - nframes = bytearray(struct.calcsize(nframes_fmt)) + nframes = host_array(struct.calcsize(nframes_fmt)) await self.ep.recv(nframes) (nframes,) = struct.unpack(nframes_fmt, nframes) cuda_frames_fmt = nframes * "?" - cuda_frames = bytearray(struct.calcsize(cuda_frames_fmt)) + cuda_frames = host_array(struct.calcsize(cuda_frames_fmt)) await self.ep.recv(cuda_frames) cuda_frames = struct.unpack(cuda_frames_fmt, cuda_frames) sizes_fmt = nframes * "Q" - sizes = bytearray(struct.calcsize(sizes_fmt)) + sizes = host_array(struct.calcsize(sizes_fmt)) await self.ep.recv(sizes) sizes = struct.unpack(sizes_fmt, sizes) except (ucp.exceptions.UCXBaseException, CancelledError): From 98d82ddb65382d66b6ed7c5218c9189543077e42 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Mon, 20 Apr 2020 18:08:05 -0700 Subject: [PATCH 14/15] Pack per frame metadata into one message To send fewer and larger messages, pack both which frames are on device and how large each frame is into one message. --- distributed/comm/ucx.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 3e2c868e3d..6979cdd934 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -191,8 +191,9 @@ async def write( # Send meta data await self.ep.send(struct.pack("Q", nframes)) - await self.ep.send(struct.pack(nframes * "?", *cuda_frames)) - await self.ep.send(struct.pack(nframes * "Q", *sizes)) + await self.ep.send( + struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes) + ) # Send frames @@ -226,15 +227,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): await self.ep.recv(nframes) (nframes,) = struct.unpack(nframes_fmt, nframes) - cuda_frames_fmt = nframes * "?" - cuda_frames = host_array(struct.calcsize(cuda_frames_fmt)) - await self.ep.recv(cuda_frames) - cuda_frames = struct.unpack(cuda_frames_fmt, cuda_frames) - - sizes_fmt = nframes * "Q" - sizes = host_array(struct.calcsize(sizes_fmt)) - await self.ep.recv(sizes) - sizes = struct.unpack(sizes_fmt, sizes) + header_fmt = nframes * "?" + nframes * "Q" + header = host_array(struct.calcsize(header_fmt)) + await self.ep.recv(header) + header = struct.unpack(header_fmt, header) + cuda_frames, sizes = header[:nframes], header[nframes:] except (ucp.exceptions.UCXBaseException, CancelledError): self.abort() raise CommClosedError("While reading, the connection was closed") From c59f95dffe3b24d219283f0c5e04b7cb04e8ff0c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Tue, 21 Apr 2020 13:24:28 -0700 Subject: [PATCH 15/15] Note what `struct` lines are packing/unpacking --- distributed/comm/ucx.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 6979cdd934..7761afef7a 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -190,7 +190,11 @@ async def write( ] # Send meta data + + # Send # of frames (uint64) await self.ep.send(struct.pack("Q", nframes)) + # Send which frames are CUDA (bool) and + # how large each frame is (uint64) await self.ep.send( struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes) ) @@ -222,11 +226,15 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): try: # Recv meta data + + # Recv # of frames (uint64) nframes_fmt = "Q" nframes = host_array(struct.calcsize(nframes_fmt)) await self.ep.recv(nframes) (nframes,) = struct.unpack(nframes_fmt, nframes) + # Recv which frames are CUDA (bool) and + # how large each frame is (uint64) header_fmt = nframes * "?" + nframes * "Q" header = host_array(struct.calcsize(header_fmt)) await self.ep.recv(header)