Skip to content

Commit

Permalink
implements backend-specific reduction_(begin|end)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaushikcfd committed Jul 11, 2022
1 parent 38e2027 commit 1381d88
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 96 deletions.
70 changes: 41 additions & 29 deletions pyop2/backends/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pyop2.types.map import Map, MixedMap
from pyop2.parloop import AbstractParloop
from pyop2.global_kernel import AbstractGlobalKernel
from pyop2.types.access import INC, MIN, MAX
from pyop2.types.mat import Mat
from pyop2.types.glob import Global
from pyop2.backends import AbstractComputeBackend
Expand All @@ -23,14 +24,16 @@ class Dat(BaseDat):
@utils.cached_property
def _vec(self):
assert self.dtype == PETSc.ScalarType, \
"Can't create Vec with type %s, must be %s" % (self.dtype, PETSc.ScalarType)
"Can't create Vec with type %s, must be %s" % (self.dtype,
PETSc.ScalarType)
# Can't duplicate layout_vec of dataset, because we then
# carry around extra unnecessary data.
# But use getSizes to save an Allreduce in computing the
# global size.
size = self.dataset.layout_vec.getSizes()
data = self._data[:size[0]]
vec = PETSc.Vec().createWithArray(data, size=size, bsize=self.cdim, comm=self.comm)
vec = PETSc.Vec().createWithArray(data, size=size,
bsize=self.cdim, comm=self.comm)
return vec


Expand All @@ -46,9 +49,10 @@ def code_to_compile(self):

if self.local_kernel.cpp:
from loopy.codegen.result import process_preambles
preamble = "".join(process_preambles(getattr(code, "device_preambles", [])))
preamble = "".join(
process_preambles(getattr(code, "device_preambles", [])))
device_code = "\n\n".join(str(dp.ast) for dp in code.device_programs)
return preamble + "\nextern \"C\" {\n" + device_code + "\n}\n"
return preamble + '\nextern "C" {\n' + device_code + "\n}\n"
return code.device_code()

@PETSc.Log.EventDecorator()
Expand Down Expand Up @@ -81,32 +85,40 @@ def compile(self, comm):

class Parloop(AbstractParloop):

def prepare_arglist(self, iterset, *args):
arglist = iterset._kernel_args_
for arg in args:
arglist += arg._kernel_args_
seen = set()
for arg in args:
maps = arg.map_tuple
for map_ in maps:
if map_ is None:
continue
for k in map_._kernel_args_:
if k in seen:
continue
arglist += (k,)
seen.add(k)
return arglist

@PETSc.Log.EventDecorator("ParLoopRednBegin")
@mpi.collective
def reduction_begin(self):
"""Begin reductions."""
requests = []
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
mpi_op = {INC: mpi.MPI.SUM,
MIN: mpi.MPI.MIN,
MAX: mpi.MPI.MAX}.get(self.accesses[idx])

if mpi.MPI.VERSION >= 3:
requests.append(self.comm.Iallreduce(glob._data,
glob._buf,
op=mpi_op))
else:
self.comm.Allreduce(glob._data, glob._buf, op=mpi_op)
return tuple(requests)

@PETSc.Log.EventDecorator("ParLoopRednEnd")
@mpi.collective
def _compute(self, part):
"""Execute the kernel over all members of a MPI-part of the iteration space.
def reduction_end(self, requests):
"""Finish reductions."""
if mpi.MPI.VERSION >= 3:
for idx, req in zip(self._reduction_idxs, requests):
req.Wait()
glob = self.arguments[idx].data
glob._data[:] = glob._buf
else:
assert len(requests) == 0

:arg part: The :class:`SetPartition` to compute over.
"""
with self._compute_event():
PETSc.Log.logFlops(part.size*self.num_flops)
self.global_kernel(self.comm, part.offset, part.offset+part.size, *self.arglist)
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
glob._data[:] = glob._buf


class CPUBackend(AbstractComputeBackend):
Expand All @@ -126,7 +138,7 @@ class CPUBackend(AbstractComputeBackend):
Mat = Mat
Global = Global
GlobalDataSet = GlobalDataSet
PETScVecType = 'standard'
PETScVecType = "standard"

def turn_on_offloading(self):
pass
Expand Down
76 changes: 42 additions & 34 deletions pyop2/backends/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
AVAILABLE_ON_DEVICE_ONLY,
AVAILABLE_ON_BOTH,
DataAvailability)
from pyop2.profiling import timed_region
from pyop2.configuration import configuration
from pyop2.types.set import (MixedSet, Subset as BaseSubset,
ExtrudedSet as BaseExtrudedSet,
Expand All @@ -20,7 +19,7 @@
from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet
from pyop2.types.mat import Mat
from pyop2.types.glob import Global as BaseGlobal
from pyop2.types.access import RW, READ, INC
from pyop2.types.access import RW, READ, INC, MIN, MAX
from pyop2.parloop import AbstractParloop
from pyop2.global_kernel import AbstractGlobalKernel
from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend
Expand Down Expand Up @@ -253,6 +252,9 @@ def _kernel_args_(self):
return (self._cuda_data.gpudata,)
else:
self.ensure_availability_on_host()
# tell petsc that we have updated the data on the host
with self.vec as v:
v.array_w
return (self._data.ctypes.data, )
else:
if cuda_backend.offloading:
Expand Down Expand Up @@ -365,7 +367,7 @@ def ensure_availability_on_device(self):

def ensure_availability_on_host(self):
if not self.is_available_on_host():
self._cuda.get(ary=self._data)
self._cuda_data.get(ary=self._data)
self._availability_flag = AVAILABLE_ON_BOTH

@property
Expand Down Expand Up @@ -538,39 +540,45 @@ def compile(self, comm):

class Parloop(AbstractParloop):

def prepare_arglist(self, iterset, *args):
nbytes = 0

arglist = iterset._kernel_args_
for arg in args:
arglist += arg._kernel_args_
if arg.access is INC:
nbytes += arg.data.nbytes * 2
@PETSc.Log.EventDecorator("ParLoopRednBegin")
@mpi.collective
def reduction_begin(self):
"""Begin reductions."""
requests = []
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
mpi_op = {INC: mpi.MPI.SUM,
MIN: mpi.MPI.MIN,
MAX: mpi.MPI.MAX}.get(self.accesses[idx])

if mpi.MPI.VERSION >= 3:
glob.ensure_availability_on_host()
requests.append(self.comm.Iallreduce(glob._data,
glob._buf,
op=mpi_op))
else:
nbytes += arg.data.nbytes
seen = set()
for arg in args:
maps = arg.map_tuple
for map_ in maps:
for k in map_._kernel_args_:
if k in seen:
continue
arglist += map_._kernel_args_
seen.add(k)
nbytes += map_.values.nbytes

self.nbytes = nbytes

return arglist
self.comm.Allreduce(glob._data, glob._buf, op=mpi_op)
return tuple(requests)

@PETSc.Log.EventDecorator("ParLoopRednEnd")
@mpi.collective
def _compute(self, part, fun, *arglist):
if part.size == 0:
return
def reduction_end(self, requests):
"""Finish reductions."""
if mpi.MPI.VERSION >= 3:
for idx, req in zip(self._reduction_idxs, requests):
req.Wait()
glob = self.arguments[idx].data
glob._data[:] = glob._buf
glob._availability_flag = AVAILABLE_ON_HOST_ONLY
glob.ensure_availability_on_device()
else:
assert len(requests) == 0

with timed_region("Parloop_{0}_{1}".format(self.iterset.name,
self._jitmodule._wrapper_name)):
fun(part.offset, part.offset + part.size, *arglist)
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
glob._data[:] = glob._buf
glob._availability_flag = AVAILABLE_ON_HOST_ONLY
glob.ensure_availability_on_device()


class CUDABackend(AbstractComputeBackend):
Expand Down Expand Up @@ -602,12 +610,12 @@ def __init__(self):

def turn_on_offloading(self):
self.offloading = True
self.ParLoop = self.Parloop_offloading
self.Parloop = self.Parloop_offloading
self.GlobalKernel = self.GlobalKernel_offloading

def turn_off_offloading(self):
self.offloading = False
self.ParLoop = self.Parloop_no_offloading
self.Parloop = self.Parloop_no_offloading
self.GlobalKernel = self.GlobalKernel_no_offloading

@property
Expand Down
50 changes: 40 additions & 10 deletions pyop2/backends/opencl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
AVAILABLE_ON_DEVICE_ONLY,
AVAILABLE_ON_BOTH,
DataAvailability)
from pyop2.profiling import timed_region
from pyop2.configuration import configuration
from pyop2.types.set import (MixedSet, Subset as BaseSubset,
ExtrudedSet as BaseExtrudedSet,
Expand All @@ -19,7 +18,7 @@
from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet
from pyop2.types.mat import Mat
from pyop2.types.glob import Global as BaseGlobal
from pyop2.types.access import RW, READ, INC
from pyop2.types.access import RW, READ, INC, MIN, MAX
from pyop2.parloop import AbstractParloop
from pyop2.global_kernel import AbstractGlobalKernel
from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend
Expand Down Expand Up @@ -582,14 +581,45 @@ def prepare_arglist(self, iterset, *args):

return arglist

@PETSc.Log.EventDecorator("ParLoopRednBegin")
@mpi.collective
def _compute(self, part, fun, *arglist):
if part.size == 0:
return
def reduction_begin(self):
"""Begin reductions."""
requests = []
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
mpi_op = {INC: mpi.MPI.SUM,
MIN: mpi.MPI.MIN,
MAX: mpi.MPI.MAX}.get(self.accesses[idx])

if mpi.MPI.VERSION >= 3:
glob.ensure_availability_on_host()
requests.append(self.comm.Iallreduce(glob._data,
glob._buf,
op=mpi_op))
else:
self.comm.Allreduce(glob._data, glob._buf, op=mpi_op)
return tuple(requests)

@PETSc.Log.EventDecorator("ParLoopRednEnd")
@mpi.collective
def reduction_end(self, requests):
"""Finish reductions."""
if mpi.MPI.VERSION >= 3:
for idx, req in zip(self._reduction_idxs, requests):
req.Wait()
glob = self.arguments[idx].data
glob._data[:] = glob._buf
glob._availability_flag = AVAILABLE_ON_HOST_ONLY
glob.ensure_availability_on_device()
else:
assert len(requests) == 0

with timed_region("Parloop_{0}_{1}".format(self.iterset.name,
self._jitmodule._wrapper_name)):
fun(part.offset, part.offset + part.size, *arglist)
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
glob._data[:] = glob._buf
glob._availability_flag = AVAILABLE_ON_HOST_ONLY
glob.ensure_availability_on_device()


class OpenCLBackend(AbstractComputeBackend):
Expand Down Expand Up @@ -642,12 +672,12 @@ def queue(self):

def turn_on_offloading(self):
self.offloading = True
self.ParLoop = self.Parloop_offloading
self.Parloop = self.Parloop_offloading
self.GlobalKernel = self.GlobalKernel_offloading

def turn_off_offloading(self):
self.offloading = False
self.ParLoop = self.Parloop_no_offloading
self.Parloop = self.Parloop_no_offloading
self.GlobalKernel = self.GlobalKernel_no_offloading

@property
Expand Down
35 changes: 12 additions & 23 deletions pyop2/parloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ def compute(self):
# Parloop.compute is an alias for Parloop.__call__
self()

@mpi.collective
def _compute(self, part):
"""Execute the kernel over all members of a MPI-part of the iteration space.
:arg part: The :class:`SetPartition` to compute over.
"""
with self._compute_event():
PETSc.Log.logFlops(part.size*self.num_flops)
self.global_kernel(self.comm, part.offset, part.offset+part.size, *self.arglist)

@PETSc.Log.EventDecorator("ParLoopExecute")
@mpi.collective
def __call__(self):
Expand Down Expand Up @@ -341,34 +351,13 @@ def _l2g_idxs(self):
@mpi.collective
def reduction_begin(self):
"""Begin reductions."""
requests = []
for idx in self._reduction_idxs:
glob = self.arguments[idx].data
mpi_op = {Access.INC: mpi.MPI.SUM,
Access.MIN: mpi.MPI.MIN,
Access.MAX: mpi.MPI.MAX}.get(self.accesses[idx])

if mpi.MPI.VERSION >= 3:
requests.append(self.comm.Iallreduce(glob._data, glob._buf, op=mpi_op))
else:
self.comm.Allreduce(glob._data, glob._buf, op=mpi_op)
return tuple(requests)
raise NotImplementedError("Backend-specific logic not implemented")

@PETSc.Log.EventDecorator("ParLoopRednEnd")
@mpi.collective
def reduction_end(self, requests):
"""Finish reductions."""
if mpi.MPI.VERSION >= 3:
for idx, req in zip(self._reduction_idxs, requests):
req.Wait()
glob = self.arguments[idx].data
glob.data[:] = glob._buf
else:
assert len(requests) == 0

for idx in self._reduction_idxs:
glob = self.arguments[idx].data
glob.data[:] = glob._buf
raise NotImplementedError("Backend-specific logic not implemented")

@cached_property
def _reduction_idxs(self):
Expand Down

0 comments on commit 1381d88

Please sign in to comment.