diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ab636d61..706038ba 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -55,7 +55,7 @@ jobs: steps: - task: UsePythonVersion@0 inputs: - versionSpec: '3.9' + versionSpec: '3.11' architecture: 'x64' - script: | @@ -85,7 +85,7 @@ jobs: steps: - task: UsePythonVersion@0 inputs: - versionSpec: '3.9' + versionSpec: '3.11' architecture: 'x64' - script: | diff --git a/pylops/waveeqprocessing/_kirchhoff_cuda.py b/pylops/waveeqprocessing/_kirchhoff_cuda.py new file mode 100644 index 00000000..c320468c --- /dev/null +++ b/pylops/waveeqprocessing/_kirchhoff_cuda.py @@ -0,0 +1,590 @@ +from math import cos + +import numpy as np +from numba import cuda + +from pylops.utils.backend import to_cupy + + +class _KirchhoffCudaHelper: + """A helper class for performing Kirchhoff migration or modeling using Numba CUDA. + + This class provides methods to compute the forward and adjoint operations for the + Kirchhoff operator, utilizing GPU acceleration through Numba's CUDA capabilities. + + Parameters + ---------- + ns : :obj:`int` + Number of sources. + nr : :obj:`int` + Number of receivers. + nt : :obj:`int` + Number of time samples. + ni : :obj:`int` + Number of image points. + dynamic : :obj:`bool`, optional + Flag indicating whether to use dynamic computation (default is `False``). + nstreams : :obj:`int`, optional + Number of streams used in case of ``dynamic=True``. + + """ + + def __init__(self, ns, nr, nt, ni, dynamic=False, nstreams=1): + self.ns, self.nr, self.nt, self.ni = ns, nr, nt, ni + self.dynamic = dynamic + self.nstreams = nstreams + + self._grid_setup() + + def _grid_setup(self): + """Set up the CUDA grid and block dimensions based on the current device and computation flags. + + This method configures the number of blocks and threads per block for CUDA kernels, + depending on whether dynamic weights are ued in computations or not. + """ + current_device = cuda.get_current_device() + if self.dynamic: + num_sources = self.ns + num_streams = self.nstreams + streams = [cuda.stream() for _ in range(num_streams)] + sources_per_stream = num_sources // num_streams + remainder = num_sources % num_streams + source_counter = 0 + self.sources_per_streams = {} + for i, stream in enumerate(streams): + num_sources_for_stream = sources_per_stream + ( + 1 if i < remainder else 0 + ) + sources_for_stream = list( + range(source_counter, source_counter + num_sources_for_stream) + ) + self.sources_per_streams[stream] = sources_for_stream + source_counter += num_sources_for_stream + n_runs = ( + num_streams * self.ns * self.ni * self.nr + ) # number_of_times_to_run_kernel + self.num_threads_per_blocks = current_device.WARP_SIZE * 8 + self.num_blocks = ( + n_runs + (self.num_threads_per_blocks - 1) + ) // self.num_threads_per_blocks + else: + warp = current_device.WARP_SIZE + self.num_threads_per_blocks = (warp, warp) + self.num_blocks = ( + (self.ns + self.num_threads_per_blocks[0] - 1) + // self.num_threads_per_blocks[0], + (self.nr + self.num_threads_per_blocks[1] - 1) + // self.num_threads_per_blocks[1], + ) + + def _data_prep_dynamic( + self, + ns, + nr, + nt, + ni, + nz, + dt, + aperture, + angleaperture, + aperturetap, + vel, + six, + rix, + trav_recs, + angle_recs, + trav_srcs, + angle_srcs, + amp_srcs, + amp_recs, + ): + """Data preparation for dynamic computation + + Prepare data for dynamic computation by transfering some variables to device + memory in advance once. + """ + ns_d = np.int32(ns) + nr_d = np.int32(nr) + nt_d = np.int32(nt) + ni_d = np.int32(ni) + nz_d = np.int32(nz) + dt_d = np.float32(dt) + aperturemin_d = np.float32(aperture[0]) + aperturemax_d = np.float32(aperture[1]) + angleaperturemin_d = np.float32(angleaperture[0]) + angleaperturemax_d = np.float32(angleaperture[1]) + aperturetap_d = cuda.to_device(aperturetap) + vel_d = cuda.to_device(vel) + six_d = cuda.to_device(six) + rix_d = cuda.to_device(rix) + self.const_inputs = ( + ns_d, + nr_d, + nt_d, + ni_d, + nz_d, + dt_d, + aperturemin_d, + aperturemax_d, + angleaperturemin_d, + angleaperturemax_d, + vel_d, + aperturetap_d, + six_d, + rix_d, + ) + + self.trav_recs_d_global = cuda.to_device(trav_recs) + self.angles_recs_d_global = cuda.to_device(angle_recs) + self.trav_srcs_d_global = cuda.to_device(trav_srcs) + self.angles_srcs_d_global = cuda.to_device(angle_srcs) + self.amp_srcs_d_global = cuda.to_device(amp_srcs) + self.amp_recs_d_global = cuda.to_device(amp_recs) + + def _call_nondynamic(self, opt, *inputs): + """Process data using CUDA single stream for non-dynamic computation. + + This method handles data preparation and execution of CUDA kernels + for both forward and adjoint operations of non-dynamic case. + + Parameters + ---------- + opt : :obj:`str` + Operation type, either '_matvec' for forward or '_rmatvec' for adjoint. + *inputs : :obj:`list` + List of input parameters required by the kernels. + + Returns + ------- + y_d : :obj:`numpy.ndarray` + Output data after processing. + + """ + + if opt == "_matvec": + x_d = inputs[0] + y_d = inputs[1] + ns_d = np.int32(inputs[2]) + nr_d = np.int32(inputs[3]) + nt_d = np.int32(inputs[4]) + ni_d = np.int32(inputs[5]) + dt_d = np.float32(inputs[6]) + trav_srcs_d = to_cupy(inputs[7]) + trav_recs_d = to_cupy(inputs[8]) + self._travsrcrec_kirch_matvec_cuda[ + self.num_blocks, self.num_threads_per_blocks + ](x_d, y_d, ns_d, nr_d, nt_d, ni_d, dt_d, trav_srcs_d, trav_recs_d) + cuda.synchronize() + + elif opt == "_rmatvec": + x_d = inputs[0] + y_d = inputs[1] + ns_d = np.int32(inputs[2]) + nr_d = np.int32(inputs[3]) + nt_d = np.int32(inputs[4]) + ni_d = np.int32(inputs[5]) + dt_d = np.float32(inputs[6]) + trav_srcs_d = to_cupy(inputs[7]) + trav_recs_d = to_cupy(inputs[8]) + self._travsrcrec_kirch_rmatvec_cuda[ + self.num_blocks, self.num_threads_per_blocks + ](x_d, y_d, ns_d, nr_d, nt_d, ni_d, dt_d, trav_srcs_d, trav_recs_d) + cuda.synchronize() + + return y_d + + def _call_dynamic(self, x, opt): + """Process data using CUDA streams for dynamic computation. + + This method handles data preparation and execution of CUDA kernels using streams, + for both forward and adjoint operations of dynamic case. + + Parameters + ---------- + x : :obj:`numpy.ndarray` + Input data (image or seismic data). + opt : :obj:`str` + Operation type, either '_matvec' for forward or '_rmatvec' for adjoint. + + Returns + ------- + y : :obj:`numpy.ndarray` + Output data after processing. + + """ + + if opt == "_matvec": + x = x.ravel() + y = np.zeros((self.ns * self.nr, self.nt)) + elif opt == "_rmatvec": + y = np.zeros(self.ni) + + y_d_dict = {} + isrc_list_d_dict = {} + for stream, isrc_list in self.sources_per_streams.items(): + x_d = cuda.to_device(x, stream=stream) + y_d_dict[stream] = cuda.to_device(y, stream=stream) + isrc_list_d_dict[stream] = cuda.to_device(isrc_list, stream=stream) + + for stream in self.sources_per_streams.keys(): + stream.synchronize() + for irec in range(self.nr): + for stream, isrc_list in self.sources_per_streams.items(): + if opt == "_matvec": + self._ampsrcrec_kirch_matvec_cuda_streams[ + self.num_blocks, self.num_threads_per_blocks, stream + ]( + *self.const_inputs, + self.trav_srcs_d_global, + self.amp_srcs_d_global, + self.angles_srcs_d_global, + self.trav_recs_d_global, + self.amp_recs_d_global, + self.angles_recs_d_global, + y_d_dict[stream], + isrc_list_d_dict[stream], + irec, + x_d + ) + elif opt == "_rmatvec": + self._ampsrcrec_kirch_rmatvec_cuda_streams[ + self.num_blocks, self.num_threads_per_blocks, stream + ]( + *self.const_inputs, + self.trav_srcs_d_global, + self.amp_srcs_d_global, + self.angles_srcs_d_global, + self.trav_recs_d_global, + self.amp_recs_d_global, + self.angles_recs_d_global, + y_d_dict[stream], + isrc_list_d_dict[stream], + irec, + x_d + ) + # Synchronize the streams to ensure all operations have been completed + for stream in self.sources_per_streams.keys(): + stream.synchronize() + y_streams = [] + for stream, y_dev in y_d_dict.items(): + y_streams.append(y_dev.copy_to_host(stream=stream)) + y_total = np.sum(y_streams, axis=0) + return y_total + + @staticmethod + @cuda.jit + def _travsrcrec_kirch_matvec_cuda(x, y, ns, nr, nt, ni, dt, trav_srcs, trav_recs): + isrc, irec = cuda.grid(2) + if ns > isrc and nr > irec: + for ii in range(ni): + travisrc = trav_srcs[ii, isrc] + travirec = trav_recs[ii, irec] + trav = travisrc + travirec + itravii = int(trav / dt) + travdii = trav / dt - itravii + if 0 <= itravii < nt - 1: + ind1 = (isrc * nr + irec, itravii) + val1 = x[ii] * (1 - travdii) + ind2 = (isrc * nr + irec, itravii + 1) + val2 = x[ii] * travdii + cuda.atomic.add(y, ind1, val1) + cuda.atomic.add(y, ind2, val2) + + @staticmethod + @cuda.jit + def _travsrcrec_kirch_rmatvec_cuda(x, y, ns, nr, nt, ni, dt, trav_srcs, trav_recs): + isrc, irec = cuda.grid(2) + if ns > isrc and nr > irec: + for ii in range(ni): + trav_srcsii = trav_srcs[ii] + trav_recsii = trav_recs[ii] + trav_srcii = trav_srcsii[isrc] + trav_recii = trav_recsii[irec] + travii = trav_srcii + trav_recii + itravii = int(travii / dt) + travdii = travii / dt - itravii + if 0 <= itravii < nt - 1: + vii = ( + x[isrc * nr + irec, itravii] * (1 - travdii) + + x[isrc * nr + irec, itravii + 1] * travdii + ) + cuda.atomic.add(y, ii, vii) + + @staticmethod + @cuda.jit + def _ampsrcrec_kirch_matvec_cuda_streams( + ns, + nr, + nt, + ni, + nz, + dt, + aperturemin, + aperturemax, + angleaperturemin, + angleaperturemax, + vel, + aperturetap, + six_d, + rix_d, + travsrc, + ampsrc, + anglesrc, + travi, + ampi, + anglei, + y, + isrc_list, + irec, + x, + ): + ii = cuda.grid(1) + if ni > ii: + index_isrc = -1 + for isrc in isrc_list: + index_isrc = index_isrc + 1 + sixisrcrec = six_d[isrc * nr + irec] + rixisrcrec = rix_d[isrc * nr + irec] + travirec = travi[:, irec] + ampirec = ampi[:, irec] + angleirec = anglei[:, irec] + travisrc = travsrc[:, isrc] + ampisrc = ampsrc[:, isrc] + angleisrc = anglesrc[:, isrc] + daperture = aperturemax - aperturemin + dangleaperture = angleaperturemax - angleaperturemin + trav = travisrc[ii] + travirec[ii] + itravii = int(trav / dt) + travdii = trav / dt - itravii + # compute cosine of half opening angle and total amplitude scaling + cosangle = cos((angleisrc[ii] - angleirec[ii]) / 2.0) + damp = 2.0 * cosangle * ampisrc[ii] * ampirec[ii] / vel[ii] + # extract source and receiver angle at given image point + angle_src = angleisrc[ii] + angle_rec = angleirec[ii] + abs_angle_src = abs(angle_src) + abs_angle_rec = abs(angle_rec) + aptap = 1.0 + # angle apertures checks + if ( + abs_angle_src < angleaperturemax + and abs_angle_rec < angleaperturemax + ): + if abs_angle_src >= angleaperturemin: + # extract source angle aperture taper value + aptap = ( + aptap + * aperturetap[ + int( + 20 + * (abs_angle_src - angleaperturemin) + // dangleaperture + ) + ] + ) + if abs_angle_rec >= angleaperturemin: + # extract receiver angle aperture taper value + aptap = ( + aptap + * aperturetap[ + int( + 20 + * (abs_angle_rec - angleaperturemin) + // dangleaperture + ) + ] + ) + # identify x-index of image point + iz = ii % nz + # aperture check + aperture = abs(sixisrcrec - rixisrcrec) / iz + if aperture < aperturemax: + if aperture >= aperturemin: + # extract aperture taper value + aptap = ( + aptap + * aperturetap[ + int(20 * ((aperture - aperturemin) // daperture)) + ] + ) + # time limit check + if 0 <= itravii < nt - 1: + ind1 = (isrc * nr + irec, itravii) + val1 = x[ii] * (1 - travdii) * damp * aptap + ind2 = (isrc * nr + irec, itravii + 1) + val2 = x[ii] * travdii * damp * aptap + cuda.atomic.add(y, ind1, val1) + cuda.atomic.add(y, ind2, val2) + + @staticmethod + @cuda.jit + def _ampsrcrec_kirch_rmatvec_cuda_streams( + ns, + nr, + nt, + ni, + nz, + dt, + aperturemin, + aperturemax, + angleaperturemin, + angleaperturemax, + vel, + aperturetap, + six_d, + rix_d, + travsrc, + ampsrc, + anglesrc, + travi, + ampi, + anglei, + y, + isrc_list, + irec, + x, + ): + ii = cuda.grid(1) + if ni > ii: + index_isrc = -1 + for isrc in isrc_list: + index_isrc = index_isrc + 1 + sixisrcrec = six_d[isrc * nr + irec] + rixisrcrec = rix_d[isrc * nr + irec] + travirec = travi[:, irec] + ampirec = ampi[:, irec] + angleirec = anglei[:, irec] + travisrc = travsrc[:, isrc] + ampisrc = ampsrc[:, isrc] + angleisrc = anglesrc[:, isrc] + daperture = aperturemax - aperturemin + dangleaperture = angleaperturemax - angleaperturemin + trav = travisrc[ii] + travirec[ii] + itravii = int(trav / dt) + travdii = trav / dt - itravii + # extract source and receiver angle at given image point + angle_src = angleisrc[ii] + angle_rec = angleirec[ii] + abs_angle_src = abs(angle_src) + abs_angle_rec = abs(angle_rec) + aptap = 1.0 + + cosangle = cos((angle_src - angle_rec) / 2.0) + damp = 2.0 * cosangle * ampisrc[ii] * ampirec[ii] / vel[ii] + # angle apertures checks + if ( + abs_angle_src < angleaperturemax + and abs_angle_rec < angleaperturemax + ): + if abs_angle_src >= angleaperturemin: + # extract source angle aperture taper value + aptap = ( + aptap + * aperturetap[ + int( + 20 + * (abs_angle_src - angleaperturemin) + // dangleaperture + ) + ] + ) + if abs_angle_rec >= angleaperturemin: + # extract receiver angle aperture taper value + aptap = ( + aptap + * aperturetap[ + int( + 20 + * (abs_angle_rec - angleaperturemin) + // dangleaperture + ) + ] + ) + # identify x-index of image point + iz = ii % nz + # aperture check + aperture = abs(sixisrcrec - rixisrcrec) / iz + if aperture < aperturemax: + if aperture >= aperturemin: + # extract aperture taper value + aptap = ( + aptap + * aperturetap[ + int(20 * ((aperture - aperturemin) // daperture)) + ] + ) + # time limit check + if 0 <= itravii < nt - 1: + ind1 = ii + val1 = ( + ( + x[isrc * nr + irec, itravii] * (1 - travdii) + + x[isrc * nr + irec, itravii + 1] * travdii + ) + * damp + * aptap + ) + cuda.atomic.add(y, ind1, val1) + + def _matvec_call(self, *inputs): + """Handle the forward operation call, dispatching to appropriate CUDA kernels. + + This method selects the appropriate kernel to execute based on the computation flags, + and performs the forward operation. + + Parameters + ---------- + *inputs : :obj:`list` + List of input parameters required by the kernels. + + Returns + ------- + y : :obj:`numpy.ndarray` + Output data (seismic data) of forward operation. + + """ + if self.dynamic: + y_d = self._call_dynamic(inputs[0], "_matvec") + else: + y_d = self._call_nondynamic("_matvec", *inputs) + """ + else: + x_d = inputs[0] + y_d = inputs[1] + ns_d = np.int32(inputs[2]) + nr_d = np.int32(inputs[3]) + nt_d = np.int32(inputs[4]) + ni_d = np.int32(inputs[5]) + dt_d = np.float32(inputs[6]) + trav_srcs_d = to_cupy(inputs[7]) + trav_recs_d = to_cupy(inputs[8]) + self._travsrcrec_kirch_matvec_cuda[ + self.num_blocks, self.num_threads_per_blocks + ](x_d, y_d, ns_d, nr_d, nt_d, ni_d, dt_d, trav_srcs_d, trav_recs_d) + cuda.synchronize() + """ + return y_d + + def _rmatvec_call(self, *inputs): + """Handle the adjoint operation call, dispatching to appropriate CUDA kernels. + + This method selects the appropriate kernel to execute based on the computation flags, + and performs the adjoint operation. + + Parameters + ---------- + *inputs : :obj:`list` + List of input parameters required by the kernels. + + Returns + ------- + y : :obj:`numpy.ndarray` + Output data (image) of adjoint operation. + + """ + if self.dynamic: + y_d = self._call_dynamic(inputs[0], "_rmatvec") + else: + y_d = self._call_nondynamic("_rmatvec", *inputs) + + return y_d diff --git a/pylops/waveeqprocessing/kirchhoff.py b/pylops/waveeqprocessing/kirchhoff.py index bdd5be87..59453320 100644 --- a/pylops/waveeqprocessing/kirchhoff.py +++ b/pylops/waveeqprocessing/kirchhoff.py @@ -12,6 +12,7 @@ from pylops.signalprocessing import Convolve1D from pylops.utils import deps from pylops.utils._internal import _value_or_sized_to_array +from pylops.utils.backend import get_array_module from pylops.utils.decorators import reshaped from pylops.utils.tapers import taper from pylops.utils.typing import DTypeLike, NDArray @@ -25,6 +26,8 @@ if jit_message is None: from numba import jit, prange + from ._kirchhoff_cuda import _KirchhoffCudaHelper + # detect whether to use parallel or not numba_threads = int(os.getenv("NUMBA_NUM_THREADS", "1")) parallel = True if numba_threads != 1 else False @@ -82,8 +85,8 @@ class Kirchhoff(LinearOperator): :math:`\lbrack (n_y) n_x n_z \times n_s n_r \rbrack` or pair of traveltime tables of size :math:`\lbrack (n_y) n_x n_z \times n_s \rbrack` and :math:`\lbrack (n_y) n_x n_z \times n_r \rbrack` (to be provided if ``mode='byot'``). Note that the latter approach is recommended as less memory demanding - than the former. Moreover, only ``mode='dynamic'`` is only possible when traveltimes are provided in - the latter form. + than the former. Moreover, ``mode='dynamic'`` and ``engine='cuda'`` are only possible when traveltimes are + provided in the latter form. amp : :obj:`numpy.ndarray`, optional .. versionadded:: 2.0.0 @@ -108,7 +111,7 @@ class Kirchhoff(LinearOperator): Deprecated, will be removed in v3.0.0. Simply kept for back-compatibility with previous implementation, but effectively not affecting the behaviour of the operator. engine : :obj:`str`, optional - Engine used for computations (``numpy`` or ``numba``). + Engine used for computations (``numpy``, ``numba`` or ``cuda``). dtype : :obj:`str`, optional Type of elements in input array. name : :obj:`str`, optional @@ -995,10 +998,9 @@ def _ampsrcrec_kirch_rmatvec( return y def _register_multiplications(self, engine: str) -> None: - if engine not in ["numpy", "numba"]: - raise KeyError("engine must be numpy or numba") + if engine not in ["numpy", "numba", "cuda"]: + raise KeyError("engine must be numpy or numba or cuda") if engine == "numba" and jit_message is None: - # numba numba_opts = dict( nopython=True, nogil=True, parallel=parallel ) # fastmath=True, @@ -1011,7 +1013,37 @@ def _register_multiplications(self, engine: str) -> None: elif not self.travsrcrec: self._kirch_matvec = jit(**numba_opts)(self._trav_kirch_matvec) self._kirch_rmatvec = jit(**numba_opts)(self._trav_kirch_rmatvec) - + elif engine == "cuda": + if self.dynamic and self.travsrcrec: + self.cuda_helper = _KirchhoffCudaHelper( + self.ns, self.nr, self.nt, self.ni, True + ) + self.cuda_helper._data_prep_dynamic( + self.ns, + self.nr, + self.nt, + self.ni, + self.nz, + self.dt, + self.aperture, + self.angleaperture, + self.aperturetap, + self.vel, + self.six, + self.rix, + self.trav_recs, + self.angle_recs, + self.trav_srcs, + self.angle_srcs, + self.amp_srcs, + self.amp_recs, + ) + elif self.travsrcrec: + self.cuda_helper = _KirchhoffCudaHelper( + self.ns, self.nr, self.nt, self.ni, False + ) + self._kirch_matvec = self.cuda_helper._matvec_call + self._kirch_rmatvec = self.cuda_helper._rmatvec_call else: if engine == "numba" and jit_message is not None: logging.warning(jit_message) @@ -1027,7 +1059,8 @@ def _register_multiplications(self, engine: str) -> None: @reshaped def _matvec(self, x: NDArray) -> NDArray: - y = np.zeros((self.nsnr, self.nt), dtype=self.dtype) + ncp = get_array_module(x) + y = ncp.zeros((self.nsnr, self.nt), dtype=self.dtype) if self.dynamic and self.travsrcrec: inputs = ( x.ravel(), @@ -1074,9 +1107,10 @@ def _matvec(self, x: NDArray) -> NDArray: @reshaped def _rmatvec(self, x: NDArray) -> NDArray: + ncp = get_array_module(x) x = self.cop._rmatvec(x.ravel()) x = x.reshape(self.nsnr, self.nt) - y = np.zeros(self.ni, dtype=self.dtype) + y = ncp.zeros(self.ni, dtype=self.dtype) if self.dynamic and self.travsrcrec: inputs = ( x,