diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 546b40f9..6780dbd4 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "06-07-2020 11:53:08 on release (by fahui)" +timestamp = "14-08-2020 09:04:50 on release (by fahui)" diff --git a/pandaharvester/harvesterbody/event_feeder.py b/pandaharvester/harvesterbody/event_feeder.py index c941b468..e358897f 100644 --- a/pandaharvester/harvesterbody/event_feeder.py +++ b/pandaharvester/harvesterbody/event_feeder.py @@ -58,7 +58,8 @@ def run(self): # get events tmpLog.debug('get events') tmpStat, events = self.communicator.get_event_ranges(workSpec.eventsRequestParams, - scattered) + scattered, + workSpec.get_access_point()) # failed if tmpStat is False: tmpLog.error('failed to get events with {0}'.format(events)) diff --git a/pandaharvester/harvesterbody/propagator.py b/pandaharvester/harvesterbody/propagator.py index 22f7d7b1..2396591d 100644 --- a/pandaharvester/harvesterbody/propagator.py +++ b/pandaharvester/harvesterbody/propagator.py @@ -116,7 +116,8 @@ def run(self): PilotErrors.pilotError[PilotErrors.ERR_PANDAKILL]) tmpJobSpec.stateChangeTime = datetime.datetime.utcnow() tmpJobSpec.trigger_propagation() - self.dbProxy.update_job(tmpJobSpec, {'propagatorLock': self.get_pid()}) + self.dbProxy.update_job(tmpJobSpec, {'propagatorLock': self.get_pid()}, + update_out_file=True) else: mainLog.error('failed to update PandaID={0} status={1}'.format(tmpJobSpec.PandaID, tmpJobSpec.status)) @@ -203,7 +204,8 @@ def run(self): # get latest metrics from DB service_metrics_list = self.dbProxy.get_service_metrics(self._last_metrics_update) if not service_metrics_list: - mainLog.error('failed to get service metrics') + if self._last_metrics_update: + mainLog.error('failed to get service metrics') self._last_metrics_update = datetime.datetime.utcnow() else: tmp_ret, tmp_str = self.communicator.update_service_metrics(service_metrics_list) diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index 53bc4719..4f2b3580 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -251,7 +251,8 @@ def run(self): # events if len(okJobs) > 0 and \ ('eventService' in okJobs[0].jobParams or - 'cloneJob' in okJobs[0].jobParams): + 'cloneJob' in okJobs[0].jobParams or + 'isHPO' in okJobs[0].jobParams): work_spec.eventsRequest = WorkSpec.EV_useEvents work_specList.append(work_spec) if len(work_specList) > 0: @@ -364,6 +365,16 @@ def run(self): 'nRanges': max(int(math.ceil(work_spec.nCore / len(jobList))), job_spec.jobParams['coreCount']), } + if 'isHPO' in job_spec.jobParams: + if 'sourceURL' in job_spec.jobParams: + sourceURL = job_spec.jobParams['sourceURL'] + else: + sourceURL = None + eventsRequestParams[job_spec.PandaID].update( + {'isHPO': True, + 'jobsetID': 0, + 'sourceURL': sourceURL + }) work_spec.eventsRequestParams = eventsRequestParams # register worker tmpStat = self.dbProxy.register_worker(work_spec, jobList, locked_by) diff --git a/pandaharvester/harvestercommunicator/base_communicator.py b/pandaharvester/harvestercommunicator/base_communicator.py index ed77197c..5bca53de 100644 --- a/pandaharvester/harvestercommunicator/base_communicator.py +++ b/pandaharvester/harvestercommunicator/base_communicator.py @@ -32,7 +32,7 @@ def update_jobs(self, jobspec_list, id): return [{'StatusCode': 0, 'ErrorDiag': '', 'command': ''}] * len(jobspec_list) # get events - def get_event_ranges(self, data_map, scattered): + def get_event_ranges(self, data_map, scattered, base_path): return True, {} # update events diff --git a/pandaharvester/harvestercommunicator/panda_communicator.py b/pandaharvester/harvestercommunicator/panda_communicator.py index 6b56000d..bfb05aa0 100644 --- a/pandaharvester/harvestercommunicator/panda_communicator.py +++ b/pandaharvester/harvestercommunicator/panda_communicator.py @@ -8,6 +8,7 @@ ssl.HAS_SNI = False except Exception: pass +import os import sys import json import pickle @@ -25,6 +26,7 @@ pass from pandaharvester.harvestercore import core_utils from pandaharvester.harvesterconfig import harvester_config +from pandaharvester.harvestermisc import idds_utils from pandacommon.pandautils.net_utils import get_http_adapter_with_random_dns_resolution from .base_communicator import BaseCommunicator @@ -76,7 +78,7 @@ def post(self, path, data): return False, errMsg # POST with https - def post_ssl(self, path, data, cert=None): + def post_ssl(self, path, data, cert=None, base_url=None): try: tmpLog = None if self.verbose: @@ -85,7 +87,9 @@ def post_ssl(self, path, data, cert=None): tmpExec = inspect.stack()[1][3] tmpExec += '/' tmpExec = str(uuid.uuid4()) - url = '{0}/{1}'.format(harvester_config.pandacon.pandaURLSSL, path) + if base_url is None: + base_url = harvester_config.pandacon.pandaURLSSL + url = '{0}/{1}'.format(base_url, path) if self.verbose: tmpLog.debug('exec={0} URL={1} data={2}'.format(tmpExec, url, str(data))) if cert is None: @@ -115,7 +119,7 @@ def post_ssl(self, path, data, cert=None): return False, errMsg # PUT with https - def put_ssl(self, path, files, cert=None): + def put_ssl(self, path, files, cert=None, base_url=None): try: tmpLog = None tmpExec = None @@ -125,7 +129,9 @@ def put_ssl(self, path, files, cert=None): tmpExec = inspect.stack()[1][3] tmpExec += '/' tmpExec = str(uuid.uuid4()) - url = '{0}/{1}'.format(harvester_config.pandacon.pandaCacheURL_W, path) + if base_url is None: + base_url = harvester_config.pandacon.pandaCacheURL_W + url = '{0}/{1}'.format(base_url, path) if self.verbose: tmpLog.debug('exec={0} URL={1} files={2}'.format(tmpExec, url, files['file'][0])) if cert is None: @@ -203,6 +209,17 @@ def update_jobs(self, jobspec_list, id): tmpLogG = self.make_logger('id={0}'.format(id), method_name='update_jobs') tmpLogG.debug('update {0} jobs'.format(len(jobspec_list))) retList = [] + # upload checkpoints + for jobSpec in jobspec_list: + if jobSpec.outFiles: + tmpLogG.debug('upload {0} checkpoint files for PandaID={1}'.format(len(jobSpec.outFiles), + jobSpec.PandaID)) + for fileSpec in jobSpec.outFiles: + if 'sourceURL' in jobSpec.jobParams: + tmpS = self.upload_checkpoint(jobSpec.jobParams['sourceURL'], jobSpec.taskID, + jobSpec.PandaID, fileSpec.lfn, fileSpec.path) + if tmpS: + fileSpec.status = 'done' # update events for jobSpec in jobspec_list: eventRanges, eventSpecs = jobSpec.to_event_data(max_events=10000) @@ -284,7 +301,7 @@ def update_jobs(self, jobspec_list, id): return retList # get events - def get_event_ranges(self, data_map, scattered): + def get_event_ranges(self, data_map, scattered, base_path): retStat = False retVal = dict() try: @@ -301,6 +318,16 @@ def get_event_ranges(self, data_map, scattered): nRanges = 1 if scattered: data['scattered'] = True + if 'isHPO' in data: + isHPO = data['isHPO'] + del data['isHPO'] + else: + isHPO = False + if 'sourceURL' in data: + sourceURL = data['sourceURL'] + del data['sourceURL'] + else: + sourceURL = None tmpLog.debug('start nRanges={0}'.format(nRanges)) while nRanges > 0: # use a small chunk size to avoid timeout @@ -314,14 +341,35 @@ def get_event_ranges(self, data_map, scattered): tmpDict = tmpRes.json() if tmpDict['StatusCode'] == 0: retStat = True - if data['pandaID'] not in retVal: - retVal[data['pandaID']] = [] - retVal[data['pandaID']] += tmpDict['eventRanges'] + retVal.setdefault(data['pandaID'], []) + if not isHPO: + retVal[data['pandaID']] += tmpDict['eventRanges'] + else: + for event in tmpDict['eventRanges']: + event_id = event['eventRangeID'] + task_id = event_id.split('-')[0] + point_id = event_id.split('-')[3] + # get HP point + tmpSI, tmpOI = idds_utils.get_hp_point(harvester_config.pandacon.iddsURL, + task_id, point_id, + tmpLog, self.verbose) + if tmpSI: + event['hp_point'] = tmpOI + # get checkpoint + if sourceURL: + tmpSO, tmpOO = self.download_checkpoint(sourceURL, task_id, + data['pandaID'], + point_id, base_path) + if tmpSO: + event['checkpoint'] = tmpOO + retVal[data['pandaID']].append(event) + else: + core_utils.dump_error_message(tmpLog, tmpOI) # got empty if len(tmpDict['eventRanges']) == 0: break except Exception: - core_utils.dump_error_message(tmpLog, tmpRes) + core_utils.dump_error_message(tmpLog) break nRanges -= chunkSize tmpLog.debug('done with {0}'.format(str(retVal))) @@ -330,6 +378,33 @@ def get_event_ranges(self, data_map, scattered): # update events def update_event_ranges(self, event_ranges, tmp_log): tmp_log.debug('start update_event_ranges') + + # loop over for HPO + for item in event_ranges: + new_event_ranges = [] + for event in item['eventRanges']: + # report loss to idds + if 'loss' in event: + event_id = event['eventRangeID'] + task_id = event_id.split('-')[0] + point_id = event_id.split('-')[3] + tmpSI, tmpOI = idds_utils.update_hp_point(harvester_config.pandacon.iddsURL, + task_id, point_id, event['loss'], + tmp_log, self.verbose) + if not tmpSI: + core_utils.dump_error_message(tmp_log, tmpOI) + tmp_log.error('skip {0} since cannot update iDDS'.format(event_id)) + continue + else: + # clear checkpoint + if 'sourceURL' in item: + tmpSC, tmpOC = self.clear_checkpoint(item['sourceURL'], task_id, point_id) + if not tmpSC: + core_utils.dump_error_message(tmp_log, tmpOC) + del event['loss'] + new_event_ranges.append(event) + item['eventRanges'] = new_event_ranges + # update in panda data = {} data['eventRanges'] = json.dumps(event_ranges) data['version'] = 1 @@ -711,4 +786,65 @@ def update_service_metrics(self, service_metrics_list): tmp_log.error('conversion failure from {0}'.format(tmp_res.text)) if tmp_stat: tmp_log.debug('done with {0}:{1}'.format(tmp_stat, err_str)) - return tmp_stat, err_str \ No newline at end of file + return tmp_stat, err_str + + # upload checkpoint + def upload_checkpoint(self, base_url, task_id, panda_id, file_name, file_path): + tmp_log = self.make_logger('taskID={0} pandaID={1}'.format(task_id, panda_id), + method_name='upload_checkpoint') + tmp_log.debug('start for {0}'.format(file_name)) + try: + files = {'file': (file_name, open(file_path).read())} + tmpStat, tmpRes = self.put_ssl('server/panda/put_checkpoint', files, base_url=base_url) + if tmpStat is False: + core_utils.dump_error_message(tmp_log, tmpRes) + else: + tmp_log.debug('got {0}'.format(tmpRes.text)) + return tmpStat + except Exception: + core_utils.dump_error_message(tmp_log) + return False + + # download checkpoint + def download_checkpoint(self, base_url, task_id, panda_id, point_id, base_path): + tmp_log = self.make_logger('taskID={0} pandaID={1}'.format(task_id, panda_id), + method_name='download_checkpoint') + tmp_log.debug('start for ID={0}'.format(point_id)) + try: + path = 'cache/hpo_cp_{0}_{1}'.format(task_id, point_id) + tmpStat, tmpRes = self.post_ssl(path, {}, base_url=base_url) + file_name = None + if tmpStat is False: + core_utils.dump_error_message(tmp_log, tmpRes) + else: + file_name = os.path.join(base_path, str(uuid.uuid4())) + with open(file_name, 'w') as f: + f.write(tmpRes.content) + tmp_log.debug('got {0}'.format(file_name)) + return tmpStat, file_name + except Exception: + core_utils.dump_error_message(tmp_log) + return False, None + + # clear checkpoint + def clear_checkpoint(self, base_url, task_id, point_id): + tmp_log = self.make_logger('taskID={0} pointID={1}'.format(task_id, point_id), + method_name='clear_checkpoints') + data = dict() + data['task_id'] = task_id + data['sub_id'] = point_id + tmp_log.debug('start') + tmpStat, tmpRes = self.post_ssl('server/panda/delete_checkpoint', data, base_url=base_url) + retMap = None + if tmpStat is False: + core_utils.dump_error_message(tmp_log, tmpRes) + else: + try: + retMap = tmpRes.json() + except Exception: + core_utils.dump_error_message(tmp_log) + if retMap is None: + retMap = {} + retMap['StatusCode'] = 999 + tmp_log.debug('done with {0}'.format(str(retMap))) + return retMap \ No newline at end of file diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 84785e01..c11ad9c8 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -690,7 +690,7 @@ def get_jobs(self): return None # update job - def update_job(self, jobspec, criteria=None, update_in_file=False): + def update_job(self, jobspec, criteria=None, update_in_file=False, update_out_file=False): try: # get logger tmpLog = core_utils.make_logger(_logger, 'PandaID={0} subStatus={1}'.format(jobspec.PandaID, @@ -741,6 +741,16 @@ def update_job(self, jobspec, criteria=None, update_in_file=False): sqlF = "UPDATE {0} SET status=:status ".format(fileTableName) sqlF += "WHERE PandaID=:PandaID AND fileType IN (:type1,:type2) " self.execute(sqlF, varMap) + # update output file + if update_out_file: + for fileSpec in jobspec.outFiles: + varMap = fileSpec.values_map(only_changed=True) + if varMap != {}: + sqlF = "UPDATE {0} SET {1} ".format(fileTableName, + fileSpec.bind_update_changes_expression()) + sqlF += "WHERE fileID=:fileID " + varMap[':fileID'] = fileSpec.fileID + self.execute(sqlF, varMap) # set to_delete flag if jobspec.subStatus == 'done': sqlD = "UPDATE {0} SET todelete=:to_delete ".format(fileTableName) @@ -1025,6 +1035,9 @@ def get_jobs_to_propagate(self, max_jobs, lock_interval, update_interval, locked sqlZ = "SELECT e.fileID,f.zipFileID FROM {0} f, {1} e ".format(fileTableName, eventTableName) sqlZ += "WHERE e.PandaID=:PandaID AND e.fileID=f.fileID " sqlZ += "AND e.subStatus IN (:statusFinished,:statusFailed) " + # sql to get checkpoint files + sqlC = "SELECT {0} FROM {1} ".format(FileSpec.column_names(), fileTableName) + sqlC += "WHERE PandaID=:PandaID AND fileType=:type AND status=:status " # get jobs timeNow = datetime.datetime.utcnow() lockTimeLimit = timeNow - datetime.timedelta(seconds=lock_interval) @@ -1112,6 +1125,18 @@ def get_jobs_to_propagate(self, max_jobs, lock_interval, update_interval, locked zipFileSpec = zipFiles[zipFileID] jobSpec.add_event(eventSpec, zipFileSpec) iEvents += 1 + # read checkpoint files + varMap = dict() + varMap[':PandaID'] = pandaID + varMap[':type'] = 'checkpoint' + varMap[':status'] = 'renewed' + self.execute(sqlC, varMap) + resC = self.cur.fetchall() + for resFile in resC: + fileSpec = FileSpec() + fileSpec.pack(resFile) + jobSpec.add_out_file(fileSpec) + # add to job list jobSpecList.append(jobSpec) tmpLog.debug('got {0} jobs'.format(len(jobSpecList))) return jobSpecList @@ -2061,7 +2086,7 @@ def update_jobs_workers(self, jobspec_list, workspec_list, locked_by, panda_ids_ sqlFC = "SELECT {0} FROM {1} ".format(FileSpec.column_names(), fileTableName) sqlFC += "WHERE PandaID=:PandaID AND lfn=:lfn " # sql to get all LFNs - sqlFL = "SELECT lfn FROM {0} ".format(fileTableName) + sqlFL = "SELECT lfn,fileID FROM {0} ".format(fileTableName) sqlFL += "WHERE PandaID=:PandaID AND fileType<>:type " # sql to check file with eventRangeID sqlFE = "SELECT 1 c FROM {0} ".format(fileTableName) @@ -2136,14 +2161,14 @@ def update_jobs_workers(self, jobspec_list, workspec_list, locked_by, panda_ids_ activeWorkers.add(tmpWorkerID) jobSpec.nWorkers = len(activeWorkers) # get all LFNs - allLFNs = set() + allLFNs = dict() varMap = dict() varMap[':PandaID'] = jobSpec.PandaID varMap[':type'] = 'input' self.execute(sqlFL, varMap) resFL = self.cur.fetchall() - for tmpLFN, in resFL: - allLFNs.add(tmpLFN) + for tmpLFN, tmpFileID in resFL: + allLFNs[tmpLFN] = tmpFileID # insert files nFiles = 0 fileIdMap = {} @@ -2152,8 +2177,11 @@ def update_jobs_workers(self, jobspec_list, workspec_list, locked_by, panda_ids_ # insert file if fileSpec.lfn not in allLFNs: if jobSpec.zipPerMB is None or fileSpec.isZip in [0, 1]: - fileSpec.status = 'defined' - jobSpec.hasOutFile = JobSpec.HO_hasOutput + if fileSpec.fileType != 'checkpoint': + fileSpec.status = 'defined' + jobSpec.hasOutFile = JobSpec.HO_hasOutput + else: + fileSpec.status = 'renewed' else: fileSpec.status = 'pending' varMap = fileSpec.values_list() @@ -2198,6 +2226,13 @@ def update_jobs_workers(self, jobspec_list, workspec_list, locked_by, panda_ids_ nFiles += 1 # mapping between event range ID and file ID fileIdMap[fileSpec.eventRangeID] = self.cur.lastrowid + elif fileSpec.fileType == 'checkpoint': + # reset status of checkpoint to be uploaded again + varMap = dict() + varMap[':status'] = 'renewed' + varMap[':fileID'] = allLFNs[fileSpec.lfn] + varMap[':zipFileID'] = None + self.execute(sqlFU, varMap) if nFiles > 0: tmpLog.debug('inserted {0} files'.format(nFiles)) # check pending files @@ -2628,13 +2663,13 @@ def get_jobs_for_stage_out(self, max_jobs, interval_without_lock, interval_with_ sqlJJ += "WHERE PandaID=:PandaID " # sql to get files sqlF = "SELECT {0} FROM {1} ".format(FileSpec.column_names(), fileTableName) - sqlF += "WHERE PandaID=:PandaID AND status=:status AND fileType NOT IN (:type1,:type2) " + sqlF += "WHERE PandaID=:PandaID AND status=:status AND fileType NOT IN (:type1,:type2,:type3) " if max_files_per_job is not None and max_files_per_job > 0: sqlF += "LIMIT {0} ".format(max_files_per_job) # sql to get associated files sqlAF = "SELECT {0} FROM {1} ".format(FileSpec.column_names(), fileTableName) sqlAF += "WHERE PandaID=:PandaID AND zipFileID=:zipFileID " - sqlAF += "AND fileType NOT IN (:type1,:type2) " + sqlAF += "AND fileType NOT IN (:type1,:type2,:type3) " # sql to increment attempt number sqlFU = "UPDATE {0} SET attemptNr=attemptNr+1 WHERE fileID=:fileID ".format(fileTableName) # get jobs @@ -2696,6 +2731,7 @@ def get_jobs_for_stage_out(self, max_jobs, interval_without_lock, interval_with_ varMap[':PandaID'] = jobSpec.PandaID varMap[':type1'] = 'input' varMap[':type2'] = FileSpec.AUX_INPUT + varMap[':type3'] = 'checkpoint' if has_out_file_flag == JobSpec.HO_hasOutput: varMap[':status'] = 'defined' elif has_out_file_flag == JobSpec.HO_hasZipOutput: @@ -2727,6 +2763,7 @@ def get_jobs_for_stage_out(self, max_jobs, interval_without_lock, interval_with_ varMap[':zipFileID'] = fileSpec.fileID varMap[':type1'] = 'input' varMap[':type2'] = FileSpec.AUX_INPUT + varMap[':type3'] = 'checkpoint' self.execute(sqlAF, varMap) resAFs = self.cur.fetchall() for resAF in resAFs: diff --git a/pandaharvester/harvestercore/event_spec.py b/pandaharvester/harvestercore/event_spec.py index d1f51d8a..e8cc974f 100644 --- a/pandaharvester/harvestercore/event_spec.py +++ b/pandaharvester/harvestercore/event_spec.py @@ -17,7 +17,8 @@ class EventSpec(SpecBase): 'coreCount:integer', 'cpuConsumptionTime:integer', 'subStatus:text / index', - 'fileID:integer' + 'fileID:integer', + 'loss:text' ) # constructor @@ -30,7 +31,7 @@ def to_data(self): for attr in self.attributes: # ignore some attributes if attr not in ['eventRangeID', 'eventStatus', 'coreCount', - 'cpuConsumptionTime']: + 'cpuConsumptionTime', 'loss']: continue val = getattr(self, attr) # don't propagate finished until subStatus is finished diff --git a/pandaharvester/harvestercore/job_spec.py b/pandaharvester/harvestercore/job_spec.py index 65a6d38e..05848a10 100644 --- a/pandaharvester/harvestercore/job_spec.py +++ b/pandaharvester/harvestercore/job_spec.py @@ -259,6 +259,8 @@ def to_event_data(self, max_events=None): iEvents += 1 tmpData = {} tmpData['eventRanges'] = eventRanges + if 'sourceURL' in self.jobParams: + tmpData['sourceURL'] = self.jobParams['sourceURL'] if zipFileID is not None: zipFileSpec = eventsData['zip'] if zipFileSpec.status == 'finished': diff --git a/pandaharvester/harvesterextractor/analysis_extractor.py b/pandaharvester/harvesterextractor/analysis_extractor.py index f37174fd..87ef19c1 100644 --- a/pandaharvester/harvesterextractor/analysis_extractor.py +++ b/pandaharvester/harvesterextractor/analysis_extractor.py @@ -1,37 +1,6 @@ -import re -from .base_extractor import BaseExtractor +from .aux_extractor import AuxExtractor # OBSOLETE - use aux_extractor -class AnalysisExtractor(BaseExtractor): - # constructor - def __init__(self, **kwarg): - BaseExtractor.__init__(self, **kwarg) - - # get auxiliary input files - def get_aux_inputs(self, jobspec): - url_list = [] - jobPars = jobspec.jobParams['jobPars'] - # transformation - trf = jobspec.jobParams['transformation'] - if trf is not None and trf.startswith('http'): - url_list.append(trf) - # extract source URL - tmpM = re.search(' --sourceURL\s+([^\s]+)', jobPars) - if tmpM is not None: - sourceURL = tmpM.group(1) - # extract sandbox - if jobspec.jobParams['prodSourceLabel'] == 'user': - tmpM = re.search('-a\s+([^\s]+)', jobPars) - else: - tmpM = re.search('-i\s+([^\s]+)', jobPars) - if tmpM is not None: - lfn = tmpM.group(1) - url = '{0}/cache/{1}'.format(sourceURL, lfn) - url_list.append(url) - # extract container image - tmpM = re.search(' --containerImage\s+([^\s]+)', jobPars) - if tmpM is not None: - url = tmpM.group(1) - url_list.append(url) - return self.make_aux_inputs(url_list) +class AnalysisExtractor(AuxExtractor): + pass diff --git a/pandaharvester/harvesterextractor/aux_extractor.py b/pandaharvester/harvesterextractor/aux_extractor.py index 67253806..33cbe672 100644 --- a/pandaharvester/harvesterextractor/aux_extractor.py +++ b/pandaharvester/harvesterextractor/aux_extractor.py @@ -21,6 +21,7 @@ def get_aux_inputs(self, jobspec): tmpM = re.search(' --sourceURL\s+([^\s]+)', jobPars) if tmpM is not None: sourceURL = tmpM.group(1) + jobspec.jobParams['sourceURL'] = sourceURL # extract sandbox if jobspec.jobParams['prodSourceLabel'] == 'user': tmpM = re.search('-a\s+([^\s]+)', jobPars) diff --git a/pandaharvester/harvestermessenger/shared_file_messenger.py b/pandaharvester/harvestermessenger/shared_file_messenger.py index ef9940be..122eee3c 100644 --- a/pandaharvester/harvestermessenger/shared_file_messenger.py +++ b/pandaharvester/harvestermessenger/shared_file_messenger.py @@ -284,46 +284,47 @@ def get_files_to_stage_out(self, workspec): tmpEventRangeID = tmpEventInfo['eventRangeID'] else: tmpEventRangeID = None - tmpFileDict = dict() - pfn = tmpEventInfo['path'] - lfn = os.path.basename(pfn) - tmpFileDict['path'] = pfn - if pfn not in sizeMap: - if 'fsize' in tmpEventInfo: - sizeMap[pfn] = tmpEventInfo['fsize'] + if 'path' in tmpEventInfo: + tmpFileDict = dict() + pfn = tmpEventInfo['path'] + lfn = os.path.basename(pfn) + tmpFileDict['path'] = pfn + if pfn not in sizeMap: + if 'fsize' in tmpEventInfo: + sizeMap[pfn] = tmpEventInfo['fsize'] + else: + sizeMap[pfn] = os.stat(pfn).st_size + tmpFileDict['fsize'] = sizeMap[pfn] + tmpFileDict['type'] = tmpEventInfo['type'] + if tmpEventInfo['type'] in ['log', 'output', 'checkpoint']: + # disable zipping + tmpFileDict['isZip'] = 0 + elif tmpEventInfo['type'] == 'zip_output': + # already zipped + tmpFileDict['isZip'] = 1 + elif 'isZip' in tmpEventInfo: + tmpFileDict['isZip'] = tmpEventInfo['isZip'] + # guid + if 'guid' in tmpEventInfo: + tmpFileDict['guid'] = tmpEventInfo['guid'] else: - sizeMap[pfn] = os.stat(pfn).st_size - tmpFileDict['fsize'] = sizeMap[pfn] - tmpFileDict['type'] = tmpEventInfo['type'] - if tmpEventInfo['type'] in ['log', 'output']: - # disable zipping - tmpFileDict['isZip'] = 0 - elif tmpEventInfo['type'] == 'zip_output': - # already zipped - tmpFileDict['isZip'] = 1 - elif 'isZip' in tmpEventInfo: - tmpFileDict['isZip'] = tmpEventInfo['isZip'] - # guid - if 'guid' in tmpEventInfo: - tmpFileDict['guid'] = tmpEventInfo['guid'] - else: - tmpFileDict['guid'] = str(uuid.uuid4()) - # get checksum - if pfn not in chksumMap: - if 'chksum' in tmpEventInfo: - chksumMap[pfn] = tmpEventInfo['chksum'] - else: - chksumMap[pfn] = core_utils.calc_adler32(pfn) - tmpFileDict['chksum'] = chksumMap[pfn] - if tmpPandaID not in fileDict: - fileDict[tmpPandaID] = dict() - if lfn not in fileDict[tmpPandaID]: - fileDict[tmpPandaID][lfn] = [] - fileDict[tmpPandaID][lfn].append(tmpFileDict) - # skip if unrelated to events - if tmpFileDict['type'] not in ['es_output', 'zip_output']: - continue - tmpFileDict['eventRangeID'] = tmpEventRangeID + tmpFileDict['guid'] = str(uuid.uuid4()) + # get checksum + if pfn not in chksumMap: + if 'chksum' in tmpEventInfo: + chksumMap[pfn] = tmpEventInfo['chksum'] + else: + chksumMap[pfn] = core_utils.calc_adler32(pfn) + tmpFileDict['chksum'] = chksumMap[pfn] + if tmpPandaID not in fileDict: + fileDict[tmpPandaID] = dict() + if lfn not in fileDict[tmpPandaID]: + fileDict[tmpPandaID][lfn] = [] + fileDict[tmpPandaID][lfn].append(tmpFileDict) + # skip if unrelated to events + if tmpFileDict['type'] not in ['es_output', 'zip_output']: + continue + tmpFileDict['eventRangeID'] = tmpEventRangeID if tmpPandaID not in eventsList: eventsList[tmpPandaID] = list() eventsList[tmpPandaID].append({'eventRangeID': tmpEventRangeID, diff --git a/pandaharvester/harvestermisc/idds_utils.py b/pandaharvester/harvestermisc/idds_utils.py new file mode 100644 index 00000000..9f93a4c5 --- /dev/null +++ b/pandaharvester/harvestermisc/idds_utils.py @@ -0,0 +1,47 @@ +import os +import requests +try: + import subprocess32 as subprocess +except ImportError: + import subprocess + + +# get HP point +def get_hp_point(idds_url, task_id, point_id, tmp_log, verbose): + url = os.path.join(idds_url, 'idds', 'hpo', str(task_id), 'null', str(point_id), 'null', 'null') + try: + if verbose: + tmp_log.debug("getting HP point from {0}".format(url)) + r = requests.get(url, verify=False) + if verbose: + tmp_log.debug('status: {0}, body: {1}'.format(r.status_code, r.text)) + if r.status_code != requests.codes.ok: + False, "bad http status {0} when getting point (ID={1}) : {2}".format(r.status_code, point_id, r.text) + tmp_dict = r.json() + for i in tmp_dict: + if i['id'] == point_id: + return True, i + except Exception as e: + errStr = "failed to get point (ID={0}) : {1}".format(point_id, str(e)) + return False, errStr + return False, "cannot get point (ID={0}) since it is unavailable".format(point_id) + + +# update HP point +def update_hp_point(idds_url, task_id, point_id, loss, tmp_log, verbose): + url = os.path.join(idds_url, 'idds', 'hpo', str(task_id), 'null', str(point_id), str(loss)) + try: + if verbose: + tmp_log.debug("updating HP point at {0}".format(url)) + r = requests.put(url, verify=False) + if verbose: + tmp_log.debug('status: {0}, body: {1}'.format(r.status_code, r.text)) + if r.status_code != requests.codes.ok: + False, "bad http status {0} when updating point (ID={1}) : {2}".format(r.status_code, point_id, r.text) + tmp_dict = r.json() + if tmp_dict['status'] == 0: + return True, None + except Exception as e: + errStr = "failed to update point (ID={0}) : {1}".format(point_id, str(e)) + return False, errStr + return False, "cannot update point (ID={0}) since status is missing".format(point_id) diff --git a/pandaharvester/harvesterpreparator/analysis_aux_preparator.py b/pandaharvester/harvesterpreparator/analysis_aux_preparator.py index 99c177aa..bad5b061 100644 --- a/pandaharvester/harvesterpreparator/analysis_aux_preparator.py +++ b/pandaharvester/harvesterpreparator/analysis_aux_preparator.py @@ -89,6 +89,9 @@ def trigger_preparation(self, jobspec): elif self.containerRuntime == 'singularity': args = ['singularity', 'build', '--sandbox', accPathTmp, url ] return_code = self.make_image(jobspec,args) + elif self.containerRuntime == 'shifter': + args = ['shifterimg', 'pull', url ] + return_code = self.make_image(jobspec,args) else: tmpLog.error('unsupported container runtime : {0}'.format(self.containerRuntime)) elif url.startswith('/'): @@ -185,9 +188,9 @@ def check_stage_in_status(self, jobspec): for tmpGroupID in transferGroups: if tmpGroupID is None: continue - tmpGroupID_parts = tmpGroupID.split(':',maxsplit=2) + tmpGroupID_parts = tmpGroupID.split(':', 2) tmpLog.debug('transfer group ID : {0} components: {1}'.format(tmpGroupID, tmpGroupID_parts)) - protocol, executionID, dst = tmpGroupID.split(':',maxsplit=2) + protocol, executionID, dst = tmpGroupID.split(':', 2) args = [] for arg in self.externalCommand[protocol]['check']['args']: if arg == '{id}': diff --git a/pandaharvester/harvesterpreparator/go_bulk_preparator.py b/pandaharvester/harvesterpreparator/go_bulk_preparator.py index 5baada64..614fb9ac 100644 --- a/pandaharvester/harvesterpreparator/go_bulk_preparator.py +++ b/pandaharvester/harvesterpreparator/go_bulk_preparator.py @@ -120,13 +120,6 @@ def check_stage_in_status(self, jobspec): tmpLog = self.make_logger(_logger, 'PandaID={0} ThreadID={1}'.format(jobspec.PandaID,threading.current_thread().ident), method_name='check_stage_in_status') tmpLog.debug('start') - # show the dummy transfer id and set to a value with the PandaID if needed. - tmpLog.debug('self.dummy_transfer_id = {}'.format(self.dummy_transfer_id)) - - # default return - tmpRetVal = (True, '') - # set flag if have db lock - have_db_lock = False # check that jobspec.computingSite is defined if jobspec.computingSite is None: # not found @@ -134,6 +127,17 @@ def check_stage_in_status(self, jobspec): return False, 'jobspec.computingSite is not defined' else: tmpLog.debug('jobspec.computingSite : {0}'.format(jobspec.computingSite)) + # show the dummy transfer id and set to a value with the jobspec.computingSite if needed. + tmpLog.debug('self.dummy_transfer_id = {}'.format(self.dummy_transfer_id)) + if self.dummy_transfer_id == '{0}_{1}'.format(dummy_transfer_id_base,'XXXX') : + old_dummy_transfer_id = self.dummy_transfer_id + self.dummy_transfer_id = '{0}_{1}'.format(dummy_transfer_id_base,jobspec.computingSite) + tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id)) + + # default return + tmpRetVal = (True, '') + # set flag if have db lock + have_db_lock = False queueConfigMapper = QueueConfigMapper() queueConfig = queueConfigMapper.get_queue(jobspec.computingSite) # test we have a Globus Transfer Client @@ -147,10 +151,13 @@ def check_stage_in_status(self, jobspec): groups = jobspec.get_groups_of_input_files(skip_ready=True) tmpLog.debug('jobspec.get_groups_of_input_files() = : {0}'.format(groups)) # lock if the dummy transfer ID is used to avoid submitting duplicated transfer requests - if self.dummy_transfer_id in groups: + for dummy_transferID in groups: + # skip if valid transfer ID not dummy one + if validate_transferid(dummy_transferID) : + continue # lock for 120 sec - tmpLog.debug('attempt to set DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - have_db_lock = self.dbInterface.get_object_lock(self.dummy_transfer_id, lock_interval=120) + tmpLog.debug('attempt to set DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}'.format(self.id,self.dummy_transfer_id,dummy_transferID)) + have_db_lock = self.dbInterface.get_object_lock(dummy_transferID, lock_interval=120) tmpLog.debug(' DB lock result - {0}'.format(have_db_lock)) if not have_db_lock: # escape since locked by another thread @@ -166,11 +173,14 @@ def check_stage_in_status(self, jobspec): groups = jobspec.get_groups_of_input_files(skip_ready=True) tmpLog.debug('after db lock and refresh - jobspec.get_groups_of_input_files(skip_ready=True) = : {0}'.format(groups)) # the dummy transfer ID is still there - if self.dummy_transfer_id in groups: - groupUpdateTime = groups[self.dummy_transfer_id]['groupUpdateTime'] + if dummy_transferID in groups: + groupUpdateTime = groups[dummy_transferID]['groupUpdateTime'] # get files with the dummy transfer ID across jobs - fileSpecs = self.dbInterface.get_files_with_group_id(self.dummy_transfer_id) - msgStr = 'self.dummy_transfer_id = {0} self.dbInterface.get_files_with_group_id(self.dummy_transfer_id) number of files = {1}'.format(self.dummy_transfer_id,len(fileSpecs)) + fileSpecs_allgroups = self.dbInterface.get_files_with_group_id(dummy_transferID) + msgStr = 'dummy_transferID = {0} self.dbInterface.get_files_with_group_id(dummy_transferID) number of files = {1}'.format(dummy_transferID,len(fileSpecs_allgroups)) + tmpLog.debug(msgStr) + fileSpecs = jobspec.get_input_file_specs(dummy_transferID, skip_ready=True) + msgStr = 'dummy_transferID = {0} jobspec.get_input_file_specs(dummy_transferID,skip_ready=True) number of files = {1}'.format(dummy_transferID,len(fileSpecs)) tmpLog.debug(msgStr) # submit transfer if there are more than 10 files or the group was made before more than 10 min if len(fileSpecs) >= 10 or \ @@ -199,10 +209,10 @@ def check_stage_in_status(self, jobspec): if not tmpStatdst : errMsg += ' destination Endpoint not activated ' # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, self.dummy_transfer_id - {2}'.format(self.id,self.dummy_transfer_id,self.dummy_transfer_id)) - have_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}'.format(self.id,self.dummy_transfer_id,dummy_transferID)) + have_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not have_db_lock: - errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (None,errMsg) return tmpRetVal @@ -218,10 +228,10 @@ def check_stage_in_status(self, jobspec): except: errStat, errMsg = globus_utils.handle_globus_exception(tmpLog) # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, self.dummy_transfer_id - {2}'.format(self.id,self.dummy_transfer_id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}, dummy_transferID - {2}'.format(self.id,self.dummy_transfer_id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not release_db_lock: - errMsg += ' - Could not release DB lock for {}'.format(self.self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (errStat, errMsg) return tmpRetVal @@ -280,51 +290,51 @@ def check_stage_in_status(self, jobspec): tmpLog.debug(msgStr) else: # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if release_db_lock: - tmpLog.debug('Released DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) + tmpLog.debug('Released DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) have_db_lock = False else: - errMsg = 'Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg = 'Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (None, transfer_result['message']) return tmpRetVal except Exception as e: errStat,errMsg = globus_utils.handle_globus_exception(tmpLog) # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if release_db_lock: - tmpLog.debug('Released DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) + tmpLog.debug('Released DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) have_db_lock = False else : - errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) return errStat, errMsg else: msgStr = 'wait until enough files are pooled' tmpLog.debug(msgStr) # release the lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if release_db_lock: - tmpLog.debug('released DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) + tmpLog.debug('released DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) have_db_lock = False else: - msgStr += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + msgStr += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(msgStr) # return None to retry later return None, msgStr # release the db lock if needed if have_db_lock: - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if release_db_lock: - tmpLog.debug('released DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) + tmpLog.debug('released DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) have_db_lock = False else: - msgStr += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + msgStr += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(msgStr) return None, msgStr # check transfer with real transfer IDs @@ -361,7 +371,6 @@ def check_stage_in_status(self, jobspec): # succeeded in finding a transfer task by tranferID if transferTasks[transferID]['status'] == 'SUCCEEDED': tmpLog.debug('transfer task {} succeeded'.format(transferID)) - jobspec.update_group_status_in_files(transferID, 'done') self.set_FileSpec_status(jobspec,'finished') return True, '' # failed @@ -369,7 +378,6 @@ def check_stage_in_status(self, jobspec): errStr = 'transfer task {} failed'.format(transferID) tmpLog.error(errStr) self.set_FileSpec_status(jobspec,'failed') - jobspec.update_group_status_in_files(transferID, 'failed') return False, errStr # another status tmpStr = 'transfer task {0} status: {1}'.format(transferID,transferTasks[transferID]['status']) @@ -398,8 +406,12 @@ def trigger_preparation(self, jobspec): errStr = 'failed to get Globus Transfer Client' tmpLog.error(errStr) return False, errStr - # show the dummy transfer id and set to a value with the PandaID if needed. + # show the dummy transfer id and set to a value with the computingSite if needed. tmpLog.debug('self.dummy_transfer_id = {}'.format(self.dummy_transfer_id)) + if self.dummy_transfer_id == '{0}_{1}'.format(dummy_transfer_id_base,'XXXX') : + old_dummy_transfer_id = self.dummy_transfer_id + self.dummy_transfer_id = '{0}_{1}'.format(dummy_transfer_id_base,jobspec.computingSite) + tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id)) # set the dummy transfer ID which will be replaced with a real ID in check_stage_in_status() inFiles = jobspec.get_input_file_attributes(skip_ready=True) lfns = list(inFiles.keys()) diff --git a/pandaharvester/harvesterstager/go_bulk_stager.py b/pandaharvester/harvesterstager/go_bulk_stager.py index bde1ffd6..7087503d 100644 --- a/pandaharvester/harvesterstager/go_bulk_stager.py +++ b/pandaharvester/harvesterstager/go_bulk_stager.py @@ -62,7 +62,7 @@ def __init__(self, **kwarg): tmpLog = self.make_logger(_logger, 'ThreadID={0}'.format(threading.current_thread().ident), method_name='GlobusBulkStager __init__ ') tmpLog.debug('start') - self.Yodajob = False + self.EventServicejob = False self.pathConvention = None self.id = GlobusBulkStager.next_id self.changeFileStatusOnSuccess = True @@ -126,12 +126,8 @@ def check_stage_out_status(self, jobspec): tmpLog = self.make_logger(_logger, 'PandaID={0} ThreadID={1}'.format(jobspec.PandaID,threading.current_thread().ident), method_name='check_stage_out_status') tmpLog.debug('start') - # show the dummy transfer id and set to a value with the PandaID if needed. - tmpLog.debug('self.dummy_transfer_id = {}'.format(self.dummy_transfer_id)) # default return tmpRetVal = (True, '') - # set flag if have db lock - have_db_lock = False # check that jobspec.computingSite is defined if jobspec.computingSite is None: # not found @@ -139,19 +135,32 @@ def check_stage_out_status(self, jobspec): return False, 'jobspec.computingSite is not defined' else: tmpLog.debug('jobspec.computingSite : {0}'.format(jobspec.computingSite)) + # show the dummy transfer id and set to a value with the PandaID if needed. + tmpLog.debug('self.dummy_transfer_id = {}'.format(self.dummy_transfer_id)) + if self.dummy_transfer_id == '{0}_{1}'.format(dummy_transfer_id_base,'XXXX') : + old_dummy_transfer_id = self.dummy_transfer_id + self.dummy_transfer_id = '{0}_{1}'.format(dummy_transfer_id_base,jobspec.computingSite) + tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id)) + # set flag if have db lock + have_db_lock = False # get the queueConfig and corresponding objStoreID_ES queueConfigMapper = QueueConfigMapper() queueConfig = queueConfigMapper.get_queue(jobspec.computingSite) # check queueConfig stager section to see if jobtype is set if 'jobtype' in queueConfig.stager: + if queueConfig.stager['jobtype'] == "EventService" : + self.EventServicejob = True + tmpLog.debug('Setting job type to EventService') + # guard against old parameter in queue config if queueConfig.stager['jobtype'] == "Yoda" : - self.Yodajob = True + self.EventServicejob = True + tmpLog.debug('Setting job type to EventService') # set the location of the files in fileSpec.objstoreID # see file /cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json self.objstoreID = int(queueConfig.stager['objStoreID_ES']) - if self.Yodajob : + if self.EventServicejob : self.pathConvention = int(queueConfig.stager['pathConvention']) - tmpLog.debug('Yoda Job - PandaID = {0} objstoreID = {1} pathConvention ={2}'.format(jobspec.PandaID,self.objstoreID,self.pathConvention)) + tmpLog.debug('EventService Job - PandaID = {0} objstoreID = {1} pathConvention ={2}'.format(jobspec.PandaID,self.objstoreID,self.pathConvention)) else: self.pathConvention = None tmpLog.debug('PandaID = {0} objstoreID = {1}'.format(jobspec.PandaID,self.objstoreID)) @@ -172,10 +181,12 @@ def check_stage_out_status(self, jobspec): groups = jobspec.get_groups_of_output_files() tmpLog.debug('jobspec.get_groups_of_output_files() = : {0}'.format(groups)) # lock if the dummy transfer ID is used to avoid submitting duplicated transfer requests - if self.dummy_transfer_id in groups: + for dummy_transferID in groups: + if validate_transferid(dummy_transferID): + continue # lock for 120 sec - tmpLog.debug('attempt to set DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - have_db_lock = self.dbInterface.get_object_lock(self.dummy_transfer_id, lock_interval=120) + tmpLog.debug('attempt to set DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + have_db_lock = self.dbInterface.get_object_lock(dummy_transferID, lock_interval=120) if not have_db_lock: # escape since locked by another thread msgStr = 'escape since locked by another thread' @@ -189,12 +200,12 @@ def check_stage_out_status(self, jobspec): groups = jobspec.get_groups_of_output_files() tmpLog.debug('jobspec.get_groups_of_output_files() = : {0}'.format(groups)) # the dummy transfer ID is still there - if self.dummy_transfer_id in groups: - groupUpdateTime = groups[self.dummy_transfer_id]['groupUpdateTime'] + if dummy_transferID in groups: + groupUpdateTime = groups[dummy_transferID]['groupUpdateTime'] # get files with the dummy transfer ID across jobs - fileSpecs = self.dbInterface.get_files_with_group_id(self.dummy_transfer_id) + fileSpecs = self.dbInterface.get_files_with_group_id(dummy_transferID) # submit transfer if there are more than 10 files or the group was made before more than 10 min - msgStr = 'self.dummy_transfer_id = {0} number of files = {1}'.format(self.dummy_transfer_id,len(fileSpecs)) + msgStr = 'dummy_transferID = {0} number of files = {1}'.format(dummy_transferID,len(fileSpecs)) tmpLog.debug(msgStr) if len(fileSpecs) >= 10 or \ groupUpdateTime < datetime.datetime.utcnow() - datetime.timedelta(minutes=10): @@ -222,10 +233,10 @@ def check_stage_out_status(self, jobspec): if not tmpStatdst : errMsg += ' destination Endpoint not activated ' # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - self.have_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + self.have_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not self.have_db_lock: - errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (None,errMsg) return tmpRetVal @@ -238,22 +249,27 @@ def check_stage_out_status(self, jobspec): except: errStat, errMsg = globus_utils.handle_globus_exception(tmpLog) # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not release_db_lock: - errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (errStat, errMsg) return tmpRetVal # loop over all files ifile = 0 for fileSpec in fileSpecs: + # protect against blank lfn's + if not fileSpec.lfn : + msgStr = 'fileSpec.lfn is empty' + tmpLog.debug(msgStr) + continue logfile = False scope ='panda' if fileSpec.scope is not None : scope = fileSpec.scope - # for Yoda job set the scope to transient for non log files - if self.Yodajob : + # for EventService job set the scope to transient for non log files + if self.EventServicejob : scope = 'transient' if fileSpec.fileType == "log" : logfile = True @@ -291,10 +307,10 @@ def check_stage_out_status(self, jobspec): else: errMsg = "source file {} does not exist".format(srcURL) # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not release_db_lock: - errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (False,errMsg) return tmpRetVal @@ -316,45 +332,45 @@ def check_stage_out_status(self, jobspec): tmpLog.debug(msgStr) else: # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not release_db_lock: - errMsg = 'Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg = 'Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) tmpRetVal = (None, transfer_result['message']) return tmpRetVal except Exception as e: errStat,errMsg = globus_utils.handle_globus_exception(tmpLog) # release process lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if not release_db_lock: - errMsg += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + errMsg += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(errMsg) return errStat, errMsg else: msgStr = 'wait until enough files are pooled' tmpLog.debug(msgStr) # release the lock - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if release_db_lock: - tmpLog.debug('released DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) + tmpLog.debug('released DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) have_db_lock = False else: - msgStr += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + msgStr += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(msgStr) # return None to retry later return None, msgStr # release the db lock if needed if have_db_lock: - tmpLog.debug('attempt to release DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) - release_db_lock = self.dbInterface.release_object_lock(self.dummy_transfer_id) + tmpLog.debug('attempt to release DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) + release_db_lock = self.dbInterface.release_object_lock(dummy_transferID) if release_db_lock: - tmpLog.debug('released DB lock for self.id - {0} self.dummy_transfer_id - {1}'.format(self.id,self.dummy_transfer_id)) + tmpLog.debug('released DB lock for self.id - {0} dummy_transferID - {1}'.format(self.id,dummy_transferID)) have_db_lock = False else: - msgStr += ' - Could not release DB lock for {}'.format(self.dummy_transfer_id) + msgStr += ' - Could not release DB lock for {}'.format(dummy_transferID) tmpLog.error(msgStr) return None, msgStr # check transfer with real transfer IDs @@ -427,10 +443,20 @@ def trigger_stage_out(self, jobspec): return False, errStr # show the dummy transfer id and set to a value with the PandaID if needed. tmpLog.debug('self.dummy_transfer_id = {}'.format(self.dummy_transfer_id)) + if self.dummy_transfer_id == '{0}_{1}'.format(dummy_transfer_id_base,'XXXX') : + old_dummy_transfer_id = self.dummy_transfer_id + self.dummy_transfer_id = '{0}_{1}'.format(dummy_transfer_id_base,jobspec.computingSite) + tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id)) # set the dummy transfer ID which will be replaced with a real ID in check_stage_out_status() lfns = [] for fileSpec in jobspec.get_output_file_specs(skip_done=True): - lfns.append(fileSpec.lfn) + # test if fileSpec.lfn is not empty + if not fileSpec.lfn : + msgStr = 'fileSpec.lfn is empty' + else: + msgStr = 'fileSpec.lfn is {0}'.format(fileSpec.lfn) + lfns.append(fileSpec.lfn) + tmpLog.debug(msgStr) jobspec.set_groups_to_files({self.dummy_transfer_id: {'lfns': lfns,'groupStatus': 'pending'}}) msgStr = 'jobspec.set_groups_to_files - self.dummy_tranfer_id - {0}, lfns - {1}, groupStatus - pending'.format(self.dummy_transfer_id,lfns) tmpLog.debug(msgStr) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index d3b9c12a..931fcc04 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -713,6 +713,8 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): ce_flavour_str=ce_flavour_str, sdf_suffix_str=sdf_suffix_str) sdf_template_file = os.path.join(self.CEtemplateDir, sdf_template_filename) else: + if self.templateFile: + sdf_template_file = self.templateFile try: # Manually define site condor schedd as ceHostname and central manager as ceEndpoint if self.ceHostname and isinstance(self.ceHostname, list) and len(self.ceHostname) > 0: diff --git a/pandaharvester/harvestersubmitter/slurm_submitter_jinja.py b/pandaharvester/harvestersubmitter/slurm_submitter_jinja.py new file mode 100644 index 00000000..531d7da4 --- /dev/null +++ b/pandaharvester/harvestersubmitter/slurm_submitter_jinja.py @@ -0,0 +1,136 @@ +import tempfile +import re +import jinja2 + +import six + +try: + import subprocess32 as subprocess +except ImportError: + import subprocess + +from pandaharvester.harvestercore import core_utils +from pandaharvester.harvestercore.plugin_base import PluginBase + +# logger +baseLogger = core_utils.setup_logger('slurm_submitter') + + +# submitter for SLURM batch system +class SlurmSubmitterJinja(PluginBase): + # constructor + def __init__(self, **kwarg): + self.uploadLog = False + self.logBaseURL = None + PluginBase.__init__(self, **kwarg) + + # submit workers + def submit_workers(self, workspec_list): + retList = [] + retStrList = [] + for workSpec in workspec_list: + # make logger + tmpLog = self.make_logger(baseLogger, 'workerID={0}'.format(workSpec.workerID), + method_name='submit_workers') + # set nCore + workSpec.nCore = self.nCore + # make batch script + batchFile = self.make_batch_script_jinja(workSpec) + # command + comStr = "sbatch -D {0} {1}".format(workSpec.get_access_point(), batchFile) + # submit + tmpLog.debug('submit with {0}'.format(batchFile)) + p = subprocess.Popen(comStr.split(), + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + # check return code + stdOut, stdErr = p.communicate() + retCode = p.returncode + tmpLog.debug('retCode={0}'.format(retCode)) + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode() + if retCode == 0: + # extract batchID + workSpec.batchID = re.search('[^0-9]*([0-9]+)[^0-9]*', '{0}'.format(stdOut_str)).group(1) + tmpLog.debug('batchID={0}'.format(workSpec.batchID)) + # set log files + if self.uploadLog: + if self.logBaseURL is None: + baseDir = workSpec.get_access_point() + else: + baseDir = self.logBaseURL + stdOut, stdErr = self.get_log_file_names(batchFile, workSpec.batchID) + if stdOut is not None: + workSpec.set_log_file('stdout', '{0}/{1}'.format(baseDir, stdOut)) + if stdErr is not None: + workSpec.set_log_file('stderr', '{0}/{1}'.format(baseDir, stdErr)) + tmpRetVal = (True, '') + else: + # failed + errStr = '{0} {1}'.format(stdOut_str, stdErr_str) + tmpLog.error(errStr) + tmpRetVal = (False, errStr) + retList.append(tmpRetVal) + return retList + + # make batch script + def make_batch_script(self, workspec): + # template for batch script + tmpFile = open(self.templateFile) + self.template = tmpFile.read() + tmpFile.close() + del tmpFile + tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix='_submit.sh', dir=workspec.get_access_point()) + tmpFile.write(six.b(self.template.format(nCorePerNode=self.nCorePerNode, + nNode=workspec.nCore // self.nCorePerNode, + accessPoint=workspec.accessPoint, + workerID=workspec.workerID)) + ) + tmpFile.close() + return tmpFile.name + + + # make batch script + def make_batch_script_jinja(self, workspec): + # template for batch script + tmpFile = open(self.templateFile) + self.template = tmpFile.read() + tmpFile.close() + del tmpFile + tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix='_submit.sh', dir=workspec.get_access_point()) + tm = jinja2.Template(self.template) + tmpFile.write(six.b(tm.render(nCorePerNode=self.nCorePerNode, + nNode=workspec.nCore // self.nCorePerNode, + accessPoint=workspec.accessPoint, + workerID=workspec.workerID, + workspec=workspec))) + + #tmpFile.write(six.b(self.template.format(nCorePerNode=self.nCorePerNode, + # nNode=workspec.nCore // self.nCorePerNode, + # accessPoint=workspec.accessPoint, + # workerID=workspec.workerID)) + # ) + #tmpFile.write(six.b(self.template.format(nCorePerNode=self.nCorePerNode, + # nNode=workspec.nCore // self.nCorePerNode, + # worker=workSpec, + # submitter=self)) + # ) + tmpFile.close() + return tmpFile.name + + + # get log file names + def get_log_file_names(self, batch_script, batch_id): + stdOut = None + stdErr = None + with open(batch_script) as f: + for line in f: + if not line.startswith('#SBATCH'): + continue + items = line.split() + if '-o' in items: + stdOut = items[-1].replace('$SLURM_JOB_ID', batch_id) + elif '-e' in items: + stdErr = items[-1].replace('$SLURM_JOB_ID', batch_id) + return stdOut, stdErr diff --git a/pandaharvester/harvestertest/getEventRangesTest.py b/pandaharvester/harvestertest/getEventRangesTest.py index 85f69576..7a5b742f 100644 --- a/pandaharvester/harvestertest/getEventRangesTest.py +++ b/pandaharvester/harvestertest/getEventRangesTest.py @@ -1,3 +1,4 @@ +import os import sys from pandaharvester.harvestercore.communicator_pool import CommunicatorPool @@ -19,4 +20,4 @@ } a = CommunicatorPool() -o = a.get_event_ranges(data) +o = a.get_event_ranges(data, False, os.getcwd()) diff --git a/pandaharvester/harvestertest/submitterTest.py b/pandaharvester/harvestertest/submitterTest.py index 200cd3f7..cc683e35 100644 --- a/pandaharvester/harvestertest/submitterTest.py +++ b/pandaharvester/harvestertest/submitterTest.py @@ -104,7 +104,7 @@ } workSpec.eventsRequestParams = eventsRequestParams - tmpStat, events = com.get_event_ranges(workSpec.eventsRequestParams) + tmpStat, events = com.get_event_ranges(workSpec.eventsRequestParams, False, os.getcwd()) # failed if tmpStat is False: print ('failed to get events with {0}'.format(events)) diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index ae636d83..6d349a0d 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.2.3" +release_version = "0.2.5" diff --git a/setup.py b/setup.py index 95aebfc7..8ea8ce19 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,8 @@ # optional pip dependencies extras_require={ 'kubernetes': ['kubernetes', 'pyyaml'], - 'mysql': ['mysqlclient'] + 'mysql': ['mysqlclient'], + 'atlasgrid': ['uWSGI >= 2.0.0', 'htcondor >= 8.4.0', 'mysqlclient'], }, data_files=[ diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index 7e49c3a4..bcd62498 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -407,12 +407,8 @@ eventBasedEnable = False # { # "module": "pandaharvester.harvestermonitor.htcondor_monitor", # "name": "HTCondorMonitor", -# "submissionHost_list": [ -# "aipanda023.cern.ch,aipanda023.cern.ch:19618", -# "aipanda024.cern.ch,aipanda024.cern.ch:19618", -# "aipanda156.cern.ch,aipanda156.cern.ch:19618", -# "aipanda183.cern.ch,aipanda183.cern.ch:19618", -# "aipanda184.cern.ch,aipanda184.cern.ch:19618" +# "condorHostConfig_list": [ +# "/opt/harvester/etc/panda/condor_host_config.json" # ] # } # ] @@ -474,6 +470,28 @@ voms = atlas:/atlas/Role=production atlas:/atlas/Role=pilot +# plugin configs in json +# pluginConfigs = +# [ +# { +# "module": "pandaharvester.harvestercredmanager.no_voms_cred_manager", +# "name": "NoVomsCredManager", +# "configs": { +# "production": { +# "inCertFile": "/data/atlpan/proxy/atlpilo1RFC.plain", +# "outCertFile": "/data/atlpan/proxy/x509up_u25606_prod", +# "voms": "atlas:/atlas/Role=production" +# }, +# "pilot": { +# "inCertFile": "/data/atlpan/proxy/atlpilo1RFC.plain", +# "outCertFile": "/data/atlpan/proxy/x509up_u25606_pilot", +# "voms": "atlas:/atlas/Role=pilot" +# } +# } +# } +# ] + + # sleep interval in sec sleepTime = 1800 @@ -599,9 +617,9 @@ sleepTime = 60 # It also caches proxy files which are renewed by Credential Manager. Access key for BNL object store # is retrieved from panda. data = - ddmendpoints_objectstores.json||http://atlas-agis-api.cern.ch/request/ddmendpoint/query/list/?json&state=ACTIVE&site_state=ACTIVE&preset=dict&json_pretty=1&type[]=OS_LOGS&type[]=OS_ES - panda_queues.json||http://atlas-agis-api.cern.ch/request/pandaqueue/query/list/?json&preset=schedconf.all&vo_name=atlas - agis_ddmendpoints.json||http://atlas-agis-api.cern.ch/request/ddmendpoint/query/list/?json&state=ACTIVE&site_state=ACTIVE&preset=dict&json_pretty=1 + ddmendpoints_objectstores.json||https://atlas-cric.cern.ch/api/atlas/ddmendpoint/query/?json&state=ACTIVE&site_state=ACTIVE&preset=dict&json_pretty=1&type[]=OS_LOGS&type[]=OS_ES + panda_queues.json||https://atlas-cric.cern.ch/api/atlas/pandaqueue/query/?json + agis_ddmendpoints.json||https://atlas-cric.cern.ch/api/atlas/ddmendpoint/query/list/?json&state=ACTIVE&site_state=ACTIVE&preset=dict&json_pretty=1 proxy_pilot||file://path_to/FIXME_proxy_pilot proxy_production||file://path_to/FIXME_proxy_production resource_types.json||panda_server:get_resource_types