Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add 'expedited' decorator for urgent messages #212

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion auto_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def searchForPython(python_implementations):
python_implementations = set() # python implementations can also be added here manually
searchForPython(python_implementations)

interfaces = ['ctypes', 'cffi', 'cython']
interfaces = ['cython']

with open('test_config.json', 'r') as infile:
tests = json.load(infile)
Expand Down
2 changes: 1 addition & 1 deletion charm4py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
Reducer = charm.reducers
Future = charm.createFuture

from .entry_method import when, coro, coro_ext, coro as threaded
from .entry_method import when, coro, coro_ext, coro as threaded, expedited, EntryMethodOptions

from .chare import Chare, Group, Array, ArrayMap
from .channel import Channel
Expand Down
9 changes: 6 additions & 3 deletions charm4py/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

class Channel(object):

def __new__(cls, chare, remote, local=None):
def __new__(cls, chare, remote, local=None, options=None):
if not hasattr(chare, '__channels__'):
chare.__initchannelattrs__()
ch = chare.__findPendingChannel__(remote, False)
if ch is None:
local_port = len(chare.__channels__)
ch = _Channel(local_port, remote, True)
ch = _Channel(local_port, remote, True, options)
chare.__channels__.append(ch)
chare.__pendingChannels__.append(ch)
else:
Expand All @@ -28,7 +28,7 @@ def __new__(cls, chare, remote, local=None):

class _Channel(object):

def __init__(self, port, remote, locally_initiated):
def __init__(self, port, remote, locally_initiated, opts):
self.port = port
self.remote = remote
self.remote_port = -1
Expand All @@ -41,6 +41,9 @@ def __init__(self, port, remote, locally_initiated):
self.established_fut = None
self.locally_initiated = locally_initiated

if opts:
self.remote._channelRecv__.set_options(opts)

def setEstablished(self):
self.established = True
del self.established_fut
Expand Down
19 changes: 17 additions & 2 deletions charm4py/chare.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def _channelConnect__(self, remote_proxy, remote_port): # entry method
else:
from .channel import _Channel
local_port = len(self.__channels__)
ch = _Channel(local_port, remote_proxy, False)
ch = _Channel(local_port, remote_proxy, False, None)
self.__channels__.append(ch)
self.__pendingChannels__.append(ch)
ch.remote_port = remote_port
Expand Down Expand Up @@ -713,7 +713,19 @@ def array_proxy_elem(proxy, idx): # array proxy [] overload method
assert _slice.start is not None and _slice.stop is not None, 'Must specify start and stop indexes for array slicing'
return charm.split(proxy, 1, slicing=idx)[0]

class EntryMethodOptions:
def __init__(self):
self.value = 0
def set_option(self, val_identifier):
self.value |= val_identifier
def get(self):
return self.value

def array_proxy_method_gen(ep, argcount, argnames, defaults): # decorator, generates proxy entry methods
msg_opts = EntryMethodOptions()
def set_options(options):
nonlocal msg_opts
msg_opts = options
def proxy_entry_method(proxy, *args, **kwargs):
num_args = len(args)
if num_args < argcount and len(kwargs) > 0:
Expand Down Expand Up @@ -748,7 +760,7 @@ def proxy_entry_method(proxy, *args, **kwargs):
if elemIdx in array:
destObj = array[elemIdx]
msg = charm.packMsg(destObj, args, header)
charm.CkArraySend(aid, elemIdx, ep, msg)
charm.CkArraySend(aid, elemIdx, ep, msg, msg_opts.get())
else:
root, sid = proxy.section
header[b'sid'] = sid
Expand All @@ -758,6 +770,7 @@ def proxy_entry_method(proxy, *args, **kwargs):
charm.sectionMgr.thisProxy[root].sendToSection(sid, ep, header, *args)
return blockFuture
proxy_entry_method.ep = ep
proxy_entry_method.set_options = set_options
return proxy_entry_method

def array_ckNew_gen(C, epIdx):
Expand Down Expand Up @@ -871,6 +884,8 @@ def __getProxyClass__(C, cls, sectionProxy=False):
f = profile_send_function(array_proxy_method_gen(m.epIdx, argcount, argnames, defaults))
else:
f = array_proxy_method_gen(m.epIdx, argcount, argnames, defaults)
if m._msg_opts is not None:
f.set_options(m._msg_opts)
f.__qualname__ = proxyClassName + '.' + m.name
f.__name__ = m.name
M[m.name] = f
Expand Down
6 changes: 5 additions & 1 deletion charm4py/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ def check_deprecated(self):
if len(old_options.intersection(set(dir(self.__class__)))) != 0:
raise Charm4PyError('Options API has changed. Use charm.options instead')


class Charm4PyError(Exception):
def __init__(self, msg):
super(Charm4PyError, self).__init__(msg)
self.message = msg

class EntryMethodAttributes:
def __init__(self,
expedited):
CK_EXPEDITED = expedited

# Acts as the Charm runtime at the Python level (there is one instance of this class
# per process)
Expand Down Expand Up @@ -120,6 +123,7 @@ def __init__(self):
self.CkChareSend = self.lib.CkChareSend
self.CkGroupSend = self.lib.CkGroupSend
self.CkArraySend = self.lib.CkArraySend
self.em_options = self.lib.em_options
self.reducers = reduction.ReducerContainer(self)
self.redMgr = reduction.ReductionManager(self, self.reducers)
self.mainchareRegistered = False
Expand Down
5 changes: 2 additions & 3 deletions charm4py/charmlib/ccharm.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ cdef extern from "charm.h":
void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes);
void CkGroupExtSend(int gid, int npes, int *pes, int epIdx, char *msg, int msgSize);
void CkGroupExtSend_multi(int gid, int npes, int *pes, int epIdx, int num_bufs, char **bufs, int *buf_sizes);
void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize);
void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes);
void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize, int opts);
void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes, int opts);
void CkForwardMulticastMsg(int gid, int num_children, int *children);

int CkGroupGetReductionNumber(int gid);
Expand Down Expand Up @@ -70,7 +70,6 @@ cdef extern from "charm.h":
void CkStartQDExt_SectionCallback(int sid_pe, int sid_cnt, int rootPE, int ep);
void CcdCallFnAfter(void (*CcdVoidFn)(void *userParam,double curWallTime), void *arg, double msecs);


cdef extern from "spanningTree.h":
void getPETopoTreeEdges(int pe, int rootPE, int *pes, int numpes, unsigned int bfactor,
int *parent, int *child_count, int **children);
Expand Down
11 changes: 7 additions & 4 deletions charm4py/charmlib/charmlib_cython.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ cdef object times = [0.0] * 3 # track time in [charm reduction callbacks, custom
cdef bytes localMsg = b'L:' + (b' ' * sizeof(int))
cdef char* localMsg_ptr = <char*>localMsg

class EntryMethodOptions(object):
EXPEDITED = 0x4

class CharmLib(object):

Expand All @@ -322,6 +324,7 @@ class CharmLib(object):
self.chareNames = []
self.init()
self.ReducerType = CkReductionTypesExt_Wrapper()
self.em_options = EntryMethodOptions()
#print(charm_reducers.sum_long, charm_reducers.product_ushort, charm_reducers.max_char, charm_reducers.max_float, charm_reducers.min_char)
#print(ReducerType.sum_long, ReducerType.product_ushort, ReducerType.max_char, ReducerType.max_float, ReducerType.min_char)

Expand Down Expand Up @@ -449,18 +452,18 @@ class CharmLib(object):
CkGroupExtSend_multi(group_id, num_pes, section_children, ep, cur_buf, send_bufs, send_buf_sizes)
cur_buf = 1

def CkArraySend(self, int array_id, index not None, int ep, msg not None):
def CkArraySend(self, int array_id, index not None, int ep, msg not None, int opts=0):
global cur_buf
msg0, dcopy = msg
cdef int ndims = len(index)
cdef int i = 0
for i in range(ndims): c_index[i] = index[i]
if cur_buf <= 1:
CkArrayExtSend(array_id, c_index, ndims, ep, msg0, len(msg0))
CkArrayExtSend(array_id, c_index, ndims, ep, msg0, len(msg0), opts)
else:
send_bufs[0] = <char*>msg0
send_buf_sizes[0] = <int>len(msg0)
CkArrayExtSend_multi(array_id, c_index, ndims, ep, cur_buf, send_bufs, send_buf_sizes)
CkArrayExtSend_multi(array_id, c_index, ndims, ep, cur_buf, send_bufs, send_buf_sizes, opts)
cur_buf = 1

def sendToSection(self, int gid, list children):
Expand Down Expand Up @@ -930,7 +933,7 @@ cdef void resumeFromSync(int aid, int ndims, int *arrayIndex):
try:
index = array_index_to_tuple(ndims, arrayIndex)
CkArrayExtSend(aid, arrayIndex, ndims, charm.arrays[aid][index].thisProxy.resumeFromSync.ep,
emptyMsg, len(emptyMsg))
emptyMsg, len(emptyMsg), 0)
except:
charm.handleGeneralError()

Expand Down
24 changes: 23 additions & 1 deletion charm4py/entry_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@
from time import time
import sys
from greenlet import greenlet, getcurrent

from . import charm

class EntryMethodOptions:
def __init__(self):
self.value = 0
def set_option(self, val_identifier):
self.value |= val_identifier
def unset_option(self, val_identifier):
raise NotImplementedError("Options are currently permanent")
def get(self):
return self.value

class EntryMethod(object):

Expand All @@ -28,6 +38,10 @@ def __init__(self, C, name, profile=False):
else:
self.run = self._run_prof

self._msg_opts = None
if hasattr(method, '_msg_opts'):
self._msg_opts = method._msg_opts

self.when_cond = None
if hasattr(method, 'when_cond'):
# template object specifying the 'when' condition clause
Expand Down Expand Up @@ -176,6 +190,14 @@ def coro(func):
func._ck_coro = True
return func

def expedited(func):
options = EntryMethodOptions()
# TODO: get this value from charm
# options.set_value(charm.em_options.CK_EXPEDITED)
options.set_option(0x4)
func._msg_opts = options
return func


def coro_ext(event_notify=False):
def _coro(func):
Expand Down
43 changes: 43 additions & 0 deletions tests/entry_methods/decorated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from charm4py import charm, Chare, Array, expedited, Channel, EntryMethodOptions, coro


class Test(Chare):
"""
A chare array to test the element proxy.
"""

@coro
def __init__(self):
self.count = 0
opts = EntryMethodOptions()
opts.set_option(0x4)
self.partner = Channel(self, self.thisProxy[(self.thisIndex[0] + 1) % 6], options=opts)
self.partner.send(1)
self.partner.recv()

@expedited
def say(self, msg):
"""
Helper method which is called by invoking the element proxy.
This method is expected to be called on only the chare for
which the proxy is created.
"""

self.count += 1
print("Say", msg, "called on", self.thisIndex, "on PE", charm.myPe())
if self.count == 2:
assert self.thisIndex == (3,)
exit()

def start(self):
proxy = self.thisProxy[3]
proxy.say("bye")
proxy.say("bye")


def main(args):
arr_proxy = Array(Test, 6)
arr_proxy[0].start()


charm.start(main)