-
Notifications
You must be signed in to change notification settings - Fork 2
/
pmi.py
1213 lines (1039 loc) · 40.9 KB
/
pmi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# PMI - Parallel Method Invocation
# Copyright (C) 2009,2010 Olaf Lenz
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
************************************
**PMI** - Parallel Method Invocation
************************************
PMI allows users to write serial Python scripts that use functions and
classes that are executed in parallel.
PMI is intended to be used in data-parallel environments, where
several threads run in parallel and can communicate via MPI.
In PMI mode, a single thread of control (a python script that runs on
the *controller*, i.e. the MPI root task) can invoke arbitrary
functions on all other threads (the *workers*) in parallel via
`call()`, `invoke()` and `reduce()`. When the function on the workers
return, the control is returned to the controller.
This model is equivalent to the \"Fork-Join execution model\" used
e.g. in OpenMP.
PMI also allows to create parallel instances of object classes via
`create()`, i.e. instances that have a corresponding object instance
on all workers. `call()`, `invoke()` and `reduce()` can be used to
call arbitrary methods of these instances.
to execute arbitrary code on all workers, `exec_()` can be used, and
to import python modules to all workers, use 'import_()'.
Main program
------------
On the workers, the main program of a PMI script usually consists of a
single call to the function `startWorkerLoop()`. On the workers, this
will start an infinite loop on the workers that waits to receive the
next PMI call, while it will immediately return on the controller. On
the workers, the loop ends only, when one of the commands
`finalizeWorkers()` or `stopWorkerLoop()` is issued on the
controller. A typical PMI main program looks like this:
>>> # compute 2*factorial(42) in parallel
>>> import pmi
>>>
>>> # start the worker loop
>>> # on the controller, this function returns immediately
>>> pmi.startWorkerLoop()
>>>
>>> # Do the parallel computation
>>> pmi.import_('math')
>>> pmi.reduce('lambda a,b: a+b', 'math.factorial', 42)
>>>
>>> # exit all workers
>>> pmi.finalizeWorkers()
Instead of using `finalizeWorkers()` at the end of the script, you can
call `registerAtExit()` anywhere else, which will cause
`finalizeWorkers()` to be called when the python interpreter exits.
Alternatively, it is possible to use PMI in an SPMD-like fashion,
where each call to a PMI command on the controller must be accompanied
by a corresponding call on the worker. This can be either a simple
call to `receive()` that accepts any PMI command, or a call to the
identical PMI command. In that case, the arguments of the call to the
PMI command on the workers are ignored. In this way, it is possible to
write SPMD scripts that profit from the PMI communication patterns.
>>> # compute 2*factorial(42) in parallel
>>> import pmi
>>>
>>> pmi.exec_('import math')
>>> pmi.reduce('lambda a,b: a+b', 'math.factorial', 42)
To start the worker loop, the command `startWorkerLoop()` can be
issued on the workers. To stop the worker loop, `stopWorkerLoop()` can
be issued on the controller, which will end the worker loop without
exiting the workers.
Controller commands
-------------------
These commands can be called in the controller script. When any of
these commands is issued on a worker during the worker loop, a
`UserError` is raised.
* `call()`, `invoke()`, `reduce()` to call functions and methods in parallel
* `create()` to create parallel object instances
* `exec_()` and `import_()` to execute arbitrary python code in
parallel and to import classes and functions into the global
namespace of pmi.
* `sync()` to make sure that all deleted PMI objects have been deleted.
* `finalizeWorkers()` to stop and exit all workers
* `registerAtExit()` to make sure that finalizeWorkers() is called when
python exits on the controller
* `stopWorkerLoop()` to interrupt the worker loop an all workers and to
return control to the single workers
Worker commands
---------------
These commands can be called on a worker.
* `startWorkerLoop()` to start the worker loop
* `receive()` to receive a single PMI command
* `call()`, `invoke()`, `reduce()`, `create()` and `exec_()` to
receive a single corresponding PMI command. Note that these commands
will ignore any arguments when called on a worker.
PMI Proxy metaclass
-------------------
The `Proxy` metaclass can be used to easily generate front-end classes
to distributed PMI classes.
.
.
.
Useful constants and variables
------------------------------
The pmi module defines the following useful constants and variables:
* `isController` is True when used on the controller, False otherwise
* `isWorker` = not isController
* `ID` is the rank of the MPI task
* `CONTROLLER` is the rank of the Controller (normally the MPI root)
* `workerStr` is a string describing the thread ('Worker #' or 'Controller')
* `inWorkerLoop` is True, if PMI currently executes the worker loop on
the workers.
"""
import logging, types, sys, inspect, os
__author__ = 'Olaf Lenz'
__email__ = 'olaf at lenz dot name'
__version__ = '1.0'
__all__ = [
'exec_', 'import_', 'execfile_',
'create', 'call', 'invoke', 'reduce', 'localcall',
'sync', 'receive',
'startWorkerLoop',
'finalizeWorkers', 'stopWorkerLoop', 'registerAtExit',
'Proxy',
'rank', 'size', 'CONTROLLER',
'isController', 'isWorker',
'workerStr', 'inWorkerLoop',
'UserError'
]
##################################################
## IMPORT
##################################################
def import_(*args) :
"""Controller command that imports python modules on all workers.
Each element of args should be a module name that is imported to
all workers.
Example:
>>> pmi.import_('hello')
>>> hw = pmi.create('hello.HelloWorld')
"""
global inWorkerLoop
if isController:
if len(args) == 0:
raise UserError('pmi.import_ expects exactly 1 argument on controller!')
# broadcast the statement
_broadcast(_IMPORT, *args)
# locally execute the statement
return __workerImport_(*args)
elif not inWorkerLoop:
return receive(_IMPORT)
def __workerImport_(*modules) :
log.info("Importing modules: %s", modules)
statement='import ' + ', '.join(modules)
exec(statement, globals())
##################################################
## EXEC
##################################################
def exec_(*args) :
"""Controller command that executes arbitrary python code on all workers.
exec_() allows to execute arbitrary Python code on all workers.
It can be used to define classes and functions on all workers.
Modules should not be imported via exec_(), instead import_()
should be used.
Each element of args should be string that is executed on all
workers.
Example:
>>> pmi.exec_('import hello')
>>> hw = pmi.create('hello.HelloWorld')
"""
if __checkController(exec_) :
if len(args) == 0:
raise UserError('pmi.exec_ expects at least one argument(s) on controller!')
# broadcast the statement
_broadcast(_EXEC, *args)
# locally execute the statement
return __workerExec_(*args)
else :
return receive(_EXEC)
def __workerExec_(*statements) :
# executing the statement locally
for statement in statements:
log.info("Executing '%s'", statement)
exec(statement, globals())
##################################################
## EXECFILE
##################################################
def execfile_(file):
if __checkController(execfile_):
_broadcast(_EXECFILE, file)
return __workerExecfile_(file)
else:
return receive(_EXECFILE)
def __workerExecfile_(file):
log.info("Executing file '%s'", file)
execfile(file, globals())
##################################################
## CREATE
##################################################
def create(cls=None, *args, **kwds) :
"""Controller command that creates an object on all workers.
cls describes the (new-style) class that should be instantiated.
args are the arguments to the constructor of the class. Only
classes that are known to PMI can be used, that is, classes that
have been imported to pmi via `exec_()` or `import_()`.
Example:
>>> pmi.exec_('import hello')
>>> hw = pmi.create('hello.HelloWorld')
>>> print(hw)
MPI process #0: Hello World!
MPI process #1: Hello World!
...
Alternative:
Note that in this case the class has to be imported to the
calling module *and* via PMI.
>>> import hello
>>> pmi.exec_('import hello')
>>> hw = pmi.create(hello.HelloWorld)
>>> print(hw)
MPI process #0: Hello World!
MPI process #1: Hello World!
...
"""
if __checkController(create) :
if cls is None:
raise UserError('pmi.create expects at least 1 argument on controller!')
cls = _translateClass(cls)
# generate a new oid
oid = __OID()
# translate the arguments
cargs, ckwds, targs, tkwds = __translateArgs(args, kwds)
# broadcast creation to the workers
_broadcast(_CREATE, cls, oid, *targs, **tkwds)
obj = __workerCreate(cls, oid, *cargs, **ckwds)
# On the controller, store the oid in the instance
obj.__pmioid = oid
# Create the destroyer so that the instances on the workers
# are destroyed
obj.__pmidestroyer = __Destroyer(oid)
return obj
else :
return receive(_CREATE)
def __workerCreate(cls, oid, *targs, **tkwds) :
# backtranslate the arguments
args, kwds = __backtranslateOIDs(targs, tkwds)
log.info('Creating: %s [%s]'
% (__formatCall(cls.__name__, args, kwds), oid))
# create the worker instance
obj = cls(*args, **kwds)
if isWorker:
# store the new object
if oid in OBJECT_CACHE :
raise InternalError("Object [%s] is already in OBJECT_CACHE!" % oid)
OBJECT_CACHE[oid] = obj
return obj
##################################################
## CLONE
##################################################
# If the class is picklable, a living instance can be converted into a pmi object
##################################################
## CALL (INVOKE WITHOUT RESULT)
##################################################
def call(*args, **kwds) :
"""Call a function on all workers, returning only the return value on the controller.
function denotes the function that is to be called, args and kwds
are the arguments to the function. If kwds contains keys that
start with with the prefix '__pmictr_', they are stripped of the
prefix and are passed only to the controller.
If the function should return any results, it will be locally
returned.
Only functions that are known to PMI can be used, that is functions
that have been imported to pmi via `exec_()` or `import_()`.
Example:
>>> pmi.exec_('import hello')
>>> hw = pmi.create('hello.HelloWorld')
>>> pmi.call(hw.hello)
>>> # equivalent:
>>> pmi.call('hello.HelloWorld', hw)
Note, that you can use only functions that are know to PMI when
`call()` is called, i.e. functions in modules that have
been imported via `exec_()`.
"""
if __checkController(call) :
if len(args) == 0:
raise UserError('pmi.call expects at least 1 argument on controller!')
cfunction, tfunction, args = __translateFunctionArgs(*args)
cargs, ckwds, targs, tkwds = __translateArgs(args, kwds)
_broadcast(_CALL, tfunction, *targs, **tkwds)
log.info("Calling: %s", __formatCall(cfunction, cargs, ckwds))
return cfunction(*cargs, **ckwds)
else :
return receive(_CALL)
def __workerCall(function, *targs, **tkwds) :
function = __backtranslateFunctionArg(function)
args, kwds = __backtranslateOIDs(targs, tkwds)
log.info("Calling: %s", __formatCall(function, args, kwds))
return function(*args, **kwds)
##################################################
## LOCAL CALL
##################################################
# provided for convenience
# you can just use: pmiobject.function(*args, **kwds)
def localcall(*args, **kwds):
if __checkController(localcall):
cfunction, tfunction, args = __translateFunctionArgs(*args)
args, kwds = __translateProxies(args, kwds)
log.info("Calling locally: %s", __formatCall(cfunction, args, kwds))
return cfunction(*args, **kwds)
else:
raise UserError('Cannot call localcall on worker!')
##################################################
## INVOKE
##################################################
def invoke(*args, **kwds) :
"""Invoke a function on all workers, gathering the return values into a list.
function denotes the function that is to be called, args and
kwds are the arguments to the function. If kwds contains keys
that start with with the prefix '__pmictr_', they are stripped of
the prefix and are passed only to the controller.
On the controller, invoke() returns the results of the different
workers as a list. On the workers, invoke returns None.
Only functions that are known to PMI can be used, that is functions
that have been imported to pmi via `exec_()` or `import_()`.
Example:
>>> pmi.exec_('import hello')
>>> hw = pmi.create('hello.HelloWorld')
>>> messages = pmi.invoke(hw.hello())
>>> # alternative:
>>> messages = pmi.invoke('hello.HelloWorld.hello', hw)
"""
if __checkController(invoke) :
if len(args) == 0:
raise UserError('pmi.invoke expects at least 1 argument on controller!')
cfunction, tfunction, args = __translateFunctionArgs(*args)
cargs, ckwds, targs, tkwds = __translateArgs(args, kwds)
_broadcast(_INVOKE, tfunction, *targs, **tkwds)
log.info("Invoking: %s", __formatCall(cfunction, cargs, ckwds))
value = cfunction(*cargs, **ckwds)
return _MPIGather(value)
else :
return receive(_INVOKE)
def __workerInvoke(function, *targs, **tkwds) :
function = __backtranslateFunctionArg(function)
args, kwds = __backtranslateOIDs(targs, tkwds)
log.info("Invoking: %s", __formatCall(function, args, kwds))
value = function(*args, **kwds)
return _MPIGather(value)
##################################################
## REDUCE (INVOKE WITH REDUCED RESULT)
##################################################
def reduce(*args, **kwds) :
"""Invoke a function on all workers, reducing the return values to
a single value.
reduceOp is the (associative) operator that is used to process the
return values, function denotes the function that is to be called,
args and kwds are the arguments to the function. If kwds
contains keys that start with with the prefix '__pmictr_', they
are stripped of the prefix and are passed only to the controller.
reduce() reduces the results of the different workers into a
single value via the operation reduceOp. reduceOp is assumed to be
associative.
Both reduceOp and function have to be known to PMI, that is they
must have been imported to pmi via `exec_()` or `import_()`.
Example:
>>> pmi.exec_('import hello')
>>> pmi.exec_('joinstr=lambda a,b: \"\\n\".join(a,b)')
>>> hw = pmi.create('hello.HelloWorld')
>>> print(pmi.reduce('joinstr', hw.hello()))
>>> # equivalent:
>>> print(
... pmi.reduce('lambda a,b: \"\\n\".join(a,b)',
... 'hello.HelloWorld.hello', hw)
... )
"""
if __checkController(reduce) :
if len(args) <= 1:
raise UserError('pmi.reduce expects at least 2 argument on controller!')
# handle reduceOp argument
creduceOp, treduceOp, args = __translateReduceOpArgs(*args)
cfunction, tfunction, args = __translateFunctionArgs(*args)
cargs, ckwds, targs, tkwds = __translateArgs(args, kwds)
_broadcast(_REDUCE, treduceOp, tfunction, *targs, **tkwds)
log.info("Reducing: %s", __formatCall(cfunction, cargs, ckwds))
value = cfunction(*args, **ckwds)
log.info("Reducing results via %s", creduceOp)
return _MPIReduce(op=creduceOp, value=value)
else :
return receive(_REDUCE)
def __workerReduce(reduceOp, function, *targs, **tkwds) :
reduceOp = __backtranslateReduceOpArg(reduceOp)
function = __backtranslateFunctionArg(function)
args, kwds = __backtranslateOIDs(targs, tkwds)
log.info("Reducing: %s", __formatCall(function, args, kwds))
value = function(*args, **kwds)
log.info("Reducing results via %s", reduceOp)
return _MPIReduce(op=reduceOp, value=value)
##################################################
## SYNC
##################################################
def sync():
"""Controller command that deletes the PMI objects on the
workers that have already been deleted on the controller.
"""
if __checkController(sync):
_broadcast(_SYNC)
else:
receive(_SYNC)
def __workerSync():
"""Worker sync is a nop, it only exists for the possible deletion
of objects.
"""
pass
##################################################
## DUMP
##################################################
def dump() :
"""Controller function that dumps the object cache of PMI. For
debugging purposes."""
if __checkController(dump) :
_broadcast(_DUMP)
else :
receive(_DUMP)
def __workerDump() :
import pprint
print("OBJECT_CACHE=%s", pprint.pformat(OBJECT_CACHE))
##################################################
## AUTOMATIC OBJECT DELETION
##################################################
def __delete():
"""Internal implementation of sync()."""
global DELETED_OIDS
if len(DELETED_OIDS) > 0:
log.debug("Got %d objects in DELETED_OIDS.", len(DELETED_OIDS))
__broadcastCmd(_DELETE, *DELETED_OIDS)
DELETED_OIDS = []
def __workerDelete(*args) :
"""Deletes the OBJECT_CACHE reference to a PMI object."""
if len(args) > 0:
log.info("Deleting oids: %s", args)
for oid in args:
obj=OBJECT_CACHE[oid]
log.debug(" %s [%s]" % (obj, oid))
# Delete the entry from the cache
del OBJECT_CACHE[oid]
##################################################
## WORKER LOOP CONTROL
##################################################
def startWorkerLoop() :
"""Worker command that starts the main worker loop.
This function starts a loop that expects to receive PMI commands
until `stopWorkerLoop()` or `finalizeWorkers()` is called on the
controller.
"""
global inWorkerLoop
# On the controller, leave immediately
if isController :
log.info('Entering and leaving the worker loop')
return None
log.info('Entering the worker loop.')
inWorkerLoop = True
try :
while 1 :
receive()
except StopIteration :
inWorkerLoop = False
def finalizeWorkers():
"""Controller command that stops and exits all workers.
"""
stopWorkerLoop(doExit=True)
def stopWorkerLoop(doExit=False) :
"""Controller command that stops all workers.
If doExit is set, the workers exit afterwards.
"""
if __checkController(stopWorkerLoop) :
log.info('Calling all workers to stop.')
_broadcast(_STOP, doExit)
else :
raise UserError('Cannot call stopWorkerLoop on worker!')
def __workerStop(doExit) :
if doExit :
log.info('Stopping worker loop and exiting worker thread.')
sys.exit()
else :
log.info('Stopping worker loop.')
raise StopIteration()
def registerAtExit() :
"""Controller command that registers the function
`finalizeWorkers()` via atexit.
"""
if __checkController(registerAtExit) :
import atexit
atexit.register(finalizeWorkers)
else:
raise UserError('Cannot call registerAtExit on worker!')
##################################################
## PROXY METACLASS
##################################################
class Proxy(type):
"""A metaclass to be used to create frontend serial objects."""
class _Initializer(object):
def __init__(self, pmiobjectclassdef):
self.pmiobjectclassdef = pmiobjectclassdef
def __call__(self, method_self, *args, **kwds):
# create the pmi object
log.info('PMI.Proxy of type %s is creating pmi object of type %s',
method_self.__class__.__name__,
self.pmiobjectclassdef)
if not _isProxy(method_self):
method_self.pmiobjectclassdef = self.pmiobjectclassdef
pmiobjectclass = _translateClass(self.pmiobjectclassdef)
method_self.pmiobject = create(pmiobjectclass, *args, **kwds)
method_self.pmiobject._pmiproxy = method_self
class _LocalCaller(object):
def __init__(self, methodName):
self.methodName = methodName
def __call__(self, method_self, *args, **kwds):
method = getattr(method_self.pmiobject, self.methodName)
return _backtranslateProxy(localcall(method, *args, **kwds))
class _PMICaller(object):
def __init__(self, methodName):
self.methodName = methodName
def __call__(self, method_self, *args, **kwds):
method = getattr(method_self.pmiobject, self.methodName)
return _backtranslateProxy(call(method, *args, **kwds))
class _PMIInvoker(object):
def __init__(self, methodName):
self.methodName = methodName
def __call__(self, method_self, *args, **kwds):
method = getattr(method_self.pmiobject, self.methodName)
return map(_backtranslateProxy, invoke(method, *args, **kwds))
class _PropertyLocalGetter(object):
def __init__(self, propName):
self.propName = propName
def __call__(self, method_self):
property = getattr(method_self.pmiobject.__class__, self.propName)
getter = getattr(property, 'fget')
return _backtranslateProxy(getter(method_self.pmiobject))
class _PropertyPMISetter(object):
def __init__(self, propName):
self.propName = propName
def __call__(self, method_self, val):
# property = getattr(method_self.pmiobject.__class__, self.propName)
# setter = getattr(property, 'fset')
# return call(setter, method_self.pmiobject, val)
setter = '.'.join(
(method_self.pmiobjectclassdef,
self.propName,
'fset'))
return _backtranslateProxy(call(setter, method_self, val))
def __addMethod(cls, methodName, caller):
newMethod = types.MethodType(caller, None, cls)
setattr(cls, methodName, newMethod)
def __init__(cls, name, bases, dict):
if 'pmiproxydefs' in dict:
defs = dict['pmiproxydefs']
# now generate the methods of the Proxy object
if 'cls' in defs:
pmiobjectclassdef = defs['cls']
log.info('Defining PMI proxy class %s for pmi object class %s.'
% (name, pmiobjectclassdef))
# define cls.pmiinit
cls.__addMethod('pmiinit', Proxy._Initializer(pmiobjectclassdef))
if not isinstance(cls.__init__, types.MethodType):
log.debug(' redirecting __init__ to pmiinit')
cls.__init__ = cls.pmiinit
else:
log.info('Defining abstract PMI proxy class %s.' % name)
if 'localcall' in defs:
for methodName in defs['localcall']:
log.debug(' adding local call to %s' % methodName)
cls.__addMethod(methodName,
Proxy._LocalCaller(methodName))
if 'pmicall' in defs:
for methodName in defs['pmicall']:
log.debug(' adding pmi call to %s' % methodName)
cls.__addMethod(methodName,
Proxy._PMICaller(methodName))
if 'pmiinvoke' in defs:
for methodName in defs['pmiinvoke']:
log.debug(' adding pmi invoke of %s' % methodName)
cls.__addMethod(methodName,
Proxy._PMIInvoker(methodName))
if 'pmiproperty' in defs:
for propname in defs['pmiproperty']:
log.debug(' adding pmi property %s' % propname)
newprop = property(
Proxy._PropertyLocalGetter(propname),
Proxy._PropertyPMISetter(propname))
setattr(cls, propname, newprop)
##################################################
## CONSTANTS AND EXCEPTIONS
##################################################
class InternalError(Exception):
"""Raised when PMI has encountered an internal error.
Hopefully, this exceptions is never raised."""
def __init__(self, msg):
self.msg = msg
def __str__(self) :
return workerStr + ': ' + self.msg
def __repr__(self) :
return str(self)
class UserError(Exception):
"""Raised when PMI has encountered a user error.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self) :
return workerStr + ': ' + self.msg
def __repr__(self) :
return str(self)
##################################################
## BROADCAST AND RECEIVE
##################################################
def _broadcast(cmd, *args, **kwds) :
"""Internal controller command that actually broadcasts a PMI command.
The function first checks whether cmd is a valid PMI command, then
it checks whether any objects have to be deleted before the
command is broadcast, and finally it broadcasts the command
itself.
"""
__delete()
log.debug("Broadcasting command: %s", _CMD[cmd][0])
__broadcastCmd(cmd, *args, **kwds)
def __broadcastCmd(cmd, *args, **kwds) :
"""This wraps a command with its argument into an internal __CMD
object, so that it can be safely sent via MPI. __CMD is
pciklable."""
if not _checkCommand(cmd) :
raise InternalError('_broadcast needs a PMI command as first argument. Got %s instead!' % cmd)
cmdobj = __CMD(cmd, args, kwds)
_MPIBroadcast(cmdobj)
def receive(expected=None) :
"""Worker command that receives and handles the next PMI command.
This function waits to receive and handle a single PMI command. If
expected is not None and the received command does not equal
expected, raise a `UserError`.
"""
__checkWorker(receive)
if expected is None:
log.debug('Waiting for next PMI command.')
else:
log.debug('Waiting for PMI command %s.', _CMD[expected][0])
message = _MPIBroadcast()
log.debug("Received message: %s", message)
if type(message) is not __CMD:
raise UserError("Received an MPI message that is not a PMI command: '%s'" % str(message))
cmd = message.cmd
args = message.args
kwds = message.kwds
if cmd == _DELETE:
# if _DELETE is sent, delete the objects
__workerDelete(*args)
# recursively call receive once more
return receive(expected)
elif expected is not None and cmd != expected :
# otherwise test whether the command is expected
raise UserError("Received PMI command %s but expected %s" % (_CMD[cmd][0], _CMD[expected][0]))
# determine which function to call
cmd_func = _CMD[cmd][1]
log.debug("Calling function %s", __formatCall(cmd_func.__name__, args, kwds))
return cmd_func(*args, **kwds)
##################################################
## INTERNAL FUNTIONS
##################################################
class __OID(object) :
"""Internal class that represents a PMI object id.
An instance of this class can be pickled, so that it can be sent
via MPI, and it is hashable, so that it can be used as a hash key
(for OBJECT_CACHE).
"""
def __init__(self) :
self.id = id(self)
def __str__(self):
return 'oid=0x%x' % self.id
def __hash__(self):
return self.id
def __eq__(self, obj):
return self.id == obj.id
def __getstate__(self):
return self.id
def __setstate__(self, id):
self.id = id
class __Destroyer(object):
def __init__(self, oid):
self.oid = oid
return object.__init__(self)
def __del__(self):
log.info("Adding OID to DELETED_OIDS: [%s]", self.oid)
DELETED_OIDS.append(self.oid)
class __CMD(object) :
"""Internal, picklable class that represents a PMI
command. Intended to be sent via MPI.
"""
def __init__(self, cmd, args=None, kwds=None) :
if not _checkCommand(cmd):
raise InternalError('Created __CMD object with invalid PMI command %s' % cmd)
self.cmd = cmd
self.args = args
self.kwds = kwds
def __str__(self):
sargs = [_CMD[self.cmd][0]]
if hasattr(self, 'args'):
sargs.append(str(map(str, self.args)))
if hasattr(self, 'kwds'):
sargs.append(str(self.kwds))
return 'PMICMD(%s)' % (', '.join(sargs))
def __getstate__(self):
state = (self.cmd, self.args, self.kwds)
return state
def __setstate__(self, state):
self.cmd, self.args, self.kwds = state
def _isProxy(obj):
return hasattr(obj, 'pmiobject')
def _checkCommand(cmd):
return 0 <= cmd < _MAXCMD
def __checkController(func) :
"""Checks whether we are on the controller, raises a UserError if
we are on a worker and in the worker loop.
Returns whether we are on the controller.
"""
global inWorkerLoop
if isController:
return True
else:
if not inWorkerLoop:
return False
else:
raise UserError("Cannot call %s on worker while in worker loop!" % func.__name__)
def __checkWorker(func) :
"""Checks whether we are on a worker, raises a UserError if we are not.
"""
if isController:
raise UserError("Cannot call %s on the controller!" % func.__name__)
def _translateClass(cls):
"""Returns the class object of the class described by cls.
"""
if cls is None :
raise UserError("pmi.create expects at least 1 argument on controller")
elif isinstance(cls, types.StringTypes) :
return eval(cls)
elif isinstance(cls, types.TypeType) :
return cls
elif isinstance(cls, types.ClassType) :
raise TypeError("""PMI cannot use old-style classes.
Please create old style classes via their names.
""")
else :
raise ValueError("__translateClass expects class as argument, but got %s" % cls)
def __mapArgs(func, args, kwds):
"""Internal function that maps a function to the list args and to
the values of the dict kwds. Used by __translateArgs and
__backtranslateOIDs.
"""
targs = map(func, args)
tkwds = {}
for k, v in kwds.iteritems():
tkwds[k] = func(v)
return targs, tkwds
def _translateOID(obj) :
"""Internal function that translates obj into an __OID
object if it is a PMI object instance.
If the object is not a PMI object, returns obj untouched.
"""
if hasattr(obj, '__pmioid'):
# if it is a pmi object, send the oid
return obj.__pmioid
else:
return obj
def _backtranslateProxy(obj):
if hasattr(obj, '_pmiproxy'):
return obj._pmiproxy
else:
return obj
def _translateProxy(obj):
if _isProxy(obj):
return obj.pmiobject
else:
return obj
def __translateProxies(args, kwds):
return __mapArgs(_translateProxy, args, kwds)
def __translateOIDs(args, kwds):
"""Internal function that translates all PMI object instances that
occur in args or kwds into __OID objects that can be sent to the
workers.
"""
return __mapArgs(_translateOID, args, kwds)
def __translateArgs(args, kwds):
args, kwds = __translateProxies(args, kwds)
workerKwds={}
controllerKwds={}
for k in kwds.keys():
if k.startswith('__pmictr_'):
knew = k[9:]
controllerKwds[knew] = kwds[k]
else:
v = kwds[k]
workerKwds[k] = v
if k not in controllerKwds:
controllerKwds[k] = v
targs, tWorkerKwds = __translateOIDs(args, workerKwds)
return args, controllerKwds, targs, tWorkerKwds
def _backtranslateOID(obj) :
"""Internal worker function that backtranslates an __OID object
into the corresponding PMI worker instance.
If the object is not an __OID object, returns the object untouched.
"""
if type(obj) is __OID:
if obj in OBJECT_CACHE:
return OBJECT_CACHE[obj]
elif isController:
# TODO: Not nice! Is this just so that broadcast can
# return anything on the controller?
return None
else:
raise InternalError("Object [%s] is not in OBJECT_CACHE" % obj)
else :
return obj
def __backtranslateOIDs(targs, tkwds):
"""Internal function that backtranslates all __OID object
instances that occur in args or kwds into the cached PMI objects
on the worker.
"""
return __mapArgs(_backtranslateOID, targs, tkwds)
# Wrapper that allows to pickle a method
class __Method(object) :
def __init__(self, funcname, im_self, im_class=None):
self.__name__ = funcname
self.im_self = _translateProxy(im_self)
if im_class is None:
self.im_class = self.im_self.__class__
else:
self.im_class = im_class
self.__determineMethod()
def __getstate__(self):
return (self.__name__,
_translateOID(self.im_self),
self.im_class)
def __setstate__(self, state):
self.__name__, self.im_self, self.im_class = state
self.im_self = _backtranslateOID(self.im_self)
self.__determineMethod()
def __determineMethod(self):
for cls in self.im_class.mro():
if hasattr(cls, self.__name__):
function = getattr(cls, self.__name__)
self.method = function.__get__(self.im_self, cls)
break
def __call__(self, *args, **kwds):
return self.method(*args, **kwds)
# translate arguments to invoke