Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Aug 14, 2020
2 parents 157cbdf + 56179a4 commit f308fa0
Show file tree
Hide file tree
Showing 23 changed files with 602 additions and 195 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "06-07-2020 11:53:08 on release (by fahui)"
timestamp = "14-08-2020 09:04:50 on release (by fahui)"
3 changes: 2 additions & 1 deletion pandaharvester/harvesterbody/event_feeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def run(self):
# get events
tmpLog.debug('get events')
tmpStat, events = self.communicator.get_event_ranges(workSpec.eventsRequestParams,
scattered)
scattered,
workSpec.get_access_point())
# failed
if tmpStat is False:
tmpLog.error('failed to get events with {0}'.format(events))
Expand Down
6 changes: 4 additions & 2 deletions pandaharvester/harvesterbody/propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def run(self):
PilotErrors.pilotError[PilotErrors.ERR_PANDAKILL])
tmpJobSpec.stateChangeTime = datetime.datetime.utcnow()
tmpJobSpec.trigger_propagation()
self.dbProxy.update_job(tmpJobSpec, {'propagatorLock': self.get_pid()})
self.dbProxy.update_job(tmpJobSpec, {'propagatorLock': self.get_pid()},
update_out_file=True)
else:
mainLog.error('failed to update PandaID={0} status={1}'.format(tmpJobSpec.PandaID,
tmpJobSpec.status))
Expand Down Expand Up @@ -203,7 +204,8 @@ def run(self):
# get latest metrics from DB
service_metrics_list = self.dbProxy.get_service_metrics(self._last_metrics_update)
if not service_metrics_list:
mainLog.error('failed to get service metrics')
if self._last_metrics_update:
mainLog.error('failed to get service metrics')
self._last_metrics_update = datetime.datetime.utcnow()
else:
tmp_ret, tmp_str = self.communicator.update_service_metrics(service_metrics_list)
Expand Down
13 changes: 12 additions & 1 deletion pandaharvester/harvesterbody/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ def run(self):
# events
if len(okJobs) > 0 and \
('eventService' in okJobs[0].jobParams or
'cloneJob' in okJobs[0].jobParams):
'cloneJob' in okJobs[0].jobParams or
'isHPO' in okJobs[0].jobParams):
work_spec.eventsRequest = WorkSpec.EV_useEvents
work_specList.append(work_spec)
if len(work_specList) > 0:
Expand Down Expand Up @@ -364,6 +365,16 @@ def run(self):
'nRanges': max(int(math.ceil(work_spec.nCore / len(jobList))),
job_spec.jobParams['coreCount']),
}
if 'isHPO' in job_spec.jobParams:
if 'sourceURL' in job_spec.jobParams:
sourceURL = job_spec.jobParams['sourceURL']
else:
sourceURL = None
eventsRequestParams[job_spec.PandaID].update(
{'isHPO': True,
'jobsetID': 0,
'sourceURL': sourceURL
})
work_spec.eventsRequestParams = eventsRequestParams
# register worker
tmpStat = self.dbProxy.register_worker(work_spec, jobList, locked_by)
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/harvestercommunicator/base_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def update_jobs(self, jobspec_list, id):
return [{'StatusCode': 0, 'ErrorDiag': '', 'command': ''}] * len(jobspec_list)

# get events
def get_event_ranges(self, data_map, scattered):
def get_event_ranges(self, data_map, scattered, base_path):
return True, {}

# update events
Expand Down
156 changes: 146 additions & 10 deletions pandaharvester/harvestercommunicator/panda_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ssl.HAS_SNI = False
except Exception:
pass
import os
import sys
import json
import pickle
Expand All @@ -25,6 +26,7 @@
pass
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestermisc import idds_utils
from pandacommon.pandautils.net_utils import get_http_adapter_with_random_dns_resolution

from .base_communicator import BaseCommunicator
Expand Down Expand Up @@ -76,7 +78,7 @@ def post(self, path, data):
return False, errMsg

# POST with https
def post_ssl(self, path, data, cert=None):
def post_ssl(self, path, data, cert=None, base_url=None):
try:
tmpLog = None
if self.verbose:
Expand All @@ -85,7 +87,9 @@ def post_ssl(self, path, data, cert=None):
tmpExec = inspect.stack()[1][3]
tmpExec += '/'
tmpExec = str(uuid.uuid4())
url = '{0}/{1}'.format(harvester_config.pandacon.pandaURLSSL, path)
if base_url is None:
base_url = harvester_config.pandacon.pandaURLSSL
url = '{0}/{1}'.format(base_url, path)
if self.verbose:
tmpLog.debug('exec={0} URL={1} data={2}'.format(tmpExec, url, str(data)))
if cert is None:
Expand Down Expand Up @@ -115,7 +119,7 @@ def post_ssl(self, path, data, cert=None):
return False, errMsg

# PUT with https
def put_ssl(self, path, files, cert=None):
def put_ssl(self, path, files, cert=None, base_url=None):
try:
tmpLog = None
tmpExec = None
Expand All @@ -125,7 +129,9 @@ def put_ssl(self, path, files, cert=None):
tmpExec = inspect.stack()[1][3]
tmpExec += '/'
tmpExec = str(uuid.uuid4())
url = '{0}/{1}'.format(harvester_config.pandacon.pandaCacheURL_W, path)
if base_url is None:
base_url = harvester_config.pandacon.pandaCacheURL_W
url = '{0}/{1}'.format(base_url, path)
if self.verbose:
tmpLog.debug('exec={0} URL={1} files={2}'.format(tmpExec, url, files['file'][0]))
if cert is None:
Expand Down Expand Up @@ -203,6 +209,17 @@ def update_jobs(self, jobspec_list, id):
tmpLogG = self.make_logger('id={0}'.format(id), method_name='update_jobs')
tmpLogG.debug('update {0} jobs'.format(len(jobspec_list)))
retList = []
# upload checkpoints
for jobSpec in jobspec_list:
if jobSpec.outFiles:
tmpLogG.debug('upload {0} checkpoint files for PandaID={1}'.format(len(jobSpec.outFiles),
jobSpec.PandaID))
for fileSpec in jobSpec.outFiles:
if 'sourceURL' in jobSpec.jobParams:
tmpS = self.upload_checkpoint(jobSpec.jobParams['sourceURL'], jobSpec.taskID,
jobSpec.PandaID, fileSpec.lfn, fileSpec.path)
if tmpS:
fileSpec.status = 'done'
# update events
for jobSpec in jobspec_list:
eventRanges, eventSpecs = jobSpec.to_event_data(max_events=10000)
Expand Down Expand Up @@ -284,7 +301,7 @@ def update_jobs(self, jobspec_list, id):
return retList

# get events
def get_event_ranges(self, data_map, scattered):
def get_event_ranges(self, data_map, scattered, base_path):
retStat = False
retVal = dict()
try:
Expand All @@ -301,6 +318,16 @@ def get_event_ranges(self, data_map, scattered):
nRanges = 1
if scattered:
data['scattered'] = True
if 'isHPO' in data:
isHPO = data['isHPO']
del data['isHPO']
else:
isHPO = False
if 'sourceURL' in data:
sourceURL = data['sourceURL']
del data['sourceURL']
else:
sourceURL = None
tmpLog.debug('start nRanges={0}'.format(nRanges))
while nRanges > 0:
# use a small chunk size to avoid timeout
Expand All @@ -314,14 +341,35 @@ def get_event_ranges(self, data_map, scattered):
tmpDict = tmpRes.json()
if tmpDict['StatusCode'] == 0:
retStat = True
if data['pandaID'] not in retVal:
retVal[data['pandaID']] = []
retVal[data['pandaID']] += tmpDict['eventRanges']
retVal.setdefault(data['pandaID'], [])
if not isHPO:
retVal[data['pandaID']] += tmpDict['eventRanges']
else:
for event in tmpDict['eventRanges']:
event_id = event['eventRangeID']
task_id = event_id.split('-')[0]
point_id = event_id.split('-')[3]
# get HP point
tmpSI, tmpOI = idds_utils.get_hp_point(harvester_config.pandacon.iddsURL,
task_id, point_id,
tmpLog, self.verbose)
if tmpSI:
event['hp_point'] = tmpOI
# get checkpoint
if sourceURL:
tmpSO, tmpOO = self.download_checkpoint(sourceURL, task_id,
data['pandaID'],
point_id, base_path)
if tmpSO:
event['checkpoint'] = tmpOO
retVal[data['pandaID']].append(event)
else:
core_utils.dump_error_message(tmpLog, tmpOI)
# got empty
if len(tmpDict['eventRanges']) == 0:
break
except Exception:
core_utils.dump_error_message(tmpLog, tmpRes)
core_utils.dump_error_message(tmpLog)
break
nRanges -= chunkSize
tmpLog.debug('done with {0}'.format(str(retVal)))
Expand All @@ -330,6 +378,33 @@ def get_event_ranges(self, data_map, scattered):
# update events
def update_event_ranges(self, event_ranges, tmp_log):
tmp_log.debug('start update_event_ranges')

# loop over for HPO
for item in event_ranges:
new_event_ranges = []
for event in item['eventRanges']:
# report loss to idds
if 'loss' in event:
event_id = event['eventRangeID']
task_id = event_id.split('-')[0]
point_id = event_id.split('-')[3]
tmpSI, tmpOI = idds_utils.update_hp_point(harvester_config.pandacon.iddsURL,
task_id, point_id, event['loss'],
tmp_log, self.verbose)
if not tmpSI:
core_utils.dump_error_message(tmp_log, tmpOI)
tmp_log.error('skip {0} since cannot update iDDS'.format(event_id))
continue
else:
# clear checkpoint
if 'sourceURL' in item:
tmpSC, tmpOC = self.clear_checkpoint(item['sourceURL'], task_id, point_id)
if not tmpSC:
core_utils.dump_error_message(tmp_log, tmpOC)
del event['loss']
new_event_ranges.append(event)
item['eventRanges'] = new_event_ranges
# update in panda
data = {}
data['eventRanges'] = json.dumps(event_ranges)
data['version'] = 1
Expand Down Expand Up @@ -711,4 +786,65 @@ def update_service_metrics(self, service_metrics_list):
tmp_log.error('conversion failure from {0}'.format(tmp_res.text))
if tmp_stat:
tmp_log.debug('done with {0}:{1}'.format(tmp_stat, err_str))
return tmp_stat, err_str
return tmp_stat, err_str

# upload checkpoint
def upload_checkpoint(self, base_url, task_id, panda_id, file_name, file_path):
tmp_log = self.make_logger('taskID={0} pandaID={1}'.format(task_id, panda_id),
method_name='upload_checkpoint')
tmp_log.debug('start for {0}'.format(file_name))
try:
files = {'file': (file_name, open(file_path).read())}
tmpStat, tmpRes = self.put_ssl('server/panda/put_checkpoint', files, base_url=base_url)
if tmpStat is False:
core_utils.dump_error_message(tmp_log, tmpRes)
else:
tmp_log.debug('got {0}'.format(tmpRes.text))
return tmpStat
except Exception:
core_utils.dump_error_message(tmp_log)
return False

# download checkpoint
def download_checkpoint(self, base_url, task_id, panda_id, point_id, base_path):
tmp_log = self.make_logger('taskID={0} pandaID={1}'.format(task_id, panda_id),
method_name='download_checkpoint')
tmp_log.debug('start for ID={0}'.format(point_id))
try:
path = 'cache/hpo_cp_{0}_{1}'.format(task_id, point_id)
tmpStat, tmpRes = self.post_ssl(path, {}, base_url=base_url)
file_name = None
if tmpStat is False:
core_utils.dump_error_message(tmp_log, tmpRes)
else:
file_name = os.path.join(base_path, str(uuid.uuid4()))
with open(file_name, 'w') as f:
f.write(tmpRes.content)
tmp_log.debug('got {0}'.format(file_name))
return tmpStat, file_name
except Exception:
core_utils.dump_error_message(tmp_log)
return False, None

# clear checkpoint
def clear_checkpoint(self, base_url, task_id, point_id):
tmp_log = self.make_logger('taskID={0} pointID={1}'.format(task_id, point_id),
method_name='clear_checkpoints')
data = dict()
data['task_id'] = task_id
data['sub_id'] = point_id
tmp_log.debug('start')
tmpStat, tmpRes = self.post_ssl('server/panda/delete_checkpoint', data, base_url=base_url)
retMap = None
if tmpStat is False:
core_utils.dump_error_message(tmp_log, tmpRes)
else:
try:
retMap = tmpRes.json()
except Exception:
core_utils.dump_error_message(tmp_log)
if retMap is None:
retMap = {}
retMap['StatusCode'] = 999
tmp_log.debug('done with {0}'.format(str(retMap)))
return retMap
Loading

0 comments on commit f308fa0

Please sign in to comment.