forked from PanDAWMS/pilot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventRanges.py
107 lines (83 loc) · 3.68 KB
/
EventRanges.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#
import json
import os
from pUtil import httpConnect, tolog
def downloadEventRanges(jobId, jobsetID, taskID, numRanges=10):
""" Download event ranges from the Event Server """
# Return the server response (instruction to AthenaMP)
# Note: the returned message is a string (of a list of dictionaries). If it needs to be converted back to a list, use json.loads(message)
tolog("Downloading new event ranges for jobId=%s, taskID=%s and jobsetID=%s" % (jobId, taskID, jobsetID))
# message = "[{u'lastEvent': 2, u'LFN': u'mu_E50_eta0-25.evgen.pool.root',u'eventRangeID': u'130-2068634812-21368-1-1', u'startEvent': 2, u'GUID':u'74DFB3ED-DAA7-E011-8954-001E4F3D9CB1'}]"
message = ""
# url = "https://aipanda007.cern.ch:25443/server/panda"
url = "https://pandaserver.cern.ch:25443/server/panda"
node = {}
node['pandaID'] = jobId
node['jobsetID'] = jobsetID
node['taskID'] = taskID
node['nRanges'] = numRanges
# open connection
ret = httpConnect(node, url, path=os.getcwd(), mode="GETEVENTRANGES")
response = ret[1]
if ret[0]: # non-zero return code
message = "Failed to download event range - error code = %d" % (ret[0])
else:
message = response['eventRanges']
if message == "" or message == "[]":
message = "No more events"
return message
def updateEventRange(event_range_id, eventRangeList, jobId, status='finished', os_bucket_id=-1):
""" Update an list of event ranges on the Event Server """
tolog("Updating an event range..")
# PanDA dev server: url = "https://aipanda007.cern.ch:25443/server/panda"
url = "https://pandaserver.cern.ch:25443/server/panda"
node = {}
node['eventRangeID'] = event_range_id
if os_bucket_id != -1:
node['objstoreID'] = os_bucket_id
if eventRangeList != []:
pass
# node['cpu'] = eventRangeList[1]
# node['wall'] = eventRangeList[2]
node['eventStatus'] = status
# open connection
ret = httpConnect(node, url, path=os.getcwd(), mode="UPDATEEVENTRANGE")
# response = ret[1]
if ret[0]: # non-zero return code
message = "Server responded with error code = %d" % (ret[0])
else:
# is there an instruction in the back channel?
data = ret[1]
tolog("data=%s"%str(data))
from json import loads
try:
d = loads(data['Command'])
except Exception, e:
tolog("No message found in updateEventRange back channel: %s" % (e))
message = ""
else:
# does the returned dictionary contain any kill instructions?
# {PandaID: u'tobekilled'} for normal kill
# {PandaID: u'softkill'} for soft kill
message = d.get(jobId, "")
return message
def updateEventRanges(event_ranges):
""" Update an event range on the Event Server """
tolog("Updating event ranges..")
message = ""
#url = "https://aipanda007.cern.ch:25443/server/panda"
url = "https://pandaserver.cern.ch:25443/server/panda"
# eventRanges = [{'eventRangeID': '4001396-1800223966-4426028-1-2', 'eventStatus':'running'}, {'eventRangeID': '4001396-1800223966-4426028-2-2','eventStatus':'running'}]
node={}
node['eventRanges']=json.dumps(event_ranges)
# open connection
ret = httpConnect(node, url, path='.', mode="UPDATEEVENTRANGES")
# response = json.loads(ret[1])
status = ret[0]
if ret[0]: # non-zero return code
message = "Failed to update event range - error code = %d, error: " % (ret[0], ret[1])
else:
response = json.loads(json.dumps(ret[1]))
status = int(response['StatusCode'])
message = json.dumps(response['Returns'])
return status, message