Skip to content

Commit

Permalink
Create function to allocate arrays on host
Browse files Browse the repository at this point in the history
To relax the NumPy requirement completely, add a function to allocate
arrays on host. If NumPy is not present, this falls back to just
allocating `bytearray` objects, which work just as well.
  • Loading branch information
jakirkham committed Apr 20, 2020
1 parent fe7018c commit 78ba385
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import weakref

import dask
import numpy as np

from .addressing import parse_host_port, unparse_host_port
from .core import Comm, Connector, Listener, CommClosedError
Expand All @@ -34,6 +33,7 @@
# required to ensure Dask configuration gets propagated to UCX, which needs
# variables to be set before being imported.
ucp = None
host_array = None
device_array = None


Expand All @@ -47,7 +47,7 @@ def synchronize_stream(stream=0):


def init_once():
global ucp, device_array
global ucp, host_array, device_array
if ucp is not None:
return

Expand All @@ -60,7 +60,15 @@ def init_once():

ucp.init(options=ucx_config, env_takes_precedence=True)

# Find the function, `device_array()`, to use when allocating new CUDA arrays
# Find the function, `host_array()`, to use when allocating new host arrays
try:
import numpy

host_array = lambda n: numpy.empty((n,), dtype="u1")
except ImportError:
host_array = lambda n: bytearray(n)

# Find the function, `cuda_array()`, to use when allocating new CUDA arrays
try:
import rmm

Expand Down Expand Up @@ -233,7 +241,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
frames = [
device_array(each_size)
if is_cuda
else np.empty(each_size, dtype="u1")
else host_array(each_size)
for is_cuda, each_size in zip(cuda_frames, sizes)
]
recv_frames = [
Expand Down

0 comments on commit 78ba385

Please sign in to comment.