From a50137f3bc971fce51be83ad3280d529409aab0e Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 8 Dec 2023 12:31:56 +0100 Subject: [PATCH 01/18] Implemented CuPy version of `DistArray` --- mpi4py_fft/distarray.py | 473 +++++++++++++++++++++++++++------------- 1 file changed, 316 insertions(+), 157 deletions(-) diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index 3a69e5c..e2275f1 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -5,106 +5,24 @@ from .pencil import Pencil, Subcomm from .io import HDF5File, NCFile, FileBase -comm = MPI.COMM_WORLD - -class DistArray(np.ndarray): - """Distributed Numpy array - - This Numpy array is part of a larger global array. Information about the - distribution is contained in the attributes. - - Parameters - ---------- - global_shape : sequence of ints - Shape of non-distributed global array - subcomm : None, :class:`.Subcomm` object or sequence of ints, optional - Describes how to distribute the array - val : Number or None, optional - Initialize array with this number if buffer is not given - dtype : np.dtype, optional - Type of array - buffer : Numpy array, optional - Array of correct shape. The buffer owns the memory that is used for - this array. - alignment : None or int, optional - Make sure array is aligned in this direction. Note that alignment does - not take rank into consideration. - rank : int, optional - Rank of tensor (number of free indices, a scalar is zero, vector one, - matrix two) - - - For more information, see `numpy.ndarray `_ - - Note - ---- - Tensors of rank higher than 0 are not distributed in the first ``rank`` - indices. For example, - - >>> from mpi4py_fft import DistArray - >>> a = DistArray((3, 8, 8, 8), rank=1) - >>> print(a.pencil.shape) - (8, 8, 8) +try: + import cupy as cp +except ImportError: + # TODO: move cupy stuff to seperate file as to avoid confusion! + import numpy as cp - The array ``a`` cannot be distributed in the first axis of length 3 since - rank is 1 and this first index represent the vector component. The ``pencil`` - attribute of ``a`` thus only considers the last three axes. +comm = MPI.COMM_WORLD - Also note that the ``alignment`` keyword does not take rank into - consideration. Setting alignment=2 for the array above means that the last - axis will be aligned, also when rank>0. +class DistArrayBase: """ - def __new__(cls, global_shape, subcomm=None, val=None, dtype=float, - buffer=None, strides=None, alignment=None, rank=0): - if len(global_shape[rank:]) < 2: # 1D case - obj = np.ndarray.__new__(cls, global_shape, dtype=dtype, buffer=buffer, strides=strides) - if buffer is None and isinstance(val, Number): - obj.fill(val) - obj._rank = rank - obj._p0 = None - return obj + Abstract class for distributed arrays in numpy or cupy implementation - if isinstance(subcomm, Subcomm): - pass - else: - if isinstance(subcomm, (tuple, list)): - assert len(subcomm) == len(global_shape[rank:]) - # Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm - if not np.all([isinstance(s, MPI.Comm) for s in subcomm]): - subcomm = Subcomm(comm, subcomm) - else: - assert subcomm is None - subcomm = [0] * len(global_shape[rank:]) - if alignment is not None: - subcomm[alignment] = 1 - else: - subcomm[-1] = 1 - alignment = len(subcomm)-1 - subcomm = Subcomm(comm, subcomm) - sizes = [s.Get_size() for s in subcomm] - if alignment is not None: - assert isinstance(alignment, (int, np.integer)) - assert sizes[alignment] == 1 - else: - # Decide that alignment is the last axis with size 1 - alignment = np.flatnonzero(np.array(sizes) == 1)[-1] - p0 = Pencil(subcomm, global_shape[rank:], axis=alignment) - subshape = p0.subshape - if rank > 0: - subshape = global_shape[:rank] + subshape - obj = np.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer) - if buffer is None and isinstance(val, Number): - obj.fill(val) - obj._p0 = p0 - obj._rank = rank - return obj + Attributes: + - xp: Numerical library, a.k.a. numpy or cupy as class attribute + """ - def __array_finalize__(self, obj): - if obj is None: - return - self._p0 = getattr(obj, '_p0', None) - self._rank = getattr(obj, '_rank', None) + xp = None @property def alignment(self): @@ -120,17 +38,17 @@ def alignment(self): @property def global_shape(self): """Return global shape of ``self``""" - return self.shape[:self.rank] + self._p0.shape + return self.shape[: self.rank] + self._p0.shape @property def substart(self): """Return starting indices of local ``self`` array""" - return (0,)*self.rank + self._p0.substart + return (0,) * self.rank + self._p0.substart @property def subcomm(self): """Return tuple of subcommunicators for all axes of ``self``""" - return (MPI.COMM_SELF,)*self.rank + self._p0.subcomm + return (MPI.COMM_SELF,) * self.rank + self._p0.subcomm @property def commsizes(self): @@ -152,32 +70,78 @@ def dimensions(self): """Return dimensions of array not including rank""" return len(self._p0.shape) + @property + def v(self): + """Return local ``self`` array as an ``ndarray`` object""" + return self.__array__() + + @staticmethod + def getSubcomm(subcomm, global_shape, rank, alignment): + if isinstance(subcomm, Subcomm): + pass + else: + if isinstance(subcomm, (tuple, list)): + assert len(subcomm) == len(global_shape[rank:]) + # Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm + if not np.all([isinstance(s, MPI.Comm) for s in subcomm]): + subcomm = Subcomm(comm, subcomm) + else: + assert subcomm is None + subcomm = [0] * len(global_shape[rank:]) + if alignment is not None: + subcomm[alignment] = 1 + else: + subcomm[-1] = 1 + alignment = len(subcomm) - 1 + subcomm = Subcomm(comm, subcomm) + return subcomm + + @classmethod + def getPencil(cls, subcomm, rank, global_shape, alignment): + sizes = [s.Get_size() for s in subcomm] + if alignment is not None: + assert isinstance(alignment, (int, cls.xp.integer)) + assert sizes[alignment] == 1 + else: + # Decide that alignment is the last axis with size 1 + alignment = cls.xp.flatnonzero(cls.xp.array(sizes) == 1)[-1] + + p0 = Pencil(subcomm, global_shape[rank:], axis=alignment) + + if rank > 0: + subshape = global_shape[:rank] + subshape + else: + subshape = p0.subshape + + return p0, subshape + + def __array_finalize__(self, obj): + if obj is None: + return + self._p0 = getattr(obj, "_p0", None) + self._rank = getattr(obj, "_rank", None) + def __getitem__(self, i): # Return DistArray if the result is a component of a tensor # Otherwise return ndarray view if self.ndim == 1: - return np.ndarray.__getitem__(self, i) + return self.xp.ndarray.__getitem__(self, i) if isinstance(i, (Integral, slice)) and self.rank > 0: - v0 = np.ndarray.__getitem__(self, i) + v0 = self.xp.ndarray.__getitem__(self, i) v0._rank = self.rank - (self.ndim - v0.ndim) return v0 if isinstance(i, (Integral, slice)) and self.rank == 0: - return np.ndarray.__getitem__(self.v, i) + return self.xp.ndarray.__getitem__(self.v, i) assert isinstance(i, tuple) if len(i) <= self.rank: - v0 = np.ndarray.__getitem__(self, i) + v0 = self.xp.ndarray.__getitem__(self, i) v0._rank = self.rank - (self.ndim - v0.ndim) return v0 - return np.ndarray.__getitem__(self.v, i) - - @property - def v(self): - """ Return local ``self`` array as an ``ndarray`` object""" - return self.__array__() + return self.xp.ndarray.__getitem__(self.v, i) def get(self, gslice): """Return global slice of ``self`` @@ -215,14 +179,19 @@ def get(self, gslice): # global MPI. We create a global file with MPI, but then open it without # MPI and only on rank 0. import h5py - f = h5py.File('tmp.h5', 'w', driver="mpio", comm=comm) + + f = h5py.File("tmp.h5", "w", driver="mpio", comm=comm) s = self.local_slice() - sp = np.nonzero([isinstance(x, slice) for x in gslice])[0] - sf = tuple(np.take(s, sp)) - f.require_dataset('data', shape=tuple(np.take(self.global_shape, sp)), dtype=self.dtype) + sp = self.xp.nonzero([isinstance(x, slice) for x in gslice])[0] + sf = tuple(self.xp.take(s, sp)) + f.require_dataset( + "data", shape=tuple(self.xp.take(self.global_shape, sp)), dtype=self.dtype + ) gslice = list(gslice) # We are required to check if the indices in si are on this processor - si = np.nonzero([isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)])[0] + si = self.xp.nonzero( + [isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)] + )[0] on_this_proc = True for i in si: if gslice[i] >= s[i].start and gslice[i] < s[i].stop: @@ -234,10 +203,10 @@ def get(self, gslice): f.close() c = None if comm.Get_rank() == 0: - h = h5py.File('tmp.h5', 'r') - c = h['data'].__array__() + h = h5py.File("tmp.h5", "r") + c = h["data"].__array__() h.close() - os.remove('tmp.h5') + os.remove("tmp.h5") return c def local_slice(self): @@ -273,27 +242,11 @@ def local_slice(self): (slice(0, 16, None), slice(7, 14, None), slice(0, 6, None)) (slice(0, 16, None), slice(7, 14, None), slice(6, 12, None)) """ - v = [slice(start, start+shape) for start, shape in zip(self._p0.substart, - self._p0.subshape)] - return tuple([slice(0, s) for s in self.shape[:self.rank]] + v) - - def get_pencil_and_transfer(self, axis): - """Return pencil and transfer objects for alignment along ``axis`` - - Parameters - ---------- - axis : int - The new axis to align data with - - Returns - ------- - 2-tuple - 2-tuple where first item is a :class:`.Pencil` aligned in ``axis``. - Second item is a :class:`.Transfer` object for executing the - redistribution of data - """ - p1 = self._p0.pencil(axis) - return p1, self._p0.transfer(p1, self.dtype) + v = [ + slice(start, start + shape) + for start, shape in zip(self._p0.substart, self._p0.subshape) + ] + return tuple([slice(0, s) for s in self.shape[: self.rank]] + v) def redistribute(self, axis=None, out=None): """Global redistribution of local ``self`` array @@ -322,7 +275,7 @@ def redistribute(self, axis=None, out=None): # Check if self is already aligned along axis. In that case just switch # axis of pencil (both axes are undivided) and return if axis is not None: - if self.commsizes[self.rank+axis] == 1: + if self.commsizes[self.rank + axis] == 1: self.pencil.axis = axis return self @@ -343,11 +296,13 @@ def redistribute(self, axis=None, out=None): p1, transfer = self.get_pencil_and_transfer(axis) if out is None: - out = DistArray(self.global_shape, - subcomm=p1.subcomm, - dtype=self.dtype, - alignment=axis, - rank=self.rank) + out = type(self)( + self.global_shape, + subcomm=p1.subcomm, + dtype=self.dtype, + alignment=axis, + rank=self.rank, + ) if self.rank == 0: transfer.forward(self, out) @@ -362,8 +317,33 @@ def redistribute(self, axis=None, out=None): transfer.destroy() return out - def write(self, filename, name='darray', step=0, global_slice=None, - domain=None, as_scalar=False): + def get_pencil_and_transfer(self, axis): + """Return pencil and transfer objects for alignment along ``axis`` + + Parameters + ---------- + axis : int + The new axis to align data with + + Returns + ------- + 2-tuple + 2-tuple where first item is a :class:`.Pencil` aligned in ``axis``. + Second item is a :class:`.Transfer` object for executing the + redistribution of data + """ + p1 = self._p0.pencil(axis) + return p1, self._p0.transfer(p1, self.dtype) + + def write( + self, + filename, + name="darray", + step=0, + global_slice=None, + domain=None, + as_scalar=False, + ): """Write snapshot ``step`` of ``self`` to file ``filename`` Parameters @@ -396,14 +376,14 @@ def write(self, filename, name='darray', step=0, global_slice=None, >>> u.write('h5file.h5', 'u', (slice(None), 4)) """ if isinstance(filename, str): - writer = HDF5File if filename.endswith('.h5') else NCFile - f = writer(filename, domain=domain, mode='a') + writer = HDF5File if filename.endswith(".h5") else NCFile + f = writer(filename, domain=domain, mode="a") elif isinstance(filename, FileBase): f = filename field = [self] if global_slice is None else [(self, global_slice)] f.write(step, {name: field}, as_scalar=as_scalar) - def read(self, filename, name='darray', step=0): + def read(self, filename, name="darray", step=0): """Read data ``name`` at index ``step``from file ``filename`` into ``self`` @@ -432,14 +412,181 @@ def read(self, filename, name='darray', step=0): """ if isinstance(filename, str): - writer = HDF5File if filename.endswith('.h5') else NCFile - f = writer(filename, mode='r') + writer = HDF5File if filename.endswith(".h5") else NCFile + f = writer(filename, mode="r") elif isinstance(filename, FileBase): f = filename f.read(self, name, step=step) -def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): +class DistArray(DistArrayBase, np.ndarray): + """Distributed Numpy array + + This Numpy array is part of a larger global array. Information about the + distribution is contained in the attributes. + + Parameters + ---------- + global_shape : sequence of ints + Shape of non-distributed global array + subcomm : None, :class:`.Subcomm` object or sequence of ints, optional + Describes how to distribute the array + val : Number or None, optional + Initialize array with this number if buffer is not given + dtype : np.dtype, optional + Type of array + buffer : Numpy array, optional + Array of correct shape. The buffer owns the memory that is used for + this array. + alignment : None or int, optional + Make sure array is aligned in this direction. Note that alignment does + not take rank into consideration. + rank : int, optional + Rank of tensor (number of free indices, a scalar is zero, vector one, + matrix two) + + + For more information, see `numpy.ndarray `_ + + Note + ---- + Tensors of rank higher than 0 are not distributed in the first ``rank`` + indices. For example, + + >>> from mpi4py_fft import DistArray + >>> a = DistArray((3, 8, 8, 8), rank=1) + >>> print(a.pencil.shape) + (8, 8, 8) + + The array ``a`` cannot be distributed in the first axis of length 3 since + rank is 1 and this first index represent the vector component. The ``pencil`` + attribute of ``a`` thus only considers the last three axes. + + Also note that the ``alignment`` keyword does not take rank into + consideration. Setting alignment=2 for the array above means that the last + axis will be aligned, also when rank>0. + + """ + + xp = np + + def __new__( + cls, + global_shape, + subcomm=None, + val=None, + dtype=float, + buffer=None, + strides=None, + alignment=None, + rank=0, + ): + if len(global_shape[rank:]) < 2: # 1D case + obj = cls.xp.ndarray.__new__( + cls, global_shape, dtype=dtype, buffer=buffer, strides=strides + ) + if buffer is None and isinstance(val, Number): + obj.fill(val) + obj._rank = rank + obj._p0 = None + return obj + + subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + + obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer) + if buffer is None and isinstance(val, Number): + obj.fill(val) + obj._p0 = p0 + obj._rank = rank + return obj + + +class DistArrayCuPy(DistArrayBase, cp.ndarray): + """Distributed CuPy array + + This Numpy array is part of a larger global array. Information about the + distribution is contained in the attributes. + + Parameters + ---------- + global_shape : sequence of ints + Shape of non-distributed global array + subcomm : None, :class:`.Subcomm` object or sequence of ints, optional + Describes how to distribute the array + val : Number or None, optional + Initialize array with this number if buffer is not given + dtype : np.dtype, optional + Type of array + buffer : Numpy array, optional + Array of correct shape. The buffer owns the memory that is used for + this array. + alignment : None or int, optional + Make sure array is aligned in this direction. Note that alignment does + not take rank into consideration. + rank : int, optional + Rank of tensor (number of free indices, a scalar is zero, vector one, + matrix two) + + + For more information, see `numpy.ndarray `_ + + Note + ---- + Tensors of rank higher than 0 are not distributed in the first ``rank`` + indices. For example, + + >>> from mpi4py_fft import DistArray + >>> a = DistArray((3, 8, 8, 8), rank=1) + >>> print(a.pencil.shape) + (8, 8, 8) + + The array ``a`` cannot be distributed in the first axis of length 3 since + rank is 1 and this first index represent the vector component. The ``pencil`` + attribute of ``a`` thus only considers the last three axes. + + Also note that the ``alignment`` keyword does not take rank into + consideration. Setting alignment=2 for the array above means that the last + axis will be aligned, also when rank>0. + + """ + + # TODO: docs + xp = cp + + def __new__( + cls, + global_shape, + subcomm=None, + val=None, + dtype=float, + memptr=None, + strides=None, + alignment=None, + rank=0, + ): + if len(global_shape[rank:]) < 2: # 1D case + obj = cls.xp.ndarray.__new__( + cls, global_shape, dtype=dtype, memptr=memptr, strides=strides + ) + if memptr is None and isinstance(val, Number): + obj.fill(val) + obj._rank = rank + obj._p0 = None + return obj + + subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + + obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr) + if memptr is None and isinstance(val, Number): + obj.fill(val) + obj._p0 = p0 + obj._rank = rank + return obj + + +def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=False): """Return a new :class:`.DistArray` object for provided :class:`.PFFT` object Parameters @@ -457,6 +604,8 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): If True return view of the underlying Numpy array, i.e., return cls.view(np.ndarray). Note that the DistArray still will be accessible through the base attribute of the view. + useCuPy : bool, optional + If True, returns DistArrayCuPy instead of DistArray Returns ------- @@ -479,15 +628,25 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): dtype = pfft.forward.output_array.dtype else: dtype = pfft.forward.input_array.dtype - global_shape = (len(global_shape),)*rank + global_shape - z = DistArray(global_shape, subcomm=p0.subcomm, val=val, dtype=dtype, - alignment=p0.axis, rank=rank) + global_shape = (len(global_shape),) * rank + global_shape + + darraycls = DistArrayCuPy if useCuPy else DistArray + z = darraycls( + global_shape, + subcomm=p0.subcomm, + val=val, + dtype=dtype, + alignment=p0.axis, + rank=rank, + ) return z.v if view else z -def Function(*args, **kwargs): #pragma: no cover + +def Function(*args, **kwargs): # pragma: no cover import warnings + warnings.warn("Function() is deprecated; use newDistArray().", FutureWarning) - if 'tensor' in kwargs: - kwargs['rank'] = 1 - del kwargs['tensor'] + if "tensor" in kwargs: + kwargs["rank"] = 1 + del kwargs["tensor"] return newDistArray(*args, **kwargs) From e770b5680e19e7544050b6b410d8f027cec8e02a Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 8 Dec 2023 16:46:50 +0100 Subject: [PATCH 02/18] Cleanup and tests --- mpi4py_fft/__init__.py | 1 + mpi4py_fft/distarray.py | 161 +++++++++--------------------------- mpi4py_fft/distarrayCuPy.py | 87 +++++++++++++++++++ tests/test_darrayCuPy.py | 156 ++++++++++++++++++++++++++++++++++ 4 files changed, 284 insertions(+), 121 deletions(-) create mode 100644 mpi4py_fft/distarrayCuPy.py create mode 100644 tests/test_darrayCuPy.py diff --git a/mpi4py_fft/__init__.py b/mpi4py_fft/__init__.py index 7450fae..e2ee0b4 100644 --- a/mpi4py_fft/__init__.py +++ b/mpi4py_fft/__init__.py @@ -20,6 +20,7 @@ __author__ = 'Lisandro Dalcin and Mikael Mortensen' from .distarray import DistArray, newDistArray, Function +from .distarrayCuPy import DistArrayCuPy from .mpifft import PFFT from . import fftw from .fftw import fftlib diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index e2275f1..c20e5a6 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -5,12 +5,6 @@ from .pencil import Pencil, Subcomm from .io import HDF5File, NCFile, FileBase -try: - import cupy as cp -except ImportError: - # TODO: move cupy stuff to seperate file as to avoid confusion! - import numpy as cp - comm = MPI.COMM_WORLD @@ -20,6 +14,24 @@ class DistArrayBase: Attributes: - xp: Numerical library, a.k.a. numpy or cupy as class attribute + + Note + ---- + Tensors of rank higher than 0 are not distributed in the first ``rank`` + indices. For example, + + >>> from mpi4py_fft import DistArray + >>> a = DistArray((3, 8, 8, 8), rank=1) + >>> print(a.pencil.shape) + (8, 8, 8) + + The array ``a`` cannot be distributed in the first axis of length 3 since + rank is 1 and this first index represent the vector component. The ``pencil`` + attribute of ``a`` thus only considers the last three axes. + + Also note that the ``alignment`` keyword does not take rank into + consideration. Setting alignment=2 for the array above means that the last + axis will be aligned, also when rank>0. """ xp = None @@ -70,11 +82,6 @@ def dimensions(self): """Return dimensions of array not including rank""" return len(self._p0.shape) - @property - def v(self): - """Return local ``self`` array as an ``ndarray`` object""" - return self.__array__() - @staticmethod def getSubcomm(subcomm, global_shape, rank, alignment): if isinstance(subcomm, Subcomm): @@ -104,14 +111,13 @@ def getPencil(cls, subcomm, rank, global_shape, alignment): assert sizes[alignment] == 1 else: # Decide that alignment is the last axis with size 1 - alignment = cls.xp.flatnonzero(cls.xp.array(sizes) == 1)[-1] + alignment = np.flatnonzero(np.array(sizes) == 1)[-1] p0 = Pencil(subcomm, global_shape[rank:], axis=alignment) + subshape = p0.subshape if rank > 0: subshape = global_shape[:rank] + subshape - else: - subshape = p0.subshape return p0, subshape @@ -180,16 +186,17 @@ def get(self, gslice): # MPI and only on rank 0. import h5py + # TODO: can we use h5py to communicate the data without copying to cpu first when using cupy? f = h5py.File("tmp.h5", "w", driver="mpio", comm=comm) s = self.local_slice() - sp = self.xp.nonzero([isinstance(x, slice) for x in gslice])[0] - sf = tuple(self.xp.take(s, sp)) + sp = np.nonzero([isinstance(x, slice) for x in gslice])[0] + sf = tuple(np.take(s, sp)) f.require_dataset( - "data", shape=tuple(self.xp.take(self.global_shape, sp)), dtype=self.dtype + "data", shape=tuple(np.take(self.global_shape, sp)), dtype=self.dtype ) gslice = list(gslice) # We are required to check if the indices in si are on this processor - si = self.xp.nonzero( + si = np.nonzero( [isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)] )[0] on_this_proc = True @@ -199,7 +206,8 @@ def get(self, gslice): else: on_this_proc = False if on_this_proc: - f["data"][sf] = self[tuple(gslice)] + data = self.asnumpy + f["data"][sf] = data[tuple(gslice)] f.close() c = None if comm.Get_rank() == 0: @@ -280,7 +288,7 @@ def redistribute(self, axis=None, out=None): return self if out is not None: - assert isinstance(out, DistArray) + assert issubclass(type(out), DistArrayBase) assert self.global_shape == out.global_shape axis = out.alignment if self.commsizes == out.commsizes: @@ -448,24 +456,6 @@ class DistArray(DistArrayBase, np.ndarray): For more information, see `numpy.ndarray `_ - Note - ---- - Tensors of rank higher than 0 are not distributed in the first ``rank`` - indices. For example, - - >>> from mpi4py_fft import DistArray - >>> a = DistArray((3, 8, 8, 8), rank=1) - >>> print(a.pencil.shape) - (8, 8, 8) - - The array ``a`` cannot be distributed in the first axis of length 3 since - rank is 1 and this first index represent the vector component. The ``pencil`` - attribute of ``a`` thus only considers the last three axes. - - Also note that the ``alignment`` keyword does not take rank into - consideration. Setting alignment=2 for the array above means that the last - axis will be aligned, also when rank>0. - """ xp = np @@ -501,89 +491,14 @@ def __new__( obj._rank = rank return obj + @property + def v(self): + """Return local ``self`` array as an ``ndarray`` object""" + return self.__array__() -class DistArrayCuPy(DistArrayBase, cp.ndarray): - """Distributed CuPy array - - This Numpy array is part of a larger global array. Information about the - distribution is contained in the attributes. - - Parameters - ---------- - global_shape : sequence of ints - Shape of non-distributed global array - subcomm : None, :class:`.Subcomm` object or sequence of ints, optional - Describes how to distribute the array - val : Number or None, optional - Initialize array with this number if buffer is not given - dtype : np.dtype, optional - Type of array - buffer : Numpy array, optional - Array of correct shape. The buffer owns the memory that is used for - this array. - alignment : None or int, optional - Make sure array is aligned in this direction. Note that alignment does - not take rank into consideration. - rank : int, optional - Rank of tensor (number of free indices, a scalar is zero, vector one, - matrix two) - - - For more information, see `numpy.ndarray `_ - - Note - ---- - Tensors of rank higher than 0 are not distributed in the first ``rank`` - indices. For example, - - >>> from mpi4py_fft import DistArray - >>> a = DistArray((3, 8, 8, 8), rank=1) - >>> print(a.pencil.shape) - (8, 8, 8) - - The array ``a`` cannot be distributed in the first axis of length 3 since - rank is 1 and this first index represent the vector component. The ``pencil`` - attribute of ``a`` thus only considers the last three axes. - - Also note that the ``alignment`` keyword does not take rank into - consideration. Setting alignment=2 for the array above means that the last - axis will be aligned, also when rank>0. - - """ - - # TODO: docs - xp = cp - - def __new__( - cls, - global_shape, - subcomm=None, - val=None, - dtype=float, - memptr=None, - strides=None, - alignment=None, - rank=0, - ): - if len(global_shape[rank:]) < 2: # 1D case - obj = cls.xp.ndarray.__new__( - cls, global_shape, dtype=dtype, memptr=memptr, strides=strides - ) - if memptr is None and isinstance(val, Number): - obj.fill(val) - obj._rank = rank - obj._p0 = None - return obj - - subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) - p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) - - obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr) - if memptr is None and isinstance(val, Number): - obj.fill(val) - obj._p0 = p0 - obj._rank = rank - return obj + @property + def asnumpy(self): + return self def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=False): @@ -630,7 +545,11 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=F dtype = pfft.forward.input_array.dtype global_shape = (len(global_shape),) * rank + global_shape - darraycls = DistArrayCuPy if useCuPy else DistArray + if useCuPy: + from mpi4py_fft.distarrayCuPy import DistArrayCuPy as darraycls + else: + darraycls = DistArray + z = darraycls( global_shape, subcomm=p0.subcomm, diff --git a/mpi4py_fft/distarrayCuPy.py b/mpi4py_fft/distarrayCuPy.py new file mode 100644 index 0000000..d3ead89 --- /dev/null +++ b/mpi4py_fft/distarrayCuPy.py @@ -0,0 +1,87 @@ +from mpi4py_fft.distarray import DistArrayBase +import cupy as cp +from mpi4py import MPI +from numbers import Number + +comm = MPI.COMM_WORLD + + +class DistArrayCuPy(DistArrayBase, cp.ndarray): + """Distributed CuPy array + + This CuPy array is part of a larger global array. Information about the + distribution is contained in the attributes. + + Parameters + ---------- + global_shape : sequence of ints + Shape of non-distributed global array + subcomm : None, :class:`.Subcomm` object or sequence of ints, optional + Describes how to distribute the array + val : Number or None, optional + Initialize array with this number if buffer is not given + dtype : cp.dtype, optional + Type of array + memptr : cupy.cuda.MemoryPointer, optional + Pointer to the array content head. + alignment : None or int, optional + Make sure array is aligned in this direction. Note that alignment does + not take rank into consideration. + rank : int, optional + Rank of tensor (number of free indices, a scalar is zero, vector one, + matrix two) + + + For more information, see `cupy.ndarray `_ + + """ + + xp = cp + + def __new__( + cls, + global_shape, + subcomm=None, + val=None, + dtype=float, + memptr=None, + strides=None, + alignment=None, + rank=0, + ): + if len(global_shape[rank:]) < 2: # 1D case + obj = cls.xp.ndarray.__new__( + cls, global_shape, dtype=dtype, memptr=memptr, strides=strides + ) + if memptr is None and isinstance(val, Number): + obj.fill(val) + obj._rank = rank + obj._p0 = None + return obj + + subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + + obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr) + if memptr is None and isinstance(val, Number): + obj.fill(val) + obj._p0 = p0 + obj._rank = rank + return obj + + def get(self, *args, **kwargs): + """Untangle inheritance conflicts""" + if any(isinstance(me, tuple) for me in args): + return DistArrayBase.get(self, *args, **kwargs) + else: + return cp.ndarray.get(self, *args, **kwargs) + + @property + def asnumpy(self): + """Copy the array to CPU""" + return self.get() + + @property + def v(self): + """Return local ``self`` array as an ``ndarray`` object""" + return cp.ndarray.__getitem__(self, slice(0, -1, 1)) diff --git a/tests/test_darrayCuPy.py b/tests/test_darrayCuPy.py new file mode 100644 index 0000000..0fd8d08 --- /dev/null +++ b/tests/test_darrayCuPy.py @@ -0,0 +1,156 @@ +import cupy as cp +from mpi4py import MPI +from mpi4py_fft import DistArrayCuPy, newDistArray, PFFT +from mpi4py_fft.pencil import Subcomm + +comm = MPI.COMM_WORLD + + +def test_1Darray(): + N = (8,) + z = DistArrayCuPy(N, val=2) + assert z[0] == 2 + assert z.shape == N + + +def test_2Darray(): + N = (8, 8) + for subcomm in ((0, 1), (1, 0), None, Subcomm(comm, (0, 1))): + for rank in (0, 1, 2): + M = (2,) * rank + N + alignment = None + if subcomm is None and rank == 1: + alignment = 1 + a = DistArrayCuPy(M, subcomm=subcomm, val=1, rank=rank, alignment=alignment) + assert a.rank == rank + assert a.global_shape == M + _ = a.substart + c = a.subcomm + z = a.commsizes + _ = a.pencil + assert cp.prod(cp.array(z)) == comm.Get_size() + if rank > 0: + a0 = a[0] + assert isinstance(a0, DistArrayCuPy) + assert a0.rank == rank - 1 + aa = a.v + assert isinstance(aa, cp.ndarray) + try: + k = a.get((0,) * rank + (0, slice(None))) + if comm.Get_rank() == 0: + assert len(k) == N[1] + assert cp.sum(k) == N[1] + k = a.get((0,) * rank + (slice(None), 0)) + if comm.Get_rank() == 0: + assert len(k) == N[0] + assert cp.sum(k) == N[0] + except ModuleNotFoundError: + pass + _ = a.local_slice() + newaxis = (a.alignment + 1) % 2 + _ = a.get_pencil_and_transfer(newaxis) + a[:] = MPI.COMM_WORLD.Get_rank() + b = a.redistribute(newaxis) + a = b.redistribute(out=a) + a = b.redistribute(a.alignment, out=a) + s0 = MPI.COMM_WORLD.reduce(cp.linalg.norm(a) ** 2) + s1 = MPI.COMM_WORLD.reduce(cp.linalg.norm(b) ** 2) + if MPI.COMM_WORLD.Get_rank() == 0: + assert abs(s0 - s1) < 1e-1 + c = a.redistribute(a.alignment) + assert c is a + + +def test_3Darray(): + N = (8, 8, 8) + for subcomm in ( + (0, 0, 1), + (0, 1, 0), + (1, 0, 0), + (0, 1, 1), + (1, 0, 1), + (1, 1, 0), + None, + Subcomm(comm, (0, 0, 1)), + ): + for rank in (0, 1, 2): + M = (3,) * rank + N + alignment = None + if subcomm is None and rank == 1: + alignment = 2 + a = DistArrayCuPy(M, subcomm=subcomm, val=1, rank=rank, alignment=alignment) + assert a.rank == rank + assert a.global_shape == M + _ = a.substart + _ = a.subcomm + z = a.commsizes + _ = a.pencil + assert cp.prod(cp.array(z)) == comm.Get_size() + if rank > 0: + a0 = a[0] + assert isinstance(a0, DistArrayCuPy) + assert a0.rank == rank - 1 + if rank == 2: + a0 = a[0, 1] + assert isinstance(a0, DistArrayCuPy) + assert a0.rank == 0 + aa = a.v + assert isinstance(aa, cp.ndarray) + try: + k = a.get((0,) * rank + (0, 0, slice(None))) + if comm.Get_rank() == 0: + assert len(k) == N[2] + assert cp.sum(k) == N[2] + k = a.get((0,) * rank + (slice(None), 0, 0)) + if comm.Get_rank() == 0: + assert len(k) == N[0] + assert cp.sum(k) == N[0] + except ModuleNotFoundError: + pass + _ = a.local_slice() + newaxis = (a.alignment + 1) % 3 + _ = a.get_pencil_and_transfer(newaxis) + a[:] = MPI.COMM_WORLD.Get_rank() + b = a.redistribute(newaxis) + a = b.redistribute(out=a) + s0 = MPI.COMM_WORLD.reduce(cp.linalg.norm(a) ** 2) + s1 = MPI.COMM_WORLD.reduce(cp.linalg.norm(b) ** 2) + if MPI.COMM_WORLD.Get_rank() == 0: + assert abs(s0 - s1) < 1e-1 + + +def test_newDistArray(): + N = (8, 8, 8) + pfft = PFFT(MPI.COMM_WORLD, N) + for forward_output in (True, False): + for view in (True, False): + for rank in (0, 1, 2): + a = newDistArray( + pfft, + forward_output=forward_output, + rank=rank, + view=view, + useCuPy=True, + ) + if view is False: + assert isinstance(a, DistArrayCuPy) + assert a.rank == rank + if rank == 0: + qfft = PFFT(MPI.COMM_WORLD, darray=a) + elif rank == 1: + qfft = PFFT(MPI.COMM_WORLD, darray=a[0]) + else: + qfft = PFFT(MPI.COMM_WORLD, darray=a[0, 0]) + qfft.destroy() + + else: + assert isinstance(a, cp.ndarray) + assert a.base.rank == rank + pfft.destroy() + + +if __name__ == "__main__": + test_1Darray() + test_2Darray() + test_3Darray() + test_newDistArray() From 424fdb1b4dabfb1f1dec094644647c8c3873f17b Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 12 Dec 2023 12:57:45 +0100 Subject: [PATCH 03/18] Added `cupy` backend for FFTs --- mpi4py_fft/distarray.py | 6 ++---- mpi4py_fft/libfft.py | 33 ++++++++++++++++++++++++++++++++- tests/test_libfft.py | 29 +++++++++++++++++++---------- tests/test_mpifft.py | 6 ++++++ 4 files changed, 59 insertions(+), 15 deletions(-) diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index c20e5a6..3f4ce5b 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -501,7 +501,7 @@ def asnumpy(self): return self -def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=False): +def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): """Return a new :class:`.DistArray` object for provided :class:`.PFFT` object Parameters @@ -519,8 +519,6 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=F If True return view of the underlying Numpy array, i.e., return cls.view(np.ndarray). Note that the DistArray still will be accessible through the base attribute of the view. - useCuPy : bool, optional - If True, returns DistArrayCuPy instead of DistArray Returns ------- @@ -545,7 +543,7 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=F dtype = pfft.forward.input_array.dtype global_shape = (len(global_shape),) * rank + global_shape - if useCuPy: + if pfft.xfftn[0].backend in ["cupy"]: from mpi4py_fft.distarrayCuPy import DistArrayCuPy as darraycls else: darraycls = DistArray diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 580bb9e..3f74b4d 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -78,6 +78,32 @@ def _Xfftn_plan_fftw(shape, axes, dtype, transforms, options): xfftn_bck = plan_bck(V, s=s, axes=axes, threads=threads, flags=flags, output_array=U) return (xfftn_fwd, xfftn_bck) +def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options): + import cupy as cp + + transforms = {} if transforms is None else transforms + if tuple(axes) in transforms: + plan_fwd, plan_bck = transforms[tuple(axes)] + else: + if cp.issubdtype(dtype, cp.floating): + plan_fwd = cp.fft.rfftn + plan_bck = cp.fft.irfftn + else: + plan_fwd = cp.fft.fftn + plan_bck = cp.fft.ifftn + + s = tuple(np.take(shape, axes)) + U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: avoid going via CPU + _V = plan_fwd(U, s=s, axes=axes) + V = cp.array(fftw.aligned_like(_V.get())) # TODO: avoid going via CPU + M = np.prod(s) + + # CuPy has forward transform unscaled and backward scaled with 1/N + return ( + _Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes}), + _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes}), + ) + def _Xfftn_plan_numpy(shape, axes, dtype, transforms, options): transforms = {} if transforms is None else transforms @@ -310,7 +336,6 @@ def _padding_backward(self, trunc_array, padded_array): su[axis] = -(N//2) padded_array[tuple(su)] *= 0.5 - class FFT(FFTBase): """Class for serial FFT transforms @@ -373,6 +398,7 @@ class FFT(FFTBase): output_array : array """ + def __init__(self, shape, axes=None, dtype=float, padding=False, backend='fftw', transforms=None, **kw): FFTBase.__init__(self, shape, axes, dtype, padding) @@ -380,6 +406,7 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, 'pyfftw': _Xfftn_plan_pyfftw, 'fftw': _Xfftn_plan_fftw, 'numpy': _Xfftn_plan_numpy, + 'cupy': _Xfftn_plan_cupy, 'mkl_fft': _Xfftn_plan_mkl, 'scipy': _Xfftn_plan_scipy, }[backend] @@ -393,12 +420,16 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, self.M = self.fwd.get_normalization() if backend == 'scipy': self.real_transform = False # No rfftn/irfftn methods + self.padding_factor = 1.0 if padding is not False: self.padding_factor = padding[self.axes[-1]] if np.ndim(padding) else padding if abs(self.padding_factor-1.0) > 1e-8: assert len(self.axes) == 1 trunc_array = self._get_truncarray(shape, V.dtype) + if self.backend in ['cupy']: # TODO: Skip detour via CPU + import cupy as cp + trunc_array = cp.array(trunc_array) self.forward = _Xfftn_wrap(self._forward, U, trunc_array) self.backward = _Xfftn_wrap(self._backward, trunc_array, U) else: diff --git a/tests/test_libfft.py b/tests/test_libfft.py index 23c6c8e..d4523a6 100644 --- a/tests/test_libfft.py +++ b/tests/test_libfft.py @@ -7,18 +7,22 @@ from mpi4py_fft.libfft import FFT has_backend = {'fftw': True} -for backend in ('pyfftw', 'mkl_fft', 'scipy', 'numpy'): +for backend in ('pyfftw', 'mkl_fft', 'scipy', 'numpy', 'cupy'): has_backend[backend] = True try: importlib.import_module(backend) except ImportError: has_backend[backend] = False +if has_backend['cupy']: + import cupy as cp + abstol = dict(f=5e-5, d=1e-14, g=1e-14) def allclose(a, b): atol = abstol[a.dtype.char.lower()] - return np.allclose(a, b, rtol=0, atol=atol) + xp = cp if type(a) == cp.ndarray else np + return xp.allclose(a, b, rtol=0, atol=atol) def test_libfft(): from itertools import product @@ -30,7 +34,7 @@ def test_libfft(): if fftw.get_fftw_lib(t): types += t+t.upper() - for backend in ('pyfftw', 'mkl_fft', 'scipy', 'numpy', 'fftw'): + for backend in has_backend.keys(): if has_backend[backend] is False: continue t0 = 0 @@ -47,7 +51,7 @@ def test_libfft(): A = fft.forward.input_array B = fft.forward.output_array - A[...] = np.random.random(A.shape).astype(typecode) + A[...] = (cp if backend in ['cupy'] else np).random.random(A.shape).astype(typecode) X = A.copy() B.fill(0) @@ -59,14 +63,15 @@ def test_libfft(): t0 -= time() A = fft.backward(B, A) t0 += time() - assert allclose(A, X) + assert allclose(A, X), f'Failed with {backend} backend with type \"{typecode}\" in {dim} dims!' print('backend: ', backend, t0) # Padding is different because the physical space is padded and as such # difficult to initialize. We solve this problem by making one extra # transform - for backend in ('pyfftw', 'mkl_fft', 'scipy', 'numpy', 'fftw'): + for backend in has_backend.keys(): if has_backend[backend] is False: continue + xp = cp if backend in ['cupy'] else np for padding in (1.5, 2.0): for typecode in types: for dim in dims: @@ -84,7 +89,7 @@ def test_libfft(): A = fft.forward.input_array B = fft.forward.output_array - A[...] = np.random.random(A.shape).astype(typecode) + A[...] = xp.random.random(A.shape).astype(typecode) B.fill(0) B = fft.forward(A, B) @@ -95,11 +100,12 @@ def test_libfft(): B.fill(0) B = fft.forward(A, B) - assert allclose(B, X), np.linalg.norm(B-X) + assert allclose(B, X), xp.linalg.norm(B-X) - for backend in ('pyfftw', 'mkl_fft', 'scipy', 'numpy', 'fftw'): + for backend in has_backend.keys(): if has_backend[backend] is False: continue + xp = cp if backend in ['cupy'] else np if backend == 'fftw': dctn = functools.partial(fftw.dctn, type=3) @@ -113,6 +119,9 @@ def test_libfft(): elif backend == 'numpy': transforms = {(1,): (np.fft.rfftn, np.fft.irfftn), (0, 1): (np.fft.rfftn, np.fft.irfftn)} + elif backend == 'cupy': + transforms = {(1,): (cp.fft.rfftn, cp.fft.irfftn), + (0, 1): (cp.fft.rfftn, cp.fft.irfftn)} elif backend == 'mkl_fft': import mkl_fft transforms = {(1,): (mkl_fft._numpy_fft.rfftn, mkl_fft._numpy_fft.irfftn), @@ -126,7 +135,7 @@ def test_libfft(): fft = FFT(shape, axis, backend=backend, transforms=transforms) A = fft.forward.input_array B = fft.forward.output_array - A[...] = np.random.random(A.shape) + A[...] = xp.random.random(A.shape) X = A.copy() B.fill(0) B = fft.forward(A, B) diff --git a/tests/test_mpifft.py b/tests/test_mpifft.py index 2ea851a..c739c21 100644 --- a/tests/test_mpifft.py +++ b/tests/test_mpifft.py @@ -14,6 +14,12 @@ except ImportError: pass +try: + import cupy + backends.append('cupy') +except ImportError: + pass + abstol = dict(f=0.1, d=2e-10, g=1e-10) def allclose(a, b): From 44be7bce8b8661adbd583c44805ada708b33e278 Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 12 Dec 2023 12:59:37 +0100 Subject: [PATCH 04/18] Removed `useCuPy` argument from `NewDistArray` --- tests/test_darrayCuPy.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_darrayCuPy.py b/tests/test_darrayCuPy.py index 0fd8d08..fb0e1b0 100644 --- a/tests/test_darrayCuPy.py +++ b/tests/test_darrayCuPy.py @@ -121,7 +121,7 @@ def test_3Darray(): def test_newDistArray(): N = (8, 8, 8) - pfft = PFFT(MPI.COMM_WORLD, N) + pfft = PFFT(MPI.COMM_WORLD, N, backend='cupy') for forward_output in (True, False): for view in (True, False): for rank in (0, 1, 2): @@ -130,17 +130,16 @@ def test_newDistArray(): forward_output=forward_output, rank=rank, view=view, - useCuPy=True, ) if view is False: assert isinstance(a, DistArrayCuPy) assert a.rank == rank if rank == 0: - qfft = PFFT(MPI.COMM_WORLD, darray=a) + qfft = PFFT(MPI.COMM_WORLD, darray=a, backend='cupy') elif rank == 1: - qfft = PFFT(MPI.COMM_WORLD, darray=a[0]) + qfft = PFFT(MPI.COMM_WORLD, darray=a[0], backend='cupy') else: - qfft = PFFT(MPI.COMM_WORLD, darray=a[0, 0]) + qfft = PFFT(MPI.COMM_WORLD, darray=a[0, 0], backend='cupy') qfft.destroy() else: From 90551b4a5b832cbdb84fd0943b5f956533efa5b0 Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 12 Dec 2023 13:04:50 +0100 Subject: [PATCH 05/18] Fixed tests --- tests/test_PFFT_cupy_backend.py | 36 +++++++++++++++++++++++++++++++++ tests/test_mpifft.py | 6 ------ 2 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 tests/test_PFFT_cupy_backend.py diff --git a/tests/test_PFFT_cupy_backend.py b/tests/test_PFFT_cupy_backend.py new file mode 100644 index 0000000..73a742f --- /dev/null +++ b/tests/test_PFFT_cupy_backend.py @@ -0,0 +1,36 @@ +def test_PFFT_cupy_backend(): + import numpy as np + import cupy as cp + from mpi4py import MPI + from mpi4py_fft import PFFT, newDistArray + + comm = MPI.COMM_WORLD + + # Set global size of the computational box + N = np.array([comm.size * 8] * 3, dtype=int) + expected_shape = (N[0] // comm.size, N[1], N[2]) + axes = ((0,), (1, 2)) + + backends = ['numpy', 'cupy'] + FFTs = {backend: PFFT(comm, N, axes=axes, grid=(-1,), backend=backend) for backend in backends} + assert FFTs['numpy'].axes == FFTs['cupy'].axes + + us = {backend: newDistArray(FFTs[backend], forward_output=False) for backend in backends} + us['numpy'][:] = np.random.random(us['numpy'].shape).astype(us['numpy'].dtype) + us['cupy'][:] = cp.array(us['numpy']) + + + + for backend, xp in zip(backends, [np, cp]): + us['hat_' + backend] = newDistArray(FFTs[backend], forward_output=True) + us['hat_' + backend] = FFTs[backend].forward(us[backend], us['hat_' + backend]) + us['back_and_forth_' + backend] = xp.zeros_like(us[backend]) + us['back_and_forth_' + backend] = FFTs[backend].backward(us['hat_' + backend], us['back_and_forth_' + backend]) + + assert xp.allclose(us[backend], us['back_and_forth_' + backend]), f'Got different values after back and forth transformation with {backend} backend' + assert np.allclose(us['back_and_forth_' + backend].shape, expected_shape), f"Got unexpected shape {us['back_and_forth_' + backend].shape} when expecting {expected_shape} with {backend} backend" + assert np.allclose(us['hat_cupy'].get(), us['hat_numpy']), 'Got different values in frequency space' + + +if __name__ == '__main__': + test_PFFT_cupy_backend() diff --git a/tests/test_mpifft.py b/tests/test_mpifft.py index c739c21..2ea851a 100644 --- a/tests/test_mpifft.py +++ b/tests/test_mpifft.py @@ -14,12 +14,6 @@ except ImportError: pass -try: - import cupy - backends.append('cupy') -except ImportError: - pass - abstol = dict(f=0.1, d=2e-10, g=1e-10) def allclose(a, b): From 202f30cf1839dbd450a2b434f8817991460cf1a3 Mon Sep 17 00:00:00 2001 From: Thomas Date: Thu, 14 Dec 2023 08:59:09 +0100 Subject: [PATCH 06/18] Added cupyx-scipy backend --- mpi4py_fft/distarray.py | 2 +- mpi4py_fft/libfft.py | 39 ++++++++++++++++++++++++++++++++++++++- tests/test_libfft.py | 24 ++++++++++++++++++++---- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index 3f4ce5b..9416856 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -543,7 +543,7 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): dtype = pfft.forward.input_array.dtype global_shape = (len(global_shape),) * rank + global_shape - if pfft.xfftn[0].backend in ["cupy"]: + if pfft.xfftn[0].backend in ["cupy", "cupyx-scipy"]: from mpi4py_fft.distarrayCuPy import DistArrayCuPy as darraycls else: darraycls = DistArray diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 3f74b4d..16feeda 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -151,6 +151,42 @@ def _Xfftn_plan_mkl(shape, axes, dtype, transforms, options): #pragma: no cover return (_Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes}), _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes})) +def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options): + import cupy as cp + import cupyx.scipy.fft as fft_lib + + transforms = {} if transforms is None else transforms + if tuple(axes) in transforms: + _plan_fwd, _plan_bck = transforms[tuple(axes)] + else: + if cp.issubdtype(dtype, cp.floating): + _plan_fwd = fft_lib.rfftn + _plan_bck = fft_lib.irfftn + else: + _plan_fwd = fft_lib.fftn + _plan_bck = fft_lib.ifftn + + def swap_shape_for_s(kwargs): + _kwargs = { + 's': kwargs.pop('shape', None), + **kwargs, + } + return _kwargs + + def plan_fwd(*args, **kwargs): + return _plan_fwd(*args, **swap_shape_for_s(kwargs)) + + def plan_bck(*args, **kwargs): + return _plan_bck(*args, **swap_shape_for_s(kwargs)) + + s = tuple(np.take(shape, axes)) + U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: Skip CPU detour + V = plan_fwd(U, s=s, axes=axes) + V = cp.array(fftw.aligned_like(V.get())) # TODO: skip CPU detour + M = np.prod(s) + return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes}), + _Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes})) + def _Xfftn_plan_scipy(shape, axes, dtype, transforms, options): transforms = {} if transforms is None else transforms @@ -409,6 +445,7 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, 'cupy': _Xfftn_plan_cupy, 'mkl_fft': _Xfftn_plan_mkl, 'scipy': _Xfftn_plan_scipy, + 'cupyx-scipy': _Xfftn_plan_cupyx_scipy, }[backend] self.backend = backend self.fwd, self.bck = plan(self.shape, self.axes, self.dtype, transforms, kw) @@ -427,7 +464,7 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, if abs(self.padding_factor-1.0) > 1e-8: assert len(self.axes) == 1 trunc_array = self._get_truncarray(shape, V.dtype) - if self.backend in ['cupy']: # TODO: Skip detour via CPU + if 'cupy' in self.backend: # TODO: Skip detour via CPU import cupy as cp trunc_array = cp.array(trunc_array) self.forward = _Xfftn_wrap(self._forward, U, trunc_array) diff --git a/tests/test_libfft.py b/tests/test_libfft.py index d4523a6..180fc11 100644 --- a/tests/test_libfft.py +++ b/tests/test_libfft.py @@ -16,12 +16,22 @@ if has_backend['cupy']: import cupy as cp + has_backend['cupyx-scipy'] = True abstol = dict(f=5e-5, d=1e-14, g=1e-14) +def get_xpy(backend=None, array=None): + if backend in ['cupy', 'cupyx-scipy']: + return cp + if has_backend['cupy'] and array is not None: + if type(array) == cp.ndarray: + return cp + return np + + def allclose(a, b): atol = abstol[a.dtype.char.lower()] - xp = cp if type(a) == cp.ndarray else np + xp = get_xpy(array=a) return xp.allclose(a, b, rtol=0, atol=atol) def test_libfft(): @@ -37,6 +47,7 @@ def test_libfft(): for backend in has_backend.keys(): if has_backend[backend] is False: continue + xp = get_xpy(backend=backend) t0 = 0 for typecode in types: for dim in dims: @@ -51,7 +62,7 @@ def test_libfft(): A = fft.forward.input_array B = fft.forward.output_array - A[...] = (cp if backend in ['cupy'] else np).random.random(A.shape).astype(typecode) + A[...] = xp.random.random(A.shape).astype(typecode) X = A.copy() B.fill(0) @@ -71,7 +82,7 @@ def test_libfft(): for backend in has_backend.keys(): if has_backend[backend] is False: continue - xp = cp if backend in ['cupy'] else np + xp = get_xpy(backend=backend) for padding in (1.5, 2.0): for typecode in types: for dim in dims: @@ -105,7 +116,7 @@ def test_libfft(): for backend in has_backend.keys(): if has_backend[backend] is False: continue - xp = cp if backend in ['cupy'] else np + xp = get_xpy(backend=backend) if backend == 'fftw': dctn = functools.partial(fftw.dctn, type=3) @@ -130,6 +141,11 @@ def test_libfft(): from scipy.fftpack import fftn, ifftn transforms = {(1,): (fftn, ifftn), (0, 1): (fftn, ifftn)} + elif backend == 'cupyx-scipy': + from scipy.fftpack import fftn, ifftn + import cupyx.scipy.fft as fftlib + transforms = {(1,): (fftlib.fftn, fftlib.ifftn), + (0, 1): (fftlib.fftn, fftlib.ifftn)} for axis in ((1,), (0, 1)): fft = FFT(shape, axis, backend=backend, transforms=transforms) From e5d4b562085628b3a2e510d8b6547fca755a3054 Mon Sep 17 00:00:00 2001 From: Thomas Date: Mon, 18 Dec 2023 17:13:19 +0100 Subject: [PATCH 07/18] Implemented NCCL for Alltoallw operations --- mpi4py_fft/mpifft.py | 8 ++- mpi4py_fft/pencil.py | 153 +++++++++++++++++++++++++++++++++++++++++-- tests/test_pencil.py | 84 +++++++++++++++--------- 3 files changed, 207 insertions(+), 38 deletions(-) diff --git a/mpi4py_fft/mpifft.py b/mpi4py_fft/mpifft.py index 4abe5d8..509a7c4 100644 --- a/mpi4py_fft/mpifft.py +++ b/mpi4py_fft/mpifft.py @@ -128,6 +128,11 @@ class PFFT(object): fftn/ifftn for complex input arrays. Real-to-real transforms can be configured using this dictionary and real-to-real transforms from the :mod:`.fftw.xfftn` module. See Examples. + comm_backend : str, optional + Choose backend for communication. When using GPU based serial backends, + the "NCCL" backend can be be used in `Alltoallw` operations to speedup + GPU to GPU transfer. Keep in mind that this is used alongside MPI and + assumes one GPU per MPI rankMPI is used. Other Parameters ---------------- @@ -201,7 +206,7 @@ class PFFT(object): """ def __init__(self, comm, shape=None, axes=None, dtype=float, grid=None, padding=False, collapse=False, backend='fftw', - transforms=None, darray=None, **kw): + transforms=None, darray=None, comm_backend='MPI', **kw): # pylint: disable=too-many-locals # pylint: disable=too-many-branches # pylint: disable=too-many-statements @@ -311,6 +316,7 @@ def __init__(self, comm, shape=None, axes=None, dtype=float, grid=None, self.pencil = [None, None] axes = self.axes[-1] + Pencil.backend = comm_backend pencil = Pencil(self.subcomm, shape, axes[-1]) xfftn = FFT(pencil.subshape, axes, dtype, padding, backend=backend, transforms=transforms, **kw) diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 6671d8c..3a8a7b2 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -179,8 +179,15 @@ def forward(self, arrayA, arrayB): assert self.subshapeB == arrayB.shape assert self.dtype == arrayA.dtype assert self.dtype == arrayB.dtype - self.comm.Alltoallw([arrayA, self._counts_displs, self._subtypesA], - [arrayB, self._counts_displs, self._subtypesB]) + self.Alltoallw(arrayA, self._subtypesA, arrayB, self._subtypesB) + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + """ + Redistribute arrayA to arrayB + """ + self.comm.Alltoallw([arrayA, self._counts_displs, subtypesA], + [arrayB, self._counts_displs, subtypesB]) + def backward(self, arrayB, arrayA): """Global redistribution from arrayB to arrayA @@ -197,8 +204,7 @@ def backward(self, arrayB, arrayA): assert self.subshapeB == arrayB.shape assert self.dtype == arrayA.dtype assert self.dtype == arrayB.dtype - self.comm.Alltoallw([arrayB, self._counts_displs, self._subtypesB], - [arrayA, self._counts_displs, self._subtypesA]) + self.Alltoallw(arrayB, self._subtypesB, arrayA, self._subtypesA) def destroy(self): for datatype in self._subtypesA: @@ -209,6 +215,134 @@ def destroy(self): datatype.Free() +class NCCLTransfer(Transfer): + """ + Transfer class which uses NCCL for `Alltoallw` operations. The NCCL + communicator will share rank and size attributes with the MPI communicator. + In particular, this assumes one GPU per MPI rank. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + from cupy.cuda import nccl + self.comm_nccl = nccl.NcclCommunicator(self.comm.size, self.comm.bcast(nccl.get_unique_id(), root=0), self.comm.rank) + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + """ + Redistribute arrayA to arrayB. + + As NCCL does not have all to all, we replicate it by a bunch of individual send and receives. + As NCCL also does not have complex datatypes, we have to send real and imaginary parts separately. + """ + import cupy as cp + from cupy.cuda import nccl + assert type(arrayA) == cp.ndarray + assert type(arrayB) == cp.ndarray + assert arrayA.dtype == arrayB.dtype + assert self.comm.rank == self.comm_nccl.rank_id(), f'The structure of the communicator has changed unexpectedly' + + rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl + stream = cp.cuda.Stream.null.ptr + iscomplex = cp.iscomplexobj(arrayA) + NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) + + for i in range(size): + for j in range(size): + + if rank == i: + local_slice, shape = self.get_slice_and_shape(subtypesB[j]) + buff = self.get_buffer(shape, iscomplex, real_dtype) + + if i == j: + send_slice, _ = self.get_slice_and_shape(subtypesA[i]) + self.fill_buffer(buff, arrayA, send_slice, iscomplex) + else: + comm.recv(buff.data.ptr, buff.size, NCCL_dtype, j, stream) + + self.unpack_buffer(buff, arrayB, local_slice, iscomplex) + + elif rank == j: + local_slice, shape = self.get_slice_and_shape(subtypesA[i]) + buff = self.get_buffer(shape, iscomplex, real_dtype) + self.fill_buffer(buff, arrayA, local_slice, iscomplex) + + comm.send(buff.data.ptr, buff.size, NCCL_dtype, i, stream) + + + @staticmethod + def get_slice_and_shape(subtype): + """ + Extract from the subtype object generated for MPI what shape the buffer + should have and what part of the array we want to send / receive. + """ + decoded = subtype.decode() + subsizes = decoded[2]['subsizes'] + starts = decoded[2]['starts'] + return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes + + @staticmethod + def get_nccl_and_real_dtypes(array): + """ + Translate the datatype of the array to a NCCL type for sending with NCCL. + As NCCL does not support complex types, we have to translate them to two + real values. + """ + from cupy.cuda import nccl + nccl_dtypes = { + np.dtype('float32'): nccl.NCCL_FLOAT32, + np.dtype('float64'): nccl.NCCL_FLOAT64, + np.dtype('complex64'): nccl.NCCL_FLOAT32, + np.dtype('complex128'): nccl.NCCL_FLOAT64, + } + real_dtypes = { + np.dtype('float32'): np.dtype('float32'), + np.dtype('float64'): np.dtype('float64'), + np.dtype('complex64'): np.dtype('float32'), + np.dtype('complex128'): np.dtype('float64'), + } + return nccl_dtypes[array.dtype], real_dtypes[array.dtype] + + @staticmethod + def get_buffer(shape, iscomplex, real_dtype): + """ + Get a buffer for communication. If complex numbers are used, we send + two real values instead. + """ + import cupy as cp + if iscomplex: + return cp.empty(shape=[2,] + shape, dtype=real_dtype) + else: + return cp.empty(shape=shape, dtype=real_dtype) + + @staticmethod + def fill_buffer(buff, array, local_slice, iscomplex): + """ + Fill buffer for communication. If complex numbers are used, we send + two real values instead. + """ + if iscomplex: + buff[0][:] = array[local_slice].real + buff[1][:] = array[local_slice].imag + else: + buff[:] = array[local_slice][:] + + @staticmethod + def unpack_buffer(buff, array, local_slice, iscomplex): + """ + Unpack buffer for communication. If complex numbers are used, we get + two real values instead. + """ + if iscomplex: + array[local_slice].real = buff[0][:] + array[local_slice].imag = buff[1][:] + else: + array[local_slice][:] = buff[:] + + def destroy(self): + super().destroy() + self.comm_nccl.destroy() + + class Pencil(object): """Class to represent a distributed array (pencil) @@ -274,6 +408,8 @@ class Pencil(object): aligned in axis 1. """ + backend = 'MPI' + def __init__(self, subcomm, shape, axis=-1): assert len(shape) >= 2 assert min(shape) >= 1 @@ -349,6 +485,13 @@ def transfer(self, pencil, dtype): shape = list(penA.subshape) shape[axis] = penA.shape[axis] - return Transfer(comm, shape, dtype, + if self.backend == 'MPI': + transfer_class = Transfer + elif self.backend == 'NCCL': + transfer_class = NCCLTransfer + else: + raise NotImplementedError(f'Don\'t have a transfer class for backend \"{self.backend}\"!') + + return transfer_class(comm, shape, dtype, penA.subshape, penA.axis, penB.subshape, penB.axis) diff --git a/tests/test_pencil.py b/tests/test_pencil.py index 9a74c40..c921706 100644 --- a/tests/test_pencil.py +++ b/tests/test_pencil.py @@ -11,53 +11,73 @@ def test_pencil(): sizes = (7, 8, 9) types = 'fdFD' #'hilfdgFDG' + backends = ['MPI'] + + xp = { + 'MPI': np, + } + + try: + import cupy as cp + from cupy.cuda import nccl + backends += ['NCCL'] + xp['NCCL'] = cp + except ImportError: + pass + for typecode in types: for dim in dims: for shape in product(*([sizes]*dim)): - axes = list(range(dim)) - for axis1, axis2, axis3 in product(axes, axes, axes): + for backend in backends: + + Pencil.backend = backend + axes = list(range(dim)) + + for axis1, axis2, axis3 in product(axes, axes, axes): + + if axis1 == axis2: continue + if axis2 == axis3: continue + axis3 -= len(shape) + #if comm.rank == 0: + # print(shape, axis1, axis2, axis3, typecode) - if axis1 == axis2: continue - if axis2 == axis3: continue - axis3 -= len(shape) - #if comm.rank == 0: - # print(shape, axis1, axis2, axis3, typecode) + for pdim in [None] + list(range(1, dim-1)): - for pdim in [None] + list(range(1, dim-1)): + subcomm = Subcomm(comm, pdim) + pencil0 = Pencil(subcomm, shape) - subcomm = Subcomm(comm, pdim) - pencil0 = Pencil(subcomm, shape) + pencilA = pencil0.pencil(axis1) + pencilB = pencilA.pencil(axis2) + pencilC = pencilB.pencil(axis3) - pencilA = pencil0.pencil(axis1) - pencilB = pencilA.pencil(axis2) - pencilC = pencilB.pencil(axis3) + assert pencilC.backend == backend - trans1 = Pencil.transfer(pencilA, pencilB, typecode) - trans2 = Pencil.transfer(pencilB, pencilC, typecode) + trans1 = Pencil.transfer(pencilA, pencilB, typecode) + trans2 = Pencil.transfer(pencilB, pencilC, typecode) - X = np.random.random(pencilA.subshape).astype(typecode) + X = xp[backend].random.random(pencilA.subshape).astype(typecode) - A = np.empty(pencilA.subshape, dtype=typecode) - B = np.empty(pencilB.subshape, dtype=typecode) - C = np.empty(pencilC.subshape, dtype=typecode) + A = xp[backend].empty(pencilA.subshape, dtype=typecode) + B = xp[backend].empty(pencilB.subshape, dtype=typecode) + C = xp[backend].empty(pencilC.subshape, dtype=typecode) - A[...] = X + A[...] = X - B.fill(0) - trans1.forward(A, B) - C.fill(0) - trans2.forward(B, C) + B.fill(0) + trans1.forward(A, B) + C.fill(0) + trans2.forward(B, C) - B.fill(0) - trans2.backward(C, B) - A.fill(0) - trans1.backward(B, A) + B.fill(0) + trans2.backward(C, B) + A.fill(0) + trans1.backward(B, A) - assert np.allclose(A, X) + assert xp[backend].allclose(A, X) - trans1.destroy() - trans2.destroy() - subcomm.destroy() + trans1.destroy() + trans2.destroy() + subcomm.destroy() if __name__ == '__main__': From d0a493e7b266ffad7d37f1f4ea617df3647f9ae6 Mon Sep 17 00:00:00 2001 From: Thomas Date: Mon, 18 Dec 2023 19:36:16 +0100 Subject: [PATCH 08/18] Minor refactor --- mpi4py_fft/distarray.py | 113 ++++++++++++------------------------ mpi4py_fft/distarrayCuPy.py | 5 +- mpi4py_fft/pencil.py | 20 +++---- 3 files changed, 50 insertions(+), 88 deletions(-) diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index 9416856..a8815a5 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -50,17 +50,17 @@ def alignment(self): @property def global_shape(self): """Return global shape of ``self``""" - return self.shape[: self.rank] + self._p0.shape + return self.shape[:self.rank] + self._p0.shape @property def substart(self): """Return starting indices of local ``self`` array""" - return (0,) * self.rank + self._p0.substart + return (0,)*self.rank + self._p0.substart @property def subcomm(self): """Return tuple of subcommunicators for all axes of ``self``""" - return (MPI.COMM_SELF,) * self.rank + self._p0.subcomm + return (MPI.COMM_SELF,)*self.rank + self._p0.subcomm @property def commsizes(self): @@ -83,7 +83,7 @@ def dimensions(self): return len(self._p0.shape) @staticmethod - def getSubcomm(subcomm, global_shape, rank, alignment): + def get_subcomm(subcomm, global_shape, rank, alignment): if isinstance(subcomm, Subcomm): pass else: @@ -104,7 +104,7 @@ def getSubcomm(subcomm, global_shape, rank, alignment): return subcomm @classmethod - def getPencil(cls, subcomm, rank, global_shape, alignment): + def setup_pencil(cls, subcomm, rank, global_shape, alignment): sizes = [s.Get_size() for s in subcomm] if alignment is not None: assert isinstance(alignment, (int, cls.xp.integer)) @@ -185,20 +185,15 @@ def get(self, gslice): # global MPI. We create a global file with MPI, but then open it without # MPI and only on rank 0. import h5py - # TODO: can we use h5py to communicate the data without copying to cpu first when using cupy? - f = h5py.File("tmp.h5", "w", driver="mpio", comm=comm) + f = h5py.File('tmp.h5', 'w', driver="mpio", comm=comm) s = self.local_slice() sp = np.nonzero([isinstance(x, slice) for x in gslice])[0] sf = tuple(np.take(s, sp)) - f.require_dataset( - "data", shape=tuple(np.take(self.global_shape, sp)), dtype=self.dtype - ) + f.require_dataset('data', shape=tuple(np.take(self.global_shape, sp)), dtype=self.dtype) gslice = list(gslice) # We are required to check if the indices in si are on this processor - si = np.nonzero( - [isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)] - )[0] + si = np.nonzero([isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)])[0] on_this_proc = True for i in si: if gslice[i] >= s[i].start and gslice[i] < s[i].stop: @@ -206,15 +201,15 @@ def get(self, gslice): else: on_this_proc = False if on_this_proc: - data = self.asnumpy + data = self.asnumpy() f["data"][sf] = data[tuple(gslice)] f.close() c = None if comm.Get_rank() == 0: - h = h5py.File("tmp.h5", "r") - c = h["data"].__array__() + h = h5py.File('tmp.h5', 'r') + c = h['data'].__array__() h.close() - os.remove("tmp.h5") + os.remove('tmp.h5') return c def local_slice(self): @@ -250,11 +245,9 @@ def local_slice(self): (slice(0, 16, None), slice(7, 14, None), slice(0, 6, None)) (slice(0, 16, None), slice(7, 14, None), slice(6, 12, None)) """ - v = [ - slice(start, start + shape) - for start, shape in zip(self._p0.substart, self._p0.subshape) - ] - return tuple([slice(0, s) for s in self.shape[: self.rank]] + v) + v = [slice(start, start+shape) for start, shape in zip(self._p0.substart, + self._p0.subshape)] + return tuple([slice(0, s) for s in self.shape[:self.rank]] + v) def redistribute(self, axis=None, out=None): """Global redistribution of local ``self`` array @@ -283,7 +276,7 @@ def redistribute(self, axis=None, out=None): # Check if self is already aligned along axis. In that case just switch # axis of pencil (both axes are undivided) and return if axis is not None: - if self.commsizes[self.rank + axis] == 1: + if self.commsizes[self.rank+axis] == 1: self.pencil.axis = axis return self @@ -304,13 +297,11 @@ def redistribute(self, axis=None, out=None): p1, transfer = self.get_pencil_and_transfer(axis) if out is None: - out = type(self)( - self.global_shape, + out = type(self)(self.global_shape, subcomm=p1.subcomm, dtype=self.dtype, alignment=axis, - rank=self.rank, - ) + rank=self.rank) if self.rank == 0: transfer.forward(self, out) @@ -343,15 +334,8 @@ def get_pencil_and_transfer(self, axis): p1 = self._p0.pencil(axis) return p1, self._p0.transfer(p1, self.dtype) - def write( - self, - filename, - name="darray", - step=0, - global_slice=None, - domain=None, - as_scalar=False, - ): + def write(self, filename, name='darray', step=0, global_slice=None, + domain=None, as_scalar=False): """Write snapshot ``step`` of ``self`` to file ``filename`` Parameters @@ -384,14 +368,14 @@ def write( >>> u.write('h5file.h5', 'u', (slice(None), 4)) """ if isinstance(filename, str): - writer = HDF5File if filename.endswith(".h5") else NCFile - f = writer(filename, domain=domain, mode="a") + writer = HDF5File if filename.endswith('.h5') else NCFile + f = writer(filename, domain=domain, mode='a') elif isinstance(filename, FileBase): f = filename field = [self] if global_slice is None else [(self, global_slice)] f.write(step, {name: field}, as_scalar=as_scalar) - def read(self, filename, name="darray", step=0): + def read(self, filename, name='darray', step=0): """Read data ``name`` at index ``step``from file ``filename`` into ``self`` @@ -420,8 +404,8 @@ def read(self, filename, name="darray", step=0): """ if isinstance(filename, str): - writer = HDF5File if filename.endswith(".h5") else NCFile - f = writer(filename, mode="r") + writer = HDF5File if filename.endswith('.h5') else NCFile + f = writer(filename, mode='r') elif isinstance(filename, FileBase): f = filename f.read(self, name, step=step) @@ -460,29 +444,18 @@ class DistArray(DistArrayBase, np.ndarray): xp = np - def __new__( - cls, - global_shape, - subcomm=None, - val=None, - dtype=float, - buffer=None, - strides=None, - alignment=None, - rank=0, - ): - if len(global_shape[rank:]) < 2: # 1D case - obj = cls.xp.ndarray.__new__( - cls, global_shape, dtype=dtype, buffer=buffer, strides=strides - ) + def __new__(cls, global_shape, subcomm=None, val=None, dtype=float, + buffer=None, strides=None, alignment=None, rank=0): + if len(global_shape[rank:]) < 2: # 1D case + obj = cls.xp.ndarray.__new__(cls, global_shape, dtype=dtype, buffer=buffer, strides=strides) if buffer is None and isinstance(val, Number): obj.fill(val) obj._rank = rank obj._p0 = None return obj - subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) - p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + subcomm = cls.get_subcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.setup_pencil(subcomm, rank, global_shape, alignment) obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer) if buffer is None and isinstance(val, Number): @@ -496,11 +469,9 @@ def v(self): """Return local ``self`` array as an ``ndarray`` object""" return self.__array__() - @property def asnumpy(self): return self - def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): """Return a new :class:`.DistArray` object for provided :class:`.PFFT` object @@ -541,29 +512,21 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): dtype = pfft.forward.output_array.dtype else: dtype = pfft.forward.input_array.dtype - global_shape = (len(global_shape),) * rank + global_shape + global_shape = (len(global_shape),)*rank + global_shape if pfft.xfftn[0].backend in ["cupy", "cupyx-scipy"]: from mpi4py_fft.distarrayCuPy import DistArrayCuPy as darraycls else: darraycls = DistArray - z = darraycls( - global_shape, - subcomm=p0.subcomm, - val=val, - dtype=dtype, - alignment=p0.axis, - rank=rank, - ) + z = darraycls(global_shape, subcomm=p0.subcomm, val=val, dtype=dtype, + alignment=p0.axis, rank=rank) return z.v if view else z - -def Function(*args, **kwargs): # pragma: no cover +def Function(*args, **kwargs): #pragma: no cover import warnings - warnings.warn("Function() is deprecated; use newDistArray().", FutureWarning) - if "tensor" in kwargs: - kwargs["rank"] = 1 - del kwargs["tensor"] + if 'tensor' in kwargs: + kwargs['rank'] = 1 + del kwargs['tensor'] return newDistArray(*args, **kwargs) diff --git a/mpi4py_fft/distarrayCuPy.py b/mpi4py_fft/distarrayCuPy.py index d3ead89..7374043 100644 --- a/mpi4py_fft/distarrayCuPy.py +++ b/mpi4py_fft/distarrayCuPy.py @@ -59,8 +59,8 @@ def __new__( obj._p0 = None return obj - subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) - p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + subcomm = cls.get_subcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.setup_pencil(subcomm, rank, global_shape, alignment) obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr) if memptr is None and isinstance(val, Number): @@ -76,7 +76,6 @@ def get(self, *args, **kwargs): else: return cp.ndarray.get(self, *args, **kwargs) - @property def asnumpy(self): """Copy the array to CPU""" return self.get() diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 3a8a7b2..4cf3ca1 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -246,27 +246,27 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): iscomplex = cp.iscomplexobj(arrayA) NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) - for i in range(size): - for j in range(size): + for recv_rank in range(size): + for send_rank in range(size): - if rank == i: - local_slice, shape = self.get_slice_and_shape(subtypesB[j]) + if rank == recv_rank: + local_slice, shape = self.get_slice_and_shape(subtypesB[send_rank]) buff = self.get_buffer(shape, iscomplex, real_dtype) - if i == j: - send_slice, _ = self.get_slice_and_shape(subtypesA[i]) + if recv_rank == send_rank: + send_slice, _ = self.get_slice_and_shape(subtypesA[recv_rank]) self.fill_buffer(buff, arrayA, send_slice, iscomplex) else: - comm.recv(buff.data.ptr, buff.size, NCCL_dtype, j, stream) + comm.recv(buff.data.ptr, buff.size, NCCL_dtype, send_rank, stream) self.unpack_buffer(buff, arrayB, local_slice, iscomplex) - elif rank == j: - local_slice, shape = self.get_slice_and_shape(subtypesA[i]) + elif rank == send_rank: + local_slice, shape = self.get_slice_and_shape(subtypesA[recv_rank]) buff = self.get_buffer(shape, iscomplex, real_dtype) self.fill_buffer(buff, arrayA, local_slice, iscomplex) - comm.send(buff.data.ptr, buff.size, NCCL_dtype, i, stream) + comm.send(buff.data.ptr, buff.size, NCCL_dtype, recv_rank, stream) @staticmethod From 6b47e36be3c3c190825e4962374ef233efbbbe84 Mon Sep 17 00:00:00 2001 From: Thomas Date: Wed, 20 Dec 2023 09:05:31 +0100 Subject: [PATCH 09/18] Changed `Alltoallw` communication pattern --- mpi4py_fft/libfft.py | 21 ++++++++++++++++---- mpi4py_fft/pencil.py | 46 ++++++++++++++++++++++++++++---------------- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 16feeda..2a503fd 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -86,11 +86,24 @@ def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options): plan_fwd, plan_bck = transforms[tuple(axes)] else: if cp.issubdtype(dtype, cp.floating): - plan_fwd = cp.fft.rfftn - plan_bck = cp.fft.irfftn + _plan_fwd = cp.fft.rfftn + _plan_bck = cp.fft.irfftn else: - plan_fwd = cp.fft.fftn - plan_bck = cp.fft.ifftn + _plan_fwd = cp.fft.fftn + _plan_bck = cp.fft.ifftn + + stream = cp.cuda.stream.Stream() + def execute_in_stream(function, *args, **kwargs): + with stream: + result = function(*args, **kwargs) + stream.synchronize() + return result + + def plan_fwd(*args, **kwargs): + return execute_in_stream(_plan_fwd, *args, **kwargs) + + def plan_bck(*args, **kwargs): + return execute_in_stream(_plan_bck, *args, **kwargs) s = tuple(np.take(shape, axes)) U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: avoid going via CPU diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 4cf3ca1..f97b28b 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -242,32 +242,44 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): assert self.comm.rank == self.comm_nccl.rank_id(), f'The structure of the communicator has changed unexpectedly' rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl - stream = cp.cuda.Stream.null.ptr iscomplex = cp.iscomplexobj(arrayA) NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) - for recv_rank in range(size): - for send_rank in range(size): + send_stream = cp.cuda.Stream(non_blocking=False) + recv_stream = cp.cuda.Stream(non_blocking=False) - if rank == recv_rank: - local_slice, shape = self.get_slice_and_shape(subtypesB[send_rank]) - buff = self.get_buffer(shape, iscomplex, real_dtype) + def send(array, subtype, send_to, iscomplex, stream): + local_slice, shape = self.get_slice_and_shape(subtype) + buff = self.get_buffer(shape, iscomplex, real_dtype) + self.fill_buffer(buff, array, local_slice, iscomplex) + comm.send(buff.data.ptr, buff.size, NCCL_dtype, send_to, stream.ptr) - if recv_rank == send_rank: - send_slice, _ = self.get_slice_and_shape(subtypesA[recv_rank]) - self.fill_buffer(buff, arrayA, send_slice, iscomplex) - else: - comm.recv(buff.data.ptr, buff.size, NCCL_dtype, send_rank, stream) + for i in range(size): + send_to = (rank + i) % size + recv_from = (rank -i + size) % size - self.unpack_buffer(buff, arrayB, local_slice, iscomplex) + if send_to > rank: + with send_stream: + send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream) - elif rank == send_rank: - local_slice, shape = self.get_slice_and_shape(subtypesA[recv_rank]) - buff = self.get_buffer(shape, iscomplex, real_dtype) - self.fill_buffer(buff, arrayA, local_slice, iscomplex) + with recv_stream: + local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from]) + buff = self.get_buffer(shape, iscomplex, real_dtype) - comm.send(buff.data.ptr, buff.size, NCCL_dtype, recv_rank, stream) + if recv_from == rank: + send_slice, _ = self.get_slice_and_shape(subtypesA[send_to]) + self.fill_buffer(buff, arrayA, send_slice, iscomplex) + else: + comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, recv_stream.ptr) + self.unpack_buffer(buff, arrayB, local_slice, iscomplex) + + if send_to < rank: + with send_stream: + send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream) + + send_stream.synchronize() + recv_stream.synchronize() @staticmethod def get_slice_and_shape(subtype): From 21805dea9a4cad8ca4d66b2d4367699a4a3a8b12 Mon Sep 17 00:00:00 2001 From: Thomas Date: Thu, 4 Jan 2024 13:12:57 +0100 Subject: [PATCH 10/18] Added custom MPI based Alltoallw implementation --- mpi4py_fft/pencil.py | 108 ++++++++++++++++++++++++++------- tests/test_pencil.py | 4 +- tests/test_transfer_classes.py | 81 +++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 22 deletions(-) create mode 100644 tests/test_transfer_classes.py diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index f97b28b..52a2b7d 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -155,6 +155,7 @@ def __init__(self, comm, shape, dtype, subshapeA, axisA, subshapeB, axisB): + self.comm = comm self.shape = tuple(shape) self.dtype = dtype = np.dtype(dtype) @@ -215,6 +216,62 @@ def destroy(self): datatype.Free() +class CustomMPITransfer(Transfer): + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + """ + Redistribute arrayA to arrayB. + """ + if type(arrayA) == np.ndarray: + xp = np + def synchronize_stream(): + pass + else: + import cupy as xp + synchronize_stream = xp.cuda.get_current_stream().synchronize + + rank, size, comm = self.comm.rank, self.comm.size, self.comm + + for i in range(size): + send_to = (rank + i) % size + recv_from = (rank -i + size) % size + + sliceA, shapeA = self.get_slice_and_shape(subtypesA[send_to]) + sliceB, shapeB = self.get_slice_and_shape(subtypesB[recv_from]) + + if send_to == rank: + arrayB[sliceB][:] = arrayA[sliceA][:] + else: + # send asynchronously + sendbuff = xp.empty(shapeA, dtype=self.dtype) + sendbuff[:] = arrayA[sliceA][:] + synchronize_stream() + req = comm.Isend(sendbuff, dest=send_to) + + # receive + recvbuff = xp.empty(shapeB, dtype=self.dtype) + comm.Recv(recvbuff, source=recv_from) + synchronize_stream() + arrayB[sliceB][:] = recvbuff[:] + + # finish send and clean up + req.wait() + del sendbuff + del recvbuff + + @staticmethod + def get_slice_and_shape(subtype): + """ + Extract from the subtype object generated for MPI what shape the buffer + should have and what part of the array we want to send / receive. + """ + decoded = subtype.decode() + subsizes = decoded[2]['subsizes'] + starts = decoded[2]['starts'] + return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes + + + class NCCLTransfer(Transfer): """ Transfer class which uses NCCL for `Alltoallw` operations. The NCCL @@ -245,41 +302,45 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): iscomplex = cp.iscomplexobj(arrayA) NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) - send_stream = cp.cuda.Stream(non_blocking=False) - recv_stream = cp.cuda.Stream(non_blocking=False) - def send(array, subtype, send_to, iscomplex, stream): local_slice, shape = self.get_slice_and_shape(subtype) - buff = self.get_buffer(shape, iscomplex, real_dtype) + buff = self.get_buffer(shape, iscomplex, real_dtype, stream) self.fill_buffer(buff, array, local_slice, iscomplex) comm.send(buff.data.ptr, buff.size, NCCL_dtype, send_to, stream.ptr) - for i in range(size): - send_to = (rank + i) % size - recv_from = (rank -i + size) % size + events = [] + streams = [cp.cuda.Stream(null=False) for _ in range(size)] + for i, stream in zip(range(size), streams): + with stream: - if send_to > rank: - with send_stream: - send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream) + send_to = (rank + i) % size + recv_from = (rank -i + size) % size + + if send_to > rank: + send(arrayA, subtypesA[send_to], send_to, iscomplex, stream) - with recv_stream: local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from]) - buff = self.get_buffer(shape, iscomplex, real_dtype) + buff = self.get_buffer(shape, iscomplex, real_dtype, stream) if recv_from == rank: send_slice, _ = self.get_slice_and_shape(subtypesA[send_to]) self.fill_buffer(buff, arrayA, send_slice, iscomplex) else: - comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, recv_stream.ptr) + comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, stream.ptr) self.unpack_buffer(buff, arrayB, local_slice, iscomplex) - if send_to < rank: - with send_stream: - send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream) + if send_to < rank: + send(arrayA, subtypesA[send_to], send_to, iscomplex, stream) + + events += [stream.record()] - send_stream.synchronize() - recv_stream.synchronize() + null_stream = cp.cuda.Stream(null=True) + null_stream.use() + for event in events: + null_stream.wait_event(event) + + cp.cuda.Device(0).synchronize() @staticmethod def get_slice_and_shape(subtype): @@ -315,16 +376,17 @@ def get_nccl_and_real_dtypes(array): return nccl_dtypes[array.dtype], real_dtypes[array.dtype] @staticmethod - def get_buffer(shape, iscomplex, real_dtype): + def get_buffer(shape, iscomplex, real_dtype, stream): """ Get a buffer for communication. If complex numbers are used, we send two real values instead. """ import cupy as cp + if iscomplex: - return cp.empty(shape=[2,] + shape, dtype=real_dtype) + return cp.ndarray(shape=[2,] + shape, dtype=real_dtype) else: - return cp.empty(shape=shape, dtype=real_dtype) + return cp.ndarray(shape=shape, dtype=real_dtype) @staticmethod def fill_buffer(buff, array, local_slice, iscomplex): @@ -501,6 +563,10 @@ def transfer(self, pencil, dtype): transfer_class = Transfer elif self.backend == 'NCCL': transfer_class = NCCLTransfer + elif self.backend == 'CUDAMemCpy': + transfer_class = CUDAMemCpy + elif self.backend == 'customMPI': + transfer_class = CustomMPITransfer else: raise NotImplementedError(f'Don\'t have a transfer class for backend \"{self.backend}\"!') diff --git a/tests/test_pencil.py b/tests/test_pencil.py index c921706..1675e0f 100644 --- a/tests/test_pencil.py +++ b/tests/test_pencil.py @@ -11,10 +11,11 @@ def test_pencil(): sizes = (7, 8, 9) types = 'fdFD' #'hilfdgFDG' - backends = ['MPI'] + backends = ['MPI', 'customMPI'] xp = { 'MPI': np, + 'customMPI': np, } try: @@ -22,6 +23,7 @@ def test_pencil(): from cupy.cuda import nccl backends += ['NCCL'] xp['NCCL'] = cp + cp.cuda.set_allocator(cp.cuda.MemoryAsyncPool().malloc) except ImportError: pass diff --git a/tests/test_transfer_classes.py b/tests/test_transfer_classes.py new file mode 100644 index 0000000..6534807 --- /dev/null +++ b/tests/test_transfer_classes.py @@ -0,0 +1,81 @@ +from mpi4py import MPI +from mpi4py_fft.pencil import Transfer, CustomMPITransfer, Pencil, Subcomm +import numpy as np + + +def get_args(comm, shape, dtype): + subcomm = Subcomm(comm=comm, dims=None) + pencilA = Pencil(subcomm, shape, 0) + pencilB = Pencil(subcomm, shape, 1) + + kwargs = { + 'comm': comm, + 'shape': shape, + 'subshapeA': pencilA.subshape, + 'subshapeB': pencilB.subshape, + 'axisA': 0, + 'axisB': 1, + 'dtype': dtype, + } + return kwargs + + +def get_arrays(kwargs): + arrayA = np.zeros(shape=kwargs['subshapeA'], dtype=kwargs['dtype']) + arrayB = np.zeros(shape=kwargs['subshapeB'], dtype=kwargs['dtype']) + + arrayA[:] = np.random.random(arrayA.shape).astype(arrayA.dtype) + return arrayA, arrayB + + +def single_test_all_to_allw(transfer_class, shape, dtype, comm=None): + comm = comm if comm else MPI.COMM_WORLD + kwargs = get_args(comm, shape, dtype) + arrayA, arrayB = get_arrays(kwargs) + arrayB_ref = arrayB.copy() + + transfer = transfer_class(**kwargs) + reference_transfer = Transfer(**kwargs) + + transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB, transfer._subtypesB) + reference_transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB_ref, transfer._subtypesB) + assert np.allclose(arrayB, arrayB_ref), f'Did not get the same result from `alltoallw` with {transfer_class.__name__} transfer class as MPI implementation on rank {comm.rank}!' + + comm.Barrier() + if comm.rank == 0: + print(f'{transfer_class.__name__} passed alltoallw test with shape {shape} and dtype {dtype}') + + +def single_test_forward_backward(transfer_class, shape, dtype, comm=None): + comm = comm if comm else MPI.COMM_WORLD + kwargs = get_args(comm, shape, dtype) + arrayA, arrayB = get_arrays(kwargs) + arrayA_ref = arrayA.copy() + + transfer = transfer_class(**kwargs) + + transfer.forward(arrayA, arrayB) + transfer.backward(arrayB, arrayA) + assert np.allclose(arrayA, arrayA_ref), f'Did not get the same result when transferring back and forth with {transfer_class.__name__} transfer class on rank {comm.rank}!' + + comm.Barrier() + if comm.rank == 0: + print(f'{transfer_class.__name__} passed forward/backward test with shape {shape} and dtype {dtype}') + + +def test_transfer_class(): + dims = (2, 3) + sizes = (7, 8, 9, 128) + dtypes = 'fFdD' + transfer_class = CustomMPITransfer + + shapes = [[size] * dim for size in sizes for dim in dims] + [[32, 256, 129]] + + for shape in shapes: + for dtype in dtypes: + single_test_all_to_allw(transfer_class, shape, dtype) + single_test_forward_backward(transfer_class, shape, dtype) + + +if __name__ == '__main__': + test_transfer_class() From ceccc7d9c7c83a5069570975625f03d8dad2484a Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 12 Jan 2024 17:28:34 +0100 Subject: [PATCH 11/18] Cleanup --- mpi4py_fft/distarrayCuPy.py | 2 +- mpi4py_fft/libfft.py | 55 +++++++------------------- mpi4py_fft/mpifft.py | 6 +-- mpi4py_fft/pencil.py | 78 ++++++++++++++----------------------- 4 files changed, 47 insertions(+), 94 deletions(-) diff --git a/mpi4py_fft/distarrayCuPy.py b/mpi4py_fft/distarrayCuPy.py index 7374043..2b4afd4 100644 --- a/mpi4py_fft/distarrayCuPy.py +++ b/mpi4py_fft/distarrayCuPy.py @@ -62,7 +62,7 @@ def __new__( subcomm = cls.get_subcomm(subcomm, global_shape, rank, alignment) p0, subshape = cls.setup_pencil(subcomm, rank, global_shape, alignment) - obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr) + obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr, strides=strides) if memptr is None and isinstance(val, Number): obj.fill(val) obj._p0 = p0 diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 2a503fd..24cae8a 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -80,30 +80,18 @@ def _Xfftn_plan_fftw(shape, axes, dtype, transforms, options): def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options): import cupy as cp + cp.fft.config.enable_nd_planning = True transforms = {} if transforms is None else transforms if tuple(axes) in transforms: plan_fwd, plan_bck = transforms[tuple(axes)] else: if cp.issubdtype(dtype, cp.floating): - _plan_fwd = cp.fft.rfftn - _plan_bck = cp.fft.irfftn + plan_fwd = cp.fft.rfftn + plan_bck = cp.fft.irfftn else: - _plan_fwd = cp.fft.fftn - _plan_bck = cp.fft.ifftn - - stream = cp.cuda.stream.Stream() - def execute_in_stream(function, *args, **kwargs): - with stream: - result = function(*args, **kwargs) - stream.synchronize() - return result - - def plan_fwd(*args, **kwargs): - return execute_in_stream(_plan_fwd, *args, **kwargs) - - def plan_bck(*args, **kwargs): - return execute_in_stream(_plan_bck, *args, **kwargs) + plan_fwd = cp.fft.fftn + plan_bck = cp.fft.ifftn s = tuple(np.take(shape, axes)) U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: avoid going via CPU @@ -166,39 +154,22 @@ def _Xfftn_plan_mkl(shape, axes, dtype, transforms, options): #pragma: no cover def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options): import cupy as cp - import cupyx.scipy.fft as fft_lib + import cupyx.scipy.fftpack as cufft transforms = {} if transforms is None else transforms if tuple(axes) in transforms: - _plan_fwd, _plan_bck = transforms[tuple(axes)] + plan_fwd, plan_bck = transforms[tuple(axes)] else: - if cp.issubdtype(dtype, cp.floating): - _plan_fwd = fft_lib.rfftn - _plan_bck = fft_lib.irfftn - else: - _plan_fwd = fft_lib.fftn - _plan_bck = fft_lib.ifftn - - def swap_shape_for_s(kwargs): - _kwargs = { - 's': kwargs.pop('shape', None), - **kwargs, - } - return _kwargs - - def plan_fwd(*args, **kwargs): - return _plan_fwd(*args, **swap_shape_for_s(kwargs)) - - def plan_bck(*args, **kwargs): - return _plan_bck(*args, **swap_shape_for_s(kwargs)) + plan_fwd = cufft.fftn + plan_bck = cufft.ifftn s = tuple(np.take(shape, axes)) U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: Skip CPU detour - V = plan_fwd(U, s=s, axes=axes) + V = plan_fwd(U, shape=s, axes=axes) V = cp.array(fftw.aligned_like(V.get())) # TODO: skip CPU detour M = np.prod(s) - return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes}), - _Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes})) + return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes, 'overwrite_x': True}), + _Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes, 'overwrite_x': True})) def _Xfftn_plan_scipy(shape, axes, dtype, transforms, options): @@ -468,7 +439,7 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, self.M = 1./np.prod(np.take(self.shape, self.axes)) else: self.M = self.fwd.get_normalization() - if backend == 'scipy': + if backend in ['scipy', 'cupyx-scipy']: self.real_transform = False # No rfftn/irfftn methods self.padding_factor = 1.0 diff --git a/mpi4py_fft/mpifft.py b/mpi4py_fft/mpifft.py index 509a7c4..c32d9ab 100644 --- a/mpi4py_fft/mpifft.py +++ b/mpi4py_fft/mpifft.py @@ -130,9 +130,9 @@ class PFFT(object): :mod:`.fftw.xfftn` module. See Examples. comm_backend : str, optional Choose backend for communication. When using GPU based serial backends, - the "NCCL" backend can be be used in `Alltoallw` operations to speedup - GPU to GPU transfer. Keep in mind that this is used alongside MPI and - assumes one GPU per MPI rankMPI is used. + the "NCCL" backend or a "customMPI" backend can be be used in `Alltoallw` + operations to speedup GPU to GPU transfer. Keep in mind that this is used + alongside MPI and assumes one GPU per MPI rank is used. Other Parameters ---------------- diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 52a2b7d..c3bddfb 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -232,7 +232,7 @@ def synchronize_stream(): rank, size, comm = self.comm.rank, self.comm.size, self.comm - for i in range(size): + for i in range(1, size + 1): send_to = (rank + i) % size recv_from = (rank -i + size) % size @@ -242,22 +242,13 @@ def synchronize_stream(): if send_to == rank: arrayB[sliceB][:] = arrayA[sliceA][:] else: - # send asynchronously - sendbuff = xp.empty(shapeA, dtype=self.dtype) - sendbuff[:] = arrayA[sliceA][:] - synchronize_stream() - req = comm.Isend(sendbuff, dest=send_to) + recvbuf = xp.empty(shapeB, dtype=self.dtype) + sendbuf = xp.empty(shapeA, dtype=self.dtype) + sendbuf[:] = arrayA[sliceA][:] - # receive - recvbuff = xp.empty(shapeB, dtype=self.dtype) - comm.Recv(recvbuff, source=recv_from) synchronize_stream() - arrayB[sliceB][:] = recvbuff[:] - - # finish send and clean up - req.wait() - del sendbuff - del recvbuff + comm.Sendrecv(sendbuf, send_to, recvbuf=recvbuf, source=recv_from) + arrayB[sliceB][:] = recvbuf[:] @staticmethod def get_slice_and_shape(subtype): @@ -302,45 +293,36 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): iscomplex = cp.iscomplexobj(arrayA) NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) - def send(array, subtype, send_to, iscomplex, stream): - local_slice, shape = self.get_slice_and_shape(subtype) - buff = self.get_buffer(shape, iscomplex, real_dtype, stream) - self.fill_buffer(buff, array, local_slice, iscomplex) - comm.send(buff.data.ptr, buff.size, NCCL_dtype, send_to, stream.ptr) - - events = [] - streams = [cp.cuda.Stream(null=False) for _ in range(size)] - for i, stream in zip(range(size), streams): - with stream: - - send_to = (rank + i) % size - recv_from = (rank -i + size) % size - - if send_to > rank: - send(arrayA, subtypesA[send_to], send_to, iscomplex, stream) + stream = cp.cuda.Stream(null=True) + stream.use() - local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from]) - buff = self.get_buffer(shape, iscomplex, real_dtype, stream) + for i in range(size): + send_to = (rank + i) % size + recv_from = (rank -i + size) % size - if recv_from == rank: - send_slice, _ = self.get_slice_and_shape(subtypesA[send_to]) - self.fill_buffer(buff, arrayA, send_slice, iscomplex) - else: - comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, stream.ptr) + # prepare receive buffer + local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from]) + recv_buff = self.get_buffer(shape, iscomplex, real_dtype) - self.unpack_buffer(buff, arrayB, local_slice, iscomplex) + # prepare send buffer + send_slice, send_shape = self.get_slice_and_shape(subtypesA[send_to]) - if send_to < rank: - send(arrayA, subtypesA[send_to], send_to, iscomplex, stream) + # send / receive + if send_to == rank: + self.fill_buffer(recv_buff, arrayA, send_slice, iscomplex) + else: + send_buff = self.get_buffer(send_shape, iscomplex, real_dtype) + self.fill_buffer(send_buff, arrayA, send_slice, iscomplex) - events += [stream.record()] + # perform all sends and receives in a single kernel to allow overlap + cp.cuda.nccl.groupStart() + comm.recv(recv_buff.data.ptr, recv_buff.size, NCCL_dtype, recv_from, stream.ptr) + comm.send(send_buff.data.ptr, send_buff.size, NCCL_dtype, send_to, stream.ptr) + cp.cuda.nccl.groupEnd() - null_stream = cp.cuda.Stream(null=True) - null_stream.use() - for event in events: - null_stream.wait_event(event) + self.unpack_buffer(recv_buff, arrayB, local_slice, iscomplex) - cp.cuda.Device(0).synchronize() + cp.cuda.Stream(null=True).use() @staticmethod def get_slice_and_shape(subtype): @@ -376,7 +358,7 @@ def get_nccl_and_real_dtypes(array): return nccl_dtypes[array.dtype], real_dtypes[array.dtype] @staticmethod - def get_buffer(shape, iscomplex, real_dtype, stream): + def get_buffer(shape, iscomplex, real_dtype): """ Get a buffer for communication. If complex numbers are used, we send two real values instead. From c3c20ffc366e26c4e5d5a896c625917466f04c64 Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 13 Feb 2024 17:27:12 +0100 Subject: [PATCH 12/18] Overlapped all communication within a single Alltoallw in NCCL backend --- mpi4py_fft/libfft.py | 12 ++-- mpi4py_fft/pencil.py | 168 ++++++++++++------------------------------- tests/test_pencil.py | 1 - 3 files changed, 53 insertions(+), 128 deletions(-) diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 24cae8a..59f054f 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -94,9 +94,9 @@ def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options): plan_bck = cp.fft.ifftn s = tuple(np.take(shape, axes)) - U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: avoid going via CPU - _V = plan_fwd(U, s=s, axes=axes) - V = cp.array(fftw.aligned_like(_V.get())) # TODO: avoid going via CPU + U = cp.empty(shape=shape, dtype=dtype) + V = plan_fwd(U, s=s, axes=axes) + V = cp.array(V) M = np.prod(s) # CuPy has forward transform unscaled and backward scaled with 1/N @@ -164,9 +164,9 @@ def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options): plan_bck = cufft.ifftn s = tuple(np.take(shape, axes)) - U = cp.array(fftw.aligned(shape, dtype=dtype)) # TODO: Skip CPU detour - V = plan_fwd(U, shape=s, axes=axes) - V = cp.array(fftw.aligned_like(V.get())) # TODO: skip CPU detour + U = cp.empty(shape=shape, dtype=dtype) + V = plan_fwd(U, s=s, axes=axes) + V = cp.array(V) M = np.prod(s) return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes, 'overwrite_x': True}), _Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes, 'overwrite_x': True})) diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index c3bddfb..d8f3408 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -29,6 +29,17 @@ def _subarraytypes(comm, shape, axis, subshape, dtype): return tuple(datatypes) +def get_slice(subtype): + """ + Extract from the subtype object generated for MPI what shape the buffer + should have and what part of the array we want to send / receive. + """ + decoded = subtype.decode() + subsizes = decoded[2]['subsizes'] + starts = decoded[2]['starts'] + return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))) + + class Subcomm(tuple): r"""Class returning a tuple of subcommunicators of any dimensionality @@ -236,32 +247,19 @@ def synchronize_stream(): send_to = (rank + i) % size recv_from = (rank -i + size) % size - sliceA, shapeA = self.get_slice_and_shape(subtypesA[send_to]) - sliceB, shapeB = self.get_slice_and_shape(subtypesB[recv_from]) + sliceA = get_slice(subtypesA[send_to]) + sliceB = get_slice(subtypesB[recv_from]) if send_to == rank: arrayB[sliceB][:] = arrayA[sliceA][:] else: - recvbuf = xp.empty(shapeB, dtype=self.dtype) - sendbuf = xp.empty(shapeA, dtype=self.dtype) - sendbuf[:] = arrayA[sliceA][:] + recvbuf = xp.ascontiguousarray(arrayB[sliceB]) + sendbuf = xp.ascontiguousarray(arrayA[sliceA]) synchronize_stream() comm.Sendrecv(sendbuf, send_to, recvbuf=recvbuf, source=recv_from) arrayB[sliceB][:] = recvbuf[:] - @staticmethod - def get_slice_and_shape(subtype): - """ - Extract from the subtype object generated for MPI what shape the buffer - should have and what part of the array we want to send / receive. - """ - decoded = subtype.decode() - subsizes = decoded[2]['subsizes'] - starts = decoded[2]['starts'] - return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes - - class NCCLTransfer(Transfer): """ @@ -275,124 +273,52 @@ def __init__(self, *args, **kwargs): from cupy.cuda import nccl self.comm_nccl = nccl.NcclCommunicator(self.comm.size, self.comm.bcast(nccl.get_unique_id(), root=0), self.comm.rank) + # determine how to send the data. If we have complex numbers, we need to send two real numbers. + if self.dtype in [np.dtype('float32'), np.dtype('complex64')]: + self.NCCL_dtype = nccl.NCCL_FLOAT32 + elif self.dtype in [np.dtype('float64'), np.dtype('complex128')]: + self.NCCL_dtype = nccl.NCCL_FLOAT64 + elif self.dtype in [np.dtype('int32')]: + self.NCCL_dtype = nccl.NCCL_INT32 + elif self.dtype in [np.dtype('int64')]: + self.NCCL_dtype = nccl.NCCL_INT64 + else: + raise NotImplementedError(f'Don\'t know what NCCL dtype to use to send data of dtype {self.dtype}!') + self.count_modifier = 2 if 'complex' in str(self.dtype) else 1 + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): """ Redistribute arrayA to arrayB. - - As NCCL does not have all to all, we replicate it by a bunch of individual send and receives. - As NCCL also does not have complex datatypes, we have to send real and imaginary parts separately. """ import cupy as cp - from cupy.cuda import nccl - assert type(arrayA) == cp.ndarray - assert type(arrayB) == cp.ndarray - assert arrayA.dtype == arrayB.dtype - assert self.comm.rank == self.comm_nccl.rank_id(), f'The structure of the communicator has changed unexpectedly' - rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl - iscomplex = cp.iscomplexobj(arrayA) - NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) + stream = cp.cuda.get_current_stream() - stream = cp.cuda.Stream(null=True) - stream.use() + # initialize dictionaries required to overlap sends + recvbufs = {} + sendbufs = {} + sliceBs = {} - for i in range(size): + # perform all sends and receives in a single kernel to allow overlap + cp.cuda.nccl.groupStart() + for i in range(1, size + 1): send_to = (rank + i) % size recv_from = (rank -i + size) % size - # prepare receive buffer - local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from]) - recv_buff = self.get_buffer(shape, iscomplex, real_dtype) - - # prepare send buffer - send_slice, send_shape = self.get_slice_and_shape(subtypesA[send_to]) - - # send / receive - if send_to == rank: - self.fill_buffer(recv_buff, arrayA, send_slice, iscomplex) - else: - send_buff = self.get_buffer(send_shape, iscomplex, real_dtype) - self.fill_buffer(send_buff, arrayA, send_slice, iscomplex) - - # perform all sends and receives in a single kernel to allow overlap - cp.cuda.nccl.groupStart() - comm.recv(recv_buff.data.ptr, recv_buff.size, NCCL_dtype, recv_from, stream.ptr) - comm.send(send_buff.data.ptr, send_buff.size, NCCL_dtype, send_to, stream.ptr) - cp.cuda.nccl.groupEnd() + sliceA = get_slice(subtypesA[send_to]) + sliceBs[i] = get_slice(subtypesB[recv_from]) - self.unpack_buffer(recv_buff, arrayB, local_slice, iscomplex) + recvbufs[i] = cp.ascontiguousarray(arrayB[sliceBs[i]]) + sendbufs[i] = cp.ascontiguousarray(arrayA[sliceA]) - cp.cuda.Stream(null=True).use() + comm.recv(recvbufs[i].data.ptr, recvbufs[i].size * self.count_modifier, self.NCCL_dtype, recv_from, stream.ptr) + comm.send(sendbufs[i].data.ptr, sendbufs[i].size * self.count_modifier, self.NCCL_dtype, send_to, stream.ptr) + cp.cuda.nccl.groupEnd() - @staticmethod - def get_slice_and_shape(subtype): - """ - Extract from the subtype object generated for MPI what shape the buffer - should have and what part of the array we want to send / receive. - """ - decoded = subtype.decode() - subsizes = decoded[2]['subsizes'] - starts = decoded[2]['starts'] - return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes - - @staticmethod - def get_nccl_and_real_dtypes(array): - """ - Translate the datatype of the array to a NCCL type for sending with NCCL. - As NCCL does not support complex types, we have to translate them to two - real values. - """ - from cupy.cuda import nccl - nccl_dtypes = { - np.dtype('float32'): nccl.NCCL_FLOAT32, - np.dtype('float64'): nccl.NCCL_FLOAT64, - np.dtype('complex64'): nccl.NCCL_FLOAT32, - np.dtype('complex128'): nccl.NCCL_FLOAT64, - } - real_dtypes = { - np.dtype('float32'): np.dtype('float32'), - np.dtype('float64'): np.dtype('float64'), - np.dtype('complex64'): np.dtype('float32'), - np.dtype('complex128'): np.dtype('float64'), - } - return nccl_dtypes[array.dtype], real_dtypes[array.dtype] - - @staticmethod - def get_buffer(shape, iscomplex, real_dtype): - """ - Get a buffer for communication. If complex numbers are used, we send - two real values instead. - """ - import cupy as cp - - if iscomplex: - return cp.ndarray(shape=[2,] + shape, dtype=real_dtype) - else: - return cp.ndarray(shape=shape, dtype=real_dtype) - - @staticmethod - def fill_buffer(buff, array, local_slice, iscomplex): - """ - Fill buffer for communication. If complex numbers are used, we send - two real values instead. - """ - if iscomplex: - buff[0][:] = array[local_slice].real - buff[1][:] = array[local_slice].imag - else: - buff[:] = array[local_slice][:] - - @staticmethod - def unpack_buffer(buff, array, local_slice, iscomplex): - """ - Unpack buffer for communication. If complex numbers are used, we get - two real values instead. - """ - if iscomplex: - array[local_slice].real = buff[0][:] - array[local_slice].imag = buff[1][:] - else: - array[local_slice][:] = buff[:] + # distribute sent data from buffers + for i in recvbufs.keys(): + if arrayB[sliceBs[i]] is not recvbufs[i]: + arrayB[sliceBs[i]][:] = recvbufs[i][:] def destroy(self): super().destroy() diff --git a/tests/test_pencil.py b/tests/test_pencil.py index 1675e0f..6985e46 100644 --- a/tests/test_pencil.py +++ b/tests/test_pencil.py @@ -23,7 +23,6 @@ def test_pencil(): from cupy.cuda import nccl backends += ['NCCL'] xp['NCCL'] = cp - cp.cuda.set_allocator(cp.cuda.MemoryAsyncPool().malloc) except ImportError: pass From 405fb4d1ebb603507a28db03a0b81ba792599f1e Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 13 Feb 2024 18:04:57 +0100 Subject: [PATCH 13/18] Cosmetic changes --- mpi4py_fft/distarray.py | 10 ++++----- mpi4py_fft/libfft.py | 6 ++--- mpi4py_fft/pencil.py | 2 -- tests/test_transfer_classes.py | 41 +++++++++++++++++++++------------- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index a8815a5..ff92b5c 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -246,7 +246,7 @@ def local_slice(self): (slice(0, 16, None), slice(7, 14, None), slice(6, 12, None)) """ v = [slice(start, start+shape) for start, shape in zip(self._p0.substart, - self._p0.subshape)] + self._p0.subshape)] return tuple([slice(0, s) for s in self.shape[:self.rank]] + v) def redistribute(self, axis=None, out=None): @@ -298,10 +298,10 @@ def redistribute(self, axis=None, out=None): p1, transfer = self.get_pencil_and_transfer(axis) if out is None: out = type(self)(self.global_shape, - subcomm=p1.subcomm, - dtype=self.dtype, - alignment=axis, - rank=self.rank) + subcomm=p1.subcomm, + dtype=self.dtype, + alignment=axis, + rank=self.rank) if self.rank == 0: transfer.forward(self, out) diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 59f054f..7fd9ac4 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -154,7 +154,7 @@ def _Xfftn_plan_mkl(shape, axes, dtype, transforms, options): #pragma: no cover def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options): import cupy as cp - import cupyx.scipy.fftpack as cufft + import cupyx.scipy.fft as cufft transforms = {} if transforms is None else transforms if tuple(axes) in transforms: @@ -168,8 +168,8 @@ def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options): V = plan_fwd(U, s=s, axes=axes) V = cp.array(V) M = np.prod(s) - return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes, 'overwrite_x': True}), - _Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes, 'overwrite_x': True})) + return (_Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes, 'overwrite_x': True}), + _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes, 'overwrite_x': True})) def _Xfftn_plan_scipy(shape, axes, dtype, transforms, options): diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index d8f3408..bcaebd8 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -471,8 +471,6 @@ def transfer(self, pencil, dtype): transfer_class = Transfer elif self.backend == 'NCCL': transfer_class = NCCLTransfer - elif self.backend == 'CUDAMemCpy': - transfer_class = CUDAMemCpy elif self.backend == 'customMPI': transfer_class = CustomMPITransfer else: diff --git a/tests/test_transfer_classes.py b/tests/test_transfer_classes.py index 6534807..f979730 100644 --- a/tests/test_transfer_classes.py +++ b/tests/test_transfer_classes.py @@ -2,6 +2,17 @@ from mpi4py_fft.pencil import Transfer, CustomMPITransfer, Pencil, Subcomm import numpy as np +transfer_classes = [CustomMPITransfer] +xps = {CustomMPITransfer: np} + +try: + import cupy as cp + from mpi4py_fft.pencil import NCCLTransfer + transfer_classes += [NCCLTransfer] + xps[NCCLTransfer] = cp +except ModuleNotFoundError: + pass + def get_args(comm, shape, dtype): subcomm = Subcomm(comm=comm, dims=None) @@ -20,18 +31,18 @@ def get_args(comm, shape, dtype): return kwargs -def get_arrays(kwargs): - arrayA = np.zeros(shape=kwargs['subshapeA'], dtype=kwargs['dtype']) - arrayB = np.zeros(shape=kwargs['subshapeB'], dtype=kwargs['dtype']) +def get_arrays(kwargs, xp): + arrayA = xp.zeros(shape=kwargs['subshapeA'], dtype=kwargs['dtype']) + arrayB = xp.zeros(shape=kwargs['subshapeB'], dtype=kwargs['dtype']) - arrayA[:] = np.random.random(arrayA.shape).astype(arrayA.dtype) + arrayA[:] = xp.random.random(arrayA.shape).astype(arrayA.dtype) return arrayA, arrayB -def single_test_all_to_allw(transfer_class, shape, dtype, comm=None): +def single_test_all_to_allw(transfer_class, shape, dtype, comm=None, xp=None): comm = comm if comm else MPI.COMM_WORLD kwargs = get_args(comm, shape, dtype) - arrayA, arrayB = get_arrays(kwargs) + arrayA, arrayB = get_arrays(kwargs, xp) arrayB_ref = arrayB.copy() transfer = transfer_class(**kwargs) @@ -39,24 +50,24 @@ def single_test_all_to_allw(transfer_class, shape, dtype, comm=None): transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB, transfer._subtypesB) reference_transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB_ref, transfer._subtypesB) - assert np.allclose(arrayB, arrayB_ref), f'Did not get the same result from `alltoallw` with {transfer_class.__name__} transfer class as MPI implementation on rank {comm.rank}!' + assert xp.allclose(arrayB, arrayB_ref), f'Did not get the same result from `alltoallw` with {transfer_class.__name__} transfer class as MPI implementation on rank {comm.rank}!' comm.Barrier() if comm.rank == 0: print(f'{transfer_class.__name__} passed alltoallw test with shape {shape} and dtype {dtype}') -def single_test_forward_backward(transfer_class, shape, dtype, comm=None): +def single_test_forward_backward(transfer_class, shape, dtype, comm=None, xp=None): comm = comm if comm else MPI.COMM_WORLD kwargs = get_args(comm, shape, dtype) - arrayA, arrayB = get_arrays(kwargs) + arrayA, arrayB = get_arrays(kwargs, xp) arrayA_ref = arrayA.copy() transfer = transfer_class(**kwargs) transfer.forward(arrayA, arrayB) transfer.backward(arrayB, arrayA) - assert np.allclose(arrayA, arrayA_ref), f'Did not get the same result when transferring back and forth with {transfer_class.__name__} transfer class on rank {comm.rank}!' + assert xp.allclose(arrayA, arrayA_ref), f'Did not get the same result when transferring back and forth with {transfer_class.__name__} transfer class on rank {comm.rank}!' comm.Barrier() if comm.rank == 0: @@ -67,14 +78,14 @@ def test_transfer_class(): dims = (2, 3) sizes = (7, 8, 9, 128) dtypes = 'fFdD' - transfer_class = CustomMPITransfer shapes = [[size] * dim for size in sizes for dim in dims] + [[32, 256, 129]] - for shape in shapes: - for dtype in dtypes: - single_test_all_to_allw(transfer_class, shape, dtype) - single_test_forward_backward(transfer_class, shape, dtype) + for transfer_class in transfer_classes: + for shape in shapes: + for dtype in dtypes: + single_test_all_to_allw(transfer_class, shape, dtype, xp=xps[transfer_class]) + single_test_forward_backward(transfer_class, shape, dtype, xp=xps[transfer_class]) if __name__ == '__main__': From b69a303a1de9a8a8c2fd7e16b6f0a269ecd5db6d Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 23 Feb 2024 09:03:08 +0100 Subject: [PATCH 14/18] Swapped `array[...]=data` for `cupy.copyto` --- mpi4py_fft/libfft.py | 37 ++++++++++++++++++++++++------------- mpi4py_fft/mpifft.py | 5 +++-- mpi4py_fft/pencil.py | 14 +++++++++++--- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index 7fd9ac4..d271432 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -101,8 +101,8 @@ def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options): # CuPy has forward transform unscaled and backward scaled with 1/N return ( - _Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes}), - _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes}), + _Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes}, xp=cp), + _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes}, xp=cp), ) def _Xfftn_plan_numpy(shape, axes, dtype, transforms, options): @@ -168,8 +168,8 @@ def _Xfftn_plan_cupyx_scipy(shape, axes, dtype, transforms, options): V = plan_fwd(U, s=s, axes=axes) V = cp.array(V) M = np.prod(s) - return (_Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes, 'overwrite_x': True}), - _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes, 'overwrite_x': True})) + return (_Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes, 'overwrite_x': True}, xp=cp), + _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes, 'overwrite_x': True}, xp=cp)) def _Xfftn_plan_scipy(shape, axes, dtype, transforms, options): @@ -193,20 +193,24 @@ class _Yfftn_wrap(object): #Wraps numpy/scipy/mkl transforms to FFTW style # pylint: disable=too-few-public-methods - __slots__ = ('_xfftn', '_M', '_opt', '__doc__', '_input_array', '_output_array') + __slots__ = ('_xfftn', '_M', '_opt', '__doc__', '_input_array', '_output_array', 'xp') - def __init__(self, xfftn_obj, input_array, output_array, M, opt): + def __init__(self, xfftn_obj, input_array, output_array, M, opt, xp=np): object.__setattr__(self, '_xfftn', xfftn_obj) object.__setattr__(self, '_opt', opt) object.__setattr__(self, '_M', M) object.__setattr__(self, '_input_array', input_array) object.__setattr__(self, '_output_array', output_array) object.__setattr__(self, '__doc__', xfftn_obj.__doc__) + object.__setattr__(self, 'xp', xp) @property def input_array(self): return object.__getattribute__(self, '_input_array') + def copyto(self, dst, src): + self.xp.copyto(dst, src, casting='unsafe') + @property def output_array(self): return object.__getattribute__(self, '_output_array') @@ -225,22 +229,24 @@ def M(self): def __call__(self, *args, **kwargs): self.opt.update(kwargs) - self.output_array[...] = self.xfftn(self.input_array, **self.opt) + self.copyto(self._output_array, self.xfftn(self.input_array, **self.opt)) if abs(self.M-1) > 1e-8: self._output_array *= self.M return self.output_array + class _Xfftn_wrap(object): #Common interface for all serial transforms # pylint: disable=too-few-public-methods - __slots__ = ('_xfftn', '__doc__', '_input_array', '_output_array') + __slots__ = ('_xfftn', '__doc__', '_input_array', '_output_array', 'xp') - def __init__(self, xfftn_obj, input_array, output_array): + def __init__(self, xfftn_obj, input_array, output_array, xp=np): object.__setattr__(self, '_xfftn', xfftn_obj) object.__setattr__(self, '_input_array', input_array) object.__setattr__(self, '_output_array', output_array) object.__setattr__(self, '__doc__', xfftn_obj.__doc__) + object.__setattr__(self, 'xp', xp) @property def input_array(self): @@ -254,12 +260,15 @@ def output_array(self): def xfftn(self): return object.__getattribute__(self, '_xfftn') + def copyto(self, dst, src): + self.xp.copyto(dst, src, casting='unsafe') + def __call__(self, input_array=None, output_array=None, **options): if input_array is not None: - self.input_array[...] = input_array + self.copyto(self.input_array, input_array) self.xfftn(**options) if output_array is not None: - output_array[...] = self.output_array + self.copyto(output_array, self.output_array) return output_array else: return self.output_array @@ -448,11 +457,13 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, if abs(self.padding_factor-1.0) > 1e-8: assert len(self.axes) == 1 trunc_array = self._get_truncarray(shape, V.dtype) + xp = np if 'cupy' in self.backend: # TODO: Skip detour via CPU import cupy as cp trunc_array = cp.array(trunc_array) - self.forward = _Xfftn_wrap(self._forward, U, trunc_array) - self.backward = _Xfftn_wrap(self._backward, trunc_array, U) + xp = cp + self.forward = _Xfftn_wrap(self._forward, U, trunc_array, xp=xp) + self.backward = _Xfftn_wrap(self._backward, trunc_array, U, xp=xp) else: self.forward = _Xfftn_wrap(self._forward, U, V) self.backward = _Xfftn_wrap(self._backward, V, U) diff --git a/mpi4py_fft/mpifft.py b/mpi4py_fft/mpifft.py index c32d9ab..5b9a04b 100644 --- a/mpi4py_fft/mpifft.py +++ b/mpi4py_fft/mpifft.py @@ -63,17 +63,18 @@ def __call__(self, input_array=None, output_array=None, **kw): """ if input_array is not None: - self.input_array[...] = input_array + self._xfftn[0].copyto(self.input_array, input_array) for i in range(len(self._transfer)): self._xfftn[i](**kw) arrayA = self._xfftn[i].output_array arrayB = self._xfftn[i+1].input_array self._transfer[i](arrayA, arrayB) + self._xfftn[-1](**kw) if output_array is not None: - output_array[...] = self.output_array + self._xfftn[-1].copyto(output_array, self.output_array) return output_array else: return self.output_array diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index bcaebd8..8d225f6 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -302,6 +302,7 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): # perform all sends and receives in a single kernel to allow overlap cp.cuda.nccl.groupStart() for i in range(1, size + 1): + send_to = (rank + i) % size recv_from = (rank -i + size) % size @@ -315,10 +316,17 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): comm.send(sendbufs[i].data.ptr, sendbufs[i].size * self.count_modifier, self.NCCL_dtype, send_to, stream.ptr) cp.cuda.nccl.groupEnd() - # distribute sent data from buffers + # unpack the buffers concurrently in different streams + streams = {key: cp.cuda.Stream() for key in recvbufs.keys()} + events = {} for i in recvbufs.keys(): - if arrayB[sliceBs[i]] is not recvbufs[i]: - arrayB[sliceBs[i]][:] = recvbufs[i][:] + with streams[i]: + cp.copyto(arrayB[sliceBs[i]], recvbufs[i][:]) + events[i] = streams[i].record() + + for i in events.keys(): + stream.wait_event(events[i]) + def destroy(self): super().destroy() From b9ac0f71b336c7d421217de915e713276b9062c1 Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 27 Feb 2024 16:23:54 +0100 Subject: [PATCH 15/18] Removed unnecessary rescaling in CuPy backend --- mpi4py_fft/libfft.py | 33 +++++++++++++++++---------------- mpi4py_fft/pencil.py | 2 +- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/mpi4py_fft/libfft.py b/mpi4py_fft/libfft.py index d271432..aca5265 100644 --- a/mpi4py_fft/libfft.py +++ b/mpi4py_fft/libfft.py @@ -95,14 +95,11 @@ def _Xfftn_plan_cupy(shape, axes, dtype, transforms, options): s = tuple(np.take(shape, axes)) U = cp.empty(shape=shape, dtype=dtype) - V = plan_fwd(U, s=s, axes=axes) - V = cp.array(V) - M = np.prod(s) + V = cp.empty_like(plan_fwd(U, s=s, axes=axes)) - # CuPy has forward transform unscaled and backward scaled with 1/N return ( - _Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes}, xp=cp), - _Yfftn_wrap(plan_bck, V, U, M, {'s': s, 'axes': axes}, xp=cp), + _Yfftn_wrap(plan_fwd, U, V, 1, {'s': s, 'axes': axes, 'norm': 'backward',}, xp=cp), + _Yfftn_wrap(plan_bck, V, U, 1, {'s': s, 'axes': axes, 'norm': 'forward',}, xp=cp), ) def _Xfftn_plan_numpy(shape, axes, dtype, transforms, options): @@ -189,6 +186,9 @@ def _Xfftn_plan_scipy(shape, axes, dtype, transforms, options): return (_Yfftn_wrap(plan_fwd, U, V, 1, {'shape': s, 'axes': axes}), _Yfftn_wrap(plan_bck, V, U, M, {'shape': s, 'axes': axes})) +def _copyto(dst, src, xp): + xp.copyto(dst, src, casting='unsafe') + class _Yfftn_wrap(object): #Wraps numpy/scipy/mkl transforms to FFTW style # pylint: disable=too-few-public-methods @@ -208,9 +208,6 @@ def __init__(self, xfftn_obj, input_array, output_array, M, opt, xp=np): def input_array(self): return object.__getattribute__(self, '_input_array') - def copyto(self, dst, src): - self.xp.copyto(dst, src, casting='unsafe') - @property def output_array(self): return object.__getattribute__(self, '_output_array') @@ -227,6 +224,9 @@ def opt(self): def M(self): return object.__getattribute__(self, '_M') + def copyto(self, dst, src): + _copyto(dst, src, self.xp) + def __call__(self, *args, **kwargs): self.opt.update(kwargs) self.copyto(self._output_array, self.xfftn(self.input_array, **self.opt)) @@ -234,7 +234,6 @@ def __call__(self, *args, **kwargs): self._output_array *= self.M return self.output_array - class _Xfftn_wrap(object): #Common interface for all serial transforms # pylint: disable=too-few-public-methods @@ -261,7 +260,7 @@ def xfftn(self): return object.__getattribute__(self, '_xfftn') def copyto(self, dst, src): - self.xp.copyto(dst, src, casting='unsafe') + _copyto(dst, src, self.xp) def __call__(self, input_array=None, output_array=None, **options): if input_array is not None: @@ -454,19 +453,21 @@ def __init__(self, shape, axes=None, dtype=float, padding=False, self.padding_factor = 1.0 if padding is not False: self.padding_factor = padding[self.axes[-1]] if np.ndim(padding) else padding + xp = np + if 'cupy' in self.backend: + import cupy as cp + xp = cp + if abs(self.padding_factor-1.0) > 1e-8: assert len(self.axes) == 1 trunc_array = self._get_truncarray(shape, V.dtype) - xp = np if 'cupy' in self.backend: # TODO: Skip detour via CPU - import cupy as cp trunc_array = cp.array(trunc_array) - xp = cp self.forward = _Xfftn_wrap(self._forward, U, trunc_array, xp=xp) self.backward = _Xfftn_wrap(self._backward, trunc_array, U, xp=xp) else: - self.forward = _Xfftn_wrap(self._forward, U, V) - self.backward = _Xfftn_wrap(self._backward, V, U) + self.forward = _Xfftn_wrap(self._forward, U, V, xp=xp) + self.backward = _Xfftn_wrap(self._backward, V, U, xp=xp) def _forward(self, **kw): normalize = kw.pop('normalize', True) diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 8d225f6..2c4737e 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -321,7 +321,7 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): events = {} for i in recvbufs.keys(): with streams[i]: - cp.copyto(arrayB[sliceBs[i]], recvbufs[i][:]) + cp.copyto(arrayB[sliceBs[i]], recvbufs[i]) events[i] = streams[i].record() for i in events.keys(): From 77b96f30945acabeab22e3b16f8c6c87f35707c8 Mon Sep 17 00:00:00 2001 From: Thomas Date: Thu, 29 Feb 2024 10:01:03 +0100 Subject: [PATCH 16/18] Use CUDA graphs in NCCL Alltoallw --- mpi4py_fft/pencil.py | 86 +++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 2c4737e..3cf5c16 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -269,6 +269,8 @@ class NCCLTransfer(Transfer): """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.forward_graph = None + self.backward_graph = None from cupy.cuda import nccl self.comm_nccl = nccl.NcclCommunicator(self.comm.size, self.comm.bcast(nccl.get_unique_id(), root=0), self.comm.rank) @@ -286,51 +288,79 @@ def __init__(self, *args, **kwargs): raise NotImplementedError(f'Don\'t know what NCCL dtype to use to send data of dtype {self.dtype}!') self.count_modifier = 2 if 'complex' in str(self.dtype) else 1 - def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + def backward(self, arrayB, arrayA): + """Global redistribution from arrayB to arrayA + + Parameters + ---------- + arrayB : array + Array of shape subshapeB, containing data to be redistributed + arrayA : array + Array of shape subshapeA, for receiving data + + """ + self.backward_graph = self.Alltoallw(arrayB, self._subtypesB, arrayA, self._subtypesA, graph=self.backward_graph) + + def forward(self, arrayA, arrayB): + """Global redistribution from arrayA to arrayB + + Parameters + ---------- + arrayA : array + Array of shape subshapeA, containing data to be redistributed + arrayB : array + Array of shape subshapeB, for receiving data + """ + self.forward_graph = self.Alltoallw(arrayA, self._subtypesA, arrayB, self._subtypesB, graph=self.forward_graph) + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB, graph=None): """ Redistribute arrayA to arrayB. """ import cupy as cp rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl - stream = cp.cuda.get_current_stream() - # initialize dictionaries required to overlap sends - recvbufs = {} - sendbufs = {} - sliceBs = {} + # record to a graph if we haven't already done so + if graph is None: + stream = cp.cuda.Stream(non_blocking=True) + with stream: + stream.begin_capture() - # perform all sends and receives in a single kernel to allow overlap - cp.cuda.nccl.groupStart() - for i in range(1, size + 1): + # initialize dictionaries required to overlap sends + recvbufs = {} + sendbufs = {} + sliceBs = {} - send_to = (rank + i) % size - recv_from = (rank -i + size) % size + # perform all sends and receives in a single kernel to allow overlap + cp.cuda.nccl.groupStart() + for i in range(1, size + 1): - sliceA = get_slice(subtypesA[send_to]) - sliceBs[i] = get_slice(subtypesB[recv_from]) + send_to = (rank + i) % size + recv_from = (rank -i + size) % size - recvbufs[i] = cp.ascontiguousarray(arrayB[sliceBs[i]]) - sendbufs[i] = cp.ascontiguousarray(arrayA[sliceA]) + sliceA = get_slice(subtypesA[send_to]) + sliceBs[i] = get_slice(subtypesB[recv_from]) - comm.recv(recvbufs[i].data.ptr, recvbufs[i].size * self.count_modifier, self.NCCL_dtype, recv_from, stream.ptr) - comm.send(sendbufs[i].data.ptr, sendbufs[i].size * self.count_modifier, self.NCCL_dtype, send_to, stream.ptr) - cp.cuda.nccl.groupEnd() + recvbufs[i] = cp.ascontiguousarray(arrayB[sliceBs[i]]) + sendbufs[i] = cp.ascontiguousarray(arrayA[sliceA]) - # unpack the buffers concurrently in different streams - streams = {key: cp.cuda.Stream() for key in recvbufs.keys()} - events = {} - for i in recvbufs.keys(): - with streams[i]: - cp.copyto(arrayB[sliceBs[i]], recvbufs[i]) - events[i] = streams[i].record() + comm.recv(recvbufs[i].data.ptr, recvbufs[i].size * self.count_modifier, self.NCCL_dtype, recv_from, stream.ptr) + comm.send(sendbufs[i].data.ptr, sendbufs[i].size * self.count_modifier, self.NCCL_dtype, send_to, stream.ptr) + cp.cuda.nccl.groupEnd() - for i in events.keys(): - stream.wait_event(events[i]) + for i in recvbufs.keys(): + cp.copyto(arrayB[sliceBs[i]], recvbufs[i]) + graph = stream.end_capture() + + graph.launch(stream=cp.cuda.get_current_stream()) + return graph def destroy(self): - super().destroy() + del self.forward_graph + del self.backward_graph self.comm_nccl.destroy() + super().destroy() class Pencil(object): From 75f03437a312f65a89de7387c7c6c23d7720a7dc Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 16 Apr 2024 07:06:51 +0200 Subject: [PATCH 17/18] Removed dependence on CuPy --- mpi4py_fft/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mpi4py_fft/__init__.py b/mpi4py_fft/__init__.py index e2ee0b4..10664fe 100644 --- a/mpi4py_fft/__init__.py +++ b/mpi4py_fft/__init__.py @@ -20,8 +20,12 @@ __author__ = 'Lisandro Dalcin and Mikael Mortensen' from .distarray import DistArray, newDistArray, Function -from .distarrayCuPy import DistArrayCuPy from .mpifft import PFFT from . import fftw from .fftw import fftlib from .io import HDF5File, NCFile, generate_xdmf + +try: + from .distarrayCuPy import DistArrayCuPy +except ModuleNotFoundError: + pass From 7330b6bddd315d878880cc4b854151f4b987b41c Mon Sep 17 00:00:00 2001 From: Thomas Date: Sat, 12 Oct 2024 13:31:26 +0200 Subject: [PATCH 18/18] Fixes --- mpi4py_fft/distarrayCuPy.py | 2 +- mpi4py_fft/pencil.py | 58 +++++++++++++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/mpi4py_fft/distarrayCuPy.py b/mpi4py_fft/distarrayCuPy.py index 2b4afd4..2c243bb 100644 --- a/mpi4py_fft/distarrayCuPy.py +++ b/mpi4py_fft/distarrayCuPy.py @@ -83,4 +83,4 @@ def asnumpy(self): @property def v(self): """Return local ``self`` array as an ``ndarray`` object""" - return cp.ndarray.__getitem__(self, slice(0, -1, 1)) + return cp.ndarray.__getitem__(self, slice(None, None, None)) diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 3cf5c16..637646d 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -267,6 +267,7 @@ class NCCLTransfer(Transfer): communicator will share rank and size attributes with the MPI communicator. In particular, this assumes one GPU per MPI rank. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.forward_graph = None @@ -288,6 +289,55 @@ def __init__(self, *args, **kwargs): raise NotImplementedError(f'Don\'t know what NCCL dtype to use to send data of dtype {self.dtype}!') self.count_modifier = 2 if 'complex' in str(self.dtype) else 1 + self.arrayA = None + self.arrayB = None + + def _copy_to_input_arrays(self, arrayA, arrayB): + """ + This is needed to make the CUDA graph work with arbitrary input data. + + Parameters + ---------- + arrayA : array + Data to be copied to the graph input + arrayB : array + Data to be copied to the graph output + """ + if self.arrayA is None: + self.arrayA = arrayA + elif self.arrayA.data.ptr == arrayA.data.ptr: + pass + else: + self.arrayA[...] = arrayA + + if self.arrayB is None: + self.arrayB = arrayB + elif self.arrayB.data.ptr == arrayB.data.ptr: + pass + else: + self.arrayB[...] = arrayB + + def _retrieve_from_input_arrays(self, arrayA, arrayB): + """ + This is needed to make the CUDA graph work with arbitrary input data. + + Parameters + ---------- + arrayA : array + Data to be copied from the graph input + arrayB : array + Data to be copied from the graph output + """ + if self.arrayA.data.ptr == arrayA.data.ptr: + pass + else: + arrayA[...] = self.arrayA + + if self.arrayB.data.ptr == arrayB.data.ptr: + pass + else: + arrayB[...] = self.arrayB + def backward(self, arrayB, arrayA): """Global redistribution from arrayB to arrayA @@ -299,7 +349,9 @@ def backward(self, arrayB, arrayA): Array of shape subshapeA, for receiving data """ - self.backward_graph = self.Alltoallw(arrayB, self._subtypesB, arrayA, self._subtypesA, graph=self.backward_graph) + self._copy_to_input_arrays(arrayA, arrayB) + self.backward_graph = self.Alltoallw(self.arrayB, self._subtypesB, self.arrayA, self._subtypesA, graph=self.backward_graph) + self._retrieve_from_input_arrays(arrayA, arrayB) def forward(self, arrayA, arrayB): """Global redistribution from arrayA to arrayB @@ -311,7 +363,9 @@ def forward(self, arrayA, arrayB): arrayB : array Array of shape subshapeB, for receiving data """ - self.forward_graph = self.Alltoallw(arrayA, self._subtypesA, arrayB, self._subtypesB, graph=self.forward_graph) + self._copy_to_input_arrays(arrayA, arrayB) + self.forward_graph = self.Alltoallw(self.arrayA, self._subtypesA, self.arrayB, self._subtypesB, graph=self.forward_graph) + self._retrieve_from_input_arrays(arrayA, arrayB) def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB, graph=None): """