diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 6780dbd4..fad0b97f 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "14-08-2020 09:04:50 on release (by fahui)" +timestamp = "15-01-2021 05:19:17 on release (by fahui)" diff --git a/pandaharvester/harvesterbody/cred_manager.py b/pandaharvester/harvesterbody/cred_manager.py index 4b1bf187..4be86a38 100644 --- a/pandaharvester/harvesterbody/cred_manager.py +++ b/pandaharvester/harvesterbody/cred_manager.py @@ -155,8 +155,10 @@ def run(self): while True: # update plugin cores from queue config self.update_cores_from_queue_config() + # execute - self.execute() + self.execute() # this is the main run + # check if being terminated if self.terminated(harvester_config.credmanager.sleepTime, randomize=False): return @@ -199,3 +201,35 @@ def execute(self): except Exception: core_utils.dump_error_message(mainLog) mainLog.debug('done') + + # monit main + def execute_monit(self): + self.update_cores_from_queue_config() + + metrics = {} + # loop over all plugins + for exeCore in itertools.chain(self.exeCores, self.queue_exe_cores): + # do nothing + if exeCore is None: + continue + + # make logger + if hasattr(exeCore, 'setup_name'): + credmanager_name = exeCore.setup_name + else: + credmanager_name = '{0} {1}'.format(exeCore.inCertFile, exeCore.outCertFile) + + subLog = self.make_logger(_logger, '{0} {1}'.format(exeCore.__class__.__name__, credmanager_name), + method_name='execute_monit') + try: + # check credential + subLog.debug('check credential lifetime') + lifetime = exeCore.check_credential_lifetime() + if lifetime is not None: + metrics[exeCore.outCertFile] = lifetime + except Exception: + core_utils.dump_error_message(subLog) + + subLog.debug('done') + + return metrics diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index fb41b22b..d4750199 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -403,10 +403,10 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False isCheckedList.append(isChecked) if monStatus == WorkSpec.ST_failed: if not workSpec.has_pilot_error() and workSpec.errorCode is None: - workSpec.set_pilot_error(PilotErrors.ERR_GENERALERROR, diagMessage) + workSpec.set_pilot_error(PilotErrors.GENERALERROR, diagMessage) elif monStatus == WorkSpec.ST_cancelled: if not workSpec.has_pilot_error() and workSpec.errorCode is None: - workSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL, diagMessage) + workSpec.set_pilot_error(PilotErrors.PANDAKILL, diagMessage) if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]: workSpec.set_work_params({'finalMonStatus': monStatus}) # request events @@ -658,7 +658,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, self.dbProxy.kill_worker(workSpec.workerID) newStatus = WorkSpec.ST_cancelled diagMessage = 'Killed by Harvester due to consecutive worker check failures. ' + diagMessage - workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage) + workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) else: # use original status newStatus = workSpec.status @@ -673,7 +673,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, workerID, workerQueueTimeLimit)) self.dbProxy.kill_worker(workSpec.workerID) diagMessage = 'Killed by Harvester due to worker queuing too long. ' + diagMessage - workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage) + workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) # set closed workSpec.set_pilot_closed() # expired heartbeat - only when requested in the configuration @@ -693,7 +693,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, workerID)) self.dbProxy.kill_worker(workSpec.workerID) diagMessage = 'Killed by Harvester due to worker heartbeat expired. ' + diagMessage - workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage) + workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) # get work attributes workAttributes = messenger.get_work_attributes(workSpec) retMap[workerID]['workAttributes'] = workAttributes diff --git a/pandaharvester/harvesterbody/preparator.py b/pandaharvester/harvesterbody/preparator.py index b49c27b5..58386fa4 100644 --- a/pandaharvester/harvesterbody/preparator.py +++ b/pandaharvester/harvesterbody/preparator.py @@ -98,6 +98,8 @@ def run(self): 'subStatus': oldSubStatus}) tmpLog.error('failed to resolve input file paths : {0}'.format(tmpStr)) continue + # manipulate container-related job params + jobSpec.manipulate_job_params_for_container() # update job jobSpec.lockedBy = None jobSpec.set_all_input_ready() @@ -131,7 +133,7 @@ def run(self): jobSpec.preparatorTime = None jobSpec.stateChangeTime = datetime.datetime.utcnow() errStr = 'stage-in failed with {0}'.format(tmpStr) - jobSpec.set_pilot_error(PilotErrors.ERR_STAGEINFAILED, errStr) + jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr) jobSpec.trigger_propagation() self.dbProxy.update_job(jobSpec, {'lockedBy': lockedBy, 'subStatus': oldSubStatus}) @@ -294,7 +296,7 @@ def run(self): jobSpec.preparatorTime = None jobSpec.stateChangeTime = datetime.datetime.utcnow() errStr = 'stage-in failed with {0}'.format(tmpStr) - jobSpec.set_pilot_error(PilotErrors.ERR_STAGEINFAILED, errStr) + jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr) jobSpec.trigger_propagation() self.dbProxy.update_job(jobSpec, {'lockedBy': lockedBy, 'subStatus': oldSubStatus}) diff --git a/pandaharvester/harvesterbody/propagator.py b/pandaharvester/harvesterbody/propagator.py index 2396591d..df118a52 100644 --- a/pandaharvester/harvesterbody/propagator.py +++ b/pandaharvester/harvesterbody/propagator.py @@ -112,8 +112,8 @@ def run(self): # no workers tmpJobSpec.status = 'cancelled' tmpJobSpec.subStatus = 'killed' - tmpJobSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL, - PilotErrors.pilotError[PilotErrors.ERR_PANDAKILL]) + tmpJobSpec.set_pilot_error(PilotErrors.PANDAKILL, + PilotErrors.pilot_error_msg[PilotErrors.PANDAKILL]) tmpJobSpec.stateChangeTime = datetime.datetime.utcnow() tmpJobSpec.trigger_propagation() self.dbProxy.update_job(tmpJobSpec, {'propagatorLock': self.get_pid()}, diff --git a/pandaharvester/harvesterbody/service_monitor.py b/pandaharvester/harvesterbody/service_monitor.py index 658607a5..214f299c 100644 --- a/pandaharvester/harvesterbody/service_monitor.py +++ b/pandaharvester/harvesterbody/service_monitor.py @@ -12,6 +12,8 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy from pandaharvester.harvestercore.service_metrics_spec import ServiceMetricSpec +from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper +from pandaharvester.harvesterbody.cred_manager import CredManager # logger _logger = core_utils.setup_logger('service_monitor') @@ -37,6 +39,8 @@ def __init__(self, pid_file, single_mode=False): self.children = self.master_process.children(recursive=True) self.cpu_count = multiprocessing.cpu_count() + self.queue_config_mapper = QueueConfigMapper() + self.cred_manager = CredManager(self.queue_config_mapper, single_mode=True) def get_master_pid(self): """ @@ -122,6 +126,14 @@ def volume_use(self, volume_name): return used_amount_float + def cert_validities(self): + try: + cert_validities = self.cred_manager.execute_monit() + return cert_validities + except Exception: + _logger.error('Could not extract ') + return {} + # main loop def run(self): while True: @@ -146,6 +158,11 @@ def run(self): _logger.debug('Disk usage of {0}: {1} %'.format(volume, volume_use)) service_metrics['volume_{0}_pc'.format(volume)] = volume_use + # get certificate validities. Not all plugins have implemented it + _logger.debug('Getting cert validities') + service_metrics['cert_lifetime'] = self.cert_validities() + _logger.debug('Got cert validities: {0}'.format(service_metrics['cert_lifetime'])) + service_metrics_spec = ServiceMetricSpec(service_metrics) self.db_proxy.insert_service_metrics(service_metrics_spec) diff --git a/pandaharvester/harvesterbody/stager.py b/pandaharvester/harvesterbody/stager.py index bc3b248a..c333c480 100644 --- a/pandaharvester/harvesterbody/stager.py +++ b/pandaharvester/harvesterbody/stager.py @@ -80,7 +80,7 @@ def run(self): if fileSpec.status != 'finished': fileSpec.status = 'failed' errStr = 'stage-out failed with {0}'.format(tmpStr) - jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr) + jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr) jobSpec.trigger_propagation() newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy) tmpLog.debug('updated new subStatus={0}'.format(newSubStatus)) @@ -147,7 +147,7 @@ def run(self): if fileSpec.status != 'finished': fileSpec.status = 'failed' errStr = 'stage-out failed with {0}'.format(tmpStr) - jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr) + jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr) jobSpec.trigger_propagation() newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy) tmpLog.debug('updated new subStatus={0}'.format(newSubStatus)) @@ -237,7 +237,7 @@ def run(self): if fileSpec.status == 'zipping': fileSpec.status = 'failed' errStr = 'zip-output failed with {0}'.format(tmpStr) - jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr) + jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr) jobSpec.trigger_propagation() newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy) tmpLog.debug('updated new subStatus={0}'.format(newSubStatus)) @@ -304,7 +304,7 @@ def run(self): if fileSpec.status == 'post_zipping': fileSpec.status = 'failed' errStr = 'post-zipping failed with {0}'.format(tmpStr) - jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr) + jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr) jobSpec.trigger_propagation() newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy) tmpLog.debug('updated new subStatus={0}'.format(newSubStatus)) diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index 4f2b3580..95216e2a 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -217,7 +217,7 @@ def run(self): job_spec.stateChangeTime = timeNow job_spec.locked_by = None errStr = 'failed to make a worker' - job_spec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr) + job_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr) job_spec.trigger_propagation() self.dbProxy.update_job(job_spec, {'locked_by': locked_by, 'subStatus': 'prepared'}) @@ -318,7 +318,7 @@ def run(self): tmp_log.error(errStr) work_spec.set_status(WorkSpec.ST_missed) work_spec.set_dialog_message(tmpStr) - work_spec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr) + work_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr) work_spec.set_pilot_closed() if jobList is not None: # increment attempt number @@ -363,7 +363,8 @@ def run(self): 'taskID': job_spec.taskID, 'jobsetID': job_spec.jobParams['jobsetID'], 'nRanges': max(int(math.ceil(work_spec.nCore / len(jobList))), - job_spec.jobParams['coreCount']), + job_spec.jobParams['coreCount']) * \ + queue_config.initEventsMultipler, } if 'isHPO' in job_spec.jobParams: if 'sourceURL' in job_spec.jobParams: @@ -422,7 +423,9 @@ def run(self): + sw_main.get_elapsed_time()) main_log.debug('done') # define sleep interval - if site_name is None: + if site_name is None or \ + (hasattr(harvester_config.submitter, 'respectSleepTime') and + harvester_config.submitter.respectSleepTime): sleepTime = harvester_config.submitter.sleepTime else: sleepTime = 0 diff --git a/pandaharvester/harvesterbody/sweeper.py b/pandaharvester/harvesterbody/sweeper.py index fa442441..a96e440d 100644 --- a/pandaharvester/harvesterbody/sweeper.py +++ b/pandaharvester/harvesterbody/sweeper.py @@ -1,3 +1,9 @@ +import os +try: + from os import walk +except ImportError: + from scandir import walk + from future.utils import iteritems from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore import core_utils @@ -165,6 +171,60 @@ def run(self): # delete orphaned job info self.dbProxy.delete_orphaned_job_info() mainLog.debug('done deletion of old jobs' + sw_delete.get_elapsed_time()) + # disk cleanup + if hasattr(harvester_config.sweeper, 'diskCleanUpInterval') and \ + hasattr(harvester_config.sweeper, 'diskHighWatermark'): + locked = self.dbProxy.get_process_lock('sweeper', self.get_pid(), + harvester_config.sweeper.diskCleanUpInterval*60*60) + if locked: + try: + all_active_files = None + for item in harvester_config.sweeper.diskHighWatermark.split(','): + # dir name and watermark in GB + dir_name, watermark = item.split('|') + mainLog.debug('checking {0} for cleanup with watermark {1} GB'.format(dir_name, watermark)) + watermark = int(watermark) * 10**9 + total_size = 0 + file_dict = {} + # scan dir + for root, dirs, filenames in walk(dir_name): + for base_name in filenames: + full_name = os.path.join(root, base_name) + f_size = os.path.getsize(full_name) + total_size += f_size + mtime = os.path.getmtime(full_name) + file_dict.setdefault(mtime, set()) + file_dict[mtime].add((base_name, full_name, f_size)) + # delete if necessary + if total_size < watermark: + mainLog.debug( + 'skip cleanup {0} due to total_size {1} GB < watermark {2} GB'.format( + dir_name, total_size//(10**9), watermark//(10**9))) + else: + mainLog.debug( + 'cleanup {0} due to total_size {1} GB >= watermark {2} GB'.format( + dir_name, total_size//(10**9), watermark//(10**9))) + # get active input files + if all_active_files is None: + all_active_files = self.dbProxy.get_all_active_input_files() + deleted_size = 0 + mtimes = sorted(file_dict.keys()) + for mtime in mtimes: + for base_name, full_name, f_size in file_dict[mtime]: + # keep if active + if base_name in all_active_files: + continue + try: + os.remove(full_name) + except Exception: + core_utils.dump_error_message(mainLog) + deleted_size += f_size + if total_size - deleted_size < watermark: + break + if total_size - deleted_size < watermark: + break + except Exception: + core_utils.dump_error_message(mainLog) # time the cycle mainLog.debug('done a sweeper cycle' + sw_main.get_elapsed_time()) # check if being terminated diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index bc27a116..1cf9dcef 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -215,11 +215,11 @@ def define_num_workers(self, static_num_workers, site_name): simple_rt_nw_list = [] for job_type in _d: # jt: job type for resource_type in _d[job_type]: # rt: resource type - simple_rt_nw_list.append([resource_type, _d[job_type][resource_type].get('nNewWorkers', 0), 0]) + simple_rt_nw_list.append([(resource_type, job_type), _d[job_type][resource_type].get('nNewWorkers', 0), 0]) _countdown = n_new_workers_max_agg for _rt_list in simple_rt_nw_list: - resource_type, n_new_workers_orig, _r = _rt_list + (resource_type, job_type), n_new_workers_orig, _r = _rt_list n_new_workers, remainder = divmod(n_new_workers_orig * n_new_workers_max_agg, total_new_workers_rts) dyn_num_workers[queue_name][job_type].setdefault(resource_type, @@ -230,7 +230,7 @@ def define_num_workers(self, static_num_workers, site_name): _countdown -= n_new_workers _s_list = sorted(simple_rt_nw_list, key=(lambda x: x[1])) sorted_rt_nw_list = sorted(_s_list, key=(lambda x: x[2]), reverse=True) - for resource_type, n_new_workers_orig, remainder in sorted_rt_nw_list: + for (resource_type, job_type), n_new_workers_orig, remainder in sorted_rt_nw_list: if _countdown <= 0: break dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] += 1 diff --git a/pandaharvester/harvestercommunicator/panda_communicator.py b/pandaharvester/harvestercommunicator/panda_communicator.py index bfb05aa0..b61d5bb5 100644 --- a/pandaharvester/harvestercommunicator/panda_communicator.py +++ b/pandaharvester/harvestercommunicator/panda_communicator.py @@ -289,7 +289,7 @@ def update_jobs(self, jobspec_list, id): try: retMap = json.loads(retMap['content']) except Exception: - errStr = 'falied to load json' + errStr = 'failed to json_load {}'.format(str(retMap)) retMap = {} retMap['StatusCode'] = 999 retMap['ErrorDiag'] = errStr diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index c11ad9c8..8f788a5a 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -840,11 +840,11 @@ def update_worker(self, workspec, criteria=None): return None # fill panda queue table - def fill_panda_queue_table(self, panda_queue_list, queue_config_mapper): + def fill_panda_queue_table(self, panda_queue_list, queue_config_mapper, refill_table=False): try: # get logger tmpLog = core_utils.make_logger(_logger, method_name='fill_panda_queue_table') - tmpLog.debug('start') + tmpLog.debug('start, refill={0}'.format(refill_table)) # get existing queues sqlE = "SELECT queueName FROM {0} ".format(pandaQueueTableName) varMap = dict() @@ -867,11 +867,20 @@ def fill_panda_queue_table(self, panda_queue_list, queue_config_mapper): # check if already exist sqlC = "SELECT * FROM {0} ".format(pandaQueueTableName) sqlC += "WHERE queueName=:queueName " + sqlC += " AND resourceType=:resourceType AND jobType=:jobType " varMap = dict() varMap[':queueName'] = queueName + varMap[':resourceType'] = PandaQueueSpec.RT_catchall + varMap[':jobType'] = PandaQueueSpec.JT_catchall self.execute(sqlC, varMap) resC = self.cur.fetchone() - if resC is not None: + if refill_table: + sqlD = "DELETE FROM {0} ".format(pandaQueueTableName) + sqlD += "WHERE queueName=:queueName " + varMap = dict() + varMap[':queueName'] = queueName + self.execute(sqlD, varMap) + if resC is not None and not refill_table: # update limits just in case varMap = dict() sqlU = "UPDATE {0} SET ".format(pandaQueueTableName) @@ -5544,3 +5553,33 @@ def kill_workers_by_query(self, params): core_utils.dump_error_message(_logger) # return return None + + # get all active input files + def get_all_active_input_files(self): + try: + # get logger + tmpLog = core_utils.make_logger(_logger, + method_name='get_all_active_input_files') + tmpLog.debug('start') + # sql to get files + sqlF = "SELECT lfn FROM {0} ".format(fileTableName) + sqlF += "WHERE fileType IN (:type1,:type2) " + # get files + varMap = dict() + varMap[':type1'] = 'input' + varMap[':type2'] = FileSpec.AUX_INPUT + self.execute(sqlF, varMap) + ret = set() + for lfn, in self.cur.fetchall(): + ret.add(lfn) + # commit + self.commit() + tmpLog.debug('got {0} files'.format(len(ret))) + return ret + except Exception: + # roll back + self.rollback() + # dump error + core_utils.dump_error_message(_logger) + # return + return set() diff --git a/pandaharvester/harvestercore/fifos.py b/pandaharvester/harvestercore/fifos.py index a6557e3a..d90da70a 100644 --- a/pandaharvester/harvestercore/fifos.py +++ b/pandaharvester/harvestercore/fifos.py @@ -378,10 +378,10 @@ def to_check_workers(self, check_interval=harvester_config.monitor.checkInterval overhead_time = None else: mainLog.debug('True') - mainLog.info('Overhead time is {0} sec'.format(overhead_time)) + mainLog.info('Overhead time is {0:.3f} sec'.format(overhead_time)) else: mainLog.debug('False. Workers too young to check') - mainLog.debug('Overhead time is {0} sec'.format(overhead_time)) + mainLog.debug('Overhead time is {0:.3f} sec'.format(overhead_time)) else: mainLog.debug('False. Got nothing in FIFO') return retVal, overhead_time diff --git a/pandaharvester/harvestercore/job_spec.py b/pandaharvester/harvestercore/job_spec.py index 05848a10..c00ef346 100644 --- a/pandaharvester/harvestercore/job_spec.py +++ b/pandaharvester/harvestercore/job_spec.py @@ -550,3 +550,24 @@ def get_pilot_type(self): return 'PR' else: return None + + # manipulate job parameters related to container + def manipulate_job_params_for_container(self): + updated = False + for fileSpec in self.inFiles: + for k, v in iteritems(self.jobParams): + # only container image + if k == 'container_name': + if v == fileSpec.url: + self.jobParams[k] = fileSpec.path + updated = True + elif k == 'containerOptions': + for kk, vv in iteritems(v): + if kk == "containerImage": + if vv == fileSpec.url: + self.jobParams[k][kk] = fileSpec.path + updated = True + continue + # trigger updating + if updated: + self.force_update('jobParams') diff --git a/pandaharvester/harvestercore/pilot_errors.py b/pandaharvester/harvestercore/pilot_errors.py index fc13ff80..5bf3ef91 100644 --- a/pandaharvester/harvestercore/pilot_errors.py +++ b/pandaharvester/harvestercore/pilot_errors.py @@ -1,296 +1,14 @@ -# Error codes : Taken from pilot1. To be removed once pilot2 API is ready +from pilot.common.errorcodes import ErrorCodes as PilotErrorCodesObj +from pilot.util import auxiliary as PilotAux -class PilotErrors(object): - """ Pilot error handling """ - - # error codes - ERR_UNKNOWNERROR = 0 - ERR_GENERALERROR = 1008 - ERR_DIRECTIOFILE = 1009 # harmless, just means that copy-to-scratch was skipped in favor or direct i/o access - ERR_GETDATAEXC = 1097 - ERR_NOLOCALSPACE = 1098 - ERR_STAGEINFAILED = 1099 - ERR_REPNOTFOUND = 1100 - ERR_LRCREGCONNREF = 1101 - ERR_NOSUCHFILE = 1103 - ERR_USERDIRTOOLARGE = 1104 - ERR_LFCADDCSUMFAILED = 1105 - ERR_STDOUTTOOBIG = 1106 - ERR_MISSDBREL = 1107 - ERR_FAILEDLCGREG = 1108 - ERR_CMTCONFIG = 1109 - ERR_SETUPFAILURE = 1110 - ERR_RUNJOBEXC = 1111 - ERR_PILOTEXC = 1112 - ERR_GETLFCIMPORT = 1113 - ERR_PUTLFCIMPORT = 1114 - ERR_NFSSQLITE = 1115 - ERR_QUEUEDATA = 1116 - ERR_QUEUEDATANOTOK = 1117 - ERR_CURLSPACE = 1118 - ERR_DDMSPACE = 1119 - ERR_NOSTMATCHDEST = 1120 # not used - ERR_NOLFCSFN = 1122 - ERR_MISSINGGUID = 1123 - ERR_OUTPUTFILETOOLARGE = 1124 - ERR_NOPFC = 1130 - ERR_PUTFUNCNOCALL = 1131 - ERR_LRCREG = 1132 - ERR_NOSTORAGE = 1133 - ERR_MKDIR = 1134 - ERR_FAILEDSIZELOCAL = 1135 - ERR_FAILEDMD5LOCAL = 1136 - ERR_STAGEOUTFAILED = 1137 - ERR_FAILEDSIZE = 1138 - ERR_PUTWRONGSIZE = 1139 - ERR_FAILEDMD5 = 1140 - ERR_PUTMD5MISMATCH = 1141 - ERR_CHMODTRF = 1143 - ERR_PANDAKILL = 1144 - ERR_GETMD5MISMATCH = 1145 - ERR_DYNTRFINST = 1146 - ERR_FAILEDRM = 1148 - ERR_TRFDOWNLOAD = 1149 - ERR_LOOPINGJOB = 1150 - ERR_GETTIMEOUT = 1151 - ERR_PUTTIMEOUT = 1152 - ERR_LOSTJOBNOTFINISHED = 1153 - ERR_LOSTJOBLOGREG = 1154 - ERR_LOSTJOBFILETRANSFER = 1155 - ERR_LOSTJOBRECOVERY = 1156 - ERR_LOSTJOBMAXEDOUT = 1158 - ERR_LOSTJOBPFC = 1159 - ERR_LRCREGSTRSIZE = 1160 - ERR_LOSTJOBXML = 1161 - ERR_LRCREGDUP = 1162 - ERR_NOPROXY = 1163 - ERR_MISSINGLOCALFILE = 1164 - ERR_MISSINGOUTPUTFILE = 1165 - ERR_SIGPIPE = 1166 - ERR_MISSFILEXML = 1167 - ERR_SIZETOOLARGE = 1168 - ERR_FAILEDLFCREG = 1169 - ERR_FAILEDADLOCAL = 1170 - ERR_GETADMISMATCH = 1171 - ERR_PUTADMISMATCH = 1172 - ERR_PANDAMOVERFILENOTCACHED = 1173 - ERR_PANDAMOVERTRANSFER = 1174 - ERR_GETWRONGSIZE = 1175 - ERR_NOCHILDPROCESSES = 1176 - ERR_NOVOMSPROXY = 1177 - ERR_NOSTAGEDFILES = 1178 - ERR_FAILEDLFCGETREPS = 1179 - ERR_GETGLOBUSSYSERR = 1180 - ERR_PUTGLOBUSSYSERR = 1181 - ERR_FAILEDLFCGETREP = 1182 - ERR_GUIDSEXISTSINLRC = 1183 - ERR_MISSINGPFC = 1184 - ERR_NOSOFTWAREDIR = 1186 - ERR_NOPAYLOADMETADATA = 1187 - ERR_LCGGETTURLS = 1188 - ERR_LCGGETTURLSTIMEOUT = 1189 - ERR_LFNTOOLONG = 1190 - ERR_ZEROFILESIZE = 1191 - ERR_DBRELNOTYETTRANSFERRED = 1192 - ERR_SEPROBLEM = 1193 - ERR_NOFILEVERIFICATION = 1194 - ERR_COMMANDTIMEOUT = 1195 - ERR_GETFAILEDTOMOUNTNFS4 = 1196 - ERR_GETPNFSSYSTEMERROR = 1197 - ERR_MKDIRWORKDIR = 1199 - ERR_KILLSIGNAL = 1200 - ERR_SIGTERM = 1201 - ERR_SIGQUIT = 1202 - ERR_SIGSEGV = 1203 - ERR_SIGXCPU = 1204 -# ERR_USERKILL = 1205 # not used by pilot - ERR_SIGBUS = 1206 - ERR_SIGUSR1 = 1207 - ERR_NOPAYLOADOUTPUT = 1210 - ERR_MISSINGINSTALLATION = 1211 - ERR_PAYLOADOUTOFMEMORY = 1212 - ERR_REACHEDMAXTIME = 1213 - ERR_DAFSNOTALLOWED = 1214 - ERR_NOTCPCONNECTION = 1215 - ERR_NOPILOTTCPSERVER = 1216 - ERR_CORECOUNTMISMATCH = 1217 - ERR_RUNEVENTEXC = 1218 - ERR_UUIDGEN = 1219 - ERR_UNKNOWN = 1220 - ERR_FILEEXIST = 1221 - ERR_GETKEYPAIR = 1222 - ERR_BADALLOC = 1223 - ERR_ESRECOVERABLE = 1224 - ERR_ESMERGERECOVERABLE = 1225 - ERR_GLEXEC = 1226 - ERR_ESATHENAMPDIED = 1227 - ERR_ESFATAL = 1228 - ERR_TEFATAL = 1229 - ERR_TEBADURL = 1230 - ERR_TEINVALIDGUID = 1231 - ERR_TEWRONGGUID = 1232 - ERR_TEHOSTNAME = 1233 - ERR_EXECUTEDCLONEJOB = 1234 - ERR_PAYLOADEXCEEDMAXMEM = 1235 - ERR_FAILEDBYSERVER = 1236 - ERR_ESKILLEDBYSERVER = 1237 - ERR_NOEVENTS = 1238 - ERR_OVERSUBSCRIBEDEVENTS = 1239 - ERR_ESMESSAGESERVER = 1240 - ERR_ESOBJECTSTORESETUP = 1241 - ERR_CHKSUMNOTSUP = 1242 - ERR_ESPREFETCHERDIED = 1243 - ERR_NORELEASEFOUND = 1244 - ERR_TOOFEWEVENTS = 1245 - # internal error codes - ERR_DDMREG = 1 - ERR_FILEONTAPE = 2 +class PilotErrors(PilotErrorCodesObj): + """ Pilot error handling """ - pilotError = { - ERR_UNKNOWNERROR : "", - ERR_GENERALERROR : "General pilot error, consult batch log", - ERR_GETDATAEXC : "Get function can not be called for staging input file", - ERR_NOLOCALSPACE : "No space left on local disk", - ERR_STAGEINFAILED : "Get error: Staging input file failed", - ERR_REPNOTFOUND : "Get error: Replica not found", - ERR_LRCREGCONNREF : "LRC registration error: Connection refused", - # 1102 : "Expected output file does not exist", # not used, see ERR_MISSINGOUTPUTFILE below - ERR_NOSUCHFILE : "No such file or directory", - ERR_USERDIRTOOLARGE : "User work directory too large", - ERR_LFCADDCSUMFAILED : "Put error: Failed to add file size and checksum to LFC", - ERR_STDOUTTOOBIG : "Payload stdout file too big", - ERR_MISSDBREL : "Get error: Missing DBRelease file", - ERR_FAILEDLCGREG : "Put error: LCG registration failed", - ERR_CMTCONFIG : "Required CMTCONFIG incompatible with WN", - ERR_SETUPFAILURE : "Failed during setup", - ERR_RUNJOBEXC : "Exception caught by RunJob*", - ERR_PILOTEXC : "Exception caught by pilot", - ERR_GETLFCIMPORT : "Get error: Failed to import LFC python module", - ERR_PUTLFCIMPORT : "Put error: Failed to import LFC python module", - ERR_NFSSQLITE : "NFS SQLite locking problems", - ERR_QUEUEDATA : "Pilot could not download queuedata", - ERR_QUEUEDATANOTOK : "Pilot found non-valid queuedata", - ERR_CURLSPACE : "Pilot could not curl space report", - ERR_DDMSPACE : "Pilot aborted due to DDM space shortage", - ERR_NOSTMATCHDEST : "Space token descriptor does not match destination path", - # 1121 : "Can not read the xml file for registering output files to dispatcher", # not used - ERR_NOLFCSFN : "Bad replica entry returned by lfc_getreplicas(): SFN not set in LFC for this guid", - ERR_MISSINGGUID : "Missing guid in output file list", - ERR_OUTPUTFILETOOLARGE : "Output file too large", - ERR_NOPFC : "Get error: Failed to get PoolFileCatalog", - ERR_PUTFUNCNOCALL : "Put function can not be called for staging out", - ERR_LRCREG : "LRC registration error (consult log file)", - ERR_NOSTORAGE : "Put error: Fetching default storage URL failed", - ERR_MKDIR : "Put error: Error in mkdir on localSE, not allowed or no available space", - ERR_FAILEDSIZELOCAL : "Could not get file size in job workdir", - ERR_FAILEDMD5LOCAL : "Error running md5sum on the file in job workdir", - ERR_STAGEOUTFAILED : "Put error: Error in copying the file from job workdir to localSE", - ERR_FAILEDSIZE : "Put error: could not get the file size on localSE", - ERR_PUTWRONGSIZE : "Put error: Problem with copying from job workdir to local SE: size mismatch", - ERR_FAILEDMD5 : "Put error: Error running md5sum on the file on local SE", - ERR_PUTMD5MISMATCH : "Put error: Problem with copying from job workdir to local SE: md5sum mismatch", - # 1142 : "Put error: failed to register the file on local SE", # not used - ERR_CHMODTRF : "Failed to chmod trf", - ERR_PANDAKILL : "This job was killed by panda server", - ERR_GETMD5MISMATCH : "Get error: md5sum mismatch on input file", - ERR_DYNTRFINST : "Trf installation dir does not exist and could not be installed", - # 1147 : "Put error: dccp returned readOnly", # not used - ERR_FAILEDRM : "Put error: Failed to remove readOnly file in dCache", - ERR_TRFDOWNLOAD : "wget command failed to download trf", - ERR_LOOPINGJOB : "Looping job killed by pilot", - ERR_GETTIMEOUT : "Get error: Input file staging timed out", - ERR_PUTTIMEOUT : "Put error: File copy timed out", - ERR_LOSTJOBNOTFINISHED : "Lost job was not finished", - ERR_LOSTJOBLOGREG : "Failed to register log file", - ERR_LOSTJOBFILETRANSFER : "Failed to move output files for lost job", - ERR_LOSTJOBRECOVERY : "Pilot could not recover job", - # 1157 : "Could not create log file", # not used - ERR_LOSTJOBMAXEDOUT : "Reached maximum number of recovery attempts", - ERR_LOSTJOBPFC : "Job recovery could not read PoolFileCatalog.xml file (guids lost)", - ERR_LRCREGSTRSIZE : "LRC registration error: file name string size exceeded limit of 250", - ERR_LOSTJOBXML : "Job recovery could not generate xml for remaining output files", - ERR_LRCREGDUP : "LRC registration error: Non-unique LFN", - ERR_NOPROXY : "Grid proxy not valid", - ERR_MISSINGLOCALFILE : "Get error: Local input file missing", - ERR_MISSINGOUTPUTFILE : "Put error: Local output file missing", - ERR_SIGPIPE : "Put error: File copy broken by SIGPIPE", - ERR_MISSFILEXML : "Get error: Input file missing in PoolFileCatalog.xml", - ERR_SIZETOOLARGE : "Get error: Total file size too large", - ERR_FAILEDLFCREG : "Put error: File registration failed", - ERR_FAILEDADLOCAL : "Error running adler32 on the file in job workdir", - ERR_GETADMISMATCH : "Get error: adler32 mismatch on input file", - ERR_PUTADMISMATCH : "Put error: adler32 mismatch on output file", - ERR_PANDAMOVERFILENOTCACHED : "PandaMover staging error: File is not cached", - ERR_PANDAMOVERTRANSFER : "PandaMover transfer failure", - ERR_GETWRONGSIZE : "Get error: Problem with copying from local SE to job workdir: size mismatch", - ERR_NOCHILDPROCESSES : "Pilot has no child processes (job wrapper has either crashed or did not send final status)", - ERR_NOVOMSPROXY : "Voms proxy not valid", - ERR_NOSTAGEDFILES : "Get error: No input files are staged", - ERR_FAILEDLFCGETREPS : "Get error: Failed to get replicas", - ERR_GETGLOBUSSYSERR : "Get error: Globus system error", - ERR_PUTGLOBUSSYSERR : "Put error: Globus system error", - ERR_FAILEDLFCGETREP : "Get error: Failed to get replica", - ERR_GUIDSEXISTSINLRC : "LRC registration error: Guid-metadata entry already exists", - ERR_MISSINGPFC : "Put error: PoolFileCatalog could not be found in workdir", - # 1185 : "Put error: Error running adler32 on the file in job workdir", # not used - ERR_NOSOFTWAREDIR : "Software directory does not exist", - ERR_NOPAYLOADMETADATA : "Payload metadata is not available", - ERR_LCGGETTURLS : "lcg-getturls failed", - ERR_LCGGETTURLSTIMEOUT : "lcg-getturls was timed-out", - ERR_LFNTOOLONG : "LFN too long (exceeding limit of 150 characters)", - ERR_ZEROFILESIZE : "Illegal zero file size", - ERR_DBRELNOTYETTRANSFERRED : "DBRelease file has not been transferred yet", - ERR_NOFILEVERIFICATION : "File verification failed", - ERR_COMMANDTIMEOUT : "Command timed out", - ERR_GETFAILEDTOMOUNTNFS4 : "Get error: Failed to mount NSF4", - ERR_GETPNFSSYSTEMERROR : "Get error: PNFS system error", - # 1198 : "Can not check the child process status from the heartbeat process", # not used - ERR_MKDIRWORKDIR : "Could not create directory", - ERR_KILLSIGNAL : "Job terminated by unknown kill signal", - ERR_SIGTERM : "Job killed by signal: SIGTERM", - ERR_SIGQUIT : "Job killed by signal: SIGQUIT", - ERR_SIGSEGV : "Job killed by signal: SIGSEGV", - ERR_SIGXCPU : "Job killed by signal: SIGXCPU", - ERR_SIGUSR1 : "Job killed by signal: SIGUSR1", - ERR_SIGBUS : "Job killed by signal: SIGBUS", - ERR_NOPAYLOADOUTPUT : "No payload output", - ERR_MISSINGINSTALLATION : "Missing installation", - ERR_PAYLOADOUTOFMEMORY : "Payload ran out of memory", - ERR_REACHEDMAXTIME : "Reached batch system time limit", - ERR_DAFSNOTALLOWED : "Site does not allow requested direct access or file stager", - ERR_NOTCPCONNECTION : "Failed to open TCP connection to localhost (worker node network problem)", - ERR_NOPILOTTCPSERVER : "Pilot TCP server has died", - ERR_CORECOUNTMISMATCH : "Mismatch between core count in job and queue definition", - ERR_RUNEVENTEXC : "Exception caught by runEvent", - ERR_UUIDGEN : "uuidgen failed to produce a guid", - ERR_UNKNOWN : "Job failed due to unknown reason (consult log file)", - ERR_FILEEXIST : "File already exist", - ERR_GETKEYPAIR : "Failed to get security key pair", - ERR_BADALLOC : "TRF failed due to bad_alloc", - ERR_ESMERGERECOVERABLE : "Recoverable Event Service Merge error", - ERR_ESRECOVERABLE: "Recoverable Event Service error", - ERR_GLEXEC: "gLExec related error", - ERR_ESATHENAMPDIED: "AthenaMP ended Event Service job prematurely", - ERR_ESFATAL: "Fatal Event Service error", - ERR_TEFATAL: "Fatal Token Extractor error", - ERR_TEHOSTNAME: "Token Extractor error: Host name could not be resolved", - ERR_TEBADURL: "Token Extractor error: Bad URL", - ERR_TEINVALIDGUID: "Token Extractor error: Invalid GUID length", - ERR_TEWRONGGUID: "Token Extractor error: No tokens for this GUID", - ERR_EXECUTEDCLONEJOB: "Already executed clone job", - ERR_PAYLOADEXCEEDMAXMEM: "Payload exceeded maximum allowed memory", - ERR_FAILEDBYSERVER: "Failed by server", - ERR_ESKILLEDBYSERVER: "Event Service job killed by server", - ERR_NOEVENTS: "Event Service no available events", - ERR_OVERSUBSCRIBEDEVENTS: "Event Service over subscribed events", - ERR_ESMESSAGESERVER: "Event service message server error", - ERR_ESOBJECTSTORESETUP: "Event service objectstore setup", - ERR_CHKSUMNOTSUP: "Mover error: query checksum is not supported", - ERR_ESPREFETCHERDIED: "Prefetcher ended Event Service job prematurely", - ERR_NORELEASEFOUND: "No release candidates found", - ERR_TOOFEWEVENTS: "Too few events, less events than minimal requirement", - } + pilot_error_msg = PilotErrorCodesObj._error_messages + pilot_error_msg.update({ + # can have additional error codes here + }) getErrorCodes = [1097, 1099, 1100, 1103, 1107, 1113, 1130, 1145, 1151, 1164, 1167, 1168, 1171, 1175, 1178, 1179, 1180, 1182] putErrorCodes = [1101, 1114, 1122, 1131, 1132, 1133, 1134, 1135, 1136, 1137, 1138, 1140, 1141, 1152, 1154, 1155, 1181] @@ -302,56 +20,50 @@ class PilotErrors(object): # Error codes used with FAX fail-over (only an error code in this list will allow FAX fail-over) PilotFAXErrorCodes = [1103] + PilotResubmissionErrorCodes + # Mapping between payload exit code and pilot errors + pilot_code_dict = PilotAux.get_error_code_translation_dictionary() + avail_exit_codes = [ value[0] for value in pilot_code_dict.values() ] + def getPilotErrorDiag(self, code=0): """ Return text corresponding to error code """ - pilotErrorDiag = "" - if code in self.pilotError.keys(): - pilotErrorDiag = self.pilotError[code] + if code in self.pilot_error_msg.keys(): + pilotErrorDiag = self.pilot_error_msg[code] else: pilotErrorDiag = "Unknown pilot error code" return pilotErrorDiag def isGetErrorCode(self, code=0): """ Determine whether code is in the put error list or not """ - state = False if code in self.getErrorCodes: state = True - return state def isPutErrorCode(self, code=0): """ Determine whether code is in the put error list or not """ - state = False if code in self.putErrorCodes: state = True - return state @classmethod def isRecoverableErrorCode(self, code=0): """ Determine whether code is a recoverable error code or not """ - return code in self.recoverableErrorCodes def isPilotResubmissionErrorCode(self, code=0): """ Determine whether code issues a Pilot-controlled resubmission """ - state = False if code in self.PilotResubmissionErrorCodes: state = True - return state def isPilotFAXErrorCode(self, code=0): """ Determine whether code allows for a FAX fail-over """ - state = False if code in self.PilotFAXErrorCodes: state = True - return state @classmethod @@ -360,7 +72,7 @@ def getErrorStr(self, code): Avoids exception if an error is not in the dictionary. An empty string is returned if the error is not in the dictionary. """ - return self.pilotError.get(code, '') + return self.pilot_error_msg.get(code, '') def getErrorName(self, code): """ From the error code to get the error name""" @@ -369,14 +81,23 @@ def getErrorName(self, code): return k return None + def convertToPilotErrors(self, exit_code): + """ + Convert payload exit code to pilot error code and error dialogue message + """ + pilot_error_code, pilot_error_diag = None, '' + if exit_code in self.avail_exit_codes: + pilot_error_code = PilotAux.convert_to_pilot_error_code(exit_code) + pilot_error_diag = self.getPilotErrorDiag(pilot_error_code) + return pilot_error_code, pilot_error_diag + class PilotException(Exception): - def __init__(self, message, code=PilotErrors.ERR_GENERALERROR, state='', *args): + def __init__(self, message, code=PilotErrors.GENERALERROR, state='', *args): self.code = code self.state = state self.message = message - super(PilotException, self).__init__(*args) @property diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 7466d7a1..3b37996f 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -67,6 +67,7 @@ def __init__(self, queue_name): self.prefetchEvents = True self.uniqueName = None self.configID = None + self.initEventsMultipler = 2 # get list of status without heartbeat def get_no_heartbeat_status(self): @@ -269,7 +270,7 @@ def _get_last_reload_time(self): return timestamp # load data - def load_data(self): + def load_data(self, refill_table=False): mainLog = _make_logger(method_name='QueueConfigMapper.load_data') with self.lock: # check if to update @@ -607,7 +608,7 @@ def load_data(self): self.lastUpdate = datetime.datetime.utcnow() # update database if self.toUpdateDB: - self.dbProxy.fill_panda_queue_table(self.activeQueues.keys(), self) + self.dbProxy.fill_panda_queue_table(self.activeQueues.keys(), self, refill_table=refill_table) mainLog.debug('updated to DB') # done mainLog.debug('done') diff --git a/pandaharvester/harvestercore/service_metrics_spec.py b/pandaharvester/harvestercore/service_metrics_spec.py index 7ca4f8ad..e4c31d81 100644 --- a/pandaharvester/harvestercore/service_metrics_spec.py +++ b/pandaharvester/harvestercore/service_metrics_spec.py @@ -12,7 +12,7 @@ class ServiceMetricSpec(SpecBase): # attributes attributesWithTypes = ('creationTime:timestamp / index', 'hostName:text', - 'metrics:text', + 'metrics:blob', ) # constructor @@ -21,4 +21,4 @@ def __init__(self, service_metrics): self.creationTime = datetime.datetime.utcnow() self.hostName = socket.getfqdn() - self.metrics = json.dumps(service_metrics) \ No newline at end of file + self.metrics = service_metrics # blobs are automatically translated to json \ No newline at end of file diff --git a/pandaharvester/harvestercredmanager/base_cred_manager.py b/pandaharvester/harvestercredmanager/base_cred_manager.py new file mode 100644 index 00000000..e11d5329 --- /dev/null +++ b/pandaharvester/harvestercredmanager/base_cred_manager.py @@ -0,0 +1,21 @@ +from pandaharvester.harvestercore.plugin_base import PluginBase + + +# base credential manager +class BaseCredManager(PluginBase): + + # constructor + def __init__(self, **kwarg): + PluginBase.__init__(self, **kwarg) + + # check credential lifetime for monitoring/alerting purposes + def check_credential_lifetime(self): + return + + # check proxy + def check_credential(self): + return True + + # renew proxy + def renew_credential(self): + return True, '' diff --git a/pandaharvester/harvestercredmanager/dummy_cred_manager.py b/pandaharvester/harvestercredmanager/dummy_cred_manager.py index c4657d61..8145bedd 100644 --- a/pandaharvester/harvestercredmanager/dummy_cred_manager.py +++ b/pandaharvester/harvestercredmanager/dummy_cred_manager.py @@ -1,12 +1,12 @@ -from pandaharvester.harvestercore.plugin_base import PluginBase +from .base_cred_manager import BaseCredManager # dummy credential manager -class DummyCredManager(PluginBase): +class DummyCredManager(BaseCredManager): # constructor def __init__(self, **kwarg): - PluginBase.__init__(self, **kwarg) + BaseCredManager.__init__(self, **kwarg) # check proxy def check_credential(self): diff --git a/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py b/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py index e91d1295..ecb1e2bc 100644 --- a/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py +++ b/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py @@ -5,7 +5,7 @@ from kubernetes import client, config from kubernetes.client.rest import ApiException -from pandaharvester.harvestercore.plugin_base import PluginBase +from .base_cred_manager import BaseCredManager from pandaharvester.harvestercore import core_utils from pandaharvester.harvestermisc.k8s_utils import k8s_Client @@ -15,10 +15,10 @@ # credential manager with k8s secret -class K8sSecretCredManager(PluginBase): +class K8sSecretCredManager(BaseCredManager): # constructor def __init__(self, **kwarg): - PluginBase.__init__(self, **kwarg) + BaseCredManager.__init__(self, **kwarg) # make logger mainLog = self.make_logger(_logger, method_name='__init__') # attributes diff --git a/pandaharvester/harvestercredmanager/no_voms_cred_manager.py b/pandaharvester/harvestercredmanager/no_voms_cred_manager.py index 879d0585..cd22abb2 100644 --- a/pandaharvester/harvestercredmanager/no_voms_cred_manager.py +++ b/pandaharvester/harvestercredmanager/no_voms_cred_manager.py @@ -3,7 +3,7 @@ except Exception: import subprocess -from pandaharvester.harvestercore.plugin_base import PluginBase +from .base_cred_manager import BaseCredManager from pandaharvester.harvestercore import core_utils # logger @@ -11,17 +11,45 @@ # credential manager with no-voms proxy -class NoVomsCredManager(PluginBase): +class NoVomsCredManager(BaseCredManager): # constructor def __init__(self, **kwarg): - PluginBase.__init__(self, **kwarg) + BaseCredManager.__init__(self, **kwarg) + # make logger + main_log = self.make_logger(_logger, method_name='__init__') + # set up with direct attributes + self.setupMap = dict(vars(self)) + # setupMap + self.genFromKeyCert = self.setupMap.get('genFromKeyCert') + self.key = self.setupMap.get('key') + self.cert = self.setupMap.get('cert') + + # check proxy lifetime for monitoring/alerting purposes + def check_credential_lifetime(self): + main_log = self.make_logger(_logger, method_name='check_credential_lifetime') + lifetime = None + try: + command_str = "voms-proxy-info -timeleft -file {0}".format(self.outCertFile) + p = subprocess.Popen(command_str.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + return_code = p.returncode + main_log.debug('retCode={0} stdout={1} stderr={2}'.format(return_code, stdout, stderr)) + if return_code == 0: # OK + lifetime = int(stdout) / 3600 + except Exception: + core_utils.dump_error_message(main_log) + if isinstance(lifetime, float): + main_log.debug('returning lifetime {0:.3f}'.format(lifetime)) + else: + main_log.debug('returning lifetime {0}'.format(lifetime)) + return lifetime # check proxy def check_credential(self): # make logger - mainLog = self.make_logger(_logger, method_name='check_credential') + main_log = self.make_logger(_logger, method_name='check_credential') comStr = "voms-proxy-info -exists -hours 72 -file {0}".format(self.outCertFile) - mainLog.debug(comStr) + main_log.debug(comStr) try: p = subprocess.Popen(comStr.split(), shell=False, @@ -30,19 +58,36 @@ def check_credential(self): stdOut, stdErr = p.communicate() retCode = p.returncode except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) return False - mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) + main_log.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) return retCode == 0 # renew proxy def renew_credential(self): # make logger - mainLog = self.make_logger(_logger, method_name='renew_credential') - comStr = "voms-proxy-init -rfc -noregen -voms {0} -out {1} -valid 96:00 -cert={2} -key={2}".format(self.voms, - self.outCertFile, - self.inCertFile) - mainLog.debug(comStr) + main_log = self.make_logger(_logger, method_name='renew_credential') + # voms or no-voms + voms_option = '' + if self.voms is not None: + voms_option = '-voms {0}'.format(self.voms) + # generate proxy with a long lifetime proxy (default) or from key/cert pair + if self.genFromKeyCert: + noregen_option = '' + usercert_value = self.cert + userkey_value = self.key + else: + noregen_option = '-noregen' + usercert_value = self.inCertFile + userkey_value = self.inCertFile + # command + comStr = "voms-proxy-init -rfc {noregen_option} {voms_option} -out {out} -valid 96:00 -cert={cert} -key={key}".format( + noregen_option=noregen_option, + voms_option=voms_option, + out=self.outCertFile, + cert=usercert_value, + key=userkey_value) + main_log.debug(comStr) try: p = subprocess.Popen(comStr.split(), shell=False, @@ -50,9 +95,9 @@ def renew_credential(self): stderr=subprocess.PIPE) stdOut, stdErr = p.communicate() retCode = p.returncode - mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) + main_log.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) except Exception: stdOut = '' - stdErr = core_utils.dump_error_message(mainLog) + stdErr = core_utils.dump_error_message(main_log) retCode = -1 return retCode == 0, "{0} {1}".format(stdOut, stdErr) diff --git a/pandaharvester/harvestermessenger/shared_file_messenger.py b/pandaharvester/harvestermessenger/shared_file_messenger.py index 122eee3c..164276b9 100644 --- a/pandaharvester/harvestermessenger/shared_file_messenger.py +++ b/pandaharvester/harvestermessenger/shared_file_messenger.py @@ -162,6 +162,7 @@ def __init__(self, **kwarg): self.stripJobParams = False self.scanInPostProcess = False self.leftOverPatterns = None + self.postProcessInSubDir = False BaseMessenger.__init__(self, **kwarg) # get access point @@ -563,6 +564,9 @@ def setup_access_points(self, workspec_list): try: for workSpec in workspec_list: accessPoint = workSpec.get_access_point() + # delete leftover + if os.path.exists(accessPoint) and workSpec.isNew: + shutil.rmtree(accessPoint, ignore_errors=True) # make the dir if missing if not os.path.exists(accessPoint): os.makedirs(accessPoint) @@ -602,6 +606,9 @@ def post_processing(self, workspec, jobspec_list, map_type): break fileDict = dict() accessPoint = self.get_access_point(workspec, jobSpec.PandaID) + origAccessPoint = accessPoint + if self.postProcessInSubDir: + accessPoint = os.path.join(accessPoint, jobSpec.PandaID) # make log if not hasLog: logFileInfo = jobSpec.get_logfile_info() @@ -679,7 +686,7 @@ def post_processing(self, workspec, jobspec_list, map_type): tmpLog.debug('got {0} leftovers'.format(nLeftOvers)) # make json to stage-out if len(fileDict) > 0: - jsonFilePath = os.path.join(accessPoint, jsonOutputsFileName) + jsonFilePath = os.path.join(origAccessPoint, jsonOutputsFileName) with open(jsonFilePath, 'w') as jsonFile: json.dump(fileDict, jsonFile) tmpLog.debug('done') diff --git a/pandaharvester/harvestermisc/htcondor_utils.py b/pandaharvester/harvestermisc/htcondor_utils.py index 4871a77c..969f3c84 100644 --- a/pandaharvester/harvestermisc/htcondor_utils.py +++ b/pandaharvester/harvestermisc/htcondor_utils.py @@ -738,7 +738,8 @@ def submit_with_command(self, jdl_list, use_spool=False, tmp_str='', keep_temp_s errStr = 'no job submitted: {0}'.format(errStr) tmpLog.error(errStr) else: - tmpLog.error('submission failed: {0} ; {1}'.format(stdErr, errStr)) + errStr = '{0} ; {1}'.format(stdErr, errStr) + tmpLog.error('submission failed: {0}'.format(errStr)) # Return return (batchIDs_list, errStr) diff --git a/pandaharvester/harvestermisc/info_utils.py b/pandaharvester/harvestermisc/info_utils.py index dca3615b..10dc0803 100644 --- a/pandaharvester/harvestermisc/info_utils.py +++ b/pandaharvester/harvestermisc/info_utils.py @@ -9,7 +9,7 @@ class PandaQueuesDict(dict, PluginBase): """ Dictionary of PanDA queue info from DB by cacher - Key is PanDA Resouce name (rather than PanDA Queue name) + Key is PanDA Resource name (rather than PanDA Queue name) Able to query with either PanDA Queue name or PanDA Resource name """ def __init__(self, **kwarg): @@ -124,3 +124,18 @@ def get_type_workflow(self, panda_resource): pq_type = 'production' workflow = panda_queue_dict.get('workflow') return pq_type, workflow + + def get_prorated_maxwdir_GB(self, panda_resource, worker_corecount): + try: + panda_queue_dict = self.get(panda_resource) + maxwdir = panda_queue_dict.get('maxwdir') / 1000 # convert to GB + corecount = panda_queue_dict.get('corecount') + if panda_queue_dict.get('capability') == 'ucore': + maxwdir_prorated = maxwdir * worker_corecount / corecount + else: + maxwdir_prorated = maxwdir + except Exception: + maxwdir_prorated = 0 + + return maxwdir_prorated + diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 2dfbeb8e..2213e81a 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -146,6 +146,16 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, conta container_env.setdefault('volumeMounts', []) container_env['volumeMounts'].append({'name': 'job-config', 'mountPath': CONFIG_DIR}) + # if we are running the pilot in a emptyDir with "pilot-dir" name, then set the max size + if 'volumes' in yaml_content['spec']['template']['spec']: + yaml_volumes = yaml_content['spec']['template']['spec']['volumes'] + for volume in yaml_volumes: + # do not overwrite any hardcoded sizeLimit value + if volume['name'] == 'pilot-dir' and 'emptyDir' in volume and 'sizeLimit' not in volume['emptyDir']: + maxwdir_prorated_GB = panda_queues_dict.get_prorated_maxwdir_GB(work_spec.computingSite, work_spec.nCore) + if maxwdir_prorated_GB: + volume['emptyDir']['sizeLimit'] = '{0}G'.format(maxwdir_prorated_GB) + # set the affinity if 'affinity' not in yaml_content['spec']['template']['spec']: yaml_content = self.set_affinity(yaml_content) diff --git a/pandaharvester/harvestermonitor/cobalt_monitor.py b/pandaharvester/harvestermonitor/cobalt_monitor.py index a49cc578..ff57ccdf 100644 --- a/pandaharvester/harvestermonitor/cobalt_monitor.py +++ b/pandaharvester/harvestermonitor/cobalt_monitor.py @@ -44,7 +44,8 @@ def check_workers(self, workspec_list): p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + text=True) oldStatus = workSpec.status newStatus = None # first check return code diff --git a/pandaharvester/harvestermonitor/htcondor_monitor.py b/pandaharvester/harvestermonitor/htcondor_monitor.py index 0dcd2120..ab558117 100644 --- a/pandaharvester/harvestermonitor/htcondor_monitor.py +++ b/pandaharvester/harvestermonitor/htcondor_monitor.py @@ -10,6 +10,7 @@ from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestercore.worker_errors import WorkerErrors from pandaharvester.harvestercore.plugin_base import PluginBase +from pandaharvester.harvestercore.pilot_errors import PilotErrors from pandaharvester.harvestermisc.htcondor_utils import condor_job_id_from_workspec, get_host_batchid_map from pandaharvester.harvestermisc.htcondor_utils import CondorJobQuery, CondorJobManage @@ -30,6 +31,10 @@ } +# pilot error object +PILOT_ERRORS = PilotErrors() + + # Check one worker def _check_one_worker(workspec, job_ads_all_dict, cancel_unknown=False, held_timeout=3600): # Make logger for one single worker @@ -124,22 +129,32 @@ def _check_one_worker(workspec, job_ads_all_dict, cancel_unknown=False, held_tim elif batchStatus in ['4']: # 4 completed try: - payloadExitCode = str(job_ads_dict['ExitCode']) + payloadExitCode_str = str(job_ads_dict['ExitCode']) + payloadExitCode = int(payloadExitCode_str) except KeyError: errStr = 'cannot get ExitCode of job submissionHost={0} batchID={1}. Regard the worker as failed'.format(workspec.submissionHost, workspec.batchID) tmpLog.warning(errStr) newStatus = WorkSpec.ST_failed + except ValueError: + errStr = 'got invalid ExitCode {0} of job submissionHost={1} batchID={2}. Regard the worker as failed'.format(payloadExitCode_str, workspec.submissionHost, workspec.batchID) + tmpLog.warning(errStr) + newStatus = WorkSpec.ST_failed else: # Propagate condor return code workspec.nativeExitCode = payloadExitCode - if payloadExitCode in ['0']: + if payloadExitCode == 0: # Payload should return 0 after successful run newStatus = WorkSpec.ST_finished else: # Other return codes are considered failed newStatus = WorkSpec.ST_failed - errStr = 'Payload execution error: returned non-zero' + errStr = 'Payload execution error: returned non-zero {0}'.format(payloadExitCode) tmpLog.debug(errStr) + # Map return code to Pilot error code + reduced_exit_code = payloadExitCode // 256 if (payloadExitCode % 256 == 0) else payloadExitCode + pilot_error_code, pilot_error_diag = PILOT_ERRORS.convertToPilotErrors(reduced_exit_code) + if pilot_error_code is not None: + workspec.set_pilot_error(pilot_error_code, pilot_error_diag) tmpLog.info('Payload return code = {0}'.format(payloadExitCode)) else: errStr = 'cannot get reasonable JobStatus of job submissionHost={0} batchID={1}. Regard the worker as failed by default'.format( diff --git a/pandaharvester/harvesterscripts/harvester_admin.py b/pandaharvester/harvesterscripts/harvester_admin.py index 94158280..8b75427b 100644 --- a/pandaharvester/harvesterscripts/harvester_admin.py +++ b/pandaharvester/harvesterscripts/harvester_admin.py @@ -198,7 +198,7 @@ def qconf_refresh(arguments): qcm = QueueConfigMapper() qcm._update_last_reload_time() qcm.lastUpdate = None - qcm.load_data() + qcm.load_data(refill_table=arguments.refill) def qconf_dump(arguments): from pandaharvester.harvesterscripts import queue_config_tool @@ -327,6 +327,7 @@ def main(): # qconf refresh command qconf_refresh_parser = qconf_subparsers.add_parser('refresh', help='refresh queue configuration immediately') qconf_refresh_parser.set_defaults(which='qconf_refresh') + qconf_refresh_parser.add_argument('-R', '--refill', dest='refill', action='store_true', help='Refill pq_table before refresh (cleaner)') # qconf purge command qconf_purge_parser = qconf_subparsers.add_parser('purge', help='Purge the queue thoroughly from harvester DB (Be careful !!)') qconf_purge_parser.set_defaults(which='qconf_purge') diff --git a/pandaharvester/harvestersubmitter/cobalt_submitter.py b/pandaharvester/harvestersubmitter/cobalt_submitter.py index ce821a0e..615539cb 100644 --- a/pandaharvester/harvestersubmitter/cobalt_submitter.py +++ b/pandaharvester/harvestersubmitter/cobalt_submitter.py @@ -45,7 +45,8 @@ def submit_workers(self, workspec_list): p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + text=True) # check return code stdOut, stdErr = p.communicate() retCode = p.returncode @@ -78,8 +79,8 @@ def submit_workers(self, workspec_list): # make batch script def make_batch_script(self, workspec): - tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix='_submit.sh', dir=workspec.get_access_point()) - tmpFile.write(self.template.format(nNode=workspec.nCore / self.nCorePerNode, + tmpFile = tempfile.NamedTemporaryFile(mode='w+t',delete=False, suffix='_submit.sh', dir=workspec.get_access_point()) + tmpFile.write(self.template.format(nNode=int(workspec.nCore / self.nCorePerNode), accessPoint=workspec.accessPoint, workerID=workspec.workerID) ) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 931fcc04..287ae646 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -29,7 +29,7 @@ def _div_round_up(a, b): # Compute weight of each CE according to worker stat, return tuple(dict, total weight score) -def _get_ce_weighting(ce_endpoint_list=[], worker_ce_all_tuple=None): +def _get_ce_weighting(ce_endpoint_list=[], worker_ce_all_tuple=None, is_slave_queue=False): multiplier = 1000. n_ce = len(ce_endpoint_list) worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers = worker_ce_all_tuple @@ -44,6 +44,12 @@ def _get_ce_weighting(ce_endpoint_list=[], worker_ce_all_tuple=None): for _ce in worker_ce_backend_throughput_dict)) thruput_avg = (log1p(Q_good_init) - log1p(Q_good_fin)) n_new_workers = float(n_new_workers) + # target number of queuing + target_Q = Q + n_new_workers + if is_slave_queue: + # take total number of current queuing if slave queue + total_Q = sum(( float(worker_ce_stats_dict[_k]['submitted']) for _k in worker_ce_stats_dict )) + target_Q = min(total_Q, Q) + n_new_workers def _get_thruput(_ce_endpoint): # inner function if _ce_endpoint not in worker_ce_backend_throughput_dict: @@ -81,7 +87,8 @@ def _get_init_weight(_ce_endpoint): # inner function if ( _ce_endpoint in worker_ce_stats_dict and q > Q ): return float(0) ce_base_weight_normalized = _get_thruput_adj_ratio(_get_thruput(_ce_endpoint))/ce_base_weight_sum - q_expected = (Q + n_new_workers) * ce_base_weight_normalized + # target number of queuing of the CE + q_expected = target_Q * ce_base_weight_normalized # weight by difference ret = max((q_expected - q), 2**-10) # # Weight by running ratio @@ -99,12 +106,12 @@ def _get_init_weight(_ce_endpoint): # inner function regulator = 1. ce_weight_dict = {_ce: _get_init_weight(_ce) * regulator for _ce in ce_endpoint_list} ce_thruput_dict = {_ce: _get_thruput(_ce) * 86400. / time_window for _ce in ce_endpoint_list} - return total_score, ce_weight_dict, ce_thruput_dict + return total_score, ce_weight_dict, ce_thruput_dict, target_Q # Choose a CE accroding to weighting def _choose_ce(weighting): - total_score, ce_weight_dict, ce_thruput_dict = weighting + total_score, ce_weight_dict, ce_thruput_dict, target_Q = weighting lucky_number = random.random() * total_score cur = 0. ce_now = None @@ -124,19 +131,21 @@ def _choose_ce(weighting): # Get better string to display the statistics and weightng of CEs def _get_ce_stats_weighting_display(ce_list, worker_ce_all_tuple, ce_weighting): worker_limits_dict, worker_ce_stats_dict, worker_ce_backend_throughput_dict, time_window, n_new_workers = worker_ce_all_tuple - total_score, ce_weight_dict, ce_thruput_dict = ce_weighting + total_score, ce_weight_dict, ce_thruput_dict, target_Q = ce_weighting worker_ce_stats_dict_sub_default = {'submitted': 0, 'running': 0} worker_ce_backend_throughput_dict_sub_default = {'submitted': 0, 'running': 0, 'finished': 0} general_dict = { 'maxWorkers': int(worker_limits_dict.get('maxWorkers')), 'nQueueLimitWorker': int(worker_limits_dict.get('nQueueLimitWorker')), 'nNewWorkers': int(n_new_workers), + 'target_Q': int(target_Q), 'history_time_window': int(time_window), } general_str = ( 'maxWorkers={maxWorkers} ' 'nQueueLimitWorker={nQueueLimitWorker} ' 'nNewWorkers={nNewWorkers} ' + 'target_Q={target_Q} ' 'hist_timeWindow={history_time_window} ' ).format(**general_dict) ce_str_list = [] @@ -218,6 +227,17 @@ def _get_complicated_pilot_options(pilot_type, pilot_url=None): return pilot_opt_dict +# get special flag of pilot wrapper about python version of pilot, and whehter to run with python 3 if python version is "3" +# FIXME: during pilot testing phase, only prodsourcelabel ptest and rc_test2 should run python3 +# This constraint will be removed when pilot is ready +def _get_python_version_option(python_version, prod_source_label): + option = '' + if python_version.startswith('3'): + if prod_source_label in ['rc_test2', 'ptest']: + option = '--pythonversion 3' + return option + + # submit a bag of workers def submit_bag_of_workers(data_list): # make logger @@ -253,8 +273,8 @@ def submit_bag_of_workers(data_list): else: workspec.reset_changed_list() # fill in host_jdl_list_workerid_map - a_jdl = make_a_jdl(**data) - val = (workspec, a_jdl) + a_jdl, placeholder_map = make_a_jdl(**data) + val = (workspec, a_jdl, placeholder_map) try: host_jdl_list_workerid_map[workspec.submissionHost].append(val) except KeyError: @@ -280,6 +300,7 @@ def submit_bag_of_workers(data_list): for val_i in range(n_workers): val = val_list[val_i] workspec = val[0] + placeholder_map = val[2] # got batchID workspec.batchID = batchIDs_list[val_i] tmpLog.debug('workerID={0} submissionHost={1} batchID={2}'.format( @@ -292,9 +313,9 @@ def submit_bag_of_workers(data_list): # set log batch_log_dict = data['batch_log_dict'] (clusterid, procid) = get_job_id_tuple_from_batchid(workspec.batchID) - batch_log = _condor_macro_replace(batch_log_dict['batch_log'], ClusterId=clusterid, ProcId=procid) - batch_stdout = _condor_macro_replace(batch_log_dict['batch_stdout'], ClusterId=clusterid, ProcId=procid) - batch_stderr = _condor_macro_replace(batch_log_dict['batch_stderr'], ClusterId=clusterid, ProcId=procid) + batch_log = _condor_macro_replace(batch_log_dict['batch_log'], ClusterId=clusterid, ProcId=procid).format(**placeholder_map) + batch_stdout = _condor_macro_replace(batch_log_dict['batch_stdout'], ClusterId=clusterid, ProcId=procid).format(**placeholder_map) + batch_stderr = _condor_macro_replace(batch_log_dict['batch_stderr'], ClusterId=clusterid, ProcId=procid).format(**placeholder_map) try: batch_jdl = '{0}.jdl'.format(batch_stderr[:-4]) except Exception: @@ -330,7 +351,8 @@ def submit_bag_of_workers(data_list): # make a condor jdl for a worker def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, executable_file, x509_user_proxy, log_subdir=None, ce_info_dict=dict(), batch_log_dict=dict(), pilot_url=None, - special_par='', harvester_queue_config=None, is_unified_queue=False, pilot_version='unknown', **kwarg): + special_par='', harvester_queue_config=None, is_unified_queue=False, + pilot_version='unknown', python_version='unknown', **kwarg): # make logger tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), method_name='make_a_jdl') @@ -376,50 +398,55 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e pilot_url_str = pilot_opt_dict['pilot_url_str'] # open tmpfile as submit description file tmpFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_submit.sdf', dir=workspec.get_access_point()) + # placeholder map + placeholder_map = { + 'sdfPath': tmpFile.name, + 'executableFile': executable_file, + 'nCorePerNode': n_core_per_node, + 'nCoreTotal': n_core_total, + 'nNode': n_node, + 'requestRam': request_ram, + 'requestRamPerCore': request_ram_per_core, + 'requestDisk': request_disk, + 'requestWalltime': request_walltime, + 'requestWalltimeMinute': request_walltime_minute, + 'requestCputime': request_cputime, + 'requestCputimeMinute': request_cputime_minute, + 'accessPoint': workspec.accessPoint, + 'harvesterID': harvester_config.master.harvester_id, + 'workerID': workspec.workerID, + 'computingSite': workspec.computingSite, + 'pandaQueueName': panda_queue_name, + 'x509UserProxy': x509_user_proxy, + 'ceEndpoint': ce_info_dict.get('ce_endpoint', ''), + 'ceHostname': ce_info_dict.get('ce_hostname', ''), + 'ceFlavour': ce_info_dict.get('ce_flavour', ''), + 'ceJobmanager': ce_info_dict.get('ce_jobmanager', ''), + 'ceQueueName': ce_info_dict.get('ce_queue_name', ''), + 'ceVersion': ce_info_dict.get('ce_version', ''), + 'logDir': log_dir, + 'logSubdir': log_subdir, + 'gtag': batch_log_dict.get('gtag', 'fake_GTAG_string'), + 'prodSourceLabel': prod_source_label, + 'jobType': workspec.jobType, + 'resourceType': _get_resource_type(workspec.resourceType, is_unified_queue), + 'pilotResourceTypeOption': _get_resource_type(workspec.resourceType, is_unified_queue, True), + 'ioIntensity': io_intensity, + 'pilotType': pilot_type_opt, + 'pilotUrlOption': pilot_url_str, + 'pilotVersion': pilot_version, + 'pilotPythonOption': _get_python_version_option(python_version, prod_source_label), + 'submissionHost': workspec.submissionHost, + 'submissionHostShort': workspec.submissionHost.split('.')[0], + } # fill in template string - jdl_str = template.format( - sdfPath=tmpFile.name, - executableFile=executable_file, - nCorePerNode=n_core_per_node, - nCoreTotal=n_core_total, - nNode=n_node, - requestRam=request_ram, - requestRamPerCore=request_ram_per_core, - requestDisk=request_disk, - requestWalltime=request_walltime, - requestWalltimeMinute=request_walltime_minute, - requestCputime=request_cputime, - requestCputimeMinute=request_cputime_minute, - accessPoint=workspec.accessPoint, - harvesterID=harvester_config.master.harvester_id, - workerID=workspec.workerID, - computingSite=workspec.computingSite, - pandaQueueName=panda_queue_name, - x509UserProxy=x509_user_proxy, - ceEndpoint=ce_info_dict.get('ce_endpoint', ''), - ceHostname=ce_info_dict.get('ce_hostname', ''), - ceFlavour=ce_info_dict.get('ce_flavour', ''), - ceJobmanager=ce_info_dict.get('ce_jobmanager', ''), - ceQueueName=ce_info_dict.get('ce_queue_name', ''), - ceVersion=ce_info_dict.get('ce_version', ''), - logDir=log_dir, - logSubdir=log_subdir, - gtag=batch_log_dict.get('gtag', 'fake_GTAG_string'), - prodSourceLabel=prod_source_label, - jobType=workspec.jobType, - resourceType=_get_resource_type(workspec.resourceType, is_unified_queue), - pilotResourceTypeOption=_get_resource_type(workspec.resourceType, is_unified_queue, True), - ioIntensity=io_intensity, - pilotType=pilot_type_opt, - pilotUrlOption=pilot_url_str, - pilotVersion=pilot_version, - ) + jdl_str = template.format(**placeholder_map) # save jdl to submit description file tmpFile.write(jdl_str) tmpFile.close() tmpLog.debug('saved sdf at {0}'.format(tmpFile.name)) tmpLog.debug('done') - return jdl_str + return jdl_str, placeholder_map # parse log, stdout, stderr filename @@ -560,7 +587,7 @@ def get_ce_statistics(self, site_name, n_new_workers, time_window=21600): # submit workers def submit_workers(self, workspec_list): - tmpLog = self.make_logger(baseLogger, method_name='submit_workers') + tmpLog = self.make_logger(baseLogger, 'site={0}'.format(self.queueName), method_name='submit_workers') nWorkers = len(workspec_list) tmpLog.debug('start nWorkers={0}'.format(nWorkers)) @@ -610,6 +637,7 @@ def submit_workers(self, workspec_list): is_unified_queue = this_panda_queue_dict.get('capability', '') == 'ucore' pilot_url = associated_params_dict.get('pilot_url') pilot_version = str(this_panda_queue_dict.get('pilot_version', 'current')) + python_version = str(this_panda_queue_dict.get('python_version', '2')) sdf_suffix_str = '_pilot2' # get override requirements from queue configured @@ -661,8 +689,10 @@ def submit_workers(self, workspec_list): # Get CE weighting tmpLog.debug('Get CE weighting') worker_ce_all_tuple = self.get_ce_statistics(self.queueName, nWorkers) + is_slave_queue = (harvester_queue_config.runMode == 'slave') ce_weighting = _get_ce_weighting(ce_endpoint_list=list(ce_auxilary_dict.keys()), - worker_ce_all_tuple=worker_ce_all_tuple) + worker_ce_all_tuple=worker_ce_all_tuple, + is_slave_queue=is_slave_queue) stats_weighting_display_str = _get_ce_stats_weighting_display( ce_auxilary_dict.keys(), worker_ce_all_tuple, ce_weighting) tmpLog.debug('CE stats and weighting: {0}'.format(stats_weighting_display_str)) @@ -672,8 +702,21 @@ def submit_workers(self, workspec_list): def _handle_one_worker(workspec, to_submit=to_submit_any): # make logger - tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), + tmpLog = core_utils.make_logger(baseLogger, 'site={0} workerID={1}'.format(self.queueName, workspec.workerID), method_name='_handle_one_worker') + def _choose_proxy(workspec): + """ + Choose the proxy based on the job type + """ + job_type = workspec.jobType + proxy = self.x509UserProxy + if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis') and self.x509UserProxyAnalysis: + tmpLog.debug('Taking analysis proxy') + proxy = self.x509UserProxyAnalysis + else: + tmpLog.debug('Taking default proxy') + return proxy + # initialize ce_info_dict = dict() batch_log_dict = dict() data = {'workspec': workspec, @@ -704,8 +747,8 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): if ce_flavour_str in default_port_map: default_port = default_port_map[ce_flavour_str] ce_info_dict['ce_endpoint'] = '{0}:{1}'.format(ce_endpoint_from_queue, default_port) - tmpLog.debug('For site {0} got pilot version: "{1}"; CE endpoint: "{2}", flavour: "{3}"'.format( - self.queueName, pilot_version, ce_endpoint_from_queue, ce_flavour_str)) + tmpLog.debug('Got pilot version: "{0}"; CE endpoint: "{1}", flavour: "{2}"'.format( + pilot_version, ce_endpoint_from_queue, ce_flavour_str)) if self.templateFile: sdf_template_file = self.templateFile elif os.path.isdir(self.CEtemplateDir) and ce_flavour_str: @@ -827,6 +870,7 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): 'use_spool': self.useSpool, 'pilot_url': pilot_url, 'pilot_version': pilot_version, + 'python_version': python_version, }) return data @@ -839,20 +883,6 @@ def _propagate_attributes(workspec, tmpVal): tmpLog.debug('Done workspec attributes propagation') return retVal - def _choose_proxy(workspec): - """ - Choose the proxy based on the job type - """ - job_type = workspec.jobType - proxy = self.x509UserProxy - if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis') and self.x509UserProxyAnalysis: - tmpLog.debug('Taking analysis proxy') - proxy = self.x509UserProxyAnalysis - else: - tmpLog.debug('Taking default proxy') - - return proxy - tmpLog.debug('finished preparing worker attributes') # map(_handle_one_worker, workspec_list) diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 6d349a0d..60b33611 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.2.5" +release_version = "0.2.6" diff --git a/setup.py b/setup.py index 8ea8ce19..92ea5f28 100644 --- a/setup.py +++ b/setup.py @@ -35,14 +35,15 @@ 'paramiko', 'pexpect', 'psutil >= 5.4.8', - 'scandir; python_version < "3.5"' + 'scandir; python_version < "3.5"', + 'panda-pilot >= 2.7.2.1', ], # optional pip dependencies extras_require={ 'kubernetes': ['kubernetes', 'pyyaml'], 'mysql': ['mysqlclient'], - 'atlasgrid': ['uWSGI >= 2.0.0', 'htcondor >= 8.4.0', 'mysqlclient'], + 'atlasgrid': ['uWSGI >= 2.0.0', 'htcondor >= 8.6.0', 'mysqlclient'], }, data_files=[ diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index bcd62498..abe00f36 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -339,7 +339,8 @@ sleepTime = 60 # max number of workers per queue to try in one cycle maxNewWorkers = 1000 - +# respect sleep time +respectSleepTime = False ########################## @@ -603,7 +604,6 @@ sleepTime = 60 - ########################## # # Cacher parameters @@ -747,7 +747,11 @@ keepCancelled = 72 # duration in hours to keep missed workers keepMissed = 24 +# disk cleaning interval in hours +#diskCleanUpInterval = 1 +# comma-concatenated list of directory_name|high_watermark_in_GB to be cleaned up +#diskHighWatermark = /dir1/subdir1|1000,/dir2/subdir2|5000