This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Experiment.py
903 lines (708 loc) · 36.3 KB
/
Experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
# Class definition:
# Experiment
# This class is the main experiment class; ATLAS etc will inherit from this class
# Instances are generated with ExperimentFactory
# Subclasses should implement all needed methods prototyped in this class
# Note: not compatible with Singleton Design Pattern due to the subclassing
import os
import re
import time
import commands
from subprocess import Popen, PIPE
from PilotErrors import PilotErrors
from pUtil import tolog # Dump to pilot log
from pUtil import readpar # Used to read values from the schedconfig DB (queuedata)
from pUtil import getCmtconfig # cmtconfig (move to subclass)
from pUtil import getDirectAccessDic # Get the direct access dictionary
from pUtil import isBuildJob # Is the current job a build job?
from pUtil import remove # Used to remove redundant file before log file creation
from pUtil import getPilotlogFilename # Used in the subprocess arguments method
from pUtil import extractHPCInfo # Used by getSubprocessName() to determine HPC plug-in if necessary
class Experiment(object):
# experiment = "generic" # String defining the experiment
# private data members
__experiment = "generic" # String defining the experiment
__instance = None # Boolean used by subclasses to become a Singleton
__error = PilotErrors() # PilotErrors object
__doFileLookups = False # True for LFC based file lookups (basically a dummy data member here since singleton object is static)
__cache = "" # Cache URL used e.g. by LSST
# Required methods
def __init__(self, *args, **kwargs):
""" Default initialization """
# e.g. self.__errorLabel = errorLabel
# self.experiment = kwargs.get('experiment')
pass
def getExperiment(self):
""" Return a string with the experiment name """
# return self.experiment
return self.__experiment
def getJobExecutionCommand(self):
""" Define and test the command(s) that will be used to execute the payload """
# E.g. cmd = "source <path>/setup.sh; <path>/python <script>"
cmd = ""
return cmd
def getFileLookups(self):
""" Return the file lookup boolean """
return self.__doFileLookups
def doFileLookups(self, doFileLookups):
""" Update the file lookups boolean """
# Only implement this method if class really wants to update the __doFileLookups boolean
# ATLAS wants to implement this, but not CMS
# Method is used by Mover
# self.__doFileLookups = doFileLookups
pass
def willDoAlternativeFileLookups(self):
""" Should file lookups be done using alternative methods? """
# E.g. in the migration period where LFC lookups are halted in favour of other methods in the Rucio API
# (for ATLAS), this method could be useful. See the usage in Mover::getReplicaDictionary() which is called
# after Experiment::willDoFileLookups() defined above. The motivation is that direct LFC calls are not to be
# used any longer by the pilot, and in the migration period the actual LFC calls will be done in the Rucio
# API. Eventually this API will switch to alternative file lookups.
return False
def willDoFileLookups(self):
""" Should (LFC) file lookups be done by the pilot or not? """
return self.__doFileLookups
def willDoFileRegistration(self):
""" Should (LFC) file registration be done by the pilot or not? """
return False
def getFileCatalog(self):
""" Return the default file catalog to use (e.g. for replica lookups) """
# See usage in Mover.py
# e.g. 'lfc://prod-lfc-atlas.cern.ch:/grid/atlas'
return ""
# Additional optional methods
# These methods are optional and can be left as they are here, or modified according to special needs
def verifyProxy(self, envsetup="", limit=None):
""" Check for a valid voms/grid proxy longer than N hours """
# Use 'limit' to set required length
tolog("(verifyProxy() is not implemented)")
exitcode = 0
pilotErrorDiag = ""
return exitcode, pilotErrorDiag
def removeRedundantFiles(self, workdir):
""" Remove redundant files and directories """
# List of files and directories to be removed from work directory prior to log file creation
# Make sure that any large files or directories that are not wanted in the log file are included in this list
dir_list = [
"buildJob*",
"external",
"fort.*",
"home",
"python",
"share",
"workdir",
"*.py",
"*.pyc",
"*.root*",
"JEM",
"tmp*",
"*.tmp",
"*.TMP",
"scratch",
]
for _dir in dir_list:
files = glob(os.path.join(workdir, _dir))
rc = remove(files)
if not rc:
tolog("IGNORE: Failed to remove redundant file(s): %s" % (files))
def getPayloadName(self, job):
""" Set a suitable name for the payload stdout """
# The payload <name> gets translated into <name>_stdout.txt
# which is the name of the stdout file produced by the payload execution
# (essentially commands.getoutput("<setup>; <payload executable> [options] > <name>_stdout.txt"))
# The job object can be used to create more precise stdout names (see e.g. the ATLASExperiment implementation)
return "payload"
def isOutOfMemory(self, **kwargs):
""" Try to identify out of memory errors in the stderr/out """
return False
def getNumberOfEvents(self, **kwargs):
""" Return the number of events """
return 0
def specialChecks(self, **kwargs):
""" Implement special checks here """
# Return False if fatal failure, otherwise return True
# The pilot will abort if this method returns a False
# On an HPC system, it might be good to skip certain checks (e.g. CVMFS, LFC, etc). Refer to schedconfig.resourcetype, set to 'hpc' on an HPC queue
status = False
tolog("No special checks for \'%s\'" % (self.experiment))
return True # obviously change this to 'status' once implemented
def checkSpecialEnvVars(self, sitename):
""" Check special environment variables """
ec = 0
tolog("No special env var checks for site %s" % (sitename))
return ec
def setINDS(self, realDatasetsIn):
""" Extract the dataset as set by pathena option --inDS and set the INDS environmental variable """
# Needed by pathena (move to ATLASExperiment later)
inDS = ""
for ds in realDatasetsIn:
if "DBRelease" not in ds and ".lib." not in ds:
inDS = ds
break
if inDS != "":
tolog("Setting INDS env variable to: %s" % (inDS))
os.environ['INDS'] = inDS
else:
tolog("INDS unknown")
def getValidBaseURLs(self, order=None):
""" Return list of valid base URLs """
# if order is defined, return given item first
# e.g. order=http://atlpan.web.cern.ch/atlpan -> ['http://atlpan.web.cern.ch/atlpan', ...]
validBaseURLs = []
_validBaseURLs = ["http://www.usatlas.bnl.gov",\
"https://www.usatlas.bnl.gov",\
"http://pandaserver.cern.ch",\
"http://atlpan.web.cern.ch/atlpan",\
"https://atlpan.web.cern.ch/atlpan",\
"http://classis01.roma1.infn.it",\
"http://atlas-install.roma1.infn.it"]
if order:
validBaseURLs.append(order)
for url in _validBaseURLs:
if url != order:
validBaseURLs.append(url)
else:
validBaseURLs = _validBaseURLs
tolog("getValidBaseURLs will return: %s" % str(validBaseURLs))
return validBaseURLs
def downloadTrf(self, wgetCommand, jobTrf):
""" Download the trf """
status = False
pilotErrorDiag = ""
cmd = "%s %s" % (wgetCommand, jobTrf)
trial = 1
max_trials = 3
# try to download the trf a maximum of 3 times
while trial <= max_trials:
tolog("Executing command [Trial %d/%d]: %s" % (trial, max_trials, cmd))
ec, rets = commands.getstatusoutput(cmd)
if not rets:
rets = "(None)"
if ec != 0:
# Analyze exit code / output
from futil import check_syserr
check_syserr(ec, rets)
pilotErrorDiag = "wget command failed: %d, %s" % (ec, rets)
tolog("!!WARNING!!3000!! %s" % (pilotErrorDiag))
if trial == max_trials:
tolog("!!FAILED!!3000!! Could not download trf: %s" % (rets))
status = False
break
else:
tolog("Will try again after 60s..")
from time import sleep
sleep(60)
else:
pilotErrorDiag = ""
tolog("wget command returned: %s" % (rets))
status = True
break
trial += 1
return status, pilotErrorDiag
def getAnalysisTrf(self, wgetCommand, origTRF, pilot_initdir):
""" Get the trf to be used for analysis jobs """
pilotErrorDiag = ""
trfName = origTRF.split('/')[-1]
tolog("trfName = %s" % (trfName))
origBaseURL = ""
# Copy trf from pilot init dir if distributed with pilot code
fname = os.path.join(pilot_initdir, trfName)
status = False
if os.path.exists(fname):
from shutil import copy2
try:
copy2(fname, os.getcwd())
except Exception, e:
tolog("!!WARNING!!2999!! Could not copy trf from pilot init dir: %s" % str(e))
else:
tolog("Copied trf (%s) from pilot init dir" % (fname))
status = True
# Download trf
if not status:
# verify the base URL
for baseURL in self.getValidBaseURLs():
if origTRF.startswith(baseURL):
origBaseURL = baseURL
break
if origBaseURL == "":
pilotErrorDiag = "Invalid base URL: %s" % (origTRF)
return self.__error.ERR_TRFDOWNLOAD, pilotErrorDiag, ""
else:
tolog("Verified the trf base URL: %s" % (origBaseURL))
# try to download from the required location, if not - switch to backup
for baseURL in self.getValidBaseURLs(order=origBaseURL):
trf = re.sub(origBaseURL, baseURL, origTRF)
tolog("Attempting to download trf: %s" % (trf))
status, pilotErrorDiag = self.downloadTrf(wgetCommand, trf)
if status:
break
if not status:
return self.__error.ERR_TRFDOWNLOAD, pilotErrorDiag, ""
tolog("Successfully downloaded trf")
tolog("Changing permission of %s to 0755" % (trfName))
try:
os.chmod(trfName, 0755)
except Exception, e:
pilotErrorDiag = "Failed to chmod %s: %s" % (trfName, str(e))
return self.__error.ERR_CHMODTRF, pilotErrorDiag, ""
return 0, pilotErrorDiag, trfName
def getAnalysisRunCommand(self, job, jobSite, trfName):
""" Get the run command for analysis jobs """
# The run command is used to setup up the user job transform
ec = 0
pilotErrorDiag = ""
run_command = ""
return ec, pilotErrorDiag, run_command
def getFileTransferInfo(self, transferType, buildJob):
""" Get all relevant fields related to file transfer """
copysetup = readpar('copysetupin')
# create the direct access dictionary
fileTransferInfo = getDirectAccessDic(copysetup)
# if copysetupin did not contain direct access info, try the copysetup instead
if not fileTransferInfo:
copysetup = readpar('copysetup')
fileTransferInfo = getDirectAccessDic(copysetup)
# should the copytool be used?
useCopyTool = False
useFileStager = False
useDirectAccess = False
oldPrefix = ""
newPrefix = ""
dInfo = None
if fileTransferInfo:
dInfo = True
# no direct access / remote I/O, use standard copytool (copy-to-scratch)
if fileTransferInfo['useCopyTool']:
useCopyTool = True
# do not set the LFC host for file stager
if fileTransferInfo['useFileStager']:
useFileStager = True
if fileTransferInfo['directIn']:
useDirectAccess = True
oldPrefix = fileTransferInfo['oldPrefix']
newPrefix = fileTransferInfo['newPrefix']
# override settings for transferType direct
if transferType == 'direct':
useCopyTool = False
useFileStager = False
useDirectAccess = True
# should pilot create TURL based PFC? (not done here, but setup needs to be aware of it)
# if dInfo and useDirectAccess and oldPrefix == "" and newPrefix == "":
if (transferType == 'direct' or (useFileStager and useDirectAccess)) and (oldPrefix == "" and newPrefix == "") and not buildJob:
# if (transferType == 'direct' or (not useFileStager and useDirectAccess)) and (oldPrefix == "" and newPrefix == ""):
usePFCTurl = True
else:
usePFCTurl = False
# force usePFCTurl for all jobs
if not buildJob and useDirectAccess:
tolog("Forced usePFCTurl (reset old/newPrefix)")
usePFCTurl = True
oldPrefix = ""
newPrefix = ""
if os.environ.get("TestXRootD", 'False') == 'True':
import re
re.sub(r'\/xrootdsetup\.sh', '/xrootdsetup-dev.sh', copysetup)
return dInfo, useCopyTool, useDirectAccess, useFileStager, oldPrefix, newPrefix, copysetup, usePFCTurl
def getGuidsFromJobPars(self, jobPars, inputFiles, inFilesGuids):
""" Extract the correct guid from the input file list """
# the guids list is used for direct reading in an LFC environment
# 1. extract input file list for direct reading from jobPars
# 2. for each input file in this list, find the corresponding guid from the input file guid list
# since jobPars is entered by a human, the order of the input files might not be the same
guidList = []
jobPars = jobPars.replace("'","")
jobPars = jobPars.replace(", ",",")
pattern = re.compile(r'\-i \"\[([A-Za-z0-9.,_-]+)\]\"')
directReadingInputFiles = re.findall(pattern, jobPars)
inFiles = []
if directReadingInputFiles != []:
inFiles = directReadingInputFiles[0].split(",")
else:
match = re.search("-i ([A-Za-z0-9.\[\],_-]+) ", jobPars)
if match != None:
compactInFiles = match.group(1)
match = re.search('(.*)\[(.+)\](.*)\[(.+)\]', compactInFiles)
if match != None:
inputFiles = []
head = match.group(1)
tail = match.group(3)
body = match.group(2).split(',')
attr = match.group(4).split(',')
for idx in range(len(body)):
lfn = '%s%s%s%s' % (head, body[idx], tail, attr[idx])
inputFiles.append(lfn)
else:
inputFiles = [compactInFiles]
if inFiles != []:
for inFile in inFiles:
# get the corresponding index from the inputFiles list, which has the same order as inFilesGuids
try:
index = inputFiles.index(inFile)
except Exception, e:
tolog("!!WARNING!!2999!! Exception caught: %s (direct reading will fail)" % str(e))
else:
# add the corresponding guid to the list
guidList.append(inFilesGuids[index])
return guidList
def getMetadataForRegistration(self, guid):
""" Return metadata for [LFC] file registration """
# This method can insert special metadata into the metadata.xml file
# E.g. it can add preliminary XML tags for info that will only be known
# at a later time, such as "<metadata att_name="surl" att_value="%s-surltobeset"/>\n' % (guid)"
# The <guid>-surltobeset will be replaced by the pilot by the appropriate value once it is known
# Inputs:
# guid = file guid
# Returns:
# metadata string
# See e.g. the CMSExperiment implementation
# The method is called from pUtil::PFCxml() during metadata file creation
return ""
def getAttrForRegistration(self):
""" Return the attribute of the metadata XML to be updated with surl value """
# Used in combination with Experiment::getMetadataForRegistration()
# The attribute (default 'surl') will be copied into the metadata string used for pattern matching
# E.g. re.compile('\<metadata att\_name\=\"%s\" att\_value\=\"([a-zA-Z0-9-]+)\-surltobeset\"\/\>' % (attribute))
return 'surl'
def getExpSpecificMetadata(self, job, workdir):
""" Return experiment specific metadata """
# Inputs:
# job = PanDA pilot job object (see Job class)
# workdir = relevant work directory where the metadata is located
# Returns:
# metadata xml string
# See e.g. implementation in CMSExperiment
return ""
def getFileCatalogHosts(self):
""" Return a list of file catalog hosts """
# The method is used in combination with federated xrootd (FAX).
# In case FAX is allowed on a given site, the pilot might need to lookup
# replica information in more than one LFC catalog. Normally a site has only
# one LFC (as set in schedconfig.lfchost). Providing a list of hosts will increase
# the probability that FAX will succeed
# See e.g. ATLASExperiment implementation
return []
def verifySwbase(self, appdir):
""" Confirm existence of appdir/swbase """
# appdir/swbase is a queuedata parameter specifying the base location of physics analysis / release software
# This method will simply verify that the corresponding directory exists
#
# Input:
# appdir = application/software/release directory (e.g. /cvmfs/atlas.cern.ch/repo/sw)
# Return:
# error code (0 for success)
return 0
def interpretPayloadStdout(self, job, res, getstatusoutput_was_interrupted, current_job_number, runCommandList, failureCode):
""" Payload error interpretation and handling """
# NOTE: TODO, hide argument complexity with kwargs**
# This method can be used to interpret special errors that only occur in actual payload stdout, e.g. memory errors that have
# caused the payload to crash
#
# Inputs:
# job = PanDA pilot job object (see Job class)
# res =
# getstatusoutput_was_interrupted = True in case the payload execution command was aborted (e.g. keyboard CTRL-C)
# current_job_number = current job number, in case of multi-trf (ATLAS)
# runCommandList = list of payload execution commands (e.g. used by ATLAS to get to a setup file)
# failureCode = signal error code
# Returns:
# Updated PanDA pilot job objectwith proper payload error information, if needed
#
# The following Job attributes can be updated here
# result = tuple of size 3 that contain the standard error info: result[0] = current job status (e.g. failed, finished, holding),
# result[1] = payload error code, result[2] = PanDA pilot error code
# pilotErrorDiag = error diagnostics (string of up to 256 characters that will appear on the PanDA monitor job web page for a failed job)
# exeError
return job
def getSubprocessName(self, eventService):
""" Select which subprocess is to be run by the Monitor """
# The default subprocess is RunJob (name='Normal', which performs payload setup, stage-in, payload execution and stage-out).
# An alternative subprocess is the runEvent module which downloads events from an Event Server, executes a payload
# and stages ou output files asynchronously as they are ready.
# Note: send the entire job object to this method since there might be other subprocesses created at a later time which
# will be identified by this method using some other job data member
# Default subprocess name
name = "RunJob"
# Select alternative subprocess names for HPCs
isHPC, _name = extractHPCInfo(readpar('catchall'))
if isHPC:
name = "RunJob" + _name # e.g. "RunJobTitan" is the proper subprocess name for the Titan plug-in
# for es merge jobs
if _name and _name.startswith("Hpc"):
name = "RunJob"
# Are we going to run an event service job?
if eventService:
tolog("Encountered an event service job")
if isHPC:
name = "RunJob%sEvent" % (_name)
else:
name = "RunJobEvent"
tolog("Selected subprocess: %s" % (name))
return name
def getSubprocessArguments(self, env, port, subprocessName="RunJob"):
""" Argument list needed to launch the subprocess by the pilot/Monitor """
# The pilot/Monitor is forking a subprocess which will be monitored for work dir size, hanging processes etc
# This method returns the arguments needed to execute the subprocess (python <subprocess name> <arguments>)
# By default the pilot has implementations for RunJob.py (standard job) and RunJobEvent.py (event server job)
# If a new subprocess module is added, it startup arguments need to be specified here
jobargs = None
tolog("Will set up subprocess arguments for type: %s" % (subprocessName))
url = '%s:%s/server/panda' % (env['pshttpurl'], str(env['psport']))
if subprocessName == "RunJobEvent":
jobargs = [env['pyexe'], "RunJobEvent.py",
"-a", env['thisSite'].appdir,
"-b", env['queuename'],
"-d", env['jobDic']["prod"][1].workdir,
"-g", env['inputDir'],
"-i", env['jobDic']["prod"][1].tarFileGuid,
"-k", getPilotlogFilename(),
"-l", env['pilot_initdir'],
"-m", env['outputDir'],
"-o", env['thisSite'].workdir,
"-p", str(port),
"-s", env['thisSite'].sitename,
"-t", str(env['proxycheckFlag']),
"-x", str(env['stageinretry']),
"-E", str(env['stageoutretry']),
"-F", env['experiment'],
"-H", env['cache'],
"-W", url]
else:
jobargs = [env['pyexe'], "%s.py" % (subprocessName),
"-a", env['thisSite'].appdir,
"-b", env['queuename'],
"-d", env['jobDic']["prod"][1].workdir,
"-g", env['inputDir'],
"-i", env['jobDic']["prod"][1].tarFileGuid,
"-k", getPilotlogFilename(),
"-l", env['pilot_initdir'],
"-m", env['outputDir'],
"-o", env['thisSite'].workdir,
"-p", str(port),
"-s", env['thisSite'].sitename,
"-t", str(env['proxycheckFlag']),
"-x", str(env['stageinretry']),
"-E", str(env['stageoutretry']),
"-F", env['experiment'],
"-H", env['cache'],
"-W", url]
if 'yodaNodes' in env and subprocessName == "RunJobHpcEvent":
jobargs.append("-N")
jobargs.append(str(env['yodaNodes']))
if 'yodaQueue' in env and subprocessName == "RunJobHpcEvent":
jobargs.append("-Q")
jobargs.append(str(env['yodaQueue']))
tolog("Will use arguments: %s" % str(jobargs))
return jobargs
# Optional
def doSpecialLogFileTransfer(self, **argdict):
""" Should the log file be transfered to a special SE? """
# The log file can at the end of the job be stored in a special SE - in addition to the normal stage-out of the log file
# If this method returns True, the JobLog class will attempt to store the log file in a secondary SE after the transfer of
# the log to the primary/normal SE. Additional information about the secondary SE is required and can be specified in
# another optional method defined in the *Experiment classes
# eventService = argdict.get('eventService', False)
return False
# Optional
def getSchedconfigURL(self, protocol="http://"):
""" Define the URL for the schedconfig / PanDA server"""
# This method gets called from SiteInformation in case the URL is not set (by the wrapper)
return protocol + "pandaserver.cern.ch"
# Optional
def getSubprocess(self, cmd, stdout=None, stderr=None):
""" Execute and return a subprocess """
process = None
try:
tolog("Executing command: %s" % (cmd))
if stdout and stderr:
# use stdout/stdout file objects to redirect the stdout/stderr streams
process = Popen(cmd, shell=True, stdout=stdout, stderr=stderr, preexec_fn=os.setsid)
else:
process = Popen(cmd, shell=True)
except Exception, e:
tolog("!!WARNING!!2344!! Caught exception: %s" % (e))
else:
tolog("Subprocess is running")
return process
# Optional
def getJobExecutionCommand4EventService(self):
""" Define and test the command(s) that will be used to execute the payload for the event service """
# E.g. cmd = ["source <path>/setup.sh; <path>/python <script>"]
# The command returned from this method is executed using subprocess.Popen() from the runEvent module
# Note: this optional method only need to be defined in case the event service is to be used
# As of March 2014, this is not yet functional or documented.
# The actual command must be declared as a list since that is expected by Popen()
cmd = [""]
return cmd
# Optional
def postGetJobActions(self, job):
""" Perform any special post-job definition download actions here """
# This method is called after the getJob() method has successfully downloaded a new job (job definition) from
# the server. If the job definition e.g. contains information that contradicts WN specifics, this method can
# be used to fail the job
# Return any error code using ec, and any error message using pilotErrorDiag
ec = 0
pilotErrorDiag = ""
return ec, pilotErrorDiag
# Optional
def useTracingService(self):
return False
# Optional
def updateJobSetupScript(self, workdir, create=False, to_script=None):
""" Create or update the job setup script (used to recreate the job locally if needed) """
# If create=True, this step will only create the file with the script header (bash info)
if create:
filename = os.path.basename(self.getJobSetupScriptName(workdir))
tolog("Creating job setup script with stage-in and payload execution commands: %s" % (filename))
to_script = "#!/bin/bash\n# %s %s\n\n" % (filename, time.strftime("%d %b %Y %H:%M:%S", time.gmtime(time.time())))
# Add the string to the setup script
if to_script:
self.addToJobSetupScript(to_script, workdir)
# Optional
def getJobSetupScriptName(self, workdir):
""" return the name of the job setup file """
return os.path.join(workdir, "job_setup.sh")
# Optional
def addToJobSetupScript(self, cmd, workdir):
""" add/append command to job setup file """
filename = self.getJobSetupScriptName(workdir)
if not os.path.exists(filename):
try:
fp = open(filename, "w")
except OSError, e:
tolog("!!WARNING!!1880!! Could not open job setup file for writing: %s" % str(e))
else:
try:
fp = open(filename, "a")
except OSError, e:
tolog("!!WARNING!!1880!! Could not open job setup file for appending: %s" % str(e))
if fp:
fp.write(cmd)
fp.write("\n\n")
fp.close()
tolog("Updated %s: %s" % (filename, cmd))
# Optional
def getRelease(self, release):
""" Return a list of the software release id's """
# Assuming 'release' is a string that separates release id's with '\n'
# Used in the case of payload using multiple steps with different release versions
# E.g. release = "19.0.0\n19.1.0" -> ['19.0.0', '19.1.0']
return release.split("\n")
# Optional
def formatReleaseString(release):
""" Return a special formatted release string """
# E.g. release = "Atlas-19.0.0" -> "19.0.0"
# This method is required for ATLAS but is probably of no interest for any other PanDA user
return release
# Optional
def setCache(self, cache):
""" Cache URL """
# Used e.g. by LSST
self.__cache = cache
# Optional
def getCache(self):
""" Return the cache URL """
# Used e.g. by LSST
return self.__cache
# Optional
def useTracingService(self):
""" Use the Rucio Tracing Service """
# A service provided by the Rucio system that allows for file transfer tracking; all file transfers
# are reported by the pilot to the Rucio Tracing Service if this method returns True
return False
# Optional
def updateJobDefinition(self, job, filename):
""" Update the job definition file and object before using it in RunJob """
# This method is called from Monitor, before RunJob is launched, which allows to make changes to the job object after it was downloaded from the job dispatcher
# (used within Monitor) and the job definition file (which is used from RunJob to recreate the same job object as is used in Monitor).
# 'job' is the job object, defined in Job.py, while 'filename' is the name of the file containing the job definition information.
return job
# Optional
def shouldExecuteUtility(self):
""" Determine whether a special utility should be executed """
# The RunJob class has the possibility to execute a special utility, e.g. a memory monitor, that runs in parallel
# to the payload (launched after the main payload process).
# The utility is executed if this method returns True. The utility is currently expected to produce
# a summary JSON file whose name is defined by the getUtilityJSONFilename() method. The contents of
# this file (ie. the full JSON dictionary) will be added to the job update.
#
# Example of summary JSON file (ATLAS case):
# {"Max":{"maxVMEM":40058624,"maxPSS":10340177,"maxRSS":16342012,"maxSwap":16235568},
# "Avg":{"avgVMEM":19384236,"avgPSS":5023500,"avgRSS":6501489,"avgSwap":5964997}}
#
# While running, the MemoryMonitor also produces a regularly updated text file with the following format: (tab separated)
# Time VMEM PSS RSS Swap (first line in file)
# 1447960494 16099644 3971809 6578312 1978060
return False
# Optional
def getUtilityOutputFilename(self):
""" Return the filename of a utility output file """
# For explanation, see shouldExecuteUtility()
return "memory_monitor_output.txt"
# Optional
def getUtilityJSONFilename(self):
""" Return the filename of a utility JSON file """
# For explanation, see shouldExecuteUtility()
return "utility_summary.json"
# Optional
def getUtilityInfo(self, workdir, pilot_initdir, allowTxtFile=False):
""" Add the utility info to the node structure if available """
# Extract the relevant information from the utility tool output and add it to the dictionary
# returned by this method. The dictionary will be merged with the node dictionary in
# PandaServerClient::getNodeStructure() and sent to the PanDA server
return {}
# Optional
def getUtilityCommand(self, **argdict):
""" Prepare a utility command string """
# This method can be used to prepare a setup string for an optional utility tool, e.g. a memory monitor,
# that will be executed by the pilot in parallel with the payload.
# The pilot will look for an output JSON file (summary.json) and will extract pre-determined fields
# from it and report them with the job updates. Currently the pilot expects to find fields related
# to memory information.
# pid = argdict.get('pid', 0)
return ""
# Optional
def getGUIDSourceFilename(self):
""" Return the filename of the file containing the GUIDs for the output files """
# In the case of ATLAS, Athena produces an XML file containing the GUIDs of the output files. The name of this
# file is PoolFileCatalog.xml. If this method returns an empty string (ie the default), the GUID generation will
# be done by the pilot in RunJobUtilities::getOutFilesGuids()
return ""
# Optional
def buildFAXPath(self, **argdict):
""" Build a proper FAX path """
# This method builds proper FAX paths and is used in pure FAX mode (i.e. when FAX is used in forced mode),
# particularly when the PoolFileCatalog.xml is built prior to stage-in
# Only needed if FAX mechanism is used in forced mode (i.e. when copytoolin='fax')
lfn = argdict.get('lfn', 'default_lfn')
scope = argdict.get('scope', 'default_scope')
subpath = argdict.get('subpath', 'atlas/rucio/')
pandaID = argdict.get('pandaID', '')
sourceSite = argdict.get('sourceSite', 'default_sourcesite')
computingSite = argdict.get('computingSite', 'default_computingsite')
# Get the proper FAX redirector (default ATLAS implementation)
from FAXTools import getFAXRedirectors
# First get the global redirectors (several, since the lib file might not be at the same place for overflow jobs)
fax_redirectors_dictionary = getFAXRedirectors(computingSite, sourceSite, pandaID)
tolog("fax_redirectors_dictionary=%s"%str(fax_redirectors_dictionary))
# select the proper fax redirector
if ".lib." in lfn:
redirector = fax_redirectors_dictionary['computingsite']
else:
redirector = fax_redirectors_dictionary['sourcesite']
# Make sure the redirector ends with a double slash
if not redirector.endswith('//'):
if redirector.endswith('/'):
redirector += "/"
else:
redirector += "//"
# Make sure that the subpath does not begin with a slash
if subpath.startswith('/') and len(subpath) > 1:
subpath = subpath[1:]
tolog("redirector=%s"%(redirector))
tolog("subpath=%s"%(subpath))
tolog("scope=%s"%(scope))
tolog("lfn=%s"%(lfn))
return redirector + subpath + scope + ":" + lfn
if __name__ == "__main__":
a=Experiment()
print a.getSubprocessName(False)