From af7e9b5bd9155c51b58fd8ce5f132f94766e99ee Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 7 May 2019 16:14:43 +0200 Subject: [PATCH 01/41] checkpoint on adding jobtype queues --- __init__.py | 0 pandaharvester/harvesterbody/submitter.py | 698 +++++++++--------- .../harvesterbody/worker_adjuster.py | 352 ++++----- pandaharvester/harvestercore/core_utils.py | 2 +- pandaharvester/harvestercore/db_proxy.py | 154 ++-- .../harvestercore/panda_queue_spec.py | 3 +- pandaharvester/harvestercore/work_spec.py | 2 + .../dummy_dynamic_worker_maker.py | 2 +- .../multijob_worker_maker.py | 2 +- .../multinode_worker_maker.py | 2 +- .../simple_bf_es_worker_maker.py | 4 +- .../simple_worker_maker.py | 39 +- 12 files changed, 663 insertions(+), 597 deletions(-) create mode 100644 __init__.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index 7c2df6b8..a7970cc4 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -26,386 +26,390 @@ class Submitter(AgentBase): # constructor def __init__(self, queue_config_mapper, single_mode=False): AgentBase.__init__(self, single_mode) - self.queueConfigMapper = queue_config_mapper + self.queue_configMapper = queue_config_mapper self.dbProxy = DBProxy() self.workerMaker = WorkerMaker() self.workerAdjuster = WorkerAdjuster(queue_config_mapper) self.pluginFactory = PluginFactory() self.monitor_fifo = MonitorFIFO() - self.apfmon = Apfmon(self.queueConfigMapper) + self.apfmon = Apfmon(self.queue_configMapper) # main loop def run(self): - lockedBy = 'submitter-{0}'.format(self.get_pid()) + locked_by = 'submitter-{0}'.format(self.get_pid()) monitor_fifo = self.monitor_fifo - queueLockInterval = getattr(harvester_config.submitter, 'queueLockInterval', + queue_lock_interval = getattr(harvester_config.submitter, 'queue_lock_interval', harvester_config.submitter.lockInterval) while True: sw_main = core_utils.get_stopwatch() - mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run') - mainLog.debug('getting queues to submit workers') + main_log = self.make_logger(_logger, 'id={0}'.format(locked_by), method_name='run') + main_log.debug('getting queues to submit workers') # get queues associated to a site to submit workers - curWorkers, siteName, resMap = self.dbProxy.get_queues_to_submit(harvester_config.submitter.nQueues, + current_workers, site_name, res_map = self.dbProxy.get_queues_to_submit(harvester_config.submitter.nQueues, harvester_config.submitter.lookupTime, harvester_config.submitter.lockInterval, - lockedBy, queueLockInterval) + locked_by, queue_lock_interval) submitted = False - if siteName is not None: - mainLog.debug('got {0} queues for site {1}'.format(len(curWorkers), siteName)) + if site_name is not None: + main_log.debug('got {0} queues for site {1}'.format(len(current_workers), site_name)) - # get commands - comStr = '{0}:{1}'.format(CommandSpec.COM_setNWorkers, siteName) - commandSpecs = self.dbProxy.get_commands_for_receiver('submitter', comStr) - mainLog.debug('got {0} {1} commands'.format(commandSpecs, comStr)) - for commandSpec in commandSpecs: - newLimits = self.dbProxy.set_queue_limit(siteName, commandSpec.params) - for tmpResource, tmpNewVal in iteritems(newLimits): - # if available, overwrite new worker value with the command from panda server - if tmpResource in resMap: - tmpQueueName = resMap[tmpResource] - if tmpQueueName in curWorkers: - curWorkers[tmpQueueName][tmpResource]['nNewWorkers'] = tmpNewVal + # get commands from panda server + com_str = '{0}:{1}'.format(CommandSpec.COM_setNWorkers, site_name) + command_specs = self.dbProxy.get_commands_for_receiver('submitter', com_str) + main_log.debug('got {0} {1} commands'.format(command_specs, com_str)) + for command_spec in command_specs: + new_limits = self.dbProxy.set_queue_limit(site_name, command_spec.params) + for tmp_job_type, tmp_jt_vals in iteritems(new_limits): + res_map.setdefault(tmp_job_type, {}) + for tmp_resource_type, tmp_new_val in iteritems(tmp_jt_vals): + # if available, overwrite new worker value with the command from panda server + if tmp_resource_type in res_map[tmp_job_type]: + tmp_queue_name = res_map[tmp_job_type][tmp_resource_type] + if tmp_queue_name in current_workers: + current_workers[tmp_queue_name][tmp_job_type][tmp_resource_type]['nNewWorkers'] = tmp_new_val # define number of new workers - if len(curWorkers) == 0: - n_workers_per_queue_and_rt = dict() + if len(current_workers) == 0: + n_workers_per_queue_jt_rt = dict() else: - n_workers_per_queue_and_rt = self.workerAdjuster.define_num_workers(curWorkers, siteName) + n_workers_per_queue_jt_rt = self.workerAdjuster.define_num_workers(current_workers, site_name) - if n_workers_per_queue_and_rt is None: - mainLog.error('WorkerAdjuster failed to define the number of workers') - elif len(n_workers_per_queue_and_rt) == 0: + if n_workers_per_queue_jt_rt is None: + main_log.error('WorkerAdjuster failed to define the number of workers') + elif len(n_workers_per_queue_jt_rt) == 0: pass else: # loop over all queues and resource types - for queueName in n_workers_per_queue_and_rt: - for resource_type, tmpVal in iteritems(n_workers_per_queue_and_rt[queueName]): + for queue_name in n_workers_per_queue_jt_rt: + for job_type, tmp_job_vals in iteritems(n_workers_per_queue_jt_rt[queue_name]): + for resource_type, tmp_val in iteritems(tmp_job_vals): - tmpLog = self.make_logger(_logger, 'id={0} queue={1} rtype={2}'.format(lockedBy, - queueName, - resource_type), - method_name='run') - try: - tmpLog.debug('start') - tmpLog.debug('workers status: %s' % tmpVal) - nWorkers = tmpVal['nNewWorkers'] + tmpVal['nReady'] - nReady = tmpVal['nReady'] + tmp_log = self.make_logger(_logger, 'id={0} queue={1} jtype={2} rtype={3}'.format( + locked_by, queue_name, job_type, resource_type), method_name='run') + try: + tmp_log.debug('start') + tmp_log.debug('workers status: %s' % tmp_val) + nWorkers = tmp_val['nNewWorkers'] + tmp_val['nReady'] + nReady = tmp_val['nReady'] - # check queue - if not self.queueConfigMapper.has_queue(queueName): - tmpLog.error('config not found') - continue + # check queue + if not self.queue_configMapper.has_queue(queue_name): + tmp_log.error('config not found') + continue - # no new workers - if nWorkers == 0: - tmpLog.debug('skipped since no new worker is needed based on current stats') - continue - # get queue - queueConfig = self.queueConfigMapper.get_queue(queueName) - workerMakerCore = self.workerMaker.get_plugin(queueConfig) - # check if resource is ready - if hasattr(workerMakerCore, 'dynamicSizing') and workerMakerCore.dynamicSizing is True: - numReadyResources = self.workerMaker.num_ready_resources(queueConfig, - resource_type, - workerMakerCore) - tmpLog.debug('numReadyResources: %s' % numReadyResources) - if not numReadyResources: - if hasattr(workerMakerCore, 'staticWorkers'): - nQRWorkers = tmpVal['nQueue'] + tmpVal['nRunning'] - tmpLog.debug('staticWorkers: %s, nQRWorkers(Queue+Running): %s' % - (workerMakerCore.staticWorkers, nQRWorkers)) - if nQRWorkers >= workerMakerCore.staticWorkers: - tmpLog.debug('No left static workers, skip') - continue + # no new workers + if nWorkers == 0: + tmp_log.debug('skipped since no new worker is needed based on current stats') + continue + # get queue + queue_config = self.queue_configMapper.get_queue(queue_name) + workerMakerCore = self.workerMaker.get_plugin(queue_config) + # check if resource is ready + if hasattr(workerMakerCore, 'dynamicSizing') and workerMakerCore.dynamicSizing is True: + numReadyResources = self.workerMaker.num_ready_resources(queue_config, + job_type, + resource_type, + workerMakerCore) + tmp_log.debug('numReadyResources: %s' % numReadyResources) + if not numReadyResources: + if hasattr(workerMakerCore, 'staticWorkers'): + nQRWorkers = tmp_val['nQueue'] + tmp_val['nRunning'] + tmp_log.debug('staticWorkers: %s, nQRWorkers(Queue+Running): %s' % + (workerMakerCore.staticWorkers, nQRWorkers)) + if nQRWorkers >= workerMakerCore.staticWorkers: + tmp_log.debug('No left static workers, skip') + continue + else: + nWorkers = min(workerMakerCore.staticWorkers - nQRWorkers, nWorkers) + tmp_log.debug('staticWorkers: %s, nWorkers: %s' % + (workerMakerCore.staticWorkers, nWorkers)) else: - nWorkers = min(workerMakerCore.staticWorkers - nQRWorkers, nWorkers) - tmpLog.debug('staticWorkers: %s, nWorkers: %s' % - (workerMakerCore.staticWorkers, nWorkers)) + tmp_log.debug('skip since no resources are ready') + continue else: - tmpLog.debug('skip since no resources are ready') - continue + nWorkers = min(nWorkers, numReadyResources) + # post action of worker maker + if hasattr(workerMakerCore, 'skipOnFail') and workerMakerCore.skipOnFail is True: + skipOnFail = True else: - nWorkers = min(nWorkers, numReadyResources) - # post action of worker maker - if hasattr(workerMakerCore, 'skipOnFail') and workerMakerCore.skipOnFail is True: - skipOnFail = True - else: - skipOnFail = False - # actions based on mapping type - if queueConfig.mapType == WorkSpec.MT_NoJob: - # workers without jobs - jobChunks = [] - for i in range(nWorkers): - jobChunks.append([]) - elif queueConfig.mapType == WorkSpec.MT_OneToOne: - # one worker per one job - jobChunks = self.dbProxy.get_job_chunks_for_workers( - queueName, - nWorkers, nReady, 1, None, - queueConfig.useJobLateBinding, - harvester_config.submitter.checkInterval, - harvester_config.submitter.lockInterval, - lockedBy) - elif queueConfig.mapType == WorkSpec.MT_MultiJobs: - # one worker for multiple jobs - nJobsPerWorker = self.workerMaker.get_num_jobs_per_worker(queueConfig, - nWorkers, - resource_type, - maker=workerMakerCore) - tmpLog.debug('nJobsPerWorker={0}'.format(nJobsPerWorker)) - jobChunks = self.dbProxy.get_job_chunks_for_workers( - queueName, - nWorkers, nReady, nJobsPerWorker, None, - queueConfig.useJobLateBinding, - harvester_config.submitter.checkInterval, - harvester_config.submitter.lockInterval, - lockedBy, - queueConfig.allowJobMixture) - elif queueConfig.mapType == WorkSpec.MT_MultiWorkers: - # multiple workers for one job - nWorkersPerJob = self.workerMaker.get_num_workers_per_job(queueConfig, - nWorkers, - resource_type, - maker=workerMakerCore) - maxWorkersPerJob = self.workerMaker.get_max_workers_per_job_in_total( - queueConfig, resource_type, maker=workerMakerCore) - maxWorkersPerJobPerCycle = self.workerMaker.get_max_workers_per_job_per_cycle( - queueConfig, resource_type, maker=workerMakerCore) - tmpLog.debug('nWorkersPerJob={0}'.format(nWorkersPerJob)) - jobChunks = self.dbProxy.get_job_chunks_for_workers( - queueName, - nWorkers, nReady, None, nWorkersPerJob, - queueConfig.useJobLateBinding, - harvester_config.submitter.checkInterval, - harvester_config.submitter.lockInterval, - lockedBy, max_workers_per_job_in_total=maxWorkersPerJob, - max_workers_per_job_per_cycle=maxWorkersPerJobPerCycle) - else: - tmpLog.error('unknown mapType={0}'.format(queueConfig.mapType)) - continue - - tmpLog.debug('got {0} job chunks'.format(len(jobChunks))) - if len(jobChunks) == 0: - continue - # make workers - okChunks, ngChunks = self.workerMaker.make_workers(jobChunks, queueConfig, - nReady, resource_type, - maker=workerMakerCore) - if len(ngChunks) == 0: - tmpLog.debug('successfully made {0} workers'.format(len(okChunks))) - else: - tmpLog.debug('made {0} workers, while {1} workers failed'.format(len(okChunks), - len(ngChunks))) - timeNow = datetime.datetime.utcnow() - timeNow_timestamp = time.time() - pandaIDs = set() - # NG (=not good) - for ngJobs in ngChunks: - for jobSpec in ngJobs: - if skipOnFail: - # release jobs when workers are not made - pandaIDs.add(jobSpec.PandaID) - else: - jobSpec.status = 'failed' - jobSpec.subStatus = 'failed_to_make' - jobSpec.stateChangeTime = timeNow - jobSpec.lockedBy = None - errStr = 'failed to make a worker' - jobSpec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr) - jobSpec.trigger_propagation() - self.dbProxy.update_job(jobSpec, {'lockedBy': lockedBy, - 'subStatus': 'prepared'}) - # OK - workSpecList = [] - if len(okChunks) > 0: - for workSpec, okJobs in okChunks: - # has job - if (queueConfig.useJobLateBinding and workSpec.workerID is None) \ - or queueConfig.mapType == WorkSpec.MT_NoJob: - workSpec.hasJob = 0 - else: - workSpec.hasJob = 1 - if workSpec.nJobsToReFill in [None, 0]: - workSpec.set_jobspec_list(okJobs) - else: - # refill free slots during the worker is running - workSpec.set_jobspec_list(okJobs[:workSpec.nJobsToReFill]) - workSpec.nJobsToReFill = None - for jobSpec in okJobs[workSpec.nJobsToReFill:]: - pandaIDs.add(jobSpec.PandaID) - workSpec.set_num_jobs_with_list() - # map type - workSpec.mapType = queueConfig.mapType - # queue name - workSpec.computingSite = queueConfig.queueName - # set access point - workSpec.accessPoint = queueConfig.messenger['accessPoint'] - # sync level - workSpec.syncLevel = queueConfig.get_synchronization_level() - # events - if len(okJobs) > 0 and \ - ('eventService' in okJobs[0].jobParams or - 'cloneJob' in okJobs[0].jobParams): - workSpec.eventsRequest = WorkSpec.EV_useEvents - workSpecList.append(workSpec) - if len(workSpecList) > 0: - sw = core_utils.get_stopwatch() - # get plugin for submitter - submitterCore = self.pluginFactory.get_plugin(queueConfig.submitter) - if submitterCore is None: - # not found - tmpLog.error( - 'submitter plugin for {0} not found'.format(jobSpec.computingSite)) + skipOnFail = False + # actions based on mapping type + if queue_config.mapType == WorkSpec.MT_NoJob: + # workers without jobs + jobChunks = [] + for i in range(nWorkers): + jobChunks.append([]) + elif queue_config.mapType == WorkSpec.MT_OneToOne: + # one worker per one job + jobChunks = self.dbProxy.get_job_chunks_for_workers( + queue_name, + nWorkers, nReady, 1, None, + queue_config.useJobLateBinding, + harvester_config.submitter.checkInterval, + harvester_config.submitter.lockInterval, + locked_by) + elif queue_config.mapType == WorkSpec.MT_MultiJobs: + # one worker for multiple jobs + nJobsPerWorker = self.workerMaker.get_num_jobs_per_worker(queue_config, + nWorkers, + job_type, + resource_type, + maker=workerMakerCore) + tmp_log.debug('nJobsPerWorker={0}'.format(nJobsPerWorker)) + jobChunks = self.dbProxy.get_job_chunks_for_workers( + queue_name, + nWorkers, nReady, nJobsPerWorker, None, + queue_config.useJobLateBinding, + harvester_config.submitter.checkInterval, + harvester_config.submitter.lockInterval, + locked_by, + queue_config.allowJobMixture) + elif queue_config.mapType == WorkSpec.MT_MultiWorkers: + # multiple workers for one job + nWorkersPerJob = self.workerMaker.get_num_workers_per_job(queue_config, + nWorkers, + job_type, + resource_type, + maker=workerMakerCore) + maxWorkersPerJob = self.workerMaker.get_max_workers_per_job_in_total( + queue_config, job_type, resource_type, maker=workerMakerCore) + maxWorkersPerJobPerCycle = self.workerMaker.get_max_workers_per_job_per_cycle( + queue_config, job_type, resource_type, maker=workerMakerCore) + tmp_log.debug('nWorkersPerJob={0}'.format(nWorkersPerJob)) + jobChunks = self.dbProxy.get_job_chunks_for_workers( + queue_name, + nWorkers, nReady, None, nWorkersPerJob, + queue_config.useJobLateBinding, + harvester_config.submitter.checkInterval, + harvester_config.submitter.lockInterval, + locked_by, max_workers_per_job_in_total=maxWorkersPerJob, + max_workers_per_job_per_cycle=maxWorkersPerJobPerCycle) + else: + tmp_log.error('unknown mapType={0}'.format(queue_config.mapType)) continue - # get plugin for messenger - messenger = self.pluginFactory.get_plugin(queueConfig.messenger) - if messenger is None: - # not found - tmpLog.error( - 'messenger plugin for {0} not found'.format(jobSpec.computingSite)) + + tmp_log.debug('got {0} job chunks'.format(len(jobChunks))) + if len(jobChunks) == 0: continue - # setup access points - messenger.setup_access_points(workSpecList) - # feed jobs - for workSpec in workSpecList: - if workSpec.hasJob == 1: - tmpStat = messenger.feed_jobs(workSpec, workSpec.get_jobspec_list()) - if tmpStat is False: - tmpLog.error( - 'failed to send jobs to workerID={0}'.format(workSpec.workerID)) + # make workers + okChunks, ngChunks = self.workerMaker.make_workers(jobChunks, queue_config, + nReady, job_type, resource_type, + maker=workerMakerCore) + if len(ngChunks) == 0: + tmp_log.debug('successfully made {0} workers'.format(len(okChunks))) + else: + tmp_log.debug('made {0} workers, while {1} workers failed'.format(len(okChunks), + len(ngChunks))) + timeNow = datetime.datetime.utcnow() + timeNow_timestamp = time.time() + pandaIDs = set() + # NG (=not good) + for ngJobs in ngChunks: + for job_spec in ngJobs: + if skipOnFail: + # release jobs when workers are not made + pandaIDs.add(job_spec.PandaID) else: - tmpLog.debug( - 'sent jobs to workerID={0} with {1}'.format(workSpec.workerID, - tmpStat)) - # insert workers - self.dbProxy.insert_workers(workSpecList, lockedBy) - # submit - sw.reset() - tmpLog.info('submitting {0} workers'.format(len(workSpecList))) - workSpecList, tmpRetList, tmpStrList = self.submit_workers(submitterCore, - workSpecList) - tmpLog.debug('done submitting {0} workers'.format(len(workSpecList)) - + sw.get_elapsed_time()) - # collect successful jobs - okPandaIDs = set() - for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)): - if tmpRet: - workSpec, jobList = okChunks[iWorker] - jobList = workSpec.get_jobspec_list() - if jobList is not None: - for jobSpec in jobList: - okPandaIDs.add(jobSpec.PandaID) - # loop over all workers - for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)): - workSpec, jobList = okChunks[iWorker] - # set harvesterHost - workSpec.harvesterHost = socket.gethostname() - # use associated job list since it can be truncated for re-filling - jobList = workSpec.get_jobspec_list() - # set status - if not tmpRet: - # failed submission - errStr = 'failed to submit a workerID={0} with {1}'.format( - workSpec.workerID, - tmpStr) - tmpLog.error(errStr) - workSpec.set_status(WorkSpec.ST_missed) - workSpec.set_dialog_message(tmpStr) - workSpec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr) + job_spec.status = 'failed' + job_spec.subStatus = 'failed_to_make' + job_spec.stateChangeTime = timeNow + job_spec.locked_by = None + errStr = 'failed to make a worker' + job_spec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr) + job_spec.trigger_propagation() + self.dbProxy.update_job(job_spec, {'locked_by': locked_by, + 'subStatus': 'prepared'}) + # OK + work_specList = [] + if len(okChunks) > 0: + for work_spec, okJobs in okChunks: + # has job + if (queue_config.useJobLateBinding and work_spec.workerID is None) \ + or queue_config.mapType == WorkSpec.MT_NoJob: + work_spec.hasJob = 0 + else: + work_spec.hasJob = 1 + if work_spec.nJobsToReFill in [None, 0]: + work_spec.set_jobspec_list(okJobs) + else: + # refill free slots during the worker is running + work_spec.set_jobspec_list(okJobs[:work_spec.nJobsToReFill]) + work_spec.nJobsToReFill = None + for job_spec in okJobs[work_spec.nJobsToReFill:]: + pandaIDs.add(job_spec.PandaID) + work_spec.set_num_jobs_with_list() + # map type + work_spec.mapType = queue_config.mapType + # queue name + work_spec.computingSite = queue_config.queue_name + # set access point + work_spec.accessPoint = queue_config.messenger['accessPoint'] + # sync level + work_spec.syncLevel = queue_config.get_synchronization_level() + # events + if len(okJobs) > 0 and \ + ('eventService' in okJobs[0].jobParams or + 'cloneJob' in okJobs[0].jobParams): + work_spec.eventsRequest = WorkSpec.EV_useEvents + work_specList.append(work_spec) + if len(work_specList) > 0: + sw = core_utils.get_stopwatch() + # get plugin for submitter + submitterCore = self.pluginFactory.get_plugin(queue_config.submitter) + if submitterCore is None: + # not found + tmp_log.error( + 'submitter plugin for {0} not found'.format(job_spec.computingSite)) + continue + # get plugin for messenger + messenger = self.pluginFactory.get_plugin(queue_config.messenger) + if messenger is None: + # not found + tmp_log.error( + 'messenger plugin for {0} not found'.format(job_spec.computingSite)) + continue + # setup access points + messenger.setup_access_points(work_specList) + # feed jobs + for work_spec in work_specList: + if work_spec.hasJob == 1: + tmpStat = messenger.feed_jobs(work_spec, work_spec.get_jobspec_list()) + if tmpStat is False: + tmp_log.error( + 'failed to send jobs to workerID={0}'.format(work_spec.workerID)) + else: + tmp_log.debug( + 'sent jobs to workerID={0} with {1}'.format(work_spec.workerID, + tmpStat)) + # insert workers + self.dbProxy.insert_workers(work_specList, locked_by) + # submit + sw.reset() + tmp_log.info('submitting {0} workers'.format(len(work_specList))) + work_specList, tmpRetList, tmpStrList = self.submit_workers(submitterCore, + work_specList) + tmp_log.debug('done submitting {0} workers'.format(len(work_specList)) + + sw.get_elapsed_time()) + # collect successful jobs + okPandaIDs = set() + for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)): + if tmpRet: + work_spec, jobList = okChunks[iWorker] + jobList = work_spec.get_jobspec_list() + if jobList is not None: + for job_spec in jobList: + okPandaIDs.add(job_spec.PandaID) + # loop over all workers + for iWorker, (tmpRet, tmpStr) in enumerate(zip(tmpRetList, tmpStrList)): + work_spec, jobList = okChunks[iWorker] + # set harvesterHost + work_spec.harvesterHost = socket.gethostname() + # use associated job list since it can be truncated for re-filling + jobList = work_spec.get_jobspec_list() + # set status + if not tmpRet: + # failed submission + errStr = 'failed to submit a workerID={0} with {1}'.format( + work_spec.workerID, + tmpStr) + tmp_log.error(errStr) + work_spec.set_status(WorkSpec.ST_missed) + work_spec.set_dialog_message(tmpStr) + work_spec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr) + if jobList is not None: + # increment attempt number + newJobList = [] + for job_spec in jobList: + # skip if successful with another worker + if job_spec.PandaID in okPandaIDs: + continue + if job_spec.submissionAttempts is None: + job_spec.submissionAttempts = 0 + job_spec.submissionAttempts += 1 + # max attempt or permanent error + if tmpRet is False or \ + job_spec.submissionAttempts >= \ + queue_config.maxSubmissionAttempts: + newJobList.append(job_spec) + else: + self.dbProxy.increment_submission_attempt( + job_spec.PandaID, + job_spec.submissionAttempts) + jobList = newJobList + elif queue_config.useJobLateBinding and work_spec.hasJob == 1: + # directly go to running after feeding jobs for late biding + work_spec.set_status(WorkSpec.ST_running) + else: + # normal successful submission + work_spec.set_status(WorkSpec.ST_submitted) + work_spec.submitTime = timeNow + work_spec.modificationTime = timeNow + work_spec.checkTime = timeNow + if self.monitor_fifo.enabled: + work_spec.set_work_params({'lastCheckAt': timeNow_timestamp}) + # prefetch events + if tmpRet and work_spec.hasJob == 1 and \ + work_spec.eventsRequest == WorkSpec.EV_useEvents and \ + queue_config.prefetchEvents: + work_spec.eventsRequest = WorkSpec.EV_requestEvents + eventsRequestParams = dict() + for job_spec in jobList: + eventsRequestParams[job_spec.PandaID] = \ + {'pandaID': job_spec.PandaID, + 'taskID': job_spec.taskID, + 'jobsetID': job_spec.jobParams['jobsetID'], + 'nRanges': max(int(math.ceil(work_spec.nCore / len(jobList))), + job_spec.jobParams['coreCount']), + } + work_spec.eventsRequestParams = eventsRequestParams + # register worker + tmpStat = self.dbProxy.register_worker(work_spec, jobList, locked_by) if jobList is not None: - # increment attempt number - newJobList = [] - for jobSpec in jobList: - # skip if successful with another worker - if jobSpec.PandaID in okPandaIDs: - continue - if jobSpec.submissionAttempts is None: - jobSpec.submissionAttempts = 0 - jobSpec.submissionAttempts += 1 - # max attempt or permanent error - if tmpRet is False or \ - jobSpec.submissionAttempts >= \ - queueConfig.maxSubmissionAttempts: - newJobList.append(jobSpec) + for job_spec in jobList: + pandaIDs.add(job_spec.PandaID) + if tmpStat: + if tmpRet: + tmpStr = \ + 'submitted a workerID={0} for PandaID={1} with batchID={2}' + tmp_log.info(tmpStr.format(work_spec.workerID, + job_spec.PandaID, + work_spec.batchID)) + else: + tmpStr = 'failed to submit a workerID={0} for PandaID={1}' + tmp_log.error(tmpStr.format(work_spec.workerID, + job_spec.PandaID)) else: - self.dbProxy.increment_submission_attempt( - jobSpec.PandaID, - jobSpec.submissionAttempts) - jobList = newJobList - elif queueConfig.useJobLateBinding and workSpec.hasJob == 1: - # directly go to running after feeding jobs for late biding - workSpec.set_status(WorkSpec.ST_running) - else: - # normal successful submission - workSpec.set_status(WorkSpec.ST_submitted) - workSpec.submitTime = timeNow - workSpec.modificationTime = timeNow - workSpec.checkTime = timeNow - if self.monitor_fifo.enabled: - workSpec.set_work_params({'lastCheckAt': timeNow_timestamp}) - # prefetch events - if tmpRet and workSpec.hasJob == 1 and \ - workSpec.eventsRequest == WorkSpec.EV_useEvents and \ - queueConfig.prefetchEvents: - workSpec.eventsRequest = WorkSpec.EV_requestEvents - eventsRequestParams = dict() - for jobSpec in jobList: - eventsRequestParams[jobSpec.PandaID] = \ - {'pandaID': jobSpec.PandaID, - 'taskID': jobSpec.taskID, - 'jobsetID': jobSpec.jobParams['jobsetID'], - 'nRanges': max(int(math.ceil(workSpec.nCore / len(jobList))), - jobSpec.jobParams['coreCount']), - } - workSpec.eventsRequestParams = eventsRequestParams - # register worker - tmpStat = self.dbProxy.register_worker(workSpec, jobList, lockedBy) - if jobList is not None: - for jobSpec in jobList: - pandaIDs.add(jobSpec.PandaID) - if tmpStat: - if tmpRet: tmpStr = \ - 'submitted a workerID={0} for PandaID={1} with batchID={2}' - tmpLog.info(tmpStr.format(workSpec.workerID, - jobSpec.PandaID, - workSpec.batchID)) - else: - tmpStr = 'failed to submit a workerID={0} for PandaID={1}' - tmpLog.error(tmpStr.format(workSpec.workerID, - jobSpec.PandaID)) - else: - tmpStr = \ - 'failed to register a worker for PandaID={0} with batchID={1}' - tmpLog.error(tmpStr.format(jobSpec.PandaID, workSpec.batchID)) - # enqueue to monitor fifo - if self.monitor_fifo.enabled \ - and queueConfig.mapType != WorkSpec.MT_MultiWorkers: - workSpecsToEnqueue = \ - [[w] for w in workSpecList if w.status - in (WorkSpec.ST_submitted, WorkSpec.ST_running)] - check_delay = min( - getattr(harvester_config.monitor, 'eventBasedCheckInterval', - harvester_config.monitor.checkInterval), - getattr(harvester_config.monitor, 'fifoCheckInterval', - harvester_config.monitor.checkInterval)) - monitor_fifo.put((queueName, workSpecsToEnqueue), time.time() + check_delay) - mainLog.debug('put workers to monitor FIFO') - submitted = True - # release jobs - self.dbProxy.release_jobs(pandaIDs, lockedBy) - tmpLog.info('done') - except Exception: - core_utils.dump_error_message(tmpLog) + 'failed to register a worker for PandaID={0} with batchID={1}' + tmp_log.error(tmpStr.format(job_spec.PandaID, work_spec.batchID)) + # enqueue to monitor fifo + if self.monitor_fifo.enabled \ + and queue_config.mapType != WorkSpec.MT_MultiWorkers: + work_specsToEnqueue = \ + [[w] for w in work_specList if w.status + in (WorkSpec.ST_submitted, WorkSpec.ST_running)] + check_delay = min( + getattr(harvester_config.monitor, 'eventBasedCheckInterval', + harvester_config.monitor.checkInterval), + getattr(harvester_config.monitor, 'fifoCheckInterval', + harvester_config.monitor.checkInterval)) + monitor_fifo.put((queue_name, work_specsToEnqueue), time.time() + check_delay) + main_log.debug('put workers to monitor FIFO') + submitted = True + # release jobs + self.dbProxy.release_jobs(pandaIDs, locked_by) + tmp_log.info('done') + except Exception: + core_utils.dump_error_message(tmp_log) # release the site - self.dbProxy.release_site(siteName, lockedBy) - if sw_main.get_elapsed_time_in_sec() > queueLockInterval: - mainLog.warning('a submitter cycle was longer than queueLockInterval {0} sec'.format(queueLockInterval) + self.dbProxy.release_site(site_name, locked_by) + if sw_main.get_elapsed_time_in_sec() > queue_lock_interval: + main_log.warning('a submitter cycle was longer than queue_lock_interval {0} sec'.format(queue_lock_interval) + sw_main.get_elapsed_time()) - mainLog.debug('done') + main_log.debug('done') # define sleep interval - if siteName is None: + if site_name is None: sleepTime = harvester_config.submitter.sleepTime else: sleepTime = 0 @@ -413,13 +417,13 @@ def run(self): interval = harvester_config.submitter.minSubmissionInterval if interval > 0: newTime = datetime.datetime.utcnow() + datetime.timedelta(seconds=interval) - self.dbProxy.update_panda_queue_attribute('submitTime', newTime, site_name=siteName) + self.dbProxy.update_panda_queue_attribute('submitTime', newTime, site_name=site_name) # time the cycle - mainLog.debug('done a submitter cycle' + sw_main.get_elapsed_time()) + main_log.debug('done a submitter cycle' + sw_main.get_elapsed_time()) # check if being terminated if self.terminated(sleepTime): - mainLog.debug('terminated') + main_log.debug('terminated') return # wrapper for submitWorkers to skip ready workers @@ -428,13 +432,13 @@ def submit_workers(self, submitter_core, workspec_list): strList = [] newSpecList = [] workersToSubmit = [] - for workSpec in workspec_list: - if workSpec.status in [WorkSpec.ST_ready, WorkSpec.ST_running]: - newSpecList.append(workSpec) + for work_spec in workspec_list: + if work_spec.status in [WorkSpec.ST_ready, WorkSpec.ST_running]: + newSpecList.append(work_spec) retList.append(True) strList.append('') else: - workersToSubmit.append(workSpec) + workersToSubmit.append(work_spec) tmpRetList = submitter_core.submit_workers(workersToSubmit) # submit the workers to the monitoring diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 22fd057e..2737a97d 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -15,11 +15,11 @@ class WorkerAdjuster(object): # constructor def __init__(self, queue_config_mapper): - self.queueConfigMapper = queue_config_mapper + self.queue_configMapper = queue_config_mapper self.pluginFactory = PluginFactory() self.dbProxy = DBProxy() self.throttlerMap = dict() - self.apf_mon = Apfmon(self.queueConfigMapper) + self.apf_mon = Apfmon(self.queue_configMapper) try: self.maxNewWorkers = harvester_config.submitter.maxNewWorkers except AttributeError: @@ -27,17 +27,17 @@ def __init__(self, queue_config_mapper): # define number of workers to submit based on various information def define_num_workers(self, static_num_workers, site_name): - tmpLog = core_utils.make_logger(_logger, 'site={0}'.format(site_name), method_name='define_num_workers') - tmpLog.debug('start') - tmpLog.debug('static_num_workers: {0}'.format(static_num_workers)) + tmp_log = core_utils.make_logger(_logger, 'site={0}'.format(site_name), method_name='define_num_workers') + tmp_log.debug('start') + tmp_log.debug('static_num_workers: {0}'.format(static_num_workers)) dyn_num_workers = copy.deepcopy(static_num_workers) try: # get queue status - queueStat = self.dbProxy.get_cache("panda_queues.json", None) - if queueStat is None: - queueStat = dict() + queue_stat = self.dbProxy.get_cache("panda_queues.json", None) + if queue_stat is None: + queue_stat = dict() else: - queueStat = queueStat.data + queue_stat = queue_stat.data # get job statistics job_stats = self.dbProxy.get_cache("job_statistics.json", None) @@ -47,196 +47,206 @@ def define_num_workers(self, static_num_workers, site_name): job_stats = job_stats.data # define num of new workers - for queueName in static_num_workers: + for queue_name in static_num_workers: # get queue - queueConfig = self.queueConfigMapper.get_queue(queueName) - workerLimits_dict = self.dbProxy.get_worker_limits(queueName) - maxWorkers = workerLimits_dict.get('maxWorkers', 0) - nQueueLimit = workerLimits_dict.get('nQueueLimitWorker', 0) - nQueueLimitPerRT = workerLimits_dict['nQueueLimitWorkerPerRT'] - nQueue_total, nReady_total, nRunning_total = 0, 0, 0 + queue_config = self.queue_configMapper.get_queue(queue_name) + worker_limits_dict = self.dbProxy.get_worker_limits(queue_name) + max_workers = worker_limits_dict.get('maxWorkers', 0) + n_queue_limit = worker_limits_dict.get('nQueueLimitWorker', 0) + n_queue_limit_per_rt = worker_limits_dict['nQueueLimitWorkerPerRT'] + n_queue_total, n_ready_total, n_running_total = 0, 0, 0 apf_msg = None apf_data = None - for resource_type, tmpVal in iteritems(static_num_workers[queueName]): - tmpLog.debug('Processing queue {0} resource {1} with static_num_workers {2}'. - format(queueName, resource_type, tmpVal)) - - # set 0 to num of new workers when the queue is disabled - if queueName in queueStat and queueStat[queueName]['status'] in ['offline', 'standby', - 'maintenance']: - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = 0 - retMsg = 'set nNewWorkers=0 since status={0}'.format(queueStat[queueName]['status']) - tmpLog.debug(retMsg) - apf_msg = 'Not submitting workers since queue status = {0}'.format(queueStat[queueName]['status']) - continue - - # protection against not-up-to-date queue config - if queueConfig is None: - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = 0 - retMsg = 'set nNewWorkers=0 due to missing queueConfig' - tmpLog.debug(retMsg) - apf_msg = 'Not submitting workers because of missing queueConfig' - continue - - # get throttler - if queueName not in self.throttlerMap: - if hasattr(queueConfig, 'throttler'): - throttler = self.pluginFactory.get_plugin(queueConfig.throttler) - else: - throttler = None - self.throttlerMap[queueName] = throttler - - # check throttler - throttler = self.throttlerMap[queueName] - if throttler is not None: - toThrottle, tmpMsg = throttler.to_be_throttled(queueConfig) - if toThrottle: - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = 0 - retMsg = 'set nNewWorkers=0 by {0}:{1}'.format(throttler.__class__.__name__, tmpMsg) - tmpLog.debug(retMsg) + for job_type, jt_values in iteritems(static_num_workers[queue_name]): + for resource_type, tmp_val in iteritems(jt_values): + tmp_log.debug('Processing queue {0} job_type {1} resource_type {2} with static_num_workers {3}'. + format(queue_name, job_type, resource_type, tmp_val)) + + # set 0 to num of new workers when the queue is disabled + if queue_name in queue_stat and queue_stat[queue_name]['status'] in ['offline', 'standby', + 'maintenance']: + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = 0 + ret_msg = 'set n_new_workers=0 since status={0}'.format(queue_stat[queue_name]['status']) + tmp_log.debug(ret_msg) + apf_msg = 'Not submitting workers since queue status = {0}'.format(queue_stat[queue_name]['status']) continue - # check stats - nQueue = tmpVal['nQueue'] - nReady = tmpVal['nReady'] - nRunning = tmpVal['nRunning'] - if resource_type != 'ANY': - nQueue_total += nQueue - nReady_total += nReady - nRunning_total += nRunning - if queueConfig.runMode == 'slave': - nNewWorkersDef = tmpVal['nNewWorkers'] - if nNewWorkersDef == 0: - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = 0 - retMsg = 'set nNewWorkers=0 by panda in slave mode' - tmpLog.debug(retMsg) + # protection against not-up-to-date queue config + if queue_config is None: + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = 0 + ret_msg = 'set n_new_workers=0 due to missing queue_config' + tmp_log.debug(ret_msg) + apf_msg = 'Not submitting workers because of missing queue_config' continue - else: - nNewWorkersDef = None - - # define num of new workers based on static site config - nNewWorkers = 0 - if nQueue >= nQueueLimitPerRT > 0: - # enough queued workers - retMsg = 'No nNewWorkers since nQueue({0})>=nQueueLimitPerRT({1})'.format(nQueue, nQueueLimitPerRT) - tmpLog.debug(retMsg) - pass - elif (nQueue + nReady + nRunning) >= maxWorkers > 0: - # enough workers in the system - retMsg = 'No nNewWorkers since nQueue({0}) + nReady({1}) + nRunning({2}) '.format(nQueue, - nReady, - nRunning) - retMsg += '>= maxWorkers({0})'.format(maxWorkers) - tmpLog.debug(retMsg) - pass - else: - - maxQueuedWorkers = None - - if nQueueLimitPerRT > 0: # there is a limit set for the queue - maxQueuedWorkers = nQueueLimitPerRT - - # Reset the maxQueueWorkers according to particular - if nNewWorkersDef is not None: # don't surpass limits given centrally - maxQueuedWorkers_slave = nNewWorkersDef + nQueue - if maxQueuedWorkers is not None: - maxQueuedWorkers = min(maxQueuedWorkers_slave, maxQueuedWorkers) + + # get throttler + if queue_name not in self.throttlerMap: + if hasattr(queue_config, 'throttler'): + throttler = self.pluginFactory.get_plugin(queue_config.throttler) else: - maxQueuedWorkers = maxQueuedWorkers_slave - - elif queueConfig.mapType == 'NoJob': # for pull mode, limit to activated jobs - # limit the queue to the number of activated jobs to avoid empty pilots - try: - n_activated = max(job_stats[queueName]['activated'], 1) # avoid no activity queues - queue_limit = maxQueuedWorkers - maxQueuedWorkers = min(n_activated, maxQueuedWorkers) - tmpLog.debug('limiting maxQueuedWorkers to min(n_activated={0}, queue_limit={1})'. - format(n_activated, queue_limit)) - except KeyError: - tmpLog.warning('n_activated not defined, defaulting to configured queue limits') - pass - - if maxQueuedWorkers is None: # no value found, use default value - maxQueuedWorkers = 1 - - # new workers - nNewWorkers = max(maxQueuedWorkers - nQueue, 0) - tmpLog.debug('setting nNewWorkers to {0} in maxQueuedWorkers calculation' - .format(nNewWorkers)) - if maxWorkers > 0: - nNewWorkers = min(nNewWorkers, max(maxWorkers - nQueue - nReady - nRunning, 0)) - tmpLog.debug('setting nNewWorkers to {0} to respect maxWorkers' - .format(nNewWorkers)) - if queueConfig.maxNewWorkersPerCycle > 0: - nNewWorkers = min(nNewWorkers, queueConfig.maxNewWorkersPerCycle) - tmpLog.debug('setting nNewWorkers to {0} in order to respect maxNewWorkersPerCycle' - .format(nNewWorkers)) - if self.maxNewWorkers is not None and self.maxNewWorkers > 0: - nNewWorkers = min(nNewWorkers, self.maxNewWorkers) - tmpLog.debug('setting nNewWorkers to {0} in order to respect universal maxNewWorkers' - .format(nNewWorkers)) - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = nNewWorkers - - # adjust nNewWorkers for UCORE to let aggregations over RT respect nQueueLimitWorker and maxWorkers - if queueConfig is None: - maxNewWorkersPerCycle = 0 - retMsg = 'set maxNewWorkersPerCycle=0 in UCORE aggregation due to missing queueConfig' - tmpLog.debug(retMsg) + throttler = None + self.throttlerMap[queue_name] = throttler + + # check throttler + throttler = self.throttlerMap[queue_name] + if throttler is not None: + to_throttle, tmp_msg = throttler.to_be_throttled(queue_config) + if to_throttle: + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = 0 + ret_msg = 'set n_new_workers=0 by {0}:{1}'.format(throttler.__class__.__name__, tmp_msg) + tmp_log.debug(ret_msg) + continue + + # check stats + n_queue = tmp_val['nQueue'] + n_ready = tmp_val['nReady'] + n_running = tmp_val['nRunning'] + if resource_type != 'ANY' and job_type != 'ANY' and job_type is not None: + n_queue_total += n_queue + n_ready_total += n_ready + n_running_total += n_running + if queue_config.runMode == 'slave': + n_new_workers_def = tmp_val['nNewWorkers'] + if n_new_workers_def == 0: + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = 0 + ret_msg = 'set n_new_workers=0 by panda in slave mode' + tmp_log.debug(ret_msg) + continue + else: + n_new_workers_def = None + + # define num of new workers based on static site config + n_new_workers = 0 + if n_queue >= n_queue_limit_per_rt > 0: + # enough queued workers + ret_msg = 'No n_new_workers since n_queue({0})>=n_queue_limit_per_rt({1})'.format(n_queue, + n_queue_limit_per_rt) + tmp_log.debug(ret_msg) + pass + elif (n_queue + n_ready + n_running) >= max_workers > 0: + # enough workers in the system + ret_msg = 'No n_new_workers since n_queue({0}) + n_ready({1}) + n_running({2}) '.format(n_queue, + n_ready, + n_running) + ret_msg += '>= max_workers({0})'.format(max_workers) + tmp_log.debug(ret_msg) + pass + else: + + max_queued_workers = None + + if n_queue_limit_per_rt > 0: # there is a limit set for the queue + max_queued_workers = n_queue_limit_per_rt + + # Reset the maxQueueWorkers according to particular + if n_new_workers_def is not None: # don't surpass limits given centrally + maxQueuedWorkers_slave = n_new_workers_def + n_queue + if max_queued_workers is not None: + max_queued_workers = min(maxQueuedWorkers_slave, max_queued_workers) + else: + max_queued_workers = maxQueuedWorkers_slave + + elif queue_config.mapType == 'NoJob': # for pull mode, limit to activated jobs + # limit the queue to the number of activated jobs to avoid empty pilots + try: + n_activated = max(job_stats[queue_name]['activated'], 1) # avoid no activity queues + queue_limit = max_queued_workers + max_queued_workers = min(n_activated, max_queued_workers) + tmp_log.debug('limiting max_queued_workers to min(n_activated={0}, queue_limit={1})'. + format(n_activated, queue_limit)) + except KeyError: + tmp_log.warning('n_activated not defined, defaulting to configured queue limits') + pass + + if max_queued_workers is None: # no value found, use default value + max_queued_workers = 1 + + # new workers + n_new_workers = max(max_queued_workers - n_queue, 0) + tmp_log.debug('setting n_new_workers to {0} in max_queued_workers calculation' + .format(n_new_workers)) + if max_workers > 0: + n_new_workers = min(n_new_workers, max(max_workers - n_queue - n_ready - n_running, 0)) + tmp_log.debug('setting n_new_workers to {0} to respect max_workers' + .format(n_new_workers)) + if queue_config.max_new_workers_per_cycle > 0: + n_new_workers = min(n_new_workers, queue_config.max_new_workers_per_cycle) + tmp_log.debug('setting n_new_workers to {0} in order to respect max_new_workers_per_cycle' + .format(n_new_workers)) + if self.maxNewWorkers is not None and self.maxNewWorkers > 0: + n_new_workers = min(n_new_workers, self.maxNewWorkers) + tmp_log.debug('setting n_new_workers to {0} in order to respect universal maxNewWorkers' + .format(n_new_workers)) + dyn_num_workers[queue_name][job_type][resource_type]['n_new_workers'] = n_new_workers + + # adjust n_new_workers for UCORE to let aggregations over RT respect nQueueLimitWorker and max_workers + if queue_config is None: + max_new_workers_per_cycle = 0 + ret_msg = 'set max_new_workers_per_cycle=0 in UCORE aggregation due to missing queue_config' + tmp_log.debug(ret_msg) else: - maxNewWorkersPerCycle = queueConfig.maxNewWorkersPerCycle - if len(dyn_num_workers[queueName]) > 1: - total_new_workers_rts = sum( dyn_num_workers[queueName][_rt]['nNewWorkers'] + max_new_workers_per_cycle = queue_config.max_new_workers_per_cycle + if len(dyn_num_workers[queue_name]) > 1: + total_new_workers_rts = sum(dyn_num_workers[queue_name][_rt]['n_new_workers'] if _rt != 'ANY' else 0 - for _rt in dyn_num_workers[queueName] ) - nNewWorkers_max_agg = min( - max(nQueueLimit - nQueue_total, 0), - max(maxWorkers - nQueue_total - nReady_total - nRunning_total, 0), - ) - if maxNewWorkersPerCycle >= 0: - nNewWorkers_max_agg = min(nNewWorkers_max_agg, maxNewWorkersPerCycle) + for _rt in dyn_num_workers[queue_name]) + n_new_workers_max_agg = min(max(n_queue_limit - n_queue_total, 0), + max(max_workers - n_queue_total - n_ready_total - n_running_total, 0)) + if max_new_workers_per_cycle >= 0: + n_new_workers_max_agg = min(n_new_workers_max_agg, max_new_workers_per_cycle) if self.maxNewWorkers is not None and self.maxNewWorkers > 0: - nNewWorkers_max_agg = min(nNewWorkers_max_agg, self.maxNewWorkers) + n_new_workers_max_agg = min(n_new_workers_max_agg, self.maxNewWorkers) + # exceeded max, to adjust - if total_new_workers_rts > nNewWorkers_max_agg: - if nNewWorkers_max_agg == 0: - for resource_type in dyn_num_workers[queueName]: - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = 0 - tmpLog.debug('No nNewWorkers since nNewWorkers_max_agg=0 for UCORE') + if total_new_workers_rts > n_new_workers_max_agg: + if n_new_workers_max_agg == 0: + for job_type in dyn_num_workers[queue_name]: + for resource_type in dyn_num_workers[queue_name][job_type]: + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = 0 + tmp_log.debug('No n_new_workers since n_new_workers_max_agg=0 for UCORE') else: - tmpLog.debug('nNewWorkers_max_agg={0} for UCORE'.format(nNewWorkers_max_agg)) - _d = dyn_num_workers[queueName].copy() + tmp_log.debug('n_new_workers_max_agg={0} for UCORE'.format(n_new_workers_max_agg)) + _d = dyn_num_workers[queue_name].copy() del _d['ANY'] - simple_rt_nw_list = [ [_rt, _d[_rt].get('nNewWorkers', 0), 0] for _rt in _d ] - _countdown = nNewWorkers_max_agg + + # TODO: needs to be recalculated + simple_rt_nw_list = [] + for _jt in _d: # jt: job type + for _rt in _d[_jt]: # rt: resource type + simple_rt_nw_list.append = [_d[_jt][_rt].get('n_new_workers', 0), 0] + + _countdown = n_new_workers_max_agg for _rt_list in simple_rt_nw_list: - resource_type, nNewWorkers_orig, _r = _rt_list - nNewWorkers, remainder = divmod(nNewWorkers_orig*nNewWorkers_max_agg, total_new_workers_rts) - dyn_num_workers[queueName][resource_type]['nNewWorkers'] = nNewWorkers + resource_type, n_new_workers_orig, _r = _rt_list + n_new_workers, remainder = divmod(n_new_workers_orig * n_new_workers_max_agg, + total_new_workers_rts) + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = n_new_workers _rt_list[2] = remainder - _countdown -= nNewWorkers + _countdown -= n_new_workers _s_list = sorted(simple_rt_nw_list, key=(lambda x: x[1])) sorted_rt_nw_list = sorted(_s_list, key=(lambda x: x[2]), reverse=True) - for resource_type, nNewWorkers_orig, remainder in sorted_rt_nw_list: + for resource_type, n_new_workers_orig, remainder in sorted_rt_nw_list: if _countdown <= 0: break - dyn_num_workers[queueName][resource_type]['nNewWorkers'] += 1 + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] += 1 _countdown -= 1 - for resource_type in dyn_num_workers[queueName]: + + for resource_type in dyn_num_workers[queue_name]: if resource_type == 'ANY': continue - nNewWorkers = dyn_num_workers[queueName][resource_type]['nNewWorkers'] - tmpLog.debug('setting nNewWorkers to {0} of type {1} in order to respect RT aggregations for UCORE' - .format(nNewWorkers, resource_type)) + n_new_workers = dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] + tmp_log.debug('setting n_new_workers to {0} of job_type {1} resource_type {2} in order to respect RT aggregations for UCORE' + .format(n_new_workers, job_type, resource_type)) if not apf_msg: - apf_data = copy.deepcopy(dyn_num_workers[queueName]) + apf_data = copy.deepcopy(dyn_num_workers[queue_name]) - self.apf_mon.update_label(queueName, apf_msg, apf_data) + self.apf_mon.update_label(queue_name, apf_msg, apf_data) # dump - tmpLog.debug('defined {0}'.format(str(dyn_num_workers))) + tmp_log.debug('defined {0}'.format(str(dyn_num_workers))) return dyn_num_workers except Exception: # dump error - errMsg = core_utils.dump_error_message(tmpLog) + err_msg = core_utils.dump_error_message(tmp_log) return None diff --git a/pandaharvester/harvestercore/core_utils.py b/pandaharvester/harvestercore/core_utils.py index 5358d92a..5a478446 100644 --- a/pandaharvester/harvestercore/core_utils.py +++ b/pandaharvester/harvestercore/core_utils.py @@ -587,7 +587,7 @@ def get_queues_config_url(): # get unique queue name def get_unique_queue_name(queue_name, resource_type): - return '{0}:{1}'.format(queue_name, resource_type) + return '{0}:{1}:{2}'.format(queue_name, resource_type) # capability to dynamically change plugins diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 4b58f4b4..2f9f7b02 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -1398,7 +1398,7 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ sqlS += "OR (submitTime<:lookupTimeLimit AND lockedBy IS NULL) " sqlS += "ORDER BY submitTime " # sql to get queues - sqlQ = "SELECT queueName,resourceType,nNewWorkers FROM {0} ".format(pandaQueueTableName) + sqlQ = "SELECT queueName, resourceType, jobType, nNewWorkers FROM {0} ".format(pandaQueueTableName) sqlQ += "WHERE siteName=:siteName " # sql to get orphaned workers sqlO = "SELECT workerID FROM {0} ".format(workTableName) @@ -1408,7 +1408,7 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ sqlD = "DELETE FROM {0} ".format(workTableName) sqlD += "WHERE workerID=:workerID " # sql to count nQueue - sqlN = "SELECT status,COUNT(*) cnt FROM {0} ".format(workTableName) + sqlN = "SELECT status, COUNT(*) cnt FROM {0} ".format(workTableName) sqlN += "WHERE computingSite=:computingSite " # sql to count re-fillers sqlR = "SELECT COUNT(*) cnt FROM {0} ".format(workTableName) @@ -1444,13 +1444,17 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ varMap[':siteName'] = siteName self.execute(sqlQ, varMap) resQ = self.cur.fetchall() - for queueName, resourceType, nNewWorkers in resQ: + for queueName, jobType, resourceType, nNewWorkers in resQ: + # delete orphaned workers varMap = dict() varMap[':computingSite'] = queueName varMap[':status'] = WorkSpec.ST_pending varMap[':timeLimit'] = timeNow - datetime.timedelta(seconds=lock_interval) sqlO_tmp = sqlO + if jobType != 'ANY': + varMap[':jobType'] = jobType + sqlO_tmp += "AND jobType=:jobType " if resourceType != 'ANY': varMap[':resourceType'] = resourceType sqlO_tmp += "AND resourceType=:resourceType " @@ -1462,11 +1466,15 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ self.execute(sqlD, varMap) # commit self.commit() + # count nQueue varMap = dict() varMap[':computingSite'] = queueName varMap[':resourceType'] = resourceType sqlN_tmp = sqlN + if jobType != 'ANY': + varMap[':jobType'] = jobType + sqlN_tmp += "AND jobType=:jobType " if resourceType != 'ANY': varMap[':resourceType'] = resourceType sqlN_tmp += "AND resourceType=:resourceType " @@ -1482,11 +1490,15 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ nReady += tmpNum elif workerStatus in [WorkSpec.ST_running]: nRunning += tmpNum + # count nFillers varMap = dict() varMap[':computingSite'] = queueName varMap[':status'] = WorkSpec.ST_running sqlR_tmp = sqlR + if jobType != 'ANY': + varMap[':jobType'] = jobType + sqlR_tmp += "AND jobType=:jobType " if resourceType != 'ANY': varMap[':resourceType'] = resourceType sqlR_tmp += "AND resourceType=:resourceType " @@ -1494,12 +1506,13 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ nReFill, = self.cur.fetchone() nReady += nReFill # add - retMap.setdefault(queueName, {}) - retMap[queueName][resourceType] = {'nReady': nReady, - 'nRunning': nRunning, - 'nQueue': nQueue, - 'nNewWorkers': nNewWorkers} - resourceMap[resourceType] = queueName + retMap.setdefault(queueName, {jobType: {}}) + retMap[queueName][jobType][resourceType] = {'nReady': nReady, + 'nRunning': nRunning, + 'nQueue': nQueue, + 'nNewWorkers': nNewWorkers} + resourceMap.setdefault(jobType, {}) + resourceMap[jobType][resourceType] = queueName # enough queues if len(retMap) >= 0: break @@ -3651,11 +3664,11 @@ def release_jobs(self, panda_ids, locked_by): return False # clone queue - def clone_queue_with_new_resource_type(self, site_name, queue_name, resource_type, new_workers): + def clone_queue_with_new_job_and_resource_type(self, site_name, queue_name, job_type, resource_type, new_workers): try: # get logger tmpLog = core_utils.make_logger(_logger, 'site_name={0} queue_name={1}'.format(site_name, queue_name), - method_name='clone_queue_with_new_resource_type') + method_name='clone_queue_with_new_job_and_resource_type') tmpLog.debug('start') # get the values from one of the existing queues @@ -3674,6 +3687,8 @@ def clone_queue_with_new_resource_type(self, site_name, queue_name, resource_typ attr_binding = ':{0}'.format(attribute) if attribute == 'resourceType': var_map[attr_binding] = resource_type + elif attribute == 'jobType': + var_map[attr_binding] = job_type elif attribute == 'nNewWorkers': var_map[attr_binding] = new_workers elif attribute == 'uniqueName': @@ -3707,85 +3722,87 @@ def set_queue_limit(self, site_name, params): sql_reset += "SET nNewWorkers=:zero WHERE siteName=:siteName " # sql to get resource types - sql_get_resource = "SELECT resourceType FROM {0} ".format(pandaQueueTableName) - sql_get_resource += "WHERE siteName=:siteName " - sql_get_resource += "FOR UPDATE " + sql_get_job_resource = "SELECT jobType, resourceType FROM {0} ".format(pandaQueueTableName) + sql_get_job_resource += "WHERE siteName=:siteName " + sql_get_job_resource += "FOR UPDATE " # sql to update nQueueLimit sql_update_queue = "UPDATE {0} ".format(pandaQueueTableName) - sql_update_queue += "SET nNewWorkers=:nQueue WHERE siteName=:siteName AND resourceType=:resourceType " + sql_update_queue += "SET nNewWorkers=:nQueue " + sql_update_queue += "WHERE siteName=:siteName AND jobType=:jobType AND resourceType=:resourceType " # sql to get num of submitted workers sql_count_workers = "SELECT COUNT(*) cnt " sql_count_workers += "FROM {0} wt, {1} pq ".format(workTableName, pandaQueueTableName) - sql_count_workers += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status=:status " - sql_count_workers += "ANd pq.resourceType=:resourceType " + sql_count_workers += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status=:status" + sql_count_workers += "AND pq.jobType=:jobType AND pq.resourceType=:resourceType " - # reset nqueued for all resource types + # reset nqueued for all job & resource types varMap = dict() varMap[':zero'] = 0 varMap[':siteName'] = site_name self.execute(sql_reset, varMap) - # get resource types + # get job & resource types varMap = dict() varMap[':siteName'] = site_name - self.execute(sql_get_resource, varMap) - resRes = self.cur.fetchall() - resource_type_list = set() - for tmpRes, in resRes: - resource_type_list.add(tmpRes) + self.execute(sql_get_job_resource, varMap) + res_res = self.cur.fetchall() + job_resource_type_list = set() + for tmp_res, in res_res: + job_resource_type_list.add(tmp_res) # set all queues nUp = 0 - retMap = dict() + ret_map = dict() queue_name = site_name - for resource_type, value in iteritems(params): - tmpLog.debug('Processing rt {0} -> {1}'.format(resource_type, value)) + for job_type, job_values in iteritems(params): + ret_map.setdefault(job_type, {}) + for resource_type, value in iteritems(job_values): + tmpLog.debug('Processing rt {0} -> {1}'.format(resource_type, value)) - # get num of submitted workers - varMap = dict() - varMap[':siteName'] = site_name - varMap[':resourceType'] = resource_type - varMap[':status'] = 'submitted' - self.execute(sql_count_workers, varMap) - res = self.cur.fetchone() - tmpLog.debug('{0} has {1} submitted workers'.format(resource_type, res)) - nSubmittedWorkers = 0 - if res is not None: - nSubmittedWorkers, = res - - # set new value - # value = max(value - nSubmittedWorkers, 0) - if value is None: - value = 0 - varMap = dict() - varMap[':nQueue'] = value - varMap[':siteName'] = site_name - varMap[':resourceType'] = resource_type - self.execute(sql_update_queue, varMap) - iUp = self.cur.rowcount - - # iUp is 0 when nQueue is not changed - if iUp > 0 or resource_type in resource_type_list: - # a queue was updated, add the values to the map - retMap[resource_type] = value - else: - # no queue was updated, we need to create a new one for the resource type - cloned = self.clone_queue_with_new_resource_type(site_name, queue_name, resource_type, value) - if cloned: - retMap[resource_type] = value - iUp = 1 + # get num of submitted workers + varMap = dict() + varMap[':siteName'] = site_name + varMap[':jobType'] = job_type + varMap[':resourceType'] = resource_type + varMap[':status'] = 'submitted' + self.execute(sql_count_workers, varMap) + res = self.cur.fetchone() + tmpLog.debug('{0} has {1} submitted workers'.format(resource_type, res)) + + if value is None: + value = 0 + varMap = dict() + varMap[':nQueue'] = value + varMap[':siteName'] = site_name + varMap[':jobType'] = job_type + varMap[':resourceType'] = resource_type + self.execute(sql_update_queue, varMap) + iUp = self.cur.rowcount + + # iUp is 0 when nQueue is not changed + if iUp > 0 or (job_type, resource_type) in job_resource_type_list: + # a queue was updated, add the values to the map + ret_map[job_type][resource_type] = value + else: + # no queue was updated, we need to create a new one for the resource type + cloned = self.clone_queue_with_new_resource_type(site_name, queue_name, job_type, + resource_type, value) + if cloned: + ret_map[job_type][resource_type] = value + iUp = 1 - nUp += iUp - tmpLog.debug('set nNewWorkers={0} to {1}:{2} with {3}'.format(value, queue_name, resource_type, iUp)) + nUp += iUp + tmpLog.debug('set nNewWorkers={0} to {1}:{2}:{3} with {4}'.format(value, queue_name, job_type, + resource_type, iUp)) # commit self.commit() tmpLog.debug('updated {0} queues'.format(nUp)) - return retMap + return ret_map except Exception: # roll back self.rollback() @@ -4338,16 +4355,20 @@ def get_worker_limits(self, site_name): # get logger tmpLog = core_utils.make_logger(_logger, method_name='get_worker_limits') tmpLog.debug('start') - # sql to get - sqlQ = "SELECT maxWorkers,nQueueLimitWorker,nQueueLimitWorkerRatio," + + # sql to get queue limits + sqlQ = "SELECT maxWorkers, nQueueLimitWorker, nQueueLimitWorkerRatio," sqlQ += "nQueueLimitWorkerMax,nQueueLimitWorkerMin FROM {0} ".format(pandaQueueTableName) - sqlQ += "WHERE siteName=:siteName AND resourceType='ANY'" + sqlQ += "WHERE siteName=:siteName AND jobType='ANY' AND resourceType='ANY'" + # sql to count resource types sqlNT = "SELECT COUNT(*) cnt FROM {0} ".format(pandaQueueTableName) sqlNT += "WHERE siteName=:siteName AND resourceType!='ANY'" + # sql to count running workers sqlNR = "SELECT COUNT(*) cnt FROM {0} ".format(workTableName) sqlNR += "WHERE computingSite=:computingSite AND status IN (:status1)" + # get varMap = dict() varMap[':siteName'] = site_name @@ -4365,6 +4386,7 @@ def get_worker_limits(self, site_name): varMap[':status1'] = 'running' self.execute(sqlNR, varMap) resNR = self.cur.fetchall() + # dynamic nQueueLimitWorker retMap = dict() nRunning = 0 diff --git a/pandaharvester/harvestercore/panda_queue_spec.py b/pandaharvester/harvestercore/panda_queue_spec.py index 1ba06b5d..210e29d7 100644 --- a/pandaharvester/harvestercore/panda_queue_spec.py +++ b/pandaharvester/harvestercore/panda_queue_spec.py @@ -16,6 +16,7 @@ class PandaQueueSpec(SpecBase): 'submitTime:timestamp / index', 'lockedBy:text', 'siteName:text / index', + 'jobType:text', 'resourceType:text', 'nNewWorkers:integer', 'uniqueName:text / unique', @@ -29,7 +30,7 @@ class PandaQueueSpec(SpecBase): # catchall resource type RT_catchall = 'ANY' - + JT_catchall = 'ANY' # constructor def __init__(self): SpecBase.__init__(self) diff --git a/pandaharvester/harvestercore/work_spec.py b/pandaharvester/harvestercore/work_spec.py index e049ad93..63e1869f 100644 --- a/pandaharvester/harvestercore/work_spec.py +++ b/pandaharvester/harvestercore/work_spec.py @@ -79,6 +79,7 @@ class WorkSpec(SpecBase): 'computingElement:text', 'nJobsToReFill:integer / index', 'logFilesToUpload:blob', + 'jobType:text', 'resourceType:text', 'nativeExitCode:integer', 'nativeStatus:text', @@ -236,6 +237,7 @@ def convert_to_propagate(self): 'submitTime', 'startTime', 'endTime', + 'jobType', 'resourceType', 'nativeExitCode', 'nativeStatus', diff --git a/pandaharvester/harvesterworkermaker/dummy_dynamic_worker_maker.py b/pandaharvester/harvesterworkermaker/dummy_dynamic_worker_maker.py index f202eece..beb29294 100644 --- a/pandaharvester/harvesterworkermaker/dummy_dynamic_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/dummy_dynamic_worker_maker.py @@ -11,7 +11,7 @@ def __init__(self, **kwarg): BaseWorkerMaker.__init__(self, **kwarg) # make a worker from jobs - def make_worker(self, jobspec_list, queue_config, resource_type): + def make_worker(self, jobspec_list, queue_config, job_type, resource_type): workSpec = WorkSpec() workSpec.resourceType = resource_type if len(jobspec_list) > 0: diff --git a/pandaharvester/harvesterworkermaker/multijob_worker_maker.py b/pandaharvester/harvesterworkermaker/multijob_worker_maker.py index 7b56c235..7de1f4f5 100644 --- a/pandaharvester/harvesterworkermaker/multijob_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/multijob_worker_maker.py @@ -45,7 +45,7 @@ def _get_executable(self, queue_config): return exe_str # make a worker from a job with a disk access point - def make_worker(self, jobspec_list, queue_config, resource_type): + def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog = self.make_logger(baseLogger, method_name='make_worker') workSpec = WorkSpec() self.nJobsPerWorker = len(jobspec_list) diff --git a/pandaharvester/harvesterworkermaker/multinode_worker_maker.py b/pandaharvester/harvesterworkermaker/multinode_worker_maker.py index c59b81a5..c03c44cf 100644 --- a/pandaharvester/harvesterworkermaker/multinode_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/multinode_worker_maker.py @@ -58,7 +58,7 @@ def _get_executable(self): return exe_str # make a worker from jobs - def make_worker(self, jobspec_list, queue_config, resource_type): + def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog = core_utils.make_logger(baseLogger, 'queue={0}'.format(queue_config.queueName), method_name='make_worker') diff --git a/pandaharvester/harvesterworkermaker/simple_bf_es_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_bf_es_worker_maker.py index 2ac53375..bc2d7dad 100644 --- a/pandaharvester/harvesterworkermaker/simple_bf_es_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_bf_es_worker_maker.py @@ -28,7 +28,7 @@ def __init__(self, **kwarg): self.dyn_resources = None # make a worker from jobs - def make_worker(self, jobspec_list, queue_config, resource_type): + def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog = self.make_logger(_logger, 'queue={0}'.format(queue_config.queueName), method_name='make_worker') @@ -201,7 +201,7 @@ def adjust_resources(self, resources): tmpLog.info("Available backfill resources after adjusting: %s" % ret_resources) return ret_resources - def get_dynamic_resource(self, queue_name, resource_type): + def get_dynamic_resource(self, queue_name, job_type, resource_type): resources = self.get_bf_resources() if resources: resources = self.adjust_resources(resources) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index b925028d..fb7a8454 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -56,8 +56,29 @@ def get_job_core_and_memory(self, queue_dict, job_spec): return job_corecount, job_memory + def get_job_type(self, job_spec, job_type, queue_dict): + + # 1. get prodSourceLabel from job (push) + if job_spec and 'prodSourceLabel' in job_spec.jobParams: + job_type_final = job_spec.jobParams['prodSourceLabel'] + # 2. get prodSourceLabel from the specified job_type (pull UPS) + elif job_type: + job_type_final = job_type + # 3. convert the prodSourcelabel from the queue configuration or leave it empty + else: # 3. get prodSourceLabel from the queue definition (pull) + queue_type = queue_dict.get('type', None) + # map AGIS types to PanDA types + if queue_type == 'analysis': + job_type_final = 'user' + elif queue_type == 'production': + job_type_final = 'managed' + else: + job_type_final = None + + return job_type_final + # make a worker from jobs - def make_worker(self, jobspec_list, queue_config, resource_type): + def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog = self.make_logger(_logger, 'queue={0}'.format(queue_config.queueName), method_name='make_worker') @@ -76,7 +97,7 @@ def make_worker(self, jobspec_list, queue_config, resource_type): workSpec.nCore = queue_dict.get('corecount', 1) or 1 workSpec.minRamCount = queue_dict.get('maxrss', 1) or 1 - # case of unified queue: look at the resource type and queue configuration + # case of unified queue: look at the job & resource type and queue configuration else: catchall = queue_dict.get('catchall', '') if 'useMaxRam' in catchall or queue_config.queueName in ('Taiwan-LCG2-HPC2_Unified', @@ -128,11 +149,9 @@ def make_worker(self, jobspec_list, queue_config, resource_type): except Exception: pass - if (nCore > 0 and 'nCore' in self.jobAttributesToUse) \ - or unified_queue: + if (nCore > 0 and 'nCore' in self.jobAttributesToUse) or unified_queue: workSpec.nCore = nCore - if (minRamCount > 0 and 'minRamCount' in self.jobAttributesToUse) \ - or unified_queue: + if (minRamCount > 0 and 'minRamCount' in self.jobAttributesToUse) or unified_queue: workSpec.minRamCount = minRamCount if maxDiskCount > 0 and 'maxDiskCount' in self.jobAttributesToUse: workSpec.maxDiskCount = maxDiskCount @@ -140,13 +159,19 @@ def make_worker(self, jobspec_list, queue_config, resource_type): workSpec.maxWalltime = maxWalltime if ioIntensity > 0 and 'ioIntensity' in self.jobAttributesToUse: workSpec.ioIntensity = ioIntensity + workSpec.pilotType = jobspec_list[0].get_pilot_type() + workSpec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict) + else: # when no job # randomize pilot type with weighting workSpec.pilotType = random.choice(self.pilotTypeRandomList) if workSpec.pilotType in ['RC', 'ALRB', 'PT']: tmpLog.info('a worker has pilotType={0}'.format(workSpec.pilotType)) + + workSpec.jobType = self.get_job_type(None, job_type, queue_dict) + # TODO: this needs to be improved with real resource types if resource_type and resource_type != 'ANY': workSpec.resourceType = resource_type @@ -155,4 +180,6 @@ def make_worker(self, jobspec_list, queue_config, resource_type): else: workSpec.resourceType = 'MCORE' + + return workSpec From 76bdf64283d396a9f72a30d0dedd05c1c3fbedc5 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 7 May 2019 17:16:13 +0200 Subject: [PATCH 02/41] checkpoint --- pandaharvester/harvestercore/db_proxy.py | 60 +++++++++++++----------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 2f9f7b02..1cbb27b7 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3264,7 +3264,7 @@ def get_worker_stats(self, site_name): tmpLog = core_utils.make_logger(_logger, method_name='get_worker_stats') tmpLog.debug('start') # sql to get nQueueLimit - sqlQ = "SELECT queueName,resourceType,nNewWorkers FROM {0} ".format(pandaQueueTableName) + sqlQ = "SELECT queueName, jobType, resourceType, nNewWorkers FROM {0} ".format(pandaQueueTableName) sqlQ += "WHERE siteName=:siteName " # get nQueueLimit varMap = dict() @@ -3272,18 +3272,18 @@ def get_worker_stats(self, site_name): self.execute(sqlQ, varMap) resQ = self.cur.fetchall() retMap = dict() - for computingSite, resourceType, nNewWorkers in resQ: - if resourceType not in retMap: - retMap[resourceType] = { - 'running': 0, - 'submitted': 0, - 'to_submit': nNewWorkers - } + for computingSite, jobType, resourceType, nNewWorkers in resQ: + retMap.setdefault(jobType, {}) + if resourceType not in retMap[jobType]: + retMap[jobType][resourceType] = {'running': 0, + 'submitted': 0, + 'to_submit': nNewWorkers} + # get worker stats - sqlW = "SELECT wt.status, wt.computingSite, pq.resourceType, COUNT(*) cnt " + sqlW = "SELECT wt.status, wt.computingSite, pq.jobType, pq.resourceType, COUNT(*) cnt " sqlW += "FROM {0} wt, {1} pq ".format(workTableName, pandaQueueTableName) sqlW += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2) " - sqlW += "GROUP BY wt.status, wt.computingSite, pq.resourceType " + sqlW += "GROUP BY wt.status, wt.computingSite, pq.jobType, pq.resourceType " # get worker stats varMap = dict() varMap[':siteName'] = site_name @@ -3291,14 +3291,14 @@ def get_worker_stats(self, site_name): varMap[':st2'] = 'submitted' self.execute(sqlW, varMap) resW = self.cur.fetchall() - for workerStatus, computingSite, resourceType, cnt in resW: + for workerStatus, computingSite, jobType, resourceType, cnt in resW: + retMap.setdefault(jobType, {}) if resourceType not in retMap: - retMap[resourceType] = { - 'running': 0, - 'submitted': 0, - 'to_submit': 0 - } - retMap[resourceType][workerStatus] = cnt + retMap[jobType][resourceType] = {'running': 0, + 'submitted': 0, + 'to_submit': 0 + } + retMap[jobType][resourceType][workerStatus] = cnt # commit self.commit() tmpLog.debug('got {0}'.format(str(retMap))) @@ -3318,40 +3318,46 @@ def get_worker_stats_bulk(self, active_ups_queues): tmpLog = core_utils.make_logger(_logger, method_name='get_worker_stats_bulk') tmpLog.debug('start') # sql to get nQueueLimit - sqlQ = "SELECT queueName, resourceType, nNewWorkers FROM {0} ".format(pandaQueueTableName) + sqlQ = "SELECT queueName, jobType, resourceType, nNewWorkers FROM {0} ".format(pandaQueueTableName) # get nQueueLimit self.execute(sqlQ) resQ = self.cur.fetchall() retMap = dict() - for computingSite, resourceType, nNewWorkers in resQ: + for computingSite, jobType, resourceType, nNewWorkers in resQ: retMap.setdefault(computingSite, {}) - if resourceType and resourceType != 'ANY' and resourceType not in retMap[computingSite]: - retMap[computingSite][resourceType] = {'running': 0, 'submitted': 0, 'to_submit': nNewWorkers} + retMap[computingSite].setdefault(jobType, {}) + if resourceType and resourceType != 'ANY' and resourceType not in retMap[computingSite][jobType]: + retMap[computingSite][jobType][resourceType] = {'running': 0, + 'submitted': 0, + 'to_submit': nNewWorkers} # get worker stats - sqlW = "SELECT wt.status, wt.computingSite, wt.resourceType, COUNT(*) cnt " + sqlW = "SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt " sqlW += "FROM {0} wt ".format(workTableName) sqlW += "WHERE wt.status IN (:st1,:st2) " - sqlW += "GROUP BY wt.status,wt.computingSite, wt.resourceType " + sqlW += "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType " # get worker stats varMap = dict() varMap[':st1'] = 'running' varMap[':st2'] = 'submitted' self.execute(sqlW, varMap) resW = self.cur.fetchall() - for workerStatus, computingSite, resourceType, cnt in resW: + for workerStatus, computingSite, jobType, resourceType, cnt in resW: if resourceType and resourceType != 'ANY': retMap.setdefault(computingSite, {}) - retMap[computingSite].setdefault(resourceType, {'running': 0, 'submitted': 0, 'to_submit': 0}) - retMap[computingSite][resourceType][workerStatus] = cnt + retMap[computingSite].setdefault(jobType, {}) + retMap[computingSite][jobType].setdefault(resourceType, {'running': 0, + 'submitted': 0, + 'to_submit': 0}) + retMap[computingSite][jobType][resourceType][workerStatus] = cnt # if there are no jobs for an active UPS queue, it needs to be initialized so that the pilot streaming # on panda server starts processing the queue if active_ups_queues: for ups_queue in active_ups_queues: if ups_queue not in retMap or not retMap[ups_queue]: - retMap[ups_queue] = {'SCORE': {'running': 0, 'submitted': 0, 'to_submit': 0}} + retMap[ups_queue] = {'managed': {'SCORE': {'running': 0, 'submitted': 0, 'to_submit': 0}}} # commit self.commit() From 3fcfec664506b0220d8c3711b5612ce9ccbdaf22 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 8 May 2019 10:43:50 +0200 Subject: [PATCH 03/41] typo --- pandaharvester/harvesterbody/submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index a7970cc4..53c3b0f1 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -38,7 +38,7 @@ def __init__(self, queue_config_mapper, single_mode=False): def run(self): locked_by = 'submitter-{0}'.format(self.get_pid()) monitor_fifo = self.monitor_fifo - queue_lock_interval = getattr(harvester_config.submitter, 'queue_lock_interval', + queue_lock_interval = getattr(harvester_config.submitter, 'queueLockInterval', harvester_config.submitter.lockInterval) while True: sw_main = core_utils.get_stopwatch() From 98875e5ae9f5c8737e145bf714033ee3ce6b35fb Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 30 Aug 2019 17:28:23 +0200 Subject: [PATCH 04/41] Prodanaly: correction on unique queue name --- pandaharvester/harvestercore/core_utils.py | 4 ++-- pandaharvester/harvestercore/db_proxy.py | 2 +- pandaharvester/harvestercore/queue_config_mapper.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvestercore/core_utils.py b/pandaharvester/harvestercore/core_utils.py index 5a478446..560fd77a 100644 --- a/pandaharvester/harvestercore/core_utils.py +++ b/pandaharvester/harvestercore/core_utils.py @@ -586,8 +586,8 @@ def get_queues_config_url(): # get unique queue name -def get_unique_queue_name(queue_name, resource_type): - return '{0}:{1}:{2}'.format(queue_name, resource_type) +def get_unique_queue_name(queue_name, resource_type, job_type): + return '{0}:{1}:{2}'.format(queue_name, resource_type, job_type) # capability to dynamically change plugins diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 1cbb27b7..c0344be6 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3698,7 +3698,7 @@ def clone_queue_with_new_job_and_resource_type(self, site_name, queue_name, job_ elif attribute == 'nNewWorkers': var_map[attr_binding] = new_workers elif attribute == 'uniqueName': - var_map[attr_binding] = core_utils.get_unique_queue_name(queue_name, resource_type) + var_map[attr_binding] = core_utils.get_unique_queue_name(queue_name, resource_type, job_type) else: var_map[attr_binding] = value attribute_list.append(attribute) diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 0471c898..4e6f9b48 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -82,7 +82,7 @@ def get_source_label(self): # set unique name def set_unique_name(self): - self.uniqueName = core_utils.get_unique_queue_name(self.queueName, self.resourceType) + self.uniqueName = core_utils.get_unique_queue_name(self.queueName, self.resourceType, self.prodSourceLabel) # update attributes def update_attributes(self, data): From 7aa3ce3cfa9b679efc83a8e150444798702509e3 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 2 Sep 2019 14:10:19 +0200 Subject: [PATCH 05/41] Prodanaly: initialize jobtype in panda queue --- pandaharvester/harvestercore/queue_config_mapper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 4e6f9b48..c7bb7dd7 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -56,6 +56,7 @@ def __init__(self, queue_name): self.noHeartbeat = '' self.runMode = 'self' self.resourceType = PandaQueueSpec.RT_catchall + self.jobType = PandaQueueSpec.JT_catchall self.getJobCriteria = None self.ddmEndpointIn = None self.allowJobMixture = False From 59127ee44236fb1ed03ab8e633a96552af27844e Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Sep 2019 14:19:35 +0200 Subject: [PATCH 06/41] Prodanaly: PEP8 convention issues --- pandaharvester/harvesterbody/worker_adjuster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 2737a97d..32e13ac7 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -169,9 +169,9 @@ def define_num_workers(self, static_num_workers, site_name): n_new_workers = min(n_new_workers, max(max_workers - n_queue - n_ready - n_running, 0)) tmp_log.debug('setting n_new_workers to {0} to respect max_workers' .format(n_new_workers)) - if queue_config.max_new_workers_per_cycle > 0: - n_new_workers = min(n_new_workers, queue_config.max_new_workers_per_cycle) - tmp_log.debug('setting n_new_workers to {0} in order to respect max_new_workers_per_cycle' + if queue_config.maxNewWorkersPerCycle > 0: + n_new_workers = min(n_new_workers, queue_config.maxNewWorkersPerCycle) + tmp_log.debug('setting n_new_workers to {0} in order to respect maxNewWorkersPerCycle' .format(n_new_workers)) if self.maxNewWorkers is not None and self.maxNewWorkers > 0: n_new_workers = min(n_new_workers, self.maxNewWorkers) @@ -185,7 +185,7 @@ def define_num_workers(self, static_num_workers, site_name): ret_msg = 'set max_new_workers_per_cycle=0 in UCORE aggregation due to missing queue_config' tmp_log.debug(ret_msg) else: - max_new_workers_per_cycle = queue_config.max_new_workers_per_cycle + max_new_workers_per_cycle = queue_config.maxNewWorkersPerCycle if len(dyn_num_workers[queue_name]) > 1: total_new_workers_rts = sum(dyn_num_workers[queue_name][_rt]['n_new_workers'] if _rt != 'ANY' else 0 From 21029d8a505464aa2bde5a42624bb5f7e09dd774 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Sep 2019 15:10:08 +0200 Subject: [PATCH 07/41] Prodanaly: PEP8 conventions related issues --- pandaharvester/harvesterbody/worker_adjuster.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 32e13ac7..ff0127d5 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -177,7 +177,7 @@ def define_num_workers(self, static_num_workers, site_name): n_new_workers = min(n_new_workers, self.maxNewWorkers) tmp_log.debug('setting n_new_workers to {0} in order to respect universal maxNewWorkers' .format(n_new_workers)) - dyn_num_workers[queue_name][job_type][resource_type]['n_new_workers'] = n_new_workers + dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = n_new_workers # adjust n_new_workers for UCORE to let aggregations over RT respect nQueueLimitWorker and max_workers if queue_config is None: @@ -187,7 +187,7 @@ def define_num_workers(self, static_num_workers, site_name): else: max_new_workers_per_cycle = queue_config.maxNewWorkersPerCycle if len(dyn_num_workers[queue_name]) > 1: - total_new_workers_rts = sum(dyn_num_workers[queue_name][_rt]['n_new_workers'] + total_new_workers_rts = sum(dyn_num_workers[queue_name][_rt]['nNewWorkers'] if _rt != 'ANY' else 0 for _rt in dyn_num_workers[queue_name]) n_new_workers_max_agg = min(max(n_queue_limit - n_queue_total, 0), @@ -213,7 +213,7 @@ def define_num_workers(self, static_num_workers, site_name): simple_rt_nw_list = [] for _jt in _d: # jt: job type for _rt in _d[_jt]: # rt: resource type - simple_rt_nw_list.append = [_d[_jt][_rt].get('n_new_workers', 0), 0] + simple_rt_nw_list.append = [_d[_jt][_rt].get('nNewWorkers', 0), 0] _countdown = n_new_workers_max_agg for _rt_list in simple_rt_nw_list: From 8144f70b338ae969e0ea257c1ae3f57acbcf6cc3 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Sep 2019 15:31:11 +0200 Subject: [PATCH 08/41] Prodanaly: make_workers missing job_type argument --- pandaharvester/harvesterbody/worker_maker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/harvesterbody/worker_maker.py b/pandaharvester/harvesterbody/worker_maker.py index 7557ea5f..3ca7d6a8 100644 --- a/pandaharvester/harvesterbody/worker_maker.py +++ b/pandaharvester/harvesterbody/worker_maker.py @@ -18,7 +18,7 @@ def get_plugin(self, queue_config): return self.pluginFactory.get_plugin(queue_config.workerMaker) # make workers - def make_workers(self, jobchunk_list, queue_config, n_ready, resource_type, maker=None): + def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, maker=None): tmpLog = core_utils.make_logger(_logger, 'queue={0} rtype={1}'.format(queue_config.queueName, resource_type), method_name='make_workers') tmpLog.debug('start') @@ -38,7 +38,7 @@ def make_workers(self, jobchunk_list, queue_config, n_ready, resource_type, make for iChunk, jobChunk in enumerate(jobchunk_list): # make a worker if iChunk >= n_ready: - workSpec = maker.make_worker(jobChunk, queue_config, resource_type) + workSpec = maker.make_worker(jobChunk, queue_config, job_type, resource_type) else: # use ready worker if iChunk < len(readyWorkers): From ba8d3a87d0f361ae55feb1e86378a369c02d6535 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Sep 2019 15:36:12 +0200 Subject: [PATCH 09/41] Prodanaly: PEP8 related typo --- pandaharvester/harvesterbody/submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index 53c3b0f1..ff7fbee1 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -242,7 +242,7 @@ def run(self): # map type work_spec.mapType = queue_config.mapType # queue name - work_spec.computingSite = queue_config.queue_name + work_spec.computingSite = queue_config.queueName # set access point work_spec.accessPoint = queue_config.messenger['accessPoint'] # sync level From d11c767db8dde6ae7e4a78c4737c26c0c704724f Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 4 Sep 2019 16:28:47 +0200 Subject: [PATCH 10/41] Prodanaly: small cleanup of unused import, empty lines, unused variables --- pandaharvester/harvestersubmitter/htcondor_submitter.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index b2468de6..366b9cdf 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -7,7 +7,7 @@ from concurrent.futures import ThreadPoolExecutor import re -from math import sqrt, log1p +from math import log1p from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper @@ -227,8 +227,6 @@ def submit_bag_of_workers(data_list): worker_retval_map[workerID] = (tmpRetVal, workspec.get_changed_attributes()) # attributes try: - ce_info_dict = data['ce_info_dict'] - batch_log_dict = data['batch_log_dict'] use_spool = data['use_spool'] except KeyError: errStr = '{0} not submitted due to incomplete data of the worker'.format(workerID) @@ -592,8 +590,6 @@ def submit_workers(self, workspec_list): tmpLog.error('No valid CE endpoint found') to_submit_any = False - - def _handle_one_worker(workspec, to_submit=to_submit_any): # make logger tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), From 7688a0232b9c533647d2689ad3bb3a6c01af68f7 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 6 Sep 2019 18:21:42 +0200 Subject: [PATCH 11/41] Prodanaly: choose proxy depending on the job_type for the worker --- .../harvestersubmitter/htcondor_submitter.py | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index fbd43bc1..0ef4d81b 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -43,7 +43,8 @@ def _get_ce_weighting(ce_endpoint_list=[], worker_ce_all_tuple=None): for _ce in worker_ce_backend_throughput_dict)) thruput_avg = (log1p(Q_good_init) - log1p(Q_good_fin)) n_new_workers = float(n_new_workers) - def _get_thruput(_ce_endpoint): + + def _get_thruput(_ce_endpoint): # inner function if _ce_endpoint not in worker_ce_backend_throughput_dict: q_good_init = 0. q_good_fin = 0. @@ -54,7 +55,8 @@ def _get_thruput(_ce_endpoint): for _st in ('submitted',))) thruput = (log1p(q_good_init) - log1p(q_good_fin)) return thruput - def _get_thruput_adj_ratio(thruput): + + def _get_thruput_adj_ratio(thruput): # inner function try: thruput_adj_ratio = thruput/thruput_avg + 1/N except ZeroDivisionError: @@ -65,7 +67,8 @@ def _get_thruput_adj_ratio(thruput): return thruput_adj_ratio ce_base_weight_sum = sum((_get_thruput_adj_ratio(_get_thruput(_ce)) for _ce in ce_endpoint_list)) - def _get_init_weight(_ce_endpoint): + + def _get_init_weight(_ce_endpoint): # inner function if _ce_endpoint not in worker_ce_stats_dict: q = 0. r = 0. @@ -454,11 +457,16 @@ def __init__(self, **kwarg): self.logDir except AttributeError: self.logDir = os.getenv('TMPDIR') or '/tmp' - # x509 proxy + # Default x509 proxy for a queue try: self.x509UserProxy except AttributeError: self.x509UserProxy = os.getenv('X509_USER_PROXY') + # x509 proxy for analysis jobs in grandly unified queues + try: + self.x509UserProxyAnalysis + except AttributeError: + self.x509UserProxyAnalysis = os.getenv('X509_USER_PROXY_ANAL') # ATLAS AGIS try: self.useAtlasAGIS = bool(self.useAtlasAGIS) @@ -732,6 +740,10 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): batch_log_dict['gtag'] = workspec.workAttributes['stdOut'] tmpLog.debug('Done set_log_file before submission') tmpLog.debug('Done jobspec attribute setting') + + # choose the x509 certificate based on the type of job (analysis or production) + proxy = _choose_proxy(workspec) + # set data dict data.update({ 'workspec': workspec, @@ -742,7 +754,7 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): 'log_subdir': log_subdir, 'n_core_per_node': n_core_per_node, 'panda_queue_name': panda_queue_name, - 'x509_user_proxy': self.x509UserProxy, + 'x509_user_proxy': proxy, 'ce_info_dict': ce_info_dict, 'batch_log_dict': batch_log_dict, 'special_par': special_par, @@ -755,6 +767,20 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): }) return data + def _choose_proxy(workspec): + """ + Choose the proxy based on the job type + """ + job_type = workspec.job_type + proxy = self.x509UserProxy + if (job_type == 'user' or job_type == 'analysis') and self.x509UserProxyAnalysis: + tmpLog.debug('Taking analysis proxy') + proxy = self.x509UserProxyAnalysis + else: + tmpLog.debug('Taking default proxy') + + return proxy + def _propagate_attributes(workspec, tmpVal): # make logger tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), From a12c51db40af7b0e17979902ecce55d2e5dfe8c0 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 9 Sep 2019 14:20:26 +0200 Subject: [PATCH 12/41] Prodanaly: choose prod vs pilot proxy and small details --- pandaharvester/harvesterbody/worker_maker.py | 2 +- pandaharvester/harvestermisc/htcondor_utils.py | 2 -- pandaharvester/harvestermisc/info_utils.py | 16 +++++++++++++++- .../harvestersubmitter/htcondor_submitter.py | 4 +++- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pandaharvester/harvesterbody/worker_maker.py b/pandaharvester/harvesterbody/worker_maker.py index 3ca7d6a8..076367f1 100644 --- a/pandaharvester/harvesterbody/worker_maker.py +++ b/pandaharvester/harvesterbody/worker_maker.py @@ -19,7 +19,7 @@ def get_plugin(self, queue_config): # make workers def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, maker=None): - tmpLog = core_utils.make_logger(_logger, 'queue={0} rtype={1}'.format(queue_config.queueName, resource_type), + tmpLog = core_utils.make_logger(_logger, 'queue={0} rtype={1} jtype={2}'.format(queue_config.queueName, resource_type, job_type), method_name='make_workers') tmpLog.debug('start') try: diff --git a/pandaharvester/harvestermisc/htcondor_utils.py b/pandaharvester/harvestermisc/htcondor_utils.py index ed37ca03..15ad7aed 100644 --- a/pandaharvester/harvestermisc/htcondor_utils.py +++ b/pandaharvester/harvestermisc/htcondor_utils.py @@ -3,7 +3,6 @@ import re import time -import datetime import threading import random import multiprocessing @@ -27,7 +26,6 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore.core_utils import SingletonWithID -from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestercore.fifos import SpecialFIFOBase # condor python or command api diff --git a/pandaharvester/harvestermisc/info_utils.py b/pandaharvester/harvestermisc/info_utils.py index cb22af4c..289a9a9c 100644 --- a/pandaharvester/harvestermisc/info_utils.py +++ b/pandaharvester/harvestermisc/info_utils.py @@ -69,7 +69,7 @@ def get_all_queue_names(self): names = set() for queue_name, queue_dict in iteritems(self): if queue_dict.get('pilot_manager') in ['Harvester'] \ - and queue_dict.get('harvester') == harvesterID: + and queue_dict.get('harvester') == harvesterID: names.add(queue_name) return names @@ -83,6 +83,20 @@ def is_ups_queue(self, panda_resource): return True return False + # is grandly unified queue, i.e. runs analysis and production + def is_grandly_unified_queue(self, panda_resource): + panda_queue_dict = self.get(panda_resource) + if panda_queue_dict is None: + return False + + # initial, temporary nomenclature + if 'grandly_unified' in panda_queue_dict.get('catchall'): + return True + + # TODO: implement the final nomenclature + + return False + # get harvester params def get_harvester_params(self, panda_resource): panda_queue_dict = self.get(panda_resource) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 0ef4d81b..81f6ea17 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -547,11 +547,13 @@ def submit_workers(self, workspec_list): _queueConfigMapper = QueueConfigMapper() harvester_queue_config = _queueConfigMapper.get_queue(self.queueName) + is_grandly_unified_queue = False # get queue info from AGIS by cacher in db if self.useAtlasAGIS: panda_queues_dict = PandaQueuesDict() panda_queue_name = panda_queues_dict.get_panda_queue_name(self.queueName) this_panda_queue_dict = panda_queues_dict.get(self.queueName, dict()) + is_grandly_unified_queue = panda_queues_dict.is_grandly_unified_queue(self.queueName) # tmpLog.debug('panda_queues_name and queue_info: {0}, {1}'.format(self.queueName, panda_queues_dict[self.queueName])) else: panda_queues_dict = dict() @@ -773,7 +775,7 @@ def _choose_proxy(workspec): """ job_type = workspec.job_type proxy = self.x509UserProxy - if (job_type == 'user' or job_type == 'analysis') and self.x509UserProxyAnalysis: + if is_grandly_unified_queue and (job_type == 'user' or job_type == 'analysis') and self.x509UserProxyAnalysis: tmpLog.debug('Taking analysis proxy') proxy = self.x509UserProxyAnalysis else: From 6eda97ad68bfc85bbc2eb505262d2c189ec76448 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 10 Sep 2019 10:44:34 +0200 Subject: [PATCH 13/41] typo --- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 81f6ea17..b5a8a57f 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -773,7 +773,7 @@ def _choose_proxy(workspec): """ Choose the proxy based on the job type """ - job_type = workspec.job_type + job_type = workspec.jobType proxy = self.x509UserProxy if is_grandly_unified_queue and (job_type == 'user' or job_type == 'analysis') and self.x509UserProxyAnalysis: tmpLog.debug('Taking analysis proxy') From 92f37b84bcf9e41e0e07206215df688391cf527f Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 10 Sep 2019 11:19:07 +0200 Subject: [PATCH 14/41] typos --- pandaharvester/harvestercore/core_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvestercore/core_utils.py b/pandaharvester/harvestercore/core_utils.py index e53efe15..caa216b3 100644 --- a/pandaharvester/harvestercore/core_utils.py +++ b/pandaharvester/harvestercore/core_utils.py @@ -613,10 +613,10 @@ def _asdict(self): return dict(zip(self.attributes, self)) -# Make a list of choice candidates accroding to permille weight +# Make a list of choice candidates according to permille weight def make_choice_list(pdpm={}, default=None): weight_sum = sum(pdpm.values()) - weight_defualt = 1000 + weight_default = 1000 ret_list = [] for candidate, weight in iteritems(pdpm): if weight_sum > 1000: @@ -624,8 +624,8 @@ def make_choice_list(pdpm={}, default=None): else: real_weight = int(weight) ret_list.extend([candidate]*real_weight) - weight_defualt -= real_weight - ret_list.extend([default]*weight_defualt) + weight_default -= real_weight + ret_list.extend([default]*weight_default) return ret_list From e470afc4f30e511da73dc61bad6ab94a6104e6a2 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 11:29:00 +0200 Subject: [PATCH 15/41] prodanaly: set_queue_limit correction --- pandaharvester/harvestercore/db_proxy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 51a10c05..0076822e 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3800,10 +3800,10 @@ def set_queue_limit(self, site_name, params): varMap = dict() varMap[':siteName'] = site_name self.execute(sql_get_job_resource, varMap) - res_res = self.cur.fetchall() + results = self.cur.fetchall() job_resource_type_list = set() - for tmp_res, in res_res: - job_resource_type_list.add(tmp_res) + for tmp_job_type, tmp_resource_type in results: + job_resource_type_list.add(tmp_job_type, tmp_resource_type) # set all queues nUp = 0 From 0f35dfd52af51b6d00ea7e90991745790dfc41aa Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 13:37:02 +0200 Subject: [PATCH 16/41] prodanaly: set_queue_limit correction --- pandaharvester/harvestercore/db_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 0076822e..79ac5187 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3803,7 +3803,7 @@ def set_queue_limit(self, site_name, params): results = self.cur.fetchall() job_resource_type_list = set() for tmp_job_type, tmp_resource_type in results: - job_resource_type_list.add(tmp_job_type, tmp_resource_type) + job_resource_type_list.add((tmp_job_type, tmp_resource_type)) # set all queues nUp = 0 From 5095dc5d3546880c8d4c00289cc665c7cf3b6241 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 13:40:42 +0200 Subject: [PATCH 17/41] prodanaly: set_queue_limit correction --- pandaharvester/harvestercore/db_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 79ac5187..1b21ece9 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3787,7 +3787,7 @@ def set_queue_limit(self, site_name, params): # sql to get num of submitted workers sql_count_workers = "SELECT COUNT(*) cnt " sql_count_workers += "FROM {0} wt, {1} pq ".format(workTableName, pandaQueueTableName) - sql_count_workers += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status=:status" + sql_count_workers += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status=:status " sql_count_workers += "AND pq.jobType=:jobType AND pq.resourceType=:resourceType " # reset nqueued for all job & resource types From e52ea97900e7137c3affb9b617b0047f15448996 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 14:18:36 +0200 Subject: [PATCH 18/41] prodanaly: set_queue_limit correction --- pandaharvester/harvestercore/db_proxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 1b21ece9..9fd8938d 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3841,8 +3841,8 @@ def set_queue_limit(self, site_name, params): ret_map[job_type][resource_type] = value else: # no queue was updated, we need to create a new one for the resource type - cloned = self.clone_queue_with_new_resource_type(site_name, queue_name, job_type, - resource_type, value) + cloned = self.clone_queue_with_new_job_and_resource_type(site_name, queue_name, job_type, + resource_type, value) if cloned: ret_map[job_type][resource_type] = value iUp = 1 From c95a59dc908178f4f8ac74834addb9df47ef7811 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 15:13:56 +0200 Subject: [PATCH 19/41] prodanaly: get_queues_to_submit correction --- pandaharvester/harvestercore/db_proxy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 9fd8938d..49e587ba 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -1524,7 +1524,8 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ nReFill, = self.cur.fetchone() nReady += nReFill # add - retMap.setdefault(queueName, {jobType: {}}) + retMap.setdefault(queueName, {}) + retMap[queueName].setdefault(jobType, {}) retMap[queueName][jobType][resourceType] = {'nReady': nReady, 'nRunning': nRunning, 'nQueue': nQueue, From 71c25be6c34e4c2e59a133479de7f157a5ae7da6 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 17:40:57 +0200 Subject: [PATCH 20/41] prodanaly: worker_adjuster correction --- pandaharvester/harvesterbody/worker_adjuster.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index ff0127d5..82cc0869 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -187,9 +187,11 @@ def define_num_workers(self, static_num_workers, site_name): else: max_new_workers_per_cycle = queue_config.maxNewWorkersPerCycle if len(dyn_num_workers[queue_name]) > 1: - total_new_workers_rts = sum(dyn_num_workers[queue_name][_rt]['nNewWorkers'] - if _rt != 'ANY' else 0 - for _rt in dyn_num_workers[queue_name]) + total_new_workers_rts = 0 + for _jt in dyn_num_workers[queue_name]; + for _rt in dyn_num_workers[queue_name][_jt]: + if _jt != 'ANY' and _rt != 'ANY': + total_new_workers_rts = total_new_workers_rts + dyn_num_workers[queue_name][_jt][_rt]['nNewWorkers'] n_new_workers_max_agg = min(max(n_queue_limit - n_queue_total, 0), max(max_workers - n_queue_total - n_ready_total - n_running_total, 0)) if max_new_workers_per_cycle >= 0: From e4ed699019f023da4ebf946ff891a45b94b3fd8c Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 17:49:45 +0200 Subject: [PATCH 21/41] typo --- pandaharvester/harvesterbody/worker_adjuster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 82cc0869..470b6a87 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -188,7 +188,7 @@ def define_num_workers(self, static_num_workers, site_name): max_new_workers_per_cycle = queue_config.maxNewWorkersPerCycle if len(dyn_num_workers[queue_name]) > 1: total_new_workers_rts = 0 - for _jt in dyn_num_workers[queue_name]; + for _jt in dyn_num_workers[queue_name]: for _rt in dyn_num_workers[queue_name][_jt]: if _jt != 'ANY' and _rt != 'ANY': total_new_workers_rts = total_new_workers_rts + dyn_num_workers[queue_name][_jt][_rt]['nNewWorkers'] From a13c53209dbb1961ecce464eff1608cae0d48678 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 17:53:53 +0200 Subject: [PATCH 22/41] prodanaly: worker_adjuster correction --- pandaharvester/harvesterbody/worker_adjuster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 470b6a87..e8ff97ff 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -215,7 +215,7 @@ def define_num_workers(self, static_num_workers, site_name): simple_rt_nw_list = [] for _jt in _d: # jt: job type for _rt in _d[_jt]: # rt: resource type - simple_rt_nw_list.append = [_d[_jt][_rt].get('nNewWorkers', 0), 0] + simple_rt_nw_list.append([_d[_jt][_rt].get('nNewWorkers', 0), 0]) _countdown = n_new_workers_max_agg for _rt_list in simple_rt_nw_list: From da46134eaf110edbaea907a9abaf5955074d2b8f Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 17:57:50 +0200 Subject: [PATCH 23/41] prodanaly: typo --- pandaharvester/harvesterbody/worker_adjuster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index e8ff97ff..56a4dc5f 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -215,7 +215,7 @@ def define_num_workers(self, static_num_workers, site_name): simple_rt_nw_list = [] for _jt in _d: # jt: job type for _rt in _d[_jt]: # rt: resource type - simple_rt_nw_list.append([_d[_jt][_rt].get('nNewWorkers', 0), 0]) + simple_rt_nw_list.append([_rt, _d[_jt][_rt].get('nNewWorkers', 0), 0]) _countdown = n_new_workers_max_agg for _rt_list in simple_rt_nw_list: From 4a6e2dded603b8a49ebdf002c9135a94f3b6b718 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 18:48:35 +0200 Subject: [PATCH 24/41] prodanaly: corrected query column order in get_queues_to_submit --- pandaharvester/harvestercore/db_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 49e587ba..73c50c67 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -1416,7 +1416,7 @@ def get_queues_to_submit(self, n_queues, lookup_interval, lock_interval, locked_ sqlS += "OR (submitTime<:lookupTimeLimit AND lockedBy IS NULL) " sqlS += "ORDER BY submitTime " # sql to get queues - sqlQ = "SELECT queueName, resourceType, jobType, nNewWorkers FROM {0} ".format(pandaQueueTableName) + sqlQ = "SELECT queueName, jobType, resourceType, nNewWorkers FROM {0} ".format(pandaQueueTableName) sqlQ += "WHERE siteName=:siteName " # sql to get orphaned workers sqlO = "SELECT workerID FROM {0} ".format(workTableName) From 32a226146746a9b7df58f2b30a1bc0dd65e9cdba Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 12 Sep 2019 19:06:28 +0200 Subject: [PATCH 25/41] prodanaly: worker_adjuster correction --- .../harvesterbody/worker_adjuster.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 56a4dc5f..b5cd1576 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -213,9 +213,9 @@ def define_num_workers(self, static_num_workers, site_name): # TODO: needs to be recalculated simple_rt_nw_list = [] - for _jt in _d: # jt: job type - for _rt in _d[_jt]: # rt: resource type - simple_rt_nw_list.append([_rt, _d[_jt][_rt].get('nNewWorkers', 0), 0]) + for job_type in _d: # jt: job type + for resource_type in _d[job_type]: # rt: resource type + simple_rt_nw_list.append([resource_type, _d[job_type][resource_type].get('nNewWorkers', 0), 0]) _countdown = n_new_workers_max_agg for _rt_list in simple_rt_nw_list: @@ -232,13 +232,13 @@ def define_num_workers(self, static_num_workers, site_name): break dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] += 1 _countdown -= 1 - - for resource_type in dyn_num_workers[queue_name]: - if resource_type == 'ANY': - continue - n_new_workers = dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] - tmp_log.debug('setting n_new_workers to {0} of job_type {1} resource_type {2} in order to respect RT aggregations for UCORE' - .format(n_new_workers, job_type, resource_type)) + for job_type in dyn_num_workers[queue_name]: + for resource_type in dyn_num_workers[queue_name][job_type]: + if job_type == 'ANY' or resource_type == 'ANY': + continue + n_new_workers = dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] + tmp_log.debug('setting n_new_workers to {0} of job_type {1} resource_type {2} in order to respect RT aggregations for UCORE' + .format(n_new_workers, job_type, resource_type)) if not apf_msg: apf_data = copy.deepcopy(dyn_num_workers[queue_name]) From 85629e6647d534180fe5163394694123cae3fbcc Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 13 Sep 2019 16:26:29 +0200 Subject: [PATCH 26/41] prodanaly: various fixes --- pandaharvester/harvesterbody/submitter.py | 7 ++++--- pandaharvester/harvesterbody/worker_maker.py | 12 ++++++------ .../harvesterworkermaker/simple_worker_maker.py | 6 ++---- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index ff7fbee1..e2f15928 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -82,9 +82,9 @@ def run(self): else: # loop over all queues and resource types for queue_name in n_workers_per_queue_jt_rt: - for job_type, tmp_job_vals in iteritems(n_workers_per_queue_jt_rt[queue_name]): - for resource_type, tmp_val in iteritems(tmp_job_vals): - + for job_type in n_workers_per_queue_jt_rt[queue_name]: + for resource_type in n_workers_per_queue_jt_rt[queue_name][job_type]: + tmp_val = n_workers_per_queue_jt_rt[queue_name][job_type][resource_type] tmp_log = self.make_logger(_logger, 'id={0} queue={1} jtype={2} rtype={3}'.format( locked_by, queue_name, job_type, resource_type), method_name='run') try: @@ -196,6 +196,7 @@ def run(self): okChunks, ngChunks = self.workerMaker.make_workers(jobChunks, queue_config, nReady, job_type, resource_type, maker=workerMakerCore) + if len(ngChunks) == 0: tmp_log.debug('successfully made {0} workers'.format(len(okChunks))) else: diff --git a/pandaharvester/harvesterbody/worker_maker.py b/pandaharvester/harvesterbody/worker_maker.py index 076367f1..8aae3440 100644 --- a/pandaharvester/harvesterbody/worker_maker.py +++ b/pandaharvester/harvesterbody/worker_maker.py @@ -19,7 +19,7 @@ def get_plugin(self, queue_config): # make workers def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, maker=None): - tmpLog = core_utils.make_logger(_logger, 'queue={0} rtype={1} jtype={2}'.format(queue_config.queueName, resource_type, job_type), + tmpLog = core_utils.make_logger(_logger, 'queue={0} jtype={1} rtype={2}'.format(queue_config.queueName, job_type, resource_type), method_name='make_workers') tmpLog.debug('start') try: @@ -65,35 +65,35 @@ def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_ return [], jobchunk_list # get number of jobs per worker - def get_num_jobs_per_worker(self, queue_config, n_workers, resource_type, maker=None): + def get_num_jobs_per_worker(self, queue_config, n_workers, job_type, resource_type, maker=None): # get plugin if maker is None: maker = self.pluginFactory.get_plugin(queue_config.workerMaker) return maker.get_num_jobs_per_worker(n_workers) # get number of workers per job - def get_num_workers_per_job(self, queue_config, n_workers, resource_type, maker=None): + def get_num_workers_per_job(self, queue_config, n_workers, job_type, resource_type, maker=None): # get plugin if maker is None: maker = self.pluginFactory.get_plugin(queue_config.workerMaker) return maker.get_num_workers_per_job(n_workers) # check number of ready resources - def num_ready_resources(self, queue_config, resource_type, maker=None): + def num_ready_resources(self, queue_config, job_type, resource_type, maker=None): # get plugin if maker is None: maker = self.pluginFactory.get_plugin(queue_config.workerMaker) return maker.num_ready_resources() # get upper limit on the cumulative total of workers per job - def get_max_workers_per_job_in_total(self, queue_config, resource_type, maker=None): + def get_max_workers_per_job_in_total(self, queue_config, job_type, resource_type, maker=None): # get plugin if maker is None: maker = self.pluginFactory.get_plugin(queue_config.workerMaker) return maker.get_max_workers_per_job_in_total() # get upper limit on the number of new workers per job in a cycle - def get_max_workers_per_job_per_cycle(self, queue_config, resource_type, maker=None): + def get_max_workers_per_job_per_cycle(self, queue_config, job_type, resource_type, maker=None): # get plugin if maker is None: maker = self.pluginFactory.get_plugin(queue_config.workerMaker) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index fcab5b62..2433dd2c 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -61,7 +61,7 @@ def get_job_type(self, job_spec, job_type, queue_dict): # make a worker from jobs def make_worker(self, jobspec_list, queue_config, job_type, resource_type): - tmpLog = self.make_logger(_logger, 'queue={0}'.format(queue_config.queueName), + tmpLog = self.make_logger(_logger, 'queue={0}:{1}:{2}'.format(queue_config.queueName, job_type, resource_type), method_name='make_worker') tmpLog.debug('jobspec_list: {0}'.format(jobspec_list)) @@ -160,8 +160,8 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog.info('a worker has pilotType={0}'.format(workSpec.pilotType)) workSpec.jobType = self.get_job_type(None, job_type, queue_dict) + tmpLog.debug('get_job_type decided for job_type: {0}'.format(workSpec.jobType)) - # TODO: this needs to be improved with real resource types if resource_type and resource_type != 'ANY': workSpec.resourceType = resource_type elif workSpec.nCore == 1: @@ -169,6 +169,4 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): else: workSpec.resourceType = 'MCORE' - - return workSpec From 8065256cb74eff39409003561b5ed55cca67464d Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 10 Oct 2019 09:52:28 +0200 Subject: [PATCH 27/41] prodanaly: added sitename to get_worker_limits log messages --- pandaharvester/harvestercore/db_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 73c50c67..0f1df9c4 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -4407,7 +4407,7 @@ def increment_submission_attempt(self, panda_id, new_number): def get_worker_limits(self, site_name): try: # get logger - tmpLog = core_utils.make_logger(_logger, method_name='get_worker_limits') + tmpLog = core_utils.make_logger(_logger, token='site_name={0}'.format(site_name), method_name='get_worker_limits') tmpLog.debug('start') # sql to get queue limits From deee7bbecea4c9b60e542d4c4ca2dfa820976cf3 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 10 Oct 2019 10:58:06 +0200 Subject: [PATCH 28/41] prodanaly: protection against uninitialized jobType in getWorkerLimits --- pandaharvester/harvestercore/db_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 187c79bb..d6fa87ac 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -4413,7 +4413,7 @@ def get_worker_limits(self, site_name): # sql to get queue limits sqlQ = "SELECT maxWorkers, nQueueLimitWorker, nQueueLimitWorkerRatio," sqlQ += "nQueueLimitWorkerMax,nQueueLimitWorkerMin FROM {0} ".format(pandaQueueTableName) - sqlQ += "WHERE siteName=:siteName AND jobType='ANY' AND resourceType='ANY'" + sqlQ += "WHERE siteName=:siteName AND resourceType='ANY' AND (jobType='ANY' OR jobType IS NULL) " # sql to count resource types sqlNT = "SELECT COUNT(*) cnt FROM {0} ".format(pandaQueueTableName) From ddd2eff37369951b0e44002c9a0e330f2d376e1e Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 18 Oct 2019 11:00:48 +0200 Subject: [PATCH 29/41] prodanaly: backwards compatibility for reporting worker stats --- pandaharvester/harvestercommunicator/panda_communicator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercommunicator/panda_communicator.py b/pandaharvester/harvestercommunicator/panda_communicator.py index c1402971..5385cff3 100644 --- a/pandaharvester/harvestercommunicator/panda_communicator.py +++ b/pandaharvester/harvestercommunicator/panda_communicator.py @@ -530,7 +530,7 @@ def update_worker_stats(self, site_name, stats): data['siteName'] = site_name data['paramsList'] = json.dumps(stats) tmpLog.debug('update stats for {0}, stats: {1}'.format(site_name, stats)) - tmpStat, tmpRes = self.post_ssl('reportWorkerStats', data) + tmpStat, tmpRes = self.post_ssl('reportWorkerStats_jobtype', data) errStr = 'OK' if tmpStat is False: errStr = core_utils.dump_error_message(tmpLog, tmpRes) From b61359846e8f3423ee6a8634e711cbc0889e0247 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Fri, 18 Oct 2019 11:37:28 +0200 Subject: [PATCH 30/41] prodanaly: UPS command name change for migration --- pandaharvester/harvestercore/command_spec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/command_spec.py b/pandaharvester/harvestercore/command_spec.py index a1e674f5..ac18f30a 100644 --- a/pandaharvester/harvestercore/command_spec.py +++ b/pandaharvester/harvestercore/command_spec.py @@ -16,7 +16,7 @@ class CommandSpec(SpecBase): ) # commands COM_reportWorkerStats = 'REPORT_WORKER_STATS' - COM_setNWorkers = 'SET_N_WORKERS' + COM_setNWorkers = 'SET_N_WORKERS_JOBTYPE' COM_killWorkers = 'KILL_WORKERS' # mapping between command and receiver receiver_map = { From a6912cbf267bad8eeb074ecabdf2b0833a46c9b5 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 13 Nov 2019 14:11:05 +0100 Subject: [PATCH 31/41] prodanaly: initialize jobType --- pandaharvester/harvestercore/db_proxy.py | 24 +++++++++++++++++++++++ pandaharvester/harvestermisc/k8s_utils.py | 6 ++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index d6fa87ac..eca8868f 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -361,6 +361,20 @@ def need_index(self, attr): isUnique = True return isIndex, isUnique + def initialize_jobType(self, table_name): + # initialize old NULL entries to ANY in pq_table and work_table + # get logger + tmp_log = core_utils.make_logger(_logger, method_name='initialize_jobType') + + sql_update = "UPDATE {0} SET jobType = 'ANY' WHERE jobType is NULL ".format(table_name) + try: + self.execute(sql_update) + # commit + self.commit() + tmp_log.debug('initialized entries in {0}'.format(table_name)) + except Exception: + core_utils.dump_error_message(tmp_log) + # make table def make_table(self, cls, table_name): try: @@ -430,6 +444,12 @@ def make_table(self, cls, table_name): tmpLog.debug('added {0} to {1}'.format(attr, table_name)) except Exception: core_utils.dump_error_message(tmpLog) + + # if we just added the jobType, old entries need to be initialized + if (table_name == pandaQueueTableName and attrName == 'jobType') \ + or (table_name == pandaQueueTableName and attrName == 'jobType'): + self.initialize_jobType(table_name) + # make indexes for index in indexes: indexName = 'idx_{0}_{1}'.format(index, table_name) @@ -479,6 +499,10 @@ def make_tables(self, queue_config_mapper): for outStr in outStrs: print (outStr) sys.exit(1) + + # initialize the job types to ANY when NULL + self.initialize_jobType() + # add sequential numbers self.add_seq_number('SEQ_workerID', 1) self.add_seq_number('SEQ_configID', 1) diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index b197f5d9..be2e6cc4 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -48,10 +48,8 @@ def create_job_from_yaml(self, yaml_content, work_spec, cert, cert_in_secret=Tru # note that predefined values in the yaml template will NOT be overwritten if work_spec.nCore > 0: - container_env['resources'].setdefault('limits', { - 'cpu': str(work_spec.nCore)}) - container_env['resources'].setdefault('requests', { - 'cpu': str(work_spec.nCore*cpuadjustratio/100.0)}) + container_env['resources'].setdefault('limits', {'cpu': str(work_spec.nCore)}) + container_env['resources'].setdefault('requests', {'cpu': str(work_spec.nCore * cpuadjustratio / 100.0)}) if work_spec.minRamCount > 4: # K8S minimum memory limit = 4 MB From 1b1cbe0b29ca08ce1cb9b5286681e4369752f7e9 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 13 Nov 2019 14:18:52 +0100 Subject: [PATCH 32/41] prodanaly: removed unnecessary call to initialize_jobType --- pandaharvester/harvestercore/db_proxy.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index eca8868f..69a8d6b6 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -500,9 +500,6 @@ def make_tables(self, queue_config_mapper): print (outStr) sys.exit(1) - # initialize the job types to ANY when NULL - self.initialize_jobType() - # add sequential numbers self.add_seq_number('SEQ_workerID', 1) self.add_seq_number('SEQ_configID', 1) From c97061d54425bf77b60ab2fe7cd898bac141b99e Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 14 Nov 2019 13:09:47 +0100 Subject: [PATCH 33/41] update version --- pandaharvester/panda_pkg_info.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 8b3a7c96..a4389a0c 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.1.5-rc" +release_version = "0.2.0-rc" From 55b527ed73154808d4c4d373774da9e105255ae5 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 18 Nov 2019 11:18:41 +0100 Subject: [PATCH 34/41] prodanaly: missing dict initialization --- pandaharvester/harvesterbody/worker_adjuster.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index b5cd1576..0f62b656 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -222,6 +222,7 @@ def define_num_workers(self, static_num_workers, site_name): resource_type, n_new_workers_orig, _r = _rt_list n_new_workers, remainder = divmod(n_new_workers_orig * n_new_workers_max_agg, total_new_workers_rts) + dyn_num_workers[queue_name][job_type].setdefault(resource_type, {}) dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = n_new_workers _rt_list[2] = remainder _countdown -= n_new_workers From 1a7cd927fa4e66216ef332700d2da3f795258bf1 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 18 Nov 2019 11:29:18 +0100 Subject: [PATCH 35/41] prodanaly: fix --- pandaharvester/harvesterbody/worker_adjuster.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 0f62b656..93e0fd03 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -222,7 +222,9 @@ def define_num_workers(self, static_num_workers, site_name): resource_type, n_new_workers_orig, _r = _rt_list n_new_workers, remainder = divmod(n_new_workers_orig * n_new_workers_max_agg, total_new_workers_rts) - dyn_num_workers[queue_name][job_type].setdefault(resource_type, {}) + dyn_num_workers[queue_name][job_type].setdefault(resource_type, + {'nReady': 0, 'nRunning': 0, + 'nQueue': 0, 'nNewWorkers': 0}) dyn_num_workers[queue_name][job_type][resource_type]['nNewWorkers'] = n_new_workers _rt_list[2] = remainder _countdown -= n_new_workers From 4502c21e7201832b8ba3a9a15b560a46e1106169 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Wed, 27 Nov 2019 17:42:55 +0100 Subject: [PATCH 36/41] prodanaly: issue with UPS queue initialization from scratch --- pandaharvester/harvestercore/db_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 69a8d6b6..c2635af6 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3424,7 +3424,7 @@ def get_worker_stats_bulk(self, active_ups_queues): # on panda server starts processing the queue if active_ups_queues: for ups_queue in active_ups_queues: - if ups_queue not in retMap or not retMap[ups_queue]: + if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {'ANY': {}}: retMap[ups_queue] = {'managed': {'SCORE': {'running': 0, 'submitted': 0, 'to_submit': 0}}} # commit From 15942a7d2cf488bea9cb9e82065d8e2d7a9e88f4 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 28 Nov 2019 10:21:42 +0100 Subject: [PATCH 37/41] k8s: CVMFS CSI ATLAS configuration for different kubernetes versions --- .../{k8s_cvmfs.yaml => k8s_cvmfs_1.13.yaml} | 0 examples/k8s/k8s_cvmfs_1.15.yaml | 101 ++++++++++++++++++ 2 files changed, 101 insertions(+) rename examples/k8s/{k8s_cvmfs.yaml => k8s_cvmfs_1.13.yaml} (100%) create mode 100644 examples/k8s/k8s_cvmfs_1.15.yaml diff --git a/examples/k8s/k8s_cvmfs.yaml b/examples/k8s/k8s_cvmfs_1.13.yaml similarity index 100% rename from examples/k8s/k8s_cvmfs.yaml rename to examples/k8s/k8s_cvmfs_1.13.yaml diff --git a/examples/k8s/k8s_cvmfs_1.15.yaml b/examples/k8s/k8s_cvmfs_1.15.yaml new file mode 100644 index 00000000..64fe7690 --- /dev/null +++ b/examples/k8s/k8s_cvmfs_1.15.yaml @@ -0,0 +1,101 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-cvmfs-atlas +provisioner: cvmfs.csi.cern.ch +parameters: + repository: atlas.cern.ch +--- +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-cvmfs-sft +provisioner: cvmfs.csi.cern.ch +parameters: + repository: sft.cern.ch +--- +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-cvmfs-grid +provisioner: cvmfs.csi.cern.ch +parameters: + repository: grid.cern.ch +--- +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-cvmfs-atlas-condb +provisioner: cvmfs.csi.cern.ch +parameters: + repository: atlas-condb.cern.ch +--- +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-cvmfs-atlas-nightlies +provisioner: cvmfs.csi.cern.ch +parameters: + repository: atlas-nightlies.cern.ch +--- +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-cvmfs-atlas-pvc +spec: + accessModes: + - ReadOnlyMany + resources: + requests: + storage: 1Gi + storageClassName: csi-cvmfs-atlas +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-cvmfs-sft-pvc +spec: + accessModes: + - ReadOnlyMany + resources: + requests: + storage: 1Gi + storageClassName: csi-cvmfs-sft +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-cvmfs-grid-pvc +spec: + accessModes: + - ReadOnlyMany + resources: + requests: + storage: 1Gi + storageClassName: csi-cvmfs-grid + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-cvmfs-atlas-condb-pvc +spec: + accessModes: + - ReadOnlyMany + resources: + requests: + storage: 1Gi + storageClassName: csi-cvmfs-atlas-condb +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-cvmfs-atlas-nightlies-pvc +spec: + accessModes: + - ReadOnlyMany + resources: + requests: + storage: 1Gi + storageClassName: csi-cvmfs-atlas-nightlies \ No newline at end of file From 054179cc5e051490d7d7670a6855f0b141e3cf82 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 2 Dec 2019 21:42:28 +0100 Subject: [PATCH 38/41] prodanaly: use production templates for unified queues --- pandaharvester/harvestermisc/info_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandaharvester/harvestermisc/info_utils.py b/pandaharvester/harvestermisc/info_utils.py index 289a9a9c..d1452e47 100644 --- a/pandaharvester/harvestermisc/info_utils.py +++ b/pandaharvester/harvestermisc/info_utils.py @@ -121,5 +121,7 @@ def get_type_workflow(self, panda_resource): workflow = None else: pq_type = panda_queue_dict.get('type') + if pq_type == 'unified': # use production templates + pq_type = 'production' workflow = panda_queue_dict.get('workflow') return pq_type, workflow From fae333b4a92259dc17041b3e1f6a08e5f864b57c Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Dec 2019 14:15:44 +0100 Subject: [PATCH 39/41] prodanaly: job prodsourcelabel precedence over queue prodsourcelabel --- pandaharvester/harvestercore/queue_config_mapper.py | 8 +++++++- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index dca5b9ab..36fd1033 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -77,9 +77,15 @@ def is_no_heartbeat_status(self, status): return status in self.get_no_heartbeat_status() # get prodSourceLabel - def get_source_label(self): + def get_source_label(self, job_type=None): + # if queue is in test status, only submit workers for HC jobs if self.queueStatus == 'test': return 'test' + + # grandly unified queues: prodsourcelabel in job has precedence over queue prodsourcelabel + if job_type == 'user': + return job_type + return self.prodSourceLabel # set unique name diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 41ffe03a..fb9baa63 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -365,7 +365,7 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e # decide prodSourceLabel pilot_opt_tuple = _get_prodsourcelabel_pilotypeopt_piloturlstr(workspec.pilotType, pilot_version) if pilot_opt_tuple is None: - prod_source_label = harvester_queue_config.get_source_label() + prod_source_label = harvester_queue_config.get_source_label(workspec.jobType) pilot_type_opt = workspec.pilotType pilot_url_str = '' else: From f128e23416b34d5021dae234e11c7a22d99ec592 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Dec 2019 16:30:52 +0100 Subject: [PATCH 40/41] prodanaly: treat prodsourcelabel=panda as analysis jobs --- pandaharvester/harvestercore/queue_config_mapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 36fd1033..ee83827a 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -83,8 +83,8 @@ def get_source_label(self, job_type=None): return 'test' # grandly unified queues: prodsourcelabel in job has precedence over queue prodsourcelabel - if job_type == 'user': - return job_type + if job_type in ('user', 'panda'): + return 'user' return self.prodSourceLabel From a6a97592db822bae7e774a5d04c0713550837082 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Tue, 3 Dec 2019 16:50:13 +0100 Subject: [PATCH 41/41] prodanaly: more prodsourcelabel=panda cases --- pandaharvester/harvestermisc/info_utils.py | 5 ++--- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvestermisc/info_utils.py b/pandaharvester/harvestermisc/info_utils.py index d1452e47..dca3615b 100644 --- a/pandaharvester/harvestermisc/info_utils.py +++ b/pandaharvester/harvestermisc/info_utils.py @@ -90,11 +90,10 @@ def is_grandly_unified_queue(self, panda_resource): return False # initial, temporary nomenclature - if 'grandly_unified' in panda_queue_dict.get('catchall'): + if 'grandly_unified' in panda_queue_dict.get('catchall') \ + or panda_queue_dict.get('type') == 'unified': return True - # TODO: implement the final nomenclature - return False # get harvester params diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index fb9baa63..feb30ab5 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -775,7 +775,7 @@ def _choose_proxy(workspec): """ job_type = workspec.jobType proxy = self.x509UserProxy - if is_grandly_unified_queue and (job_type == 'user' or job_type == 'analysis') and self.x509UserProxyAnalysis: + if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis') and self.x509UserProxyAnalysis: tmpLog.debug('Taking analysis proxy') proxy = self.x509UserProxyAnalysis else: