diff --git a/examples/htcondor_submit_doma_pilot.sdf b/examples/htcondor_submit_doma_pilot.sdf new file mode 100644 index 00000000..6f76fc6b --- /dev/null +++ b/examples/htcondor_submit_doma_pilot.sdf @@ -0,0 +1,49 @@ +executable = /data/idds/harvester_common/runpilot2-wrapper.sh +arguments = -s {computingSite} -r {computingSite} -q {pandaQueueName} -j {prodSourceLabel} -i {pilotType} -t -w generic --pilot-user generic --url https://ai-idds-01.cern.ch -d --harvester-submit-mode PULL --allow-same-user=False --job-type={jobType} {pilotResourceTypeOption} {pilotUrlOption} +initialdir = {accessPoint} + +log = {logDir}/{logSubdir}/grid.$(Cluster).$(Process).log +output = {logDir}/{logSubdir}/grid.$(Cluster).$(Process).out +error = {logDir}/{logSubdir}/grid.$(Cluster).$(Process).err +transfer_executable = True + +environment = "PANDA_JSID=harvester-{harvesterID} HARVESTER_ID={harvesterID} HARVESTER_WORKER_ID={workerID} GTAG={gtag}" ++harvesterID = "{harvesterID}" ++harvesterWorkerID = "{workerID}" + +universe = grid +grid_resource = condor gridtest01.racf.bnl.gov gridtest01.racf.bnl.gov:9619 + +X509UserProxy = {x509UserProxy} +ShouldTransferFiles = YES +WhenToTransferOutput = ON_EXIT +use_x509userproxy = true + ++remote_jobuniverse = 5 ++remote_ShouldTransferFiles = "YES" ++remote_WhenToTransferOutput = "ON_EXIT_OR_EVICT" ++remote_TransferOutput = "" +#+remote_RequestCpus = {nCoreTotal} +#+remote_RequestMemory = {requestRam} +#+remote_RequestDisk = {requestDisk} +#+remote_JobMaxVacateTime = {requestWalltime} ++ioIntensity = {ioIntensity} ++xcount = {nCoreTotal} ++maxMemory = {requestRam} ++remote_queue = "{ceQueueName}" ++maxWallTime = {requestWalltimeMinute} + +delegate_job_GSI_credentials_lifetime = 0 + +#+remote_Requirements = JobRunCount == 0 +periodic_remove = (JobStatus == 2 && (CurrentTime - EnteredCurrentStatus) > 604800) +#+remote_PeriodicHold = ( JobStatus==1 && gridjobstatus=?=UNDEFINED && CurrentTime-EnteredCurrentStatus>3600 ) || ( (JobRunCount =!= UNDEFINED && JobRunCount > 0) ) || ( JobStatus == 2 && CurrentTime-EnteredCurrentStatus>604800 ) ++remote_PeriodicRemove = (JobStatus == 5 && (CurrentTime - EnteredCurrentStatus) > 3600) || (JobStatus == 1 && globusstatus =!= 1 && (CurrentTime - EnteredCurrentStatus) > 86400) + ++sdfPath = "{sdfPath}" + ++ProjectName="EIC" ++remote_queue = "osg" + +queue 1 + diff --git a/examples/k8s/job_cern.yaml b/examples/k8s/job_cern.yaml index c5a08fe1..3981c4ee 100644 --- a/examples/k8s/job_cern.yaml +++ b/examples/k8s/job_cern.yaml @@ -19,8 +19,6 @@ spec: value: "$pandaQueueName" - name: proxySecretPath value: "$proxySecretPath" - - name: proxyContent - value: "$proxyContent" - name: workerID value: "$workerID" - name: logs_frontend_w diff --git a/examples/panda_queueconfig_doma.json b/examples/panda_queueconfig_doma.json new file mode 100644 index 00000000..b564d9cf --- /dev/null +++ b/examples/panda_queueconfig_doma.json @@ -0,0 +1,274 @@ +{ + +"production.pull": { + "isTemplateQueue": true, + "prodSourceLabel": "managed", + "nQueueLimitWorkerRatio": 50, + "nQueueLimitWorkerMin": 100, + "nQueueLimitWorkerMax": 10000, + "maxWorkers": 10, + "maxNewWorkersPerCycle": 100, + "mapType": "NoJob", + "truePilot": true, + "maxSubmissionAttempts": 3, + "walltimeLimit": 1209600, + "prefetchEvents": false, + "preparator": { + "name": "DummyPreparator", + "module": "pandaharvester.harvesterpreparator.dummy_preparator" + }, + "submitter": { + "name": "HTCondorSubmitter", + "module": "pandaharvester.harvestersubmitter.htcondor_submitter", + "useSpool": false, + "useAtlasGridCE": false, + "useAtlasAGIS": true, + "templateFile": "/cephfs/atlpan/harvester/harvester_common/CERN_central_1/cloudscheduler-pilot2.sdf", + "executableFile": "/cephfs/atlpan/harvester/harvester_common/CERN_central_1/runpilot2-wrapper.sh", + "x509UserProxy": "/data/idds/x509up_u25606", + "logDir": "/data/idds/condor_logs", + "logBaseURL": "https://ai-idds-02.cern.ch/condor_logs", + "nProcesses": 8 + }, + "workerMaker": { + "name": "SimpleWorkerMaker", + "module": "pandaharvester.harvesterworkermaker.simple_worker_maker", + "jobAttributesToUse": [ + "nCore" + ], + "pilotTypeRandomWeightsPermille": { + "RC": 10, + "ALRB": 10, + "PT": 10 + } + }, + "messenger": { + "name": "SharedFileMessenger", + "module": "pandaharvester.harvestermessenger.shared_file_messenger", + "jobSpecFileFormat": "cgi", + "accessPoint": "/data/idds/harvester_wdirs/${harvesterID}/${_workerID_3.2}/${_workerID_1.0}/${workerID}" + }, + "stager": { + "name": "DummyStager", + "module": "pandaharvester.harvesterstager.dummy_stager" + }, + "monitor": { + "name": "HTCondorMonitor", + "module": "pandaharvester.harvestermonitor.htcondor_monitor", + "cancelUnknown": false + }, + "sweeper": { + "name": "HTCondorSweeper", + "module": "pandaharvester.harvestersweeper.htcondor_sweeper" + } + }, + +"production.push": { + "isTemplateQueue": true, + "prodSourceLabel": "managed", + "nQueueLimitWorker": 10000, + "nQueueLimitJobRatio":40, + "nQueueLimitJobMax": 1000, + "nQueueLimitJobMin":3, + "maxWorkers": 10, + "maxNewWorkersPerCycle": 100, + "mapType": "OneToOne", + "truePilot": true, + "maxSubmissionAttempts": 3, + "walltimeLimit": 1209600, + "prefetchEvents": false, + "preparator": { + "name": "DummyPreparator", + "module": "pandaharvester.harvesterpreparator.dummy_preparator" + }, + "submitter": { + "name": "HTCondorSubmitter", + "module": "pandaharvester.harvestersubmitter.htcondor_submitter", + "useSpool": false, + "useAtlasGridCE": false, + "useAtlasAGIS": true, + "templateFile": "/cephfs/atlpan/harvester/harvester_common/CERN_central_1/cloudscheduler-pilot2.sdf", + "executableFile": "/cephfs/atlpan/harvester/harvester_common/CERN_central_1/runpilot2-wrapper.sh", + "x509UserProxy": "/data/idds/x509up_u25606", + "logDir": "/data/idds/condor_logs", + "logBaseURL": "https://ai-idds-02.cern.ch/condor_logs", + "nProcesses": 8 + }, + "workerMaker": { + "name": "SimpleWorkerMaker", + "module": "pandaharvester.harvesterworkermaker.simple_worker_maker", + "jobAttributesToUse": [ + "nCore" + ], + "pilotTypeRandomWeightsPermille": { + "RC": 10, + "ALRB": 10, + "PT": 10 + } + }, + "messenger": { + "name": "SharedFileMessenger", + "module": "pandaharvester.harvestermessenger.shared_file_messenger", + "jobSpecFileFormat": "cgi", + "accessPoint": "/data/idds/harvester_wdirs/${harvesterID}/${_workerID_3.2}/${_workerID_1.0}/${workerID}" + }, + "stager": { + "name": "DummyStager", + "module": "pandaharvester.harvesterstager.dummy_stager" + }, + "monitor": { + "name": "HTCondorMonitor", + "module": "pandaharvester.harvestermonitor.htcondor_monitor", + "cancelUnknown": false + }, + "sweeper": { + "name": "HTCondorSweeper", + "module": "pandaharvester.harvestersweeper.htcondor_sweeper" + } + }, + + + "production_k8s.pull":{ + "isTemplateQueue": true, + "prodSourceLabel":"managed", + "prodSourceLabelRandomWeightsPermille": {"ptest":10, "rc_test":10, "rc_test2":10, "rc_alrb":10}, + "nQueueLimitWorker":5000, + "nQueueLimitWorkerRatio":40, + "nQueueLimitWorkerMin":100, + "maxWorkers":100000, + "maxNewWorkersPerCycle":50, + "mapType":"NoJob", + "truePilot":true, + "maxSubmissionAttempts":3, + "walltimeLimit":1209600, + "prefetchEvents":false, + "preparator":{ + "name":"DummyPreparator", + "module":"pandaharvester.harvesterpreparator.dummy_preparator" + }, + "workerMaker":{ + "name":"SimpleWorkerMaker", + "module":"pandaharvester.harvesterworkermaker.simple_worker_maker", + "jobAttributesToUse":[ + "nCore" + ], + "pilotTypeRandomWeightsPermille": {"RC": 10, "ALRB": 10, "PT": 10} + }, + "messenger":{ + "name":"SharedFileMessenger", + "module":"pandaharvester.harvestermessenger.shared_file_messenger", + "jobSpecFileFormat":"cgi", + "accessPoint":"/data/idds/harvester_wdirs/${harvesterID}/${_workerID_3.2}/${_workerID_1.0}/${workerID}" + }, + "stager":{ + "name":"DummyStager", + "module":"pandaharvester.harvesterstager.dummy_stager" + }, + "submitter":{ + "name": "K8sSubmitter", + "module": "pandaharvester.harvestersubmitter.k8s_submitter", + "x509UserProxy": "/data/idds/x509up_u25606", + "proxySecretPath": "/proxy/x509up_u25606", + "logDir": "/var/cache/pandaserver/", + "logBaseURL": "https://ai-idds-01.cern.ch:25443/cache", + "nProcesses": 4 + }, + "monitor":{ + "name": "K8sMonitor", + "module": "pandaharvester.harvestermonitor.k8s_monitor" + }, + "sweeper":{ + "name": "K8sSweeper", + "module": "pandaharvester.harvestersweeper.k8s_sweeper" + }, + "credmanagers": [ + { + "module": "pandaharvester.harvestercredmanager.k8s_secret_cred_manager", + "name": "K8sSecretCredManager", + "k8s_namespace": "${common.k8s_namespace}", + "k8s_config_file": "${common.k8s_config_file}", + "proxy_files": ["/data/idds/x509up_u25606"] + } + ], + "common": { + "k8s_yaml_file": "/opt/harvester/etc/k8s/job_cvmfs_prp_driver.yaml", + "k8s_config_file": "/data/idds/gcloud_config/.kube", + "k8s_namespace": "default" + } + }, + +"DOMA_LSST_GOOGLE_TEST_HIMEM": { + "queueStatus": "offline", + "prodSourceLabelRandomWeightsPermille": {"rc_test":0, "rc_test2":0, "rc_alrb":0}, + "maxWorkers": 10000, + "nQueueLimitWorkerRatio": 30, + "nQueueLimitWorkerMin": 1, + "nQueueLimitWorkerMax": 100, + "maxNewWorkersPerCycle":200, + "templateQueueName": "production_k8s.pull", + "common": { + "k8s_yaml_file": "/opt/harvester/etc/k8s/job_cvmfs_prp_driver.yaml", + "k8s_config_file": "/data/idds/gcloud_config_rubin/kube_high_mem", + "k8s_namespace": "default" + } + }, + + +"DOMA_LSST_GOOGLE_TEST": { + "queueStatus": "offline", + "prodSourceLabelRandomWeightsPermille": {"rc_test":0, "rc_test2":0, "rc_alrb":0}, + "maxWorkers": 10000, + "nQueueLimitWorkerRatio": 30, + "nQueueLimitWorkerMin": 1, + "nQueueLimitWorkerMax": 100, + "maxNewWorkersPerCycle":200, + "templateQueueName": "production_k8s.pull", + "common": { + "k8s_yaml_file": "/opt/harvester/etc/k8s/job_cvmfs_prp_driver.yaml", + "k8s_config_file": "/data/idds/gcloud_config_rubin/kube_moderate_mem", + "k8s_namespace": "default" + } +}, + +"TEST_SITE": { + "queueStatus": "online", + "prodSourceLabel": "manage", + "prodSourceLabelRandomWeightsPermille": {"rc_test":0, "rc_test2":0, "rc_alrb":0}, + "templateQueueName": "production.pull", + "maxWorkers": 1, + "nQueueLimitWorkerMin": 1, + "nQueueLimitWorkerMax": 2, + "submitter": { + "templateFile": "/opt/condor_test/grid_submit_pilot.sdf" + } + }, + +"BNL_OSG_1": { + "queueStatus": "offline", + "prodSourceLabel": "managed", + "prodSourceLabelRandomWeightsPermille": {"rc_test":0, "rc_test2":0, "rc_alrb":0}, + "templateQueueName": "production.push", + "maxWorkers": 1000, + "nQueueLimitWorkerRatio": 200, + "nQueueLimitWorkerMin":1, + "nQueueLimitWorkerMax": 60, + "submitter": { + "templateFile": "/opt/condor_test/grid_submit_pilot_push.sdf" + } +}, + +"BNL_OSG_SPHENIX": { + "queueStatus": "offline", + "prodSourceLabel": "managed", + "prodSourceLabelRandomWeightsPermille": {"rc_test":0, "rc_test2":0, "rc_alrb":0}, + "templateQueueName": "production.pull", + "maxWorkers": 10000, + "nQueueLimitWorkerRatio": 200, + "nQueueLimitWorkerMin": 2, + "nQueueLimitWorkerMax": 60, + "submitter": { + "templateFile": "/opt/condor_test/grid_submit_pilot_pull_sphenix.sdf", + "x509UserProxy": "/data/idds/sphenix_voms/hcvoms.sdcc.bnl.gov.short.proxy" + } + } +} diff --git a/images/Horovod/1_overview.png b/images/Horovod/1_overview.png new file mode 100644 index 00000000..13b2d208 Binary files /dev/null and b/images/Horovod/1_overview.png differ diff --git a/images/Horovod/2_resources.png b/images/Horovod/2_resources.png new file mode 100644 index 00000000..17931c41 Binary files /dev/null and b/images/Horovod/2_resources.png differ diff --git a/images/Horovod/3_formation.png b/images/Horovod/3_formation.png new file mode 100644 index 00000000..779a3962 Binary files /dev/null and b/images/Horovod/3_formation.png differ diff --git a/images/Horovod/4_issues.png b/images/Horovod/4_issues.png new file mode 100644 index 00000000..cc5587c2 Binary files /dev/null and b/images/Horovod/4_issues.png differ diff --git a/images/Horovod/5 - communications.png b/images/Horovod/5 - communications.png new file mode 100644 index 00000000..ec4c4565 Binary files /dev/null and b/images/Horovod/5 - communications.png differ diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index fad0b97f..067b92a7 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "15-01-2021 05:19:17 on release (by fahui)" +timestamp = "17-09-2021 07:05:31 on release (by mightqxc)" diff --git a/pandaharvester/harvesterbody/cred_manager.py b/pandaharvester/harvesterbody/cred_manager.py index 4be86a38..8b4b4078 100644 --- a/pandaharvester/harvesterbody/cred_manager.py +++ b/pandaharvester/harvesterbody/cred_manager.py @@ -21,8 +21,8 @@ def __init__(self, queue_config_mapper, single_mode=False): self.pluginFactory = PluginFactory() self.dbProxy = DBProxy() # plugin cores - self.exeCores = [] - self.queue_exe_cores = [] + self.exe_cores = [] # general cred managers that are not queue based, e.g. VOMS renewal on harvester instance + self.queue_exe_cores = [] # cred manages that are queue based, e.g. VOMS refresh on K8S clusters # get plugin from harvester config self.get_cores_from_harvester_config() # update plugin cores from queue config @@ -72,17 +72,17 @@ def get_cores_from_harvester_config(self): # from traditional attributes for moduleName, className, inCertFile, outCertFile, voms in \ zip(moduleNames, classNames, inCertFiles, outCertFiles, vomses): - pluginPar = {} - pluginPar['module'] = moduleName - pluginPar['name'] = className - pluginPar['inCertFile'] = inCertFile - pluginPar['outCertFile'] = outCertFile - pluginPar['voms'] = voms + plugin_params = {} + plugin_params['module'] = moduleName + plugin_params['name'] = className + plugin_params['inCertFile'] = inCertFile + plugin_params['outCertFile'] = outCertFile + plugin_params['voms'] = voms try: - exeCore = self.pluginFactory.get_plugin(pluginPar) - self.exeCores.append(exeCore) + exe_core = self.pluginFactory.get_plugin(plugin_params) + self.exe_cores.append(exe_core) except Exception: - _logger.error('failed to launch credmanager with traditional attributes for {0}'.format(pluginPar)) + _logger.error('failed to launch credmanager with traditional attributes for {0}'.format(plugin_params)) core_utils.dump_error_message(_logger) # from pluginConfigs for pc in pluginConfigs: @@ -90,15 +90,14 @@ def get_cores_from_harvester_config(self): setup_maps = pc['configs'] for setup_name, setup_map in setup_maps.items(): try: - pluginPar = {} - pluginPar['module'] = pc['module'] - pluginPar['name'] = pc['name'] - pluginPar['setup_name'] = setup_name - pluginPar.update(setup_map) - exeCore = self.pluginFactory.get_plugin(pluginPar) - self.exeCores.append(exeCore) + plugin_params = {'module': pc['module'], + 'name': pc['name'], + 'setup_name': setup_name} + plugin_params.update(setup_map) + exe_core = self.pluginFactory.get_plugin(plugin_params) + self.exe_cores.append(exe_core) except Exception: - _logger.error('failed to launch credmanager in pluginConfigs for {0}'.format(pluginPar)) + _logger.error('failed to launch credmanager in pluginConfigs for {0}'.format(plugin_params)) core_utils.dump_error_message(_logger) except Exception: _logger.error('failed to parse pluginConfigs {0}'.format(pc)) @@ -109,15 +108,15 @@ def update_cores_from_queue_config(self): self.queue_exe_cores = [] for queue_name, queue_config in self.queue_config_mapper.get_all_queues().items(): if queue_config.queueStatus == 'offline' \ - or not hasattr(queue_config, 'credmanagers') \ - or not isinstance(queue_config.credmanagers, list): + or not hasattr(queue_config, 'credmanagers') \ + or not isinstance(queue_config.credmanagers, list): continue for cm_setup in queue_config.credmanagers: try: - pluginPar = {} - pluginPar['module'] = cm_setup['module'] - pluginPar['name'] = cm_setup['name'] - pluginPar['setup_name'] = queue_name + plugin_params = {'module': cm_setup['module'], + 'name': cm_setup['name'], + 'setup_name': queue_name, + 'queueName': queue_name} for k, v in cm_setup.items(): if k in ('module', 'name'): pass @@ -140,14 +139,14 @@ def update_cores_from_queue_config(self): if tmp_val is not None: value = value.replace(tmp_ph, tmp_val) # fill in - pluginPar[k] = value + plugin_params[k] = value else: # fill in - pluginPar[k] = v - exe_core = self.pluginFactory.get_plugin(pluginPar) + plugin_params[k] = v + exe_core = self.pluginFactory.get_plugin(plugin_params) self.queue_exe_cores.append(exe_core) except Exception: - _logger.error('failed to launch about queue={0} for {1}'.format(queue_name, pluginPar)) + _logger.error('failed to launch plugin for queue={0} and {1}'.format(queue_name, plugin_params)) core_utils.dump_error_message(_logger) # main loop @@ -171,30 +170,29 @@ def execute(self): if not locked: return # loop over all plugins - for exeCore in itertools.chain(self.exeCores, self.queue_exe_cores): + for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores): # do nothing - if exeCore is None: + if exe_core is None: continue # make logger - credmanager_name = '' - if hasattr(exeCore, 'setup_name'): - credmanager_name = exeCore.setup_name + if hasattr(exe_core, 'setup_name'): + credmanager_name = exe_core.setup_name else: - credmanager_name = '{0} {1}'.format(exeCore.inCertFile, exeCore.outCertFile) + credmanager_name = '{0} {1}'.format(exe_core.inCertFile, exe_core.outCertFile) mainLog = self.make_logger(_logger, - '{0} {1}'.format(exeCore.__class__.__name__, credmanager_name), + '{0} {1}'.format(exe_core.__class__.__name__, credmanager_name), method_name='execute') try: # check credential mainLog.debug('check credential') - isValid = exeCore.check_credential() + isValid = exe_core.check_credential() if isValid: mainLog.debug('valid') elif not isValid: # renew it if necessary mainLog.debug('invalid') mainLog.debug('renew credential') - tmpStat, tmpOut = exeCore.renew_credential() + tmpStat, tmpOut = exe_core.renew_credential() if not tmpStat: mainLog.error('failed : {0}'.format(tmpOut)) continue @@ -208,28 +206,28 @@ def execute_monit(self): metrics = {} # loop over all plugins - for exeCore in itertools.chain(self.exeCores, self.queue_exe_cores): + for exe_core in itertools.chain(self.exe_cores, self.queue_exe_cores): # do nothing - if exeCore is None: + if exe_core is None: continue # make logger - if hasattr(exeCore, 'setup_name'): - credmanager_name = exeCore.setup_name + if hasattr(exe_core, 'setup_name'): + credmanager_name = exe_core.setup_name else: - credmanager_name = '{0} {1}'.format(exeCore.inCertFile, exeCore.outCertFile) + credmanager_name = '{0} {1}'.format(exe_core.inCertFile, exe_core.outCertFile) - subLog = self.make_logger(_logger, '{0} {1}'.format(exeCore.__class__.__name__, credmanager_name), + sub_log = self.make_logger(_logger, '{0} {1}'.format(exe_core.__class__.__name__, credmanager_name), method_name='execute_monit') try: # check credential - subLog.debug('check credential lifetime') - lifetime = exeCore.check_credential_lifetime() + sub_log.debug('check credential lifetime') + lifetime = exe_core.check_credential_lifetime() if lifetime is not None: - metrics[exeCore.outCertFile] = lifetime + metrics[exe_core.outCertFile] = lifetime except Exception: - core_utils.dump_error_message(subLog) + core_utils.dump_error_message(sub_log) - subLog.debug('done') + sub_log.debug('done') return metrics diff --git a/pandaharvester/harvesterbody/service_monitor.py b/pandaharvester/harvesterbody/service_monitor.py index 214f299c..22aa2c0c 100644 --- a/pandaharvester/harvesterbody/service_monitor.py +++ b/pandaharvester/harvesterbody/service_monitor.py @@ -19,6 +19,11 @@ _logger = core_utils.setup_logger('service_monitor') +def round_floats(value): + # Will round floats to 2 decimals. If the value is not a float, it will return the value unchanged + return round(value, 2) if isinstance(value, float) else value + + # class to monitor the service, e.g. memory usage class ServiceMonitor(AgentBase): # constructor @@ -143,10 +148,12 @@ def run(self): # get memory usage rss_mib, memory_pc, cpu_pc = self.get_memory_n_cpu() - service_metrics['rss_mib'] = rss_mib - service_metrics['memory_pc'] = memory_pc - service_metrics['cpu_pc'] = cpu_pc - _logger.debug('Memory usage: {0} MiB/{1}%, CPU usage: {2}'.format(rss_mib, memory_pc, cpu_pc)) + service_metrics['rss_mib'] = round_floats(rss_mib) + service_metrics['memory_pc'] = round_floats(memory_pc) + service_metrics['cpu_pc'] = round_floats(cpu_pc) + _logger.debug('Memory usage: {0} MiB/{1}%, CPU usage: {2}'.format(service_metrics['rss_mib'], + service_metrics['memory_pc'], + service_metrics['cpu_pc'])) # get volume usage try: @@ -155,12 +162,14 @@ def run(self): volumes = [] for volume in volumes: volume_use = self.volume_use(volume) - _logger.debug('Disk usage of {0}: {1} %'.format(volume, volume_use)) - service_metrics['volume_{0}_pc'.format(volume)] = volume_use + service_metrics['volume_{0}_pc'.format(volume)] = round_floats(volume_use) + _logger.debug('Disk usage of {0}: {1} %'.format(volume, + service_metrics['volume_{0}_pc'.format(volume)])) + + # get certificate lifetimes. Not all plugins have implemented it + _logger.debug('Getting cert lifetimes') + service_metrics['cert_lifetime'] = {cert: round(value) for (cert, value) in self.cert_validities().items()} - # get certificate validities. Not all plugins have implemented it - _logger.debug('Getting cert validities') - service_metrics['cert_lifetime'] = self.cert_validities() _logger.debug('Got cert validities: {0}'.format(service_metrics['cert_lifetime'])) service_metrics_spec = ServiceMetricSpec(service_metrics) diff --git a/pandaharvester/harvestercloud/aws_unhealthy_nodes.py b/pandaharvester/harvestercloud/aws_unhealthy_nodes.py new file mode 100644 index 00000000..d6f5aecb --- /dev/null +++ b/pandaharvester/harvestercloud/aws_unhealthy_nodes.py @@ -0,0 +1,37 @@ +# Detect and delete nodes that are stuck + +from kubernetes import client, config +from subprocess import Popen, PIPE + +config.load_kube_config(config_file='YOUR KUBECONFIG FILE') +apis_api = client.CoreV1Api() + +# get running nodes +running_nodes = {} +nodes = apis_api.list_node() +for node in nodes.items: + running_nodes[node.metadata.name] = node.spec.provider_id.split('/')[-1] + +# get events with FailedMounts and filter them by known error message +failed_mount_events = apis_api.list_namespaced_event(namespace='default', field_selector='reason=FailedMount') + +unhealthy_node_ids = set() +for event in failed_mount_events.items: + node_name = event.source.host + if 'Argument list too long' in event.message and node_name in running_nodes: + unhealthy_node_ids.add(running_nodes[node_name]) + +# set the node as unhealthy using the AWS CLI +command = '/usr/local/bin/aws autoscaling set-instance-health --instance-id {0} --health-status Unhealthy' +for id in unhealthy_node_ids: + command_with_id = command.format(id) + command_list = command_with_id.split(' ') + p = Popen(command_list, stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + print('------------------------------------') + print(command_with_id) + print('return code: {0}'.format(p.returncode)) + print('return code: {0}'.format(p.returncode)) + print('output: {0}'.format(output)) + print('err: {0}'.format(err)) + print('------------------------------------') \ No newline at end of file diff --git a/pandaharvester/harvestercloud/k8s_startup_script.py b/pandaharvester/harvestercloud/k8s_startup_script.py deleted file mode 100644 index be5bd8cd..00000000 --- a/pandaharvester/harvestercloud/k8s_startup_script.py +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python - - -######################################################## -# OBSOLETE!!! USE PILOTS_STARTER.PY -######################################################## - -""" -This script will be executed at container startup -- It will retrieve the proxy and panda queue from the environment -- It will download the pilot wrapper from github and execute it -- It will upload the pilot logs to panda cache at the end - -post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py -""" - -try: - import subprocess32 as subprocess -except Exception: - import subprocess -import os -import sys -import logging -import traceback -import httplib -import mimetypes -import ssl -import urlparse -import urllib2 - -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', - filename='/tmp/vm_script.log', filemode='w') - - -def post_multipart(host, port, selector, files, proxy_cert): - """ - Post files to an http host as multipart/form-data. - files is a sequence of (name, filename, value) elements for data to be uploaded as files - Return the server's response page. - """ - content_type, body = encode_multipart_formdata(files) - - context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - context.load_cert_chain(certfile=proxy_cert, keyfile=proxy_cert) - - h = httplib.HTTPSConnection(host, port, context=context, timeout=180) - - h.putrequest('POST', selector) - h.putheader('content-type', content_type) - h.putheader('content-length', str(len(body))) - h.endheaders() - h.send(body) - response = h.getresponse() - return response.status, response.reason - - -def encode_multipart_formdata(files): - """ - files is a sequence of (name, filename, value) elements for data to be uploaded as files - Return (content_type, body) ready for httplib.HTTP instance - """ - BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$' - CRLF = '\r\n' - L = [] - for (key, filename, value) in files: - L.append('--' + BOUNDARY) - L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)) - L.append('Content-Type: %s' % get_content_type(filename)) - L.append('') - L.append(value) - L.append('--' + BOUNDARY + '--') - L.append('') - body = CRLF.join(L) - content_type = 'multipart/form-data; boundary=%s' % BOUNDARY - return content_type, body - - -def get_content_type(filename): - return mimetypes.guess_type(filename)[0] or 'application/octet-stream' - - -def upload_logs(url, log_file_name, destination_name, proxy_cert): - try: - full_url = url + '/putFile' - urlparts = urlparse.urlsplit(full_url) - - logging.debug('[upload_logs] start') - files = [('file', destination_name, open(log_file_name).read())] - status, reason = post_multipart(urlparts.hostname, urlparts.port, urlparts.path, files, proxy_cert) - logging.debug('[upload_logs] finished with code={0} msg={1}'.format(status, reason)) - if status == 200: - return True - except Exception: - err_type, err_value = sys.exc_info()[:2] - err_messsage = "failed to put with {0}:{1} ".format(err_type, err_value) - err_messsage += traceback.format_exc() - logging.debug('[upload_logs] excepted with:\n {0}'.format(err_messsage)) - - return False - - -def get_url(url, headers=None): - """ - get content from specified URL - TODO: error handling - """ - response = urllib2.urlopen(wrapper_url) - content = response.read() - return content - -def get_configuration(): - - # get the proxy certificate and save it - if os.environ.get('proxySecretPath'): - # os.symlink(os.environ.get('proxySecretPath'), proxy_path) - proxy_path = os.environ.get('proxySecretPath') - elif os.environ.get('proxyContent'): - proxy_path = "/tmp/x509up" - proxy_string = os.environ.get('proxyContent').replace(",", "\n") - with open(proxy_path, "w") as proxy_file: - proxy_file.write(proxy_string) - del os.environ['proxyContent'] - os.chmod(proxy_path, 0o600) - else: - logging.debug('[main] no proxy specified in env var $proxySecretPath nor $proxyContent') - raise Exception('Found no voms proxy specified') - os.environ['X509_USER_PROXY'] = proxy_path - logging.debug('[main] initialized proxy') - - # get the panda site name - panda_site = os.environ.get('computingSite') - logging.debug('[main] got panda site: {0}'.format(panda_site)) - - # get the panda queue name - panda_queue = os.environ.get('pandaQueueName') - logging.debug('[main] got panda queue: {0}'.format(panda_queue)) - - # get the resource type of the worker - resource_type = os.environ.get('resourceType') - logging.debug('[main] got resource type: {0}'.format(resource_type)) - - # get the worker id - worker_id = os.environ.get('workerID') - logging.debug('[main] got worker id: {0}'.format(worker_id)) - - # get the URL (e.g. panda cache) to upload logs - logs_frontend_w = os.environ.get('logs_frontend_w') - logging.debug('[main] got url to upload logs') - - # get the URL (e.g. panda cache) where the logs can be downloaded afterwards - logs_frontend_r = os.environ.get('logs_frontend_r') - logging.debug('[main] got url to download logs') - - return proxy_path, panda_site, panda_queue, resource_type, worker_id, logs_frontend_w, logs_frontend_r - - -if __name__ == "__main__": - - # get all the configuration from the environment - proxy_path, panda_site, panda_queue, resource_type, worker_id, logs_frontend_w, logs_frontend_r = get_configuration() - - # the pilot should propagate the download link via the pilotId field in the job table - destination_name = '{0}.out'.format(worker_id) - log_download_url = '{0}/{1}'.format(logs_frontend_r, destination_name) - os.environ['GTAG'] = log_download_url # GTAG env variable is read by pilot - - # get the pilot wrapper - wrapper_path = "/tmp/runpilot2-wrapper.sh" - wrapper_url = "https://raw.githubusercontent.com/PanDAWMS/pilot-wrapper/master/runpilot2-wrapper.sh" - wrapper_string = get_url(wrapper_url) - with open(wrapper_path, "w") as wrapper_file: - wrapper_file.write(wrapper_string) - os.chmod(wrapper_path, 0o544) # make pilot wrapper executable - logging.debug('[main] downloaded pilot wrapper') - - # execute the pilot wrapper - logging.debug('[main] starting pilot wrapper...') - resource_type_option = '' - if resource_type: - resource_type_option = '--resource-type {0}'.format(resource_type) - wrapper_params = '-s {0} -r {1} -q {2} {3}'.format(panda_site, panda_queue, panda_queue, resource_type_option) - if 'ANALY' in panda_queue: - wrapper_params = '{0} -j user'.format(wrapper_params) - else: - wrapper_params = '{0} -j managed'.format(wrapper_params) - command = "/tmp/runpilot2-wrapper.sh {0} -i PR -w generic --pilot-user=ATLAS --url=https://pandaserver.cern.ch -d --harvester-submit-mode=PULL --allow-same-user=False >& /tmp/wrapper-wid.log".\ - format(wrapper_params, worker_id) - subprocess.call(command, shell=True) - logging.debug('[main] pilot wrapper done...') - - # upload logs to e.g. panda cache or similar - upload_logs(logs_frontend_w, '/tmp/wrapper-wid.log', destination_name, proxy_path) diff --git a/pandaharvester/harvestercloud/pilots_starter.py b/pandaharvester/harvestercloud/pilots_starter.py index bae2f3cb..87ef8226 100644 --- a/pandaharvester/harvestercloud/pilots_starter.py +++ b/pandaharvester/harvestercloud/pilots_starter.py @@ -21,7 +21,6 @@ import mimetypes import ssl import urlparse -import urllib2 import traceback WORK_DIR = '/scratch' @@ -104,16 +103,6 @@ def upload_logs(url, log_file_name, destination_name, proxy_cert): return False -def get_url(url, headers=None): - """ - get content from specified URL - TODO: error handling - """ - response = urllib2.urlopen(wrapper_url) - content = response.read() - return content - - def copy_files_in_dir(src_dir, dst_dir): # src_files = os.listdir(src_dir) for file_name in CONFIG_FILES: @@ -124,17 +113,9 @@ def copy_files_in_dir(src_dir, dst_dir): def get_configuration(): # get the proxy certificate and save it if os.environ.get('proxySecretPath'): - # os.symlink(os.environ.get('proxySecretPath'), proxy_path) proxy_path = os.environ.get('proxySecretPath') - elif os.environ.get('proxyContent'): - proxy_path = "/tmp/x509up" - proxy_string = os.environ.get('proxyContent').replace(",", "\n") - with open(proxy_path, "w") as proxy_file: - proxy_file.write(proxy_string) - del os.environ['proxyContent'] - os.chmod(proxy_path, 0o600) else: - logging.debug('[main] no proxy specified in env var $proxySecretPath nor $proxyContent') + logging.debug('[main] no proxy specified in env var $proxySecretPath') raise Exception('Found no voms proxy specified') os.environ['X509_USER_PROXY'] = proxy_path logging.debug('[main] initialized proxy') @@ -157,6 +138,15 @@ def get_configuration(): job_type = os.environ.get('jobType') logging.debug('[main] got job type: {0}'.format(job_type)) + pilot_type = os.environ.get('pilotType', '') + logging.debug('[main] got pilotType: {0}'.format(pilot_type)) + + pilot_url_option = os.environ.get('pilotUrlOpt', '') + logging.debug('[main] got pilotUrlOpt: {0}'.format(pilot_url_option)) + + python_option = os.environ.get('pythonOption', '') + logging.debug('[main] got pythonOption: {0}'.format(python_option)) + # get the Harvester ID harvester_id = os.environ.get('HARVESTER_ID') logging.debug('[main] got Harvester ID: {0}'.format(harvester_id)) @@ -190,32 +180,23 @@ def get_configuration(): if tmpdir: global WORK_DIR WORK_DIR = tmpdir - global CONFIG_DIR - CONFIG_DIR = tmpdir + '/jobconfig' - return proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, harvester_id, \ - worker_id, logs_frontend_w, logs_frontend_r, stdout_name, submit_mode + return proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, \ + pilot_url_option, python_option, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, stdout_name, \ + submit_mode if __name__ == "__main__": # get all the configuration from environment - proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, harvester_id, worker_id, \ - logs_frontend_w, logs_frontend_r, destination_name, submit_mode = get_configuration() + proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, pilot_url_opt, \ + python_option, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, destination_name, submit_mode \ + = get_configuration() # the pilot should propagate the download link via the pilotId field in the job table log_download_url = '{0}/{1}'.format(logs_frontend_r, destination_name) os.environ['GTAG'] = log_download_url # GTAG env variable is read by pilot - # get the pilot wrapper - wrapper_path = "/tmp/runpilot2-wrapper.sh" - wrapper_url = "https://raw.githubusercontent.com/PanDAWMS/pilot-wrapper/master/runpilot2-wrapper.sh" - wrapper_string = get_url(wrapper_url) - with open(wrapper_path, "w") as wrapper_file: - wrapper_file.write(wrapper_string) - os.chmod(wrapper_path, 0o544) # make pilot wrapper executable - logging.debug('[main] downloaded pilot wrapper') - # execute the pilot wrapper logging.debug('[main] starting pilot wrapper...') resource_type_option = '' @@ -229,18 +210,27 @@ def get_configuration(): job_type_option = '' if job_type: - job_type_option = '-i {0}'.format(job_type) + job_type_option = '--job-type {0}'.format(job_type) - wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6}'.format(WORK_DIR, panda_site, panda_queue, panda_queue, - resource_type_option, psl_option, job_type_option) + pilot_type_option = '-i PR' + if pilot_type: + pilot_type_option = '-i {0}'.format(pilot_type) + + wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6} {7} {8} {9}'.format(WORK_DIR, panda_site, panda_queue, + panda_queue, resource_type_option, + psl_option, pilot_type_option, + job_type_option, pilot_url_opt, + python_option) if submit_mode == 'PUSH': # job configuration files need to be copied, because k8s configmap mounts as read-only file system # and therefore the pilot cannot execute in the same directory copy_files_in_dir(CONFIG_DIR, WORK_DIR) - command = "/tmp/runpilot2-wrapper.sh {0} -i PR -w generic --pilot-user=ATLAS --url=https://pandaserver.cern.ch -d --harvester-submit-mode={1} --allow-same-user=False -t | tee /tmp/wrapper-wid.log". \ - format(wrapper_params, submit_mode) + wrapper_executable = "/cvmfs/atlas.cern.ch/repo/sw/PandaPilotWrapper/latest/runpilot2-wrapper.sh" + command = "sh {0} {1} -w generic --pilot-user=ATLAS --url=https://pandaserver.cern.ch -d --harvester-submit-mode={2} --allow-same-user=False -t | tee /tmp/wrapper-wid.log". \ + format(wrapper_executable, wrapper_params, submit_mode) + try: subprocess.call(command, shell=True) except: diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 8f788a5a..b038e7ab 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -523,7 +523,8 @@ def check_table(self, cls, table_name, get_missing=False): varMap = dict() if harvester_config.db.engine == 'mariadb': varMap[':name'] = table_name - sqlC = 'SELECT column_name,column_type FROM information_schema.columns WHERE table_name=:name ' + varMap[':schema'] = harvester_config.db.schema + sqlC = 'SELECT column_name,column_type FROM information_schema.columns WHERE table_schema=:schema AND table_name=:name ' else: sqlC = 'PRAGMA table_info({0}) '.format(table_name) self.execute(sqlC, varMap) @@ -533,7 +534,10 @@ def check_table(self, cls, table_name, get_missing=False): if harvester_config.db.engine == 'mariadb': if hasattr(tmpItem, '_asdict'): tmpItem = tmpItem._asdict() - columnName, columnType = tmpItem['column_name'], tmpItem['column_type'] + try: + columnName, columnType = tmpItem['column_name'], tmpItem['column_type'] + except KeyError: + columnName, columnType = tmpItem['COLUMN_NAME'], tmpItem['COLUMN_TYPE'] else: columnName, columnType = tmpItem[1], tmpItem[2] colMap[columnName] = columnType diff --git a/pandaharvester/harvestercredmanager/arcproxy_cred_manager.py b/pandaharvester/harvestercredmanager/arcproxy_cred_manager.py index 99467926..e206c81a 100644 --- a/pandaharvester/harvestercredmanager/arcproxy_cred_manager.py +++ b/pandaharvester/harvestercredmanager/arcproxy_cred_manager.py @@ -1,7 +1,7 @@ import re import subprocess -from pandaharvester.harvestercore.plugin_base import PluginBase +from .base_cred_manager import BaseCredManager from pandaharvester.harvestercore import core_utils # logger @@ -9,10 +9,10 @@ # credential manager with no-voms proxy using arcproxy -class ArcproxyCredManager(PluginBase): +class ArcproxyCredManager(BaseCredManager): # constructor def __init__(self, **kwarg): - PluginBase.__init__(self, **kwarg) + BaseCredManager.__init__(self, **kwarg) # check proxy def check_credential(self): @@ -29,7 +29,7 @@ def check_credential(self): stdOut = p.stdout.strip() stdErr = p.stderr retCode = p.returncode - except: + except Exception: core_utils.dump_error_message(mainLog) return False mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) @@ -56,9 +56,8 @@ def renew_credential(self): stdErr = p.stderr retCode = p.returncode mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) - except: + except Exception: stdOut = '' stdErr = core_utils.dump_error_message(mainLog) retCode = -1 return retCode == 0, "{0} {1}".format(stdOut, stdErr) - diff --git a/pandaharvester/harvestercredmanager/grid_cred_manager.py b/pandaharvester/harvestercredmanager/grid_cred_manager.py index 98e5bea1..512768ed 100644 --- a/pandaharvester/harvestercredmanager/grid_cred_manager.py +++ b/pandaharvester/harvestercredmanager/grid_cred_manager.py @@ -1,20 +1,21 @@ try: import subprocess32 as subprocess -except: +except Exception: import subprocess +from .base_cred_manager import BaseCredManager from pandaharvester.harvestercore import core_utils -from pandaharvester.harvestercore.plugin_base import PluginBase + # logger _logger = core_utils.setup_logger('grid_cred_manager') # credential manager using grid-proxy -class GridCredManager(PluginBase): +class GridCredManager(BaseCredManager): # constructor def __init__(self, **kwarg): - PluginBase.__init__(self, **kwarg) + BaseCredManager.__init__(self, **kwarg) # check proxy def check_credential(self): @@ -29,7 +30,7 @@ def check_credential(self): stderr=subprocess.PIPE) stdOut, stdErr = p.communicate() retCode = p.returncode - except: + except Exception: core_utils.dump_error_message(mainLog) return False mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) @@ -50,7 +51,7 @@ def renew_credential(self): stdOut, stdErr = p.communicate() retCode = p.returncode mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) - except: + except Exception: stdOut = '' stdErr = core_utils.dump_error_message(mainLog) retCode = -1 diff --git a/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py b/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py index ecb1e2bc..a2e6da10 100644 --- a/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py +++ b/pandaharvester/harvestercredmanager/k8s_secret_cred_manager.py @@ -1,14 +1,10 @@ -import os -import time import json - -from kubernetes import client, config -from kubernetes.client.rest import ApiException +import traceback from .base_cred_manager import BaseCredManager from pandaharvester.harvestercore import core_utils from pandaharvester.harvestermisc.k8s_utils import k8s_Client - +from pandaharvester.harvestermisc.info_utils import PandaQueuesDict # logger _logger = core_utils.setup_logger('k8s_secret_cred_manager') @@ -20,7 +16,7 @@ class K8sSecretCredManager(BaseCredManager): def __init__(self, **kwarg): BaseCredManager.__init__(self, **kwarg) # make logger - mainLog = self.make_logger(_logger, method_name='__init__') + tmp_log = self.make_logger(_logger, method_name='__init__') # attributes if hasattr(self, 'inFile') or hasattr(self, 'inCertFile'): # set up with json in inFile @@ -33,8 +29,7 @@ def __init__(self, **kwarg): with open(self.inFile) as f: self.setupMap = json.load(f) except Exception as e: - mainLog.error('Error with inFile/inCertFile . {0}: {1}'.format( - e.__class__.__name__, e)) + tmp_log.error('Error with inFile/inCertFile . {0}: {1}'.format(e.__class__.__name__, e)) self.setupMap = {} raise else: @@ -42,40 +37,43 @@ def __init__(self, **kwarg): self.setupMap = dict(vars(self)) # validate setupMap try: - self.k8s_namespace = self.setupMap['k8s_namespace'] self.k8s_config_file = self.setupMap['k8s_config_file'] self.proxy_files = self.setupMap['proxy_files'] self.secret_name = self.setupMap.get('secret_name', 'proxy-secret') except KeyError as e: - mainLog.error('Missing attributes in setup . {0}: {1}'.format( - e.__class__.__name__, e)) + tmp_log.error('Missing attributes in setup. {0}: {1}'.format(e.__class__.__name__, e)) raise - # k8s client + try: - self.k8s_client = k8s_Client(namespace=self.k8s_namespace, config_file=self.k8s_config_file) + # retrieve the k8s namespace from CRIC + self.panda_queues_dict = PandaQueuesDict() + self.namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName) + # k8s client + self.k8s_client = k8s_Client(namespace=self.namespace, queue_name=self.queueName, + config_file=self.k8s_config_file) except Exception as e: - mainLog.error('Problem instantiating k8s client for {0}'.format(self.k8s_config_file)) + tmp_log.error('Problem instantiating k8s client for {0}. {1}'.format(self.k8s_config_file, + traceback.format_exc())) raise # check proxy def check_credential(self): # make logger - mainLog = self.make_logger(_logger, method_name='check_credential') # same update period as credmanager agent return False # renew proxy def renew_credential(self): # make logger - mainLog = self.make_logger(_logger, method_name='renew_credential') + tmp_log = self.make_logger(_logger, 'queueName={0}'.format(self.queueName), method_name='renew_credential') # go try: rsp = self.k8s_client.create_or_patch_secret( file_list=self.proxy_files, secret_name=self.secret_name) - mainLog.debug('done') + tmp_log.debug('done') except KeyError as e: errStr = 'Error when renew proxy secret . {0}: {1}'.format( e.__class__.__name__, e) - return (False, errStr) + return False, errStr else: - return (True, '') + return True, '' diff --git a/pandaharvester/harvestercredmanager/no_voms_cred_manager.py b/pandaharvester/harvestercredmanager/no_voms_cred_manager.py index cd22abb2..648e1782 100644 --- a/pandaharvester/harvestercredmanager/no_voms_cred_manager.py +++ b/pandaharvester/harvestercredmanager/no_voms_cred_manager.py @@ -23,6 +23,8 @@ def __init__(self, **kwarg): self.genFromKeyCert = self.setupMap.get('genFromKeyCert') self.key = self.setupMap.get('key') self.cert = self.setupMap.get('cert') + self.checkPeriod = self.setupMap.get('checkPeriod', 1) + self.lifetime = self.setupMap.get('lifetime', 96) # check proxy lifetime for monitoring/alerting purposes def check_credential_lifetime(self): @@ -48,7 +50,9 @@ def check_credential_lifetime(self): def check_credential(self): # make logger main_log = self.make_logger(_logger, method_name='check_credential') - comStr = "voms-proxy-info -exists -hours 72 -file {0}".format(self.outCertFile) + # lifetime threshold to trigger renew in hour + threshold = max(self.lifetime - self.checkPeriod, 0) + comStr = "voms-proxy-info -exists -hours {0} -file {1}".format(threshold, self.outCertFile) main_log.debug(comStr) try: p = subprocess.Popen(comStr.split(), @@ -81,10 +85,11 @@ def renew_credential(self): usercert_value = self.inCertFile userkey_value = self.inCertFile # command - comStr = "voms-proxy-init -rfc {noregen_option} {voms_option} -out {out} -valid 96:00 -cert={cert} -key={key}".format( + comStr = "voms-proxy-init -rfc {noregen_option} {voms_option} -out {out} -valid {lifetime}:00 -cert={cert} -key={key}".format( noregen_option=noregen_option, voms_option=voms_option, out=self.outCertFile, + lifetime=self.lifetime, cert=usercert_value, key=userkey_value) main_log.debug(comStr) diff --git a/pandaharvester/harvestercredmanager/proxy_cache_cred_manager.py b/pandaharvester/harvestercredmanager/proxy_cache_cred_manager.py index 805bd2fc..16fb868d 100644 --- a/pandaharvester/harvestercredmanager/proxy_cache_cred_manager.py +++ b/pandaharvester/harvestercredmanager/proxy_cache_cred_manager.py @@ -1,9 +1,9 @@ try: import subprocess32 as subprocess -except: +except Exception: import subprocess -from pandaharvester.harvestercore.plugin_base import PluginBase +from .base_cred_manager import BaseCredManager from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.communicator_pool import CommunicatorPool @@ -12,10 +12,10 @@ # credential manager with proxy cache -class ProxyCacheCredManager(PluginBase): +class ProxyCacheCredManager(BaseCredManager): # constructor def __init__(self, **kwarg): - PluginBase.__init__(self, **kwarg) + BaseCredManager.__init__(self, **kwarg) # check proxy def check_credential(self): @@ -30,7 +30,7 @@ def check_credential(self): stderr=subprocess.PIPE) stdOut, stdErr = p.communicate() retCode = p.returncode - except: + except Exception: core_utils.dump_error_message(mainLog) return False mainLog.debug('retCode={0} stdOut={1} stdErr={2}'.format(retCode, stdOut, stdErr)) diff --git a/pandaharvester/harvesterfifo/mysql_fifo.py b/pandaharvester/harvesterfifo/mysql_fifo.py index e53976c4..f763059e 100644 --- a/pandaharvester/harvesterfifo/mysql_fifo.py +++ b/pandaharvester/harvesterfifo/mysql_fifo.py @@ -1,9 +1,8 @@ -import os import time -import re import functools import warnings +from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.plugin_base import PluginBase from pandaharvester.harvesterconfig import harvester_config diff --git a/pandaharvester/harvestermessenger/act_messenger.py b/pandaharvester/harvestermessenger/act_messenger.py index 04137957..0d964fe9 100644 --- a/pandaharvester/harvestermessenger/act_messenger.py +++ b/pandaharvester/harvestermessenger/act_messenger.py @@ -9,9 +9,6 @@ from act.atlas.aCTDBPanda import aCTDBPanda -# json for job report -jsonJobReport = harvester_config.payload_interaction.jobReportFile - # json for outputs jsonOutputsFileName = harvester_config.payload_interaction.eventStatusDumpJsonFile @@ -43,34 +40,7 @@ def get_access_point(self, workspec, panda_id): return accessPoint def post_processing(self, workspec, jobspec_list, map_type): - ''' - Take the jobReport placed by aCT in the access point and fill metadata - attributes of the workspec. - ''' - - # get logger - tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), - method_name='post_processing') - if not workspec.workAttributes: - workspec.workAttributes = {} - - for pandaID in workspec.pandaid_list: - workspec.workAttributes[pandaID] = {} - # look for job report - accessPoint = self.get_access_point(workspec, pandaID) - jsonFilePath = os.path.join(accessPoint, jsonJobReport) - tmpLog.debug('looking for job report file {0}'.format(jsonFilePath)) - if not os.path.exists(jsonFilePath): - # not found - tmpLog.debug('not found') - else: - try: - with open(jsonFilePath) as jsonFile: - workspec.workAttributes[pandaID] = json.load(jsonFile) - tmpLog.debug('got {0} kB of job report'.format(os.stat(jsonFilePath).st_size / 1024)) - except: - tmpLog.debug('failed to load {0}'.format(jsonFilePath)) - tmpLog.debug("pilot info for {0}: {1}".format(pandaID, workspec.workAttributes[pandaID])) + '''Now done in stager''' return True def get_work_attributes(self, workspec): diff --git a/pandaharvester/harvestermessenger/k8s_messenger.py b/pandaharvester/harvestermessenger/k8s_messenger.py index 232f756d..5b57810c 100644 --- a/pandaharvester/harvestermessenger/k8s_messenger.py +++ b/pandaharvester/harvestermessenger/k8s_messenger.py @@ -2,11 +2,8 @@ from pandaharvester.harvestercore import core_utils from .base_messenger import BaseMessenger -from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestermisc.k8s_utils import k8s_Client -# from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper -# from pandaharvester.harvestercore.work_spec import WorkSpec - +from pandaharvester.harvestermisc.info_utils import PandaQueuesDict # logger _logger = core_utils.setup_logger('k8s_messenger') @@ -22,7 +19,12 @@ def __init__(self, **kwargs): except AttributeError: print('K8sMessenger: Missing attribute logDir') raise - self.k8s_client = k8s_Client(namespace=self.k8s_namespace, config_file=self.k8s_config_file) + + # retrieve the k8s namespace from CRIC + self.panda_queues_dict = PandaQueuesDict() + namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName) + + self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file) self._all_pods_list = self.k8s_client.get_pods_info() def post_processing(self, workspec, jobspec_list, map_type): @@ -32,14 +34,20 @@ def post_processing(self, workspec, jobspec_list, map_type): - Store or upload logs """ # get logger - tmpLog = core_utils.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='post_processing') - tmpLog.debug('start') + tmp_log = core_utils.make_logger(_logger, 'queueName={0} workerID={1}'.format(self.queueName, workspec.workerID), + method_name='post_processing') + tmp_log.debug('start') + + if self._all_pods_list is None: + tmp_log.error('No pod information') + tmp_log.debug('done') + return None + try: # fetch and store logs job_id = workspec.batchID pods_list = self.k8s_client.filter_pods_info(self._all_pods_list, job_name=job_id) - pod_name_list = [ pods_info['name'] for pods_info in pods_list ] + pod_name_list = [pods_info['name'] for pods_info in pods_list] outlog_filename = os.path.join(self.logDir, 'gridK8S.{0}.{1}.out'.format(workspec.workerID, workspec.batchID)) with open(outlog_filename, 'w') as f: for pod_name in pod_name_list: @@ -48,8 +56,8 @@ def post_processing(self, workspec, jobspec_list, map_type): # upload logs pass # return - tmpLog.debug('done') + tmp_log.debug('done') return True except Exception: - core_utils.dump_error_message(tmpLog) + core_utils.dump_error_message(tmp_log) return None diff --git a/pandaharvester/harvestermessenger/shared_file_messenger.py b/pandaharvester/harvestermessenger/shared_file_messenger.py index 164276b9..73a19a3f 100644 --- a/pandaharvester/harvestermessenger/shared_file_messenger.py +++ b/pandaharvester/harvestermessenger/shared_file_messenger.py @@ -1,7 +1,10 @@ import json import os +import copy import shutil +import tarfile import datetime +import itertools try: from urllib.parse import urlencode @@ -99,23 +102,31 @@ def filter_log_tgz(extra=None): # tar a single directory -def tar_directory(dir_name, tar_name=None, max_depth=None, extra_files=None): +def tar_directory(dir_name, tar_name=None, max_depth=None, extra_files=None, sub_tarball_name=None): if tar_name is None: tarFilePath = os.path.join(os.path.dirname(dir_name), '{0}.subdir.tar.gz'.format(os.path.basename(dir_name))) else: tarFilePath = tar_name - com = 'cd {0}; '.format(dir_name) - com += 'find . ' - if max_depth is not None: - com += '-maxdepth {0} '.format(max_depth) - com += r'-type f \( ' + filter_log_tgz(extra_files) + r'\) -print0 ' - com += '| ' - com += 'tar ' - if distutils.spawn.find_executable('pigz') is None: - com += '-z ' - else: - com += '-I pigz ' - com += '-c -f {0} --null -T -'.format(tarFilePath) + # check if sub-tarball already exists + com = None + if sub_tarball_name is not None: + subTarballPath = os.path.join(dir_name, sub_tarball_name) + if os.path.exists(subTarballPath): + com = 'mv {} {}'.format(subTarballPath, tarFilePath) + # make sub-tarball + if com is None: + com = 'cd {0}; '.format(dir_name) + com += 'find . ' + if max_depth is not None: + com += '-maxdepth {0} '.format(max_depth) + com += r'-type f \( ' + filter_log_tgz(extra_files) + r'\) -print0 ' + com += '| ' + com += 'tar ' + if distutils.spawn.find_executable('pigz') is None: + com += '-z ' + else: + com += '-I pigz ' + com += '-c -f {0} --null -T -'.format(tarFilePath) p = subprocess.Popen(com, shell=True, stdout=subprocess.PIPE, @@ -126,12 +137,22 @@ def tar_directory(dir_name, tar_name=None, max_depth=None, extra_files=None): # scan files in a directory -def scan_files_in_dir(dir_name, patterns=None): +def scan_files_in_dir(dir_name, patterns=None, zip_patterns=None): fileList = [] for root, dirs, filenames in walk(dir_name): for filename in filenames: + # check if zipped + is_zipped = False + if zip_patterns: + matched = False + for pattern in zip_patterns: + if re.search(pattern, filename) is not None: + matched = True + break + if matched: + is_zipped = True # check filename - if patterns is not None: + if not is_zipped and patterns: matched = False for pattern in patterns: if re.search(pattern, filename) is not None: @@ -142,15 +163,26 @@ def scan_files_in_dir(dir_name, patterns=None): # make dict tmpFileDict = dict() pfn = os.path.join(root, filename) - lfn = os.path.basename(pfn) tmpFileDict['path'] = pfn tmpFileDict['fsize'] = os.stat(pfn).st_size - tmpFileDict['type'] = 'es_output' tmpFileDict['guid'] = str(uuid.uuid4()) tmpFileDict['chksum'] = core_utils.calc_adler32(pfn) - tmpFileDict['eventRangeID'] = lfn.split('.')[-1] tmpFileDict['eventStatus'] = "finished" - fileList.append(tmpFileDict) + if is_zipped: + lfns = [] + # extract actual event filenames from zip + with tarfile.open(pfn) as f: + for tar_info in f.getmembers(): + lfns.append(os.path.basename(tar_info.name)) + tmpFileDict['type'] = 'zip_output' + else: + lfns = [os.path.basename(pfn)] + tmpFileDict['type'] = 'es_output' + for lfn in lfns: + tmpDict = copy.copy(tmpFileDict) + tmpDict['eventRangeID'] = lfn.split('.')[-1] + + fileList.append(tmpDict) return fileList @@ -162,7 +194,10 @@ def __init__(self, **kwarg): self.stripJobParams = False self.scanInPostProcess = False self.leftOverPatterns = None + self.leftOverZipPatterns = None self.postProcessInSubDir = False + self.outputSubDir = None + self.subTarballName = None BaseMessenger.__init__(self, **kwarg) # get access point @@ -545,14 +580,14 @@ def acknowledge_events_files(self, workspec): try: jsonFilePath = os.path.join(accessPoint, jsonEventsUpdateFileName) jsonFilePath += suffixReadJson - jsonFilePath_rename = jsonFilePath + '.' + str(datetime.datetime.utcnow()) + jsonFilePath_rename = jsonFilePath + '.' + datetime.datetime.now(tz=datetime.timezone.utc).strftime('%Y-%m-%d_%H_%M_%S.%f') os.rename(jsonFilePath, jsonFilePath_rename) except Exception: pass try: jsonFilePath = os.path.join(accessPoint, jsonOutputsFileName) jsonFilePath += suffixReadJson - jsonFilePath_rename = jsonFilePath + '.' + str(datetime.datetime.utcnow()) + jsonFilePath_rename = jsonFilePath + '.' + datetime.datetime.now(tz=datetime.timezone.utc).strftime('%Y-%m-%d_%H_%M_%S.%f') os.rename(jsonFilePath, jsonFilePath_rename) except Exception: pass @@ -608,7 +643,7 @@ def post_processing(self, workspec, jobspec_list, map_type): accessPoint = self.get_access_point(workspec, jobSpec.PandaID) origAccessPoint = accessPoint if self.postProcessInSubDir: - accessPoint = os.path.join(accessPoint, jobSpec.PandaID) + accessPoint = os.path.join(accessPoint, str(jobSpec.PandaID)) # make log if not hasLog: logFileInfo = jobSpec.get_logfile_info() @@ -623,7 +658,8 @@ def post_processing(self, workspec, jobspec_list, map_type): # tar sub dirs tmpLog.debug('tar for {0} sub dirs'.format(len(dirs))) with Pool(max_workers=multiprocessing.cpu_count()) as pool: - retValList = pool.map(tar_directory, dirs) + retValList = pool.map(lambda x, y: tar_directory(x, sub_tarball_name=y), + dirs, itertools.repeat(self.subTarballName)) for dirName, (comStr, retCode, stdOut, stdErr) in zip(dirs, retValList): if retCode != 0: tmpLog.warning('failed to sub-tar {0} with {1} -> {2}:{3}'.format( @@ -646,22 +682,21 @@ def post_processing(self, workspec, jobspec_list, map_type): # set the directory paths to scan for left over files dirs = [] if self.outputSubDir is None: - #tmpLog.debug('self.outputSubDir not set dirs- {0}'.format(dirs)) dirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))] else: # loop over directories first level from accessPoint and then add subdirectory name. - upperdirs=[] upperdirs = [os.path.join(accessPoint, name) for name in os.listdir(accessPoint) if os.path.isdir(os.path.join(accessPoint, name))] dirs = [os.path.join(dirname, self.outputSubDir) for dirname in upperdirs if os.path.isdir(os.path.join(dirname, self.outputSubDir))] - #tmpLog.debug('self.outputSubDir = {0} upperdirs - {1} dirs -{2}'.format(self.outputSubDir,upperdirs,dirs)) - if self.leftOverPatterns is None: - patterns = None - else: - patterns = [] - for scanPat in self.leftOverPatterns: + patterns = [] + patterns_zip = [] + for tmp_patterns, tmp_left_over_patterns in \ + [[patterns, self.leftOverPatterns], [patterns_zip, self.leftOverZipPatterns]]: + if tmp_left_over_patterns is None: + continue + for scanPat in tmp_left_over_patterns: # replace placeholders if '%PANDAID' in scanPat: scanPat = scanPat.replace('%PANDAID', str(jobSpec.PandaID)) @@ -672,13 +707,14 @@ def post_processing(self, workspec, jobspec_list, map_type): for outputName in jobSpec.get_output_file_attributes().keys(): if outputName == logFileName: continue - patterns.append(scanPat.replace('%OUTPUT_FILE', outputName)) + tmp_patterns.append(scanPat.replace('%OUTPUT_FILE', outputName)) else: - patterns.append(scanPat) + tmp_patterns.append(scanPat) # scan files nLeftOvers = 0 with Pool(max_workers=multiprocessing.cpu_count()) as pool: - retValList = pool.map(scan_files_in_dir, dirs, [patterns] * len(dirs)) + retValList = pool.map(scan_files_in_dir, dirs, [patterns] * len(dirs), + [patterns_zip] * len(dirs)) for retVal in retValList: fileDict.setdefault(jobSpec.PandaID, []) fileDict[jobSpec.PandaID] += retVal diff --git a/pandaharvester/harvestermisc/info_utils.py b/pandaharvester/harvestermisc/info_utils.py index 10dc0803..f92760b7 100644 --- a/pandaharvester/harvestermisc/info_utils.py +++ b/pandaharvester/harvestermisc/info_utils.py @@ -1,35 +1,61 @@ +import time +import threading from future.utils import iteritems +import six + from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore.plugin_base import PluginBase +from pandaharvester.harvestercore.core_utils import SingletonWithID from pandaharvester.harvestercore.db_interface import DBInterface + harvesterID = harvester_config.master.harvester_id +resolver_config = getattr(harvester_config.qconf, 'resolverConfig', {}) + -class PandaQueuesDict(dict, PluginBase): +class PandaQueuesDict(six.with_metaclass(SingletonWithID, dict, PluginBase)): """ Dictionary of PanDA queue info from DB by cacher Key is PanDA Resource name (rather than PanDA Queue name) Able to query with either PanDA Queue name or PanDA Resource name """ - def __init__(self, **kwarg): + def __init__(self, **kwargs): dict.__init__(self) - PluginBase.__init__(self, **kwarg) - dbInterface = DBInterface() - cacher_key = kwarg.get('cacher_key', 'panda_queues.json') - panda_queues_cache = dbInterface.get_cache(cacher_key) - if panda_queues_cache and isinstance(panda_queues_cache.data, dict): - panda_queues_dict = panda_queues_cache.data - for (k, v) in iteritems(panda_queues_dict): - try: - panda_resource = v['panda_resource'] - assert k == v['nickname'] - except Exception: - pass - else: - self[panda_resource] = v + PluginBase.__init__(self, **kwargs) + self.lock = threading.Lock() + self.dbInterface = DBInterface() + self.cacher_key = kwargs.get('cacher_key', 'panda_queues.json') + self.refresh_period = resolver_config.get('refreshPeriod', 300) + self.last_refresh_ts = 0 + self._refresh() + + def _is_fresh(self): + now_ts = time.time() + if self.last_refresh_ts + self.refresh_period > now_ts: + return True + return False + + def _refresh(self): + with self.lock: + if self._is_fresh(): + return + panda_queues_cache = self.dbInterface.get_cache(self.cacher_key) + self.last_refresh_ts = time.time() + if panda_queues_cache and isinstance(panda_queues_cache.data, dict): + panda_queues_dict = panda_queues_cache.data + for (k, v) in iteritems(panda_queues_dict): + try: + panda_resource = v['panda_resource'] + assert k == v['nickname'] + except Exception: + pass + else: + self[panda_resource] = v def __getitem__(self, panda_resource): + if not self._is_fresh(): + self._refresh() if panda_resource in self: return dict.__getitem__(self, panda_resource) else: @@ -37,6 +63,8 @@ def __getitem__(self, panda_resource): return dict.__getitem__(self, panda_queue) def get(self, panda_resource, default=None): + if not self._is_fresh(): + self._refresh() if panda_resource in self: return dict.get(self, panda_resource, default) else: @@ -48,7 +76,7 @@ def get_panda_queue_name(self, panda_resource): Return PanDA Queue name with specified PanDA Resource name """ try: - panda_queue = self.get(panda_resource).get('nickname') + panda_queue = self.get(panda_resource).get('nickname') return panda_queue except Exception: return None @@ -125,10 +153,10 @@ def get_type_workflow(self, panda_resource): workflow = panda_queue_dict.get('workflow') return pq_type, workflow - def get_prorated_maxwdir_GB(self, panda_resource, worker_corecount): + def get_prorated_maxwdir_GiB(self, panda_resource, worker_corecount): try: panda_queue_dict = self.get(panda_resource) - maxwdir = panda_queue_dict.get('maxwdir') / 1000 # convert to GB + maxwdir = panda_queue_dict.get('maxwdir') / 1024 # convert to GiB corecount = panda_queue_dict.get('corecount') if panda_queue_dict.get('capability') == 'ucore': maxwdir_prorated = maxwdir * worker_corecount / corecount @@ -139,3 +167,145 @@ def get_prorated_maxwdir_GB(self, panda_resource, worker_corecount): return maxwdir_prorated + def get_k8s_scheduler_settings(self, panda_resource): + # this is how the affinity settings are declared in CRIC + key_affinity = 'k8s.scheduler.use_score_affinity' + key_anti_affinity = 'k8s.scheduler.use_score_mcore_anti_affinity' + + params = self.get_harvester_params(panda_resource) + ret_map = {} + + try: + ret_map['use_affinity'] = params[key_affinity] + except KeyError: + # return default value + ret_map['use_affinity'] = True + + try: + ret_map['use_anti_affinity'] = params[key_anti_affinity] + except KeyError: + # return default value + ret_map['use_anti_affinity'] = True + + # this is how the affinity settings are declared in CRIC + key_priority_class_score = 'k8s.scheduler.priorityClassName.score' + key_priority_class_score_himem = 'k8s.scheduler.priorityClassName.score_himem' + key_priority_class_mcore = 'k8s.scheduler.priorityClassName.mcore' + key_priority_class_mcore_himem = 'k8s.scheduler.priorityClassName.mcore_himem' + + try: + ret_map['priority_class_score'] = params[key_priority_class_score] + except KeyError: + # return default value + ret_map['priority_class_score'] = None + + try: + ret_map['priority_class_score_himem'] = params[key_priority_class_score_himem] + except KeyError: + # return default value + ret_map['priority_class_score_himem'] = None + + try: + ret_map['priority_class_mcore'] = params[key_priority_class_mcore] + except KeyError: + # return default value + ret_map['priority_class_mcore'] = None + + try: + ret_map['priority_class_mcore_himem'] = params[key_priority_class_mcore_himem] + except KeyError: + # return default value + ret_map['priority_class_mcore_himem'] = None + + return ret_map + + def get_k8s_resource_settings(self, panda_resource): + params = self.get_harvester_params(panda_resource) + ret_map = {} + + # this is how the CPU parameters are declared in CRIC + key_cpu_scheduling_ratio = 'k8s.resources.requests.cpu_scheduling_ratio' + + try: + cpu_scheduling_ratio = params[key_cpu_scheduling_ratio] + except KeyError: + # return default value + cpu_scheduling_ratio = 90 + ret_map['cpu_scheduling_ratio'] = cpu_scheduling_ratio + + # this is how the memory parameters are declared in CRIC + key_memory_limit = 'k8s.resources.limits.use_memory_limit' + key_memory_limit_safety_factor = 'k8s.resources.limits.memory_limit_safety_factor' + key_memory_limit_min_offset = 'k8s.resources.limits.memory_limit_min_offset' + + try: + use_memory_limit = params[key_memory_limit] + except KeyError: + # return default value + use_memory_limit = False + ret_map['use_memory_limit'] = use_memory_limit + + try: + memory_limit_safety_factor = params[key_memory_limit_safety_factor] + except KeyError: + # return default value + memory_limit_safety_factor = 100 + ret_map['memory_limit_safety_factor'] = memory_limit_safety_factor + + try: + memory_limit_min_offset = params[key_memory_limit_min_offset] # in MiB to be consistent with minRamCount + except KeyError: + # return default value + memory_limit_min_offset = 0 + ret_map['memory_limit_min_offset'] = memory_limit_min_offset + + # this is how the ephemeral storage parameters are declared in CRIC + key_ephemeral_storage = 'k8s.resources.use_ephemeral_storage_resource_specs' + key_ephemeral_storage_resources_offset = 'k8s.resources.ephemeral_storage_offset' + key_ephemeral_storage_limit_safety_factor = 'k8s.resources.limits.ephemeral_storage_limit_safety_factor' + + try: + use_ephemeral_storage = params[key_ephemeral_storage] + except KeyError: + # return default value + use_ephemeral_storage = False + ret_map['use_ephemeral_storage'] = use_ephemeral_storage + + try: + ephemeral_storage_limit_safety_factor = params[key_ephemeral_storage_limit_safety_factor] + except KeyError: + # return default value + ephemeral_storage_limit_safety_factor = 100 + ret_map['ephemeral_storage_limit_safety_factor'] = ephemeral_storage_limit_safety_factor + + try: + ephemeral_storage_offset = params[key_ephemeral_storage_resources_offset] + except KeyError: + # return default value + ephemeral_storage_offset = 0 # should come in MiB + ret_map['ephemeral_storage_offset'] = ephemeral_storage_offset + + return ret_map + + def get_k8s_namespace(self, panda_resource): + default_namespace = 'default' + + # 1. check if there is an associated CE and use the queue name as namespace + panda_queue_dict = self.get(panda_resource, {}) + try: + namespace = panda_queue_dict['queues'][0]['ce_queue_name'] + return namespace + except (KeyError, TypeError, IndexError, ValueError): + pass + + # 2. alternatively, check if namespace defined in the associated parameter section + key_namespace = 'k8s.namespace' + params = self.get_harvester_params(panda_resource) + + try: + namespace = params[key_namespace] + except KeyError: + # return default value + namespace = default_namespace + + return namespace diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 2213e81a..744d031e 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -17,29 +17,35 @@ base_logger = core_utils.setup_logger('k8s_utils') CONFIG_DIR = '/scratch/jobconfig' - +EXEC_DIR = '/scratch/executables' +GiB_TO_GB = 2 ** 30 / 10.0 ** 9 class k8s_Client(object): - def __init__(self, namespace, config_file=None): + def __init__(self, namespace, config_file=None, queue_name=None): if not os.path.isfile(config_file): raise RuntimeError('Cannot find k8s config file: {0}'.format(config_file)) config.load_kube_config(config_file=config_file) - self.namespace = namespace if namespace else 'default' self.corev1 = client.CoreV1Api() self.batchv1 = client.BatchV1Api() self.deletev1 = client.V1DeleteOptions(propagation_policy='Background') + self.panda_queues_dict = PandaQueuesDict() + self.namespace = namespace + self.queue_name = queue_name + def read_yaml_file(self, yaml_file): with open(yaml_file) as f: yaml_content = yaml.load(f, Loader=yaml.FullLoader) return yaml_content - def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, container_image, executable, args, - cert, cert_in_secret=True, cpu_adjust_ratio=100, memory_adjust_ratio=100, max_time=None): + def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, + pilot_python_option, container_image, executable, args, + cert, max_time=None): - tmp_log = core_utils.make_logger(base_logger, method_name='create_job_from_yaml') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='create_job_from_yaml') # consider PULL mode as default, unless specified submit_mode = 'PULL' @@ -54,8 +60,7 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, conta return res, 'Failed to create a configmap' # retrieve panda queue information - panda_queues_dict = PandaQueuesDict() - queue_name = panda_queues_dict.get_panda_queue_name(work_spec.computingSite) + queue_name = self.panda_queues_dict.get_panda_queue_name(work_spec.computingSite) # set the worker name yaml_content['metadata']['name'] = yaml_content['metadata']['name'] + "-" + str(work_spec.workerID) @@ -89,27 +94,60 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, conta # Be familiar with QoS classes: https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod # The CPU & memory settings will affect the QoS for the pod container_env.setdefault('resources', {}) - if work_spec.nCore > 0: + resource_settings = self.panda_queues_dict.get_k8s_resource_settings(work_spec.computingSite) + # CPU resources + cpu_scheduling_ratio = resource_settings['cpu_scheduling_ratio'] + if work_spec.nCore > 0: + # CPU requests + container_env['resources'].setdefault('requests', {}) + if 'cpu' not in container_env['resources']['requests']: + container_env['resources']['requests']['cpu'] = str(work_spec.nCore * cpu_scheduling_ratio / 100.0) # CPU limits container_env['resources'].setdefault('limits', {}) if 'cpu' not in container_env['resources']['limits']: container_env['resources']['limits']['cpu'] = str(work_spec.nCore) - # CPU requests - container_env['resources'].setdefault('requests', {}) - if 'cpu' not in container_env['resources']['requests']: - container_env['resources']['requests']['cpu'] = str(work_spec.nCore * cpu_adjust_ratio / 100.0) + + # Memory resources + use_memory_limit = resource_settings['use_memory_limit'] + memory_limit_safety_factor = resource_settings['memory_limit_safety_factor'] + memory_limit_min_offset = resource_settings['memory_limit_min_offset'] if work_spec.minRamCount > 4: # K8S minimum memory limit = 4 MB - # memory limits - # container_env['resources'].setdefault('limits', {}) - # if 'memory' not in container_env['resources']['limits']: - # container_env['resources']['limits']['memory'] = str(work_spec.minRamCount) + 'M' # memory requests container_env['resources'].setdefault('requests', {}) if 'memory' not in container_env['resources']['requests']: - container_env['resources']['requests']['memory'] = str( - work_spec.minRamCount * memory_adjust_ratio / 100.0) + 'M' + container_env['resources']['requests']['memory'] = str(work_spec.minRamCount) + 'Mi' + # memory limits: kubernetes is very aggressive killing jobs due to memory, hence making this field optional + # and adding configuration possibilities to add a safety factor + if use_memory_limit: + container_env['resources'].setdefault('limits', {}) + if 'memory' not in container_env['resources']['limits']: + mem_limit = max(work_spec.minRamCount + memory_limit_min_offset, + work_spec.minRamCount * memory_limit_safety_factor / 100.0) + container_env['resources']['limits']['memory'] = str(mem_limit) + 'Mi' + + # Ephemeral storage resources + use_ephemeral_storage = resource_settings['use_ephemeral_storage'] + ephemeral_storage_offset_GiB = resource_settings['ephemeral_storage_offset'] / 1024 + ephemeral_storage_limit_safety_factor = resource_settings['ephemeral_storage_limit_safety_factor'] + + if use_ephemeral_storage: + maxwdir_prorated_GiB = self.panda_queues_dict.get_prorated_maxwdir_GiB(work_spec.computingSite, + work_spec.nCore) + # ephemeral storage requests + container_env['resources'].setdefault('requests', {}) + if 'ephemeral-storage' not in container_env['resources']['requests']: + eph_storage_request_GiB = maxwdir_prorated_GiB + ephemeral_storage_offset_GiB + eph_storage_request_MiB = round(eph_storage_request_GiB * 1024, 2) + container_env['resources']['requests']['ephemeral-storage'] = str(eph_storage_request_MiB) + 'Mi' + # ephemeral storage limits + container_env['resources'].setdefault('limits', {}) + if 'ephemeral-storage' not in container_env['resources']['limits']: + eph_storage_limit_GiB = (maxwdir_prorated_GiB + ephemeral_storage_offset_GiB) \ + * ephemeral_storage_limit_safety_factor / 100.0 + eph_storage_limit_MiB = round(eph_storage_limit_GiB * 1024, 2) + container_env['resources']['limits']['ephemeral-storage'] = str(eph_storage_limit_MiB) + 'Mi' container_env.setdefault('env', []) # try to retrieve the stdout log file name @@ -124,9 +162,11 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, conta {'name': 'pandaQueueName', 'value': queue_name}, {'name': 'resourceType', 'value': work_spec.resourceType}, {'name': 'prodSourceLabel', 'value': prod_source_label}, + {'name': 'pilotType', 'value': pilot_type}, + {'name': 'pilotUrlOpt', 'value': pilot_url_str}, + {'name': 'pythonOption', 'value': pilot_python_option}, {'name': 'jobType', 'value': work_spec.jobType}, - {'name': 'proxySecretPath', 'value': cert if cert_in_secret else None}, - {'name': 'proxyContent', 'value': None if cert_in_secret else self.set_proxy(cert)}, + {'name': 'proxySecretPath', 'value': cert}, {'name': 'workerID', 'value': str(work_spec.workerID)}, {'name': 'logs_frontend_w', 'value': harvester_config.pandacon.pandaCacheURL_W}, {'name': 'logs_frontend_r', 'value': harvester_config.pandacon.pandaCacheURL_R}, @@ -134,9 +174,18 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, conta {'name': 'PANDA_JSID', 'value': 'harvester-' + harvester_config.master.harvester_id}, {'name': 'HARVESTER_WORKER_ID', 'value': str(work_spec.workerID)}, {'name': 'HARVESTER_ID', 'value': harvester_config.master.harvester_id}, - {'name': 'submit_mode', 'value': submit_mode} + {'name': 'submit_mode', 'value': submit_mode}, + {'name': 'EXEC_DIR', 'value': EXEC_DIR}, ]) + # add the pilots starter configmap + yaml_content['spec']['template']['spec'].setdefault('volumes', []) + yaml_volumes = yaml_content['spec']['template']['spec']['volumes'] + yaml_volumes.append({'name': 'pilots-starter', 'configMap': {'name': 'pilots-starter'}}) + # mount the volume to the filesystem + container_env.setdefault('volumeMounts', []) + container_env['volumeMounts'].append({'name': 'pilots-starter', 'mountPath': EXEC_DIR}) + # in push mode, add the configmap as a volume to the pod if submit_mode == 'PUSH' and worker_id: yaml_content['spec']['template']['spec'].setdefault('volumes', []) @@ -146,19 +195,22 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, conta container_env.setdefault('volumeMounts', []) container_env['volumeMounts'].append({'name': 'job-config', 'mountPath': CONFIG_DIR}) - # if we are running the pilot in a emptyDir with "pilot-dir" name, then set the max size - if 'volumes' in yaml_content['spec']['template']['spec']: - yaml_volumes = yaml_content['spec']['template']['spec']['volumes'] - for volume in yaml_volumes: - # do not overwrite any hardcoded sizeLimit value - if volume['name'] == 'pilot-dir' and 'emptyDir' in volume and 'sizeLimit' not in volume['emptyDir']: - maxwdir_prorated_GB = panda_queues_dict.get_prorated_maxwdir_GB(work_spec.computingSite, work_spec.nCore) - if maxwdir_prorated_GB: - volume['emptyDir']['sizeLimit'] = '{0}G'.format(maxwdir_prorated_GB) - # set the affinity - if 'affinity' not in yaml_content['spec']['template']['spec']: - yaml_content = self.set_affinity(yaml_content) + scheduling_settings = self.panda_queues_dict.get_k8s_scheduler_settings(work_spec.computingSite) + + use_affinity = scheduling_settings['use_affinity'] + use_anti_affinity = scheduling_settings['use_anti_affinity'] + if (use_affinity or use_anti_affinity) and 'affinity' not in yaml_content['spec']['template']['spec']: + yaml_content = self.set_affinity(yaml_content, use_affinity, use_anti_affinity) + + # set the priority classes + priority_class_key = 'priority_class_{0}'.format(work_spec.resourceType.lower()) + try: + priority_class = scheduling_settings[priority_class_key] + except KeyError: + priority_class = None + if priority_class and 'priorityClassName' not in yaml_content['spec']['template']['spec']: + yaml_content['spec']['template']['spec']['priorityClassName'] = priority_class # set max_time to avoid having a pod running forever if 'activeDeadlineSeconds' not in yaml_content['spec']['template']['spec']: @@ -183,7 +235,8 @@ def generate_ls_from_wsl(self, workspec_list=[]): def get_pods_info(self, workspec_list=[]): - tmp_log = core_utils.make_logger(base_logger, method_name='get_pods_info') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='get_pods_info') pods_list = list() label_selector = self.generate_ls_from_wsl(workspec_list) @@ -193,21 +246,22 @@ def get_pods_info(self, workspec_list=[]): ret = self.corev1.list_namespaced_pod(namespace=self.namespace, label_selector=label_selector) except Exception as _e: tmp_log.error('Failed call to list_namespaced_pod with: {0}'.format(_e)) - else: - for i in ret.items: - pod_info = { - 'name': i.metadata.name, - 'start_time': i.status.start_time.replace(tzinfo=None) if i.status.start_time else i.status.start_time, - 'status': i.status.phase, - 'status_conditions': i.status.conditions, - 'job_name': i.metadata.labels['job-name'] if i.metadata.labels and 'job-name' in i.metadata.labels else None, - 'containers_state': [] - } - if i.status.container_statuses: - for cs in i.status.container_statuses: - if cs.state: - pod_info['containers_state'].append(cs.state) - pods_list.append(pod_info) + return None # None needs to be treated differently than [] by the caller + + for i in ret.items: + pod_info = { + 'name': i.metadata.name, + 'start_time': i.status.start_time.replace(tzinfo=None) if i.status.start_time else i.status.start_time, + 'status': i.status.phase, + 'status_conditions': i.status.conditions, + 'job_name': i.metadata.labels['job-name'] if i.metadata.labels and 'job-name' in i.metadata.labels else None, + 'containers_state': [] + } + if i.status.container_statuses: + for cs in i.status.container_statuses: + if cs.state: + pod_info['containers_state'].append(cs.state) + pods_list.append(pod_info) return pods_list @@ -218,7 +272,8 @@ def filter_pods_info(self, pods_list, job_name=None): def get_jobs_info(self, workspec_list=[]): - tmp_log = core_utils.make_logger(base_logger, method_name='get_jobs_info') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='get_jobs_info') jobs_list = list() @@ -242,6 +297,11 @@ def get_jobs_info(self, workspec_list=[]): return jobs_list def delete_pods(self, pod_name_list): + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='delete_pods') + + tmp_log.debug('Going to delete {0} PODs: {1}'.format(len(pod_name_list), pod_name_list)) + ret_list = list() for pod_name in pod_name_list: @@ -257,48 +317,71 @@ def delete_pods(self, pod_name_list): rsp['errMsg'] = '' ret_list.append(rsp) + tmp_log.debug('Done with: {0}'.format(ret_list)) return ret_list def delete_job(self, job_name): - tmp_log = core_utils.make_logger(base_logger, 'job_name={0}'.format(job_name), method_name='delete_job') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0} job_name={1}'.format(self.queue_name, job_name), + method_name='delete_job') + tmp_log.debug('Going to delete JOB {0}'.format(job_name)) try: self.batchv1.delete_namespaced_job(name=job_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0) + tmp_log.debug('Deleted JOB {0}'.format(job_name)) except Exception as _e: - tmp_log.error('Failed call to delete_namespaced_job with: {0}'.format(_e)) + tmp_log.error('Failed to delete JOB {0} with: {1}'.format(job_name, _e)) def delete_config_map(self, config_map_name): self.corev1.delete_namespaced_config_map(name=config_map_name, namespace=self.namespace, body=self.deletev1, grace_period_seconds=0) - def set_proxy(self, proxy_path): - with open(proxy_path) as f: - content = f.read() - content = content.replace("\n", ",") - return content + def set_affinity(self, yaml_content, use_affinity, use_anti_affinity): + + if not use_affinity and not use_anti_affinity: + # we are not supposed to use any affinity setting for this queue + return yaml_content - def set_affinity(self, yaml_content): yaml_content['spec']['template']['spec']['affinity'] = {} yaml_affinity = yaml_content['spec']['template']['spec']['affinity'] - res_element = {'SCORE', 'MCORE'} + + res_element = {'SCORE', 'SCORE', 'MCORE', 'MCORE_HIMEM'} + scores = ['SCORE', 'SCORE_HIMEM'] + mcores = ['MCORE', 'MCORE_HIMEM'] + + anti_affinity_matrix = {'SCORE': mcores, + 'SCORE_HIMEM': mcores, + 'MCORE': scores, + 'MCORE_HIMEM': scores} + affinity_spec = { 'preferredDuringSchedulingIgnoredDuringExecution': [ {'weight': 100, 'podAffinityTerm': { - 'labelSelector': {'matchExpressions': [ - {'key': 'resourceType', 'operator': 'In', 'values': ['SCORE']}]}, + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'resourceType', + 'operator': 'In', + 'values': ['SCORE', 'SCORE_HIMEM'] + } + ] + }, 'topologyKey': 'kubernetes.io/hostname'} - }]} + } + ] + } resource_type = yaml_content['spec']['template']['metadata']['labels']['resourceType'] - if resource_type == 'SCORE': + if use_affinity and resource_type in scores: + # resource type SCORE* should attract each other instead of spreading across the nodes yaml_affinity['podAffinity'] = copy.deepcopy(affinity_spec) - yaml_affinity['podAffinity']['preferredDuringSchedulingIgnoredDuringExecution'][0]['podAffinityTerm'][ - 'labelSelector']['matchExpressions'][0]['values'][0] = resource_type - yaml_affinity['podAntiAffinity'] = copy.deepcopy(affinity_spec) - yaml_affinity['podAntiAffinity']['preferredDuringSchedulingIgnoredDuringExecution'][0]['podAffinityTerm'][ - 'labelSelector']['matchExpressions'][0]['values'][0] = res_element.difference({resource_type}).pop() + if use_anti_affinity: + # SCORE* will repel MCORE* and viceversa. The main reasoning was to keep nodes for MCORE + # This setting depends on the size of the node vs the MCORE job + yaml_affinity['podAntiAffinity'] = copy.deepcopy(affinity_spec) + yaml_affinity['podAntiAffinity']['preferredDuringSchedulingIgnoredDuringExecution'][0]['podAffinityTerm'][ + 'labelSelector']['matchExpressions'][0]['values'] = anti_affinity_matrix[resource_type] return yaml_content @@ -307,7 +390,8 @@ def create_or_patch_secret(self, file_list, secret_name): # kind = 'Secret' # type='kubernetes.io/tls' rsp = None - tmp_log = core_utils.make_logger(base_logger, method_name='create_or_patch_secret') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='create_or_patch_secret') metadata = {'name': secret_name, 'namespace': self.namespace} data = {} @@ -332,7 +416,8 @@ def create_or_patch_secret(self, file_list, secret_name): def create_configmap(self, work_spec): # useful guide: https://matthewpalmer.net/kubernetes-app-developer/articles/ultimate-configmap-guide-kubernetes.html - tmp_log = core_utils.make_logger(base_logger, method_name='create_configmap') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='create_configmap') try: worker_id = str(work_spec.workerID) @@ -361,12 +446,47 @@ def create_configmap(self, work_spec): tmp_log.debug('Created configmap for worker id: {0}'.format(worker_id)) return True - except (ApiException, TypeError) as e: + except Exception as e: + tmp_log.error('Could not create configmap with: {0}'.format(e)) + return False + + def create_or_patch_configmap_starter(self): + # useful guide: https://matthewpalmer.net/kubernetes-app-developer/articles/ultimate-configmap-guide-kubernetes.html + + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='create_or_patch_configmap_starter') + + try: + fn = 'pilots_starter.py' + dirname = os.path.dirname(__file__) + pilots_starter_file = os.path.join(dirname, '../harvestercloud/{0}'.format(fn)) + with open(pilots_starter_file) as f: + pilots_starter_contents = f.read() + + data = {fn: pilots_starter_contents} + name = 'pilots-starter' + + # instantiate the configmap object + metadata = {'name': name, 'namespace': self.namespace} + config_map = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data=data, metadata=metadata) + + try: + api_response = self.corev1.patch_namespaced_config_map(name=name, body=config_map, namespace=self.namespace) + tmp_log.debug('Patched pilots-starter config_map') + except ApiException as e: + tmp_log.debug('Exception when patching pilots-starter config_map: {0} . Try to create it instead...' + .format(e)) + api_response = self.corev1.create_namespaced_config_map(namespace=self.namespace, body=config_map) + tmp_log.debug('Created pilots-starter config_map') + return True + + except Exception as e: tmp_log.error('Could not create configmap with: {0}'.format(e)) return False def get_pod_logs(self, pod_name, previous=False): - tmp_log = core_utils.make_logger(base_logger, method_name='get_pod_logs') + tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), + method_name='get_pod_logs') try: rsp = self.corev1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace, previous=previous) tmp_log.debug('Log file retrieved for {0}'.format(pod_name)) diff --git a/pandaharvester/harvestermonitor/act_monitor.py b/pandaharvester/harvestermonitor/act_monitor.py index ee43859c..a20b6431 100644 --- a/pandaharvester/harvestermonitor/act_monitor.py +++ b/pandaharvester/harvestermonitor/act_monitor.py @@ -1,13 +1,10 @@ -import os -import json - from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestercore.plugin_base import PluginBase +from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper from pandaharvester.harvestercore.worker_errors import WorkerErrors from pandaharvester.harvesterconfig import harvester_config -from act.common.aCTConfig import aCTConfigARC from act.atlas.aCTDBPanda import aCTDBPanda # json for job report @@ -31,39 +28,6 @@ def __init__(self, **kwarg): self.log.error('Could not connect to aCT database: {0}'.format(str(e))) self.actDB = None - # get access point - def get_access_point(self, workspec, panda_id): - if workspec.mapType == WorkSpec.MT_MultiJobs: - accessPoint = os.path.join(workspec.get_access_point(), str(panda_id)) - else: - accessPoint = workspec.get_access_point() - return accessPoint - - # Check for pilot errors - def check_pilot_status(self, workspec, tmpLog): - for pandaID in workspec.pandaid_list: - # look for job report - accessPoint = self.get_access_point(workspec, pandaID) - jsonFilePath = os.path.join(accessPoint, jsonJobReport) - tmpLog.debug('looking for job report file {0}'.format(jsonFilePath)) - try: - with open(jsonFilePath) as jsonFile: - jobreport = json.load(jsonFile) - except: - # Assume no job report available means true pilot or push mode - # If job report is not available in full push mode aCT would have failed the job - tmpLog.debug('no job report at {0}'.format(jsonFilePath)) - return WorkSpec.ST_finished - tmpLog.debug("pilot info for {0}: {1}".format(pandaID, jobreport)) - # Check for pilot errors - if jobreport.get('pilotErrorCode', 0): - workspec.set_pilot_error(jobreport.get('pilotErrorCode'), jobreport.get('pilotErrorDiag', '')) - return WorkSpec.ST_failed - if jobreport.get('exeErrorCode', 0): - workspec.set_pilot_error(jobreport.get('exeErrorCode'), jobreport.get('exeErrorDiag', '')) - return WorkSpec.ST_failed - return WorkSpec.ST_finished - # check workers def check_workers(self, workspec_list): retList = [] @@ -71,6 +35,9 @@ def check_workers(self, workspec_list): # make logger tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workSpec.workerID), method_name='check_workers') + + queueconfigmapper = QueueConfigMapper() + queueconfig = queueconfigmapper.get_queue(workSpec.computingSite) try: tmpLog.debug('Querying aCT for id {0}'.format(workSpec.batchID)) columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node', 'error'] @@ -94,15 +61,22 @@ def check_workers(self, workspec_list): errorMsg = '' if actstatus in ['waiting', 'sent', 'starting']: newStatus = WorkSpec.ST_submitted - elif actstatus == 'done': - newStatus = self.check_pilot_status(workSpec, tmpLog) - elif actstatus == 'donefailed': - newStatus = WorkSpec.ST_failed - errorMsg = actjobs[0]['error'] or 'Unknown error' - error_code = WorkerErrors.error_codes.get('GENERAL_ERROR') - workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg) - elif actstatus == 'donecancelled': - newStatus = WorkSpec.ST_cancelled + + # Handle post running states + if queueconfig.truePilot: + # True pilot: keep in running until really done + if actstatus in ['done', 'donecancelled']: + newStatus = WorkSpec.ST_finished + elif actstatus == 'donefailed': + # set failed here with workspec sup error + errorMsg = actjobs[0]['error'] or 'Unknown error' + error_code = WorkerErrors.error_codes.get('GENERAL_ERROR') + workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg) + newStatus = WorkSpec.ST_failed + tmpLog.info('ID {0} failed with error {1})'.format(workSpec.batchID, errorMsg)) + elif actstatus in ['done', 'donefailed', 'donecancelled', 'transferring', 'tovalidate']: + # NG mode: all post processing is now done in the stager + newStatus = WorkSpec.ST_finished if newStatus != workSpec.status: tmpLog.info('ID {0} updated status {1} -> {2} ({3})'.format(workSpec.batchID, workSpec.status, newStatus, actstatus)) diff --git a/pandaharvester/harvestermonitor/k8s_monitor.py b/pandaharvester/harvestermonitor/k8s_monitor.py index 31162711..d6739e3e 100644 --- a/pandaharvester/harvestermonitor/k8s_monitor.py +++ b/pandaharvester/harvestermonitor/k8s_monitor.py @@ -7,11 +7,12 @@ from pandaharvester.harvestercore.worker_errors import WorkerErrors from pandaharvester.harvestercore.plugin_base import PluginBase from pandaharvester.harvestermisc.k8s_utils import k8s_Client - +from pandaharvester.harvestermisc.info_utils import PandaQueuesDict # logger base_logger = core_utils.setup_logger('k8s_monitor') +BAD_CONTAINER_STATES = ['CreateContainerError', 'CrashLoopBackOff', "FailedMount"] # monitor for K8S class K8sMonitor(PluginBase): @@ -19,7 +20,12 @@ class K8sMonitor(PluginBase): def __init__(self, **kwarg): PluginBase.__init__(self, **kwarg) - self.k8s_client = k8s_Client(namespace=self.k8s_namespace, config_file=self.k8s_config_file) + self.panda_queues_dict = PandaQueuesDict() + + # retrieve the k8s namespace from CRIC + namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName) + + self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file) try: self.nProcesses @@ -48,11 +54,16 @@ def check_pods_status(self, pods_status_list, containers_state_list): new_status = WorkSpec.ST_running else: new_status = WorkSpec.ST_idle + else: + # Pod in Pending status if all(item == 'Pending' for item in pods_status_list): - new_status = WorkSpec.ST_submitted - # elif all(item == 'Succeeded' for item in pods_status_list): - # new_status = WorkSpec.ST_finished + new_status = WorkSpec.ST_submitted # default is submitted, but consider certain cases + for item in containers_state_list: + if item.waiting and item.waiting.reason in BAD_CONTAINER_STATES: + new_status = WorkSpec.ST_failed # change state to failed + + # Pod in Succeeded status elif 'Succeeded' in pods_status_list: if all((item.terminated is not None and item.terminated.reason == 'Completed') for item in containers_state_list): new_status = WorkSpec.ST_finished @@ -68,14 +79,19 @@ def check_pods_status(self, pods_status_list, containers_state_list): state = 'waiting' msg_str = 'container not terminated yet ({0}) while pod Succeeded'.format(state) elif item.terminated.reason != 'Completed': - msg_str = 'container termiated by k8s for reason {0}'.format(item.terminated.reason) + msg_str = 'container terminated by k8s for reason {0}'.format(item.terminated.reason) sub_mesg_list.append(msg_str) sub_msg = ';'.join(sub_mesg_list) new_status = WorkSpec.ST_cancelled + + # Pod in Running status elif 'Running' in pods_status_list: new_status = WorkSpec.ST_running + + # Pod in Failed status elif 'Failed' in pods_status_list: new_status = WorkSpec.ST_failed + else: new_status = WorkSpec.ST_idle @@ -83,7 +99,8 @@ def check_pods_status(self, pods_status_list, containers_state_list): def check_a_worker(self, workspec): # set logger - tmp_log = self.make_logger(base_logger, 'workerID={0} batchID={1}'.format(workspec.workerID, workspec.batchID), + tmp_log = self.make_logger(base_logger, 'queueName={0} workerID={1} batchID={2}'. + format(self.queueName, workspec.workerID, workspec.batchID), method_name='check_a_worker') # initialization @@ -97,16 +114,23 @@ def check_a_worker(self, workspec): pods_list = self.k8s_client.filter_pods_info(self._all_pods_list, job_name=job_id) containers_state_list = [] pods_sup_diag_list = [] - for pods_info in pods_list: - # make a list of pods that have been queued too long - if pods_info['status'] in ['Pending', 'Unknown'] and pods_info['start_time'] \ - and time_now - pods_info['start_time'] > datetime.timedelta(seconds=self.podQueueTimeLimit): - # fetch queuing too long pods - pods_name_to_delete_list.append(pods_info['name']) + for pod_info in pods_list: # make list of status of the pods belonging to our job - pods_status_list.append(pods_info['status']) - containers_state_list.extend(pods_info['containers_state']) - pods_sup_diag_list.append(pods_info['name']) + pods_status_list.append(pod_info['status']) + containers_state_list.extend(pod_info['containers_state']) + pods_sup_diag_list.append(pod_info['name']) + + # make a list of pods that should be removed + # 1. pods being queued too long + if pod_info['status'] in ['Pending', 'Unknown'] and pod_info['start_time'] \ + and time_now - pod_info['start_time'] > datetime.timedelta(seconds=self.podQueueTimeLimit): + pods_name_to_delete_list.append(pod_info['name']) + # 2. pods with containers in bad states + if pod_info['status'] in ['Pending', 'Unknown']: + for item in pod_info['containers_state']: + if item.waiting and item.waiting.reason in BAD_CONTAINER_STATES: + pods_name_to_delete_list.append(pod_info['name']) + except Exception as _e: err_str = 'Failed to get POD status of JOB id={0} ; {1}'.format(job_id, _e) tmp_log.error(err_str) @@ -144,7 +168,7 @@ def check_a_worker(self, workspec): return new_status, err_str def check_workers(self, workspec_list): - tmp_log = self.make_logger(base_logger, 'k8s query', method_name='check_workers') + tmp_log = self.make_logger(base_logger, 'queueName={0}'.format(self.queueName), method_name='check_workers') tmp_log.debug('start') ret_list = list() @@ -154,7 +178,11 @@ def check_workers(self, workspec_list): ret_list.append(('', err_str)) return False, ret_list - self._all_pods_list = self.k8s_client.get_pods_info(workspec_list=workspec_list) + pods_info = self.k8s_client.get_pods_info(workspec_list=workspec_list) + if pods_info is None: # there was a communication issue to the K8S cluster + return False, ret_list + + self._all_pods_list = pods_info # resolve status requested workers with ThreadPoolExecutor(self.nProcesses) as thread_pool: diff --git a/pandaharvester/harvesterstager/act_stager.py b/pandaharvester/harvesterstager/act_stager.py new file mode 100644 index 00000000..17a2aad4 --- /dev/null +++ b/pandaharvester/harvesterstager/act_stager.py @@ -0,0 +1,149 @@ +import json +import os + +from pandaharvester.harvestercore import core_utils +from pandaharvester.harvestercore.work_spec import WorkSpec +from pandaharvester.harvestercore.worker_errors import WorkerErrors +from pandaharvester.harvestercore.file_spec import FileSpec +from pandaharvester.harvesterconfig import harvester_config +from .base_stager import BaseStager + +from act.atlas.aCTDBPanda import aCTDBPanda + +# logger +baseLogger = core_utils.setup_logger('act_stager') + +# json for job report +jsonJobReport = harvester_config.payload_interaction.jobReportFile + +# aCT stager plugin +class ACTStager(BaseStager): + # constructor + def __init__(self, **kwarg): + BaseStager.__init__(self, **kwarg) + + # Set up aCT DB connection + self.log = core_utils.make_logger(baseLogger, 'aCT stager', method_name='__init__') + try: + self.actDB = aCTDBPanda(self.log) + except Exception as e: + self.log.error('Could not connect to aCT database: {0}'.format(str(e))) + self.actDB = None + + # check status + def check_stage_out_status(self, jobspec): + """Check the status of stage-out procedure. + Checks aCT job status and sets output file status to finished or failed + once aCT jobs is done. All error handling and post-processing needs to + be done here. + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True: transfer success, False: fatal transfer failure, + None: on-going or temporary failure) and error dialog + :rtype: (bool, string) + """ + + workSpec = jobspec.get_workspec_list()[0] + # make logger + tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workSpec.workerID), + method_name='check_workers') + try: + tmpLog.debug('Querying aCT for id {0}'.format(workSpec.batchID)) + columns = ['actpandastatus', 'error'] + actjobs = self.actDB.getJobs("id={0}".format(workSpec.batchID), columns) + except Exception as e: + if self.actDB: + tmpLog.error("Failed to query aCT DB: {0}".format(str(e))) + # try again later + return None, "Failed to query aCT DB" + + if not actjobs: + tmpLog.error("Job with id {0} not found in aCT".format(workSpec.batchID)) + return False, "Job not found in aCT" + + actstatus = actjobs[0]['actpandastatus'] + # Only check for final states + if actstatus == 'done': + # Do post processing + self.post_processing(workSpec, jobspec) + elif actstatus == 'donefailed': + # Call post processing to collect attributes set by aCT for failed jobs + self.post_processing(workSpec, jobspec) + # Set error reported by aCT + errorMsg = actjobs[0]['error'] or 'Unknown error' + error_code = WorkerErrors.error_codes.get('GENERAL_ERROR') + jobspec.status = 'failed' + # No way to update workspec here + #workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg) + jobspec.set_pilot_error(error_code, errorMsg) + tmpLog.info('Job {0} failed with error {1}'.format(jobspec.PandaID, errorMsg)) + elif actstatus == 'donecancelled': + # Nothing to do + pass + else: + # Still staging + return None, 'still staging' + + tmpLog.info('ID {0} completed in state {1}'.format(workSpec.batchID, actstatus)) + + # Set dummy output file to finished + for fileSpec in jobspec.get_output_file_specs(skip_done=True): + fileSpec.status = 'finished' + return True, '' + + # trigger stage out + def trigger_stage_out(self, jobspec): + """Trigger the stage-out procedure for the job. + Create a dummy output file to force harvester to wait until aCT + job is done + + :param jobspec: job specifications + :type jobspec: JobSpec + :return: A tuple of return code (True: success, False: fatal failure, None: temporary failure) + and error dialog + :rtype: (bool, string) + """ + fileSpec = FileSpec() + fileSpec.PandaID = jobspec.PandaID + fileSpec.taskID = jobspec.taskID + fileSpec.lfn = 'dummy.{0}'.format(jobspec.PandaID) + fileSpec.scope = 'dummy' + fileSpec.fileType = 'output' + jobspec.add_in_file(fileSpec) + + return True, '' + + # zip output files + def zip_output(self, jobspec): + """Dummy""" + return True, '' + + def post_processing(self, workspec, jobspec): + ''' + Take the jobReport placed by aCT in the access point and fill metadata + attributes of the jobspec. + ''' + + # get logger + tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), + method_name='post_processing') + # look for job report + jsonFilePath = os.path.join(workspec.get_access_point(), jsonJobReport) + tmpLog.debug('looking for job report file {0}'.format(jsonFilePath)) + try: + with open(jsonFilePath) as jsonFile: + jobreport = json.load(jsonFile) + except: + # Assume no job report available means true pilot or push mode + # If job report is not available in full push mode aCT would have failed the job + tmpLog.debug('no job report at {0}'.format(jsonFilePath)) + return + + tmpLog.debug('got {0} kB of job report'.format(os.stat(jsonFilePath).st_size / 1024)) + tmpLog.debug("pilot info for {0}: {1}".format(jobspec.PandaID, jobreport)) + + # Set info for final heartbeat and final status + jobspec.set_attributes({jobspec.PandaID: jobreport}) + jobspec.set_one_attribute('jobStatus', jobreport.get('state', 'failed')) + jobspec.status = jobreport.get('state', 'failed') diff --git a/pandaharvester/harvestersubmitter/cobalt_submitter.py b/pandaharvester/harvestersubmitter/cobalt_submitter.py index 615539cb..e609bc15 100644 --- a/pandaharvester/harvestersubmitter/cobalt_submitter.py +++ b/pandaharvester/harvestersubmitter/cobalt_submitter.py @@ -88,7 +88,7 @@ def make_batch_script(self, workspec): # set execution bit on the temp file st = os.stat(tmpFile.name) - os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC) + os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH) return tmpFile.name diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 287ae646..49c1917c 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -17,7 +17,7 @@ from pandaharvester.harvestermisc.info_utils import PandaQueuesDict from pandaharvester.harvestermisc.htcondor_utils import get_job_id_tuple_from_batchid from pandaharvester.harvestermisc.htcondor_utils import CondorJobSubmit - +from pandaharvester.harvestersubmitter import submitter_common # logger baseLogger = core_utils.setup_logger('htcondor_submitter') @@ -174,7 +174,7 @@ def _get_ce_stats_weighting_display(ce_list, worker_ce_all_tuple, ce_weighting): return stats_weighting_display_str -# Replace condor Marco from SDF file, return string +# Replace condor Macro from SDF file, return string def _condor_macro_replace(string, **kwarg): new_string = string macro_map = { @@ -201,43 +201,6 @@ def _get_resource_type(string, is_unified_queue, is_pilot_option=False): return ret -# Map "pilotType" (defined in harvester) to prodSourceLabel and pilotType option (defined in pilot, -i option) -# and piloturl (pilot option --piloturl) for pilot 2 -def _get_complicated_pilot_options(pilot_type, pilot_url=None): - pt_psl_map = { - 'RC': { - 'prod_source_label': 'rc_test2', - 'pilot_type_opt': 'RC', - 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', - }, - 'ALRB': { - 'prod_source_label': 'rc_alrb', - 'pilot_type_opt': 'ALRB', - 'pilot_url_str': '', - }, - 'PT': { - 'prod_source_label': 'ptest', - 'pilot_type_opt': 'PR', - 'pilot_url_str': '', - }, - } - pilot_opt_dict = pt_psl_map.get(pilot_type, None) - if pilot_url and pilot_opt_dict: - pilot_opt_dict['pilot_url_str'] = '--piloturl {0}'.format(pilot_url) - return pilot_opt_dict - - -# get special flag of pilot wrapper about python version of pilot, and whehter to run with python 3 if python version is "3" -# FIXME: during pilot testing phase, only prodsourcelabel ptest and rc_test2 should run python3 -# This constraint will be removed when pilot is ready -def _get_python_version_option(python_version, prod_source_label): - option = '' - if python_version.startswith('3'): - if prod_source_label in ['rc_test2', 'ptest']: - option = '--pythonversion 3' - return option - - # submit a bag of workers def submit_bag_of_workers(data_list): # make logger @@ -387,7 +350,7 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e request_walltime_minute = _div_round_up(request_walltime, 60) request_cputime_minute = _div_round_up(request_cputime, 60) # decide prodSourceLabel - pilot_opt_dict = _get_complicated_pilot_options(workspec.pilotType, pilot_url=pilot_url) + pilot_opt_dict = submitter_common.get_complicated_pilot_options(workspec.pilotType, pilot_url=pilot_url) if pilot_opt_dict is None: prod_source_label = harvester_queue_config.get_source_label(workspec.jobType) pilot_type_opt = workspec.pilotType @@ -435,9 +398,10 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'pilotType': pilot_type_opt, 'pilotUrlOption': pilot_url_str, 'pilotVersion': pilot_version, - 'pilotPythonOption': _get_python_version_option(python_version, prod_source_label), + 'pilotPythonOption': submitter_common.get_python_version_option(python_version, prod_source_label), 'submissionHost': workspec.submissionHost, 'submissionHostShort': workspec.submissionHost.split('.')[0], + 'ceARCGridType': ce_info_dict.get('ce_arc_grid_type', 'nordugrid'), } # fill in template string jdl_str = template.format(**placeholder_map) @@ -732,12 +696,24 @@ def _choose_proxy(workspec): except KeyError: tmpLog.info('Problem choosing CE with weighting. Choose an arbitrary CE endpoint') ce_info_dict = random.choice(list(ce_auxilary_dict.values())).copy() - # go on info of the CE; ignore protocol prefix in ce_endpoint + # go on info of the CE + # ignore protocol prefix in ce_endpoint for cream and condor CE + # check protocol prefix for ARC CE (gridftp or REST) + _match_ce_endpoint = re.match('^(\w+)://(\w+)', ce_info_dict.get('ce_endpoint', '')) + ce_endpoint_prefix = '' + if _match_ce_endpoint: + ce_endpoint_prefix = _match_ce_endpoint.group(1) ce_endpoint_from_queue = re.sub('^\w+://', '', ce_info_dict.get('ce_endpoint', '')) ce_flavour_str = str(ce_info_dict.get('ce_flavour', '')).lower() ce_version_str = str(ce_info_dict.get('ce_version', '')).lower() + if ce_flavour_str == 'arc-ce' and ce_endpoint_prefix in ['https', 'http']: + # new ARC REST interface + ce_info_dict['ce_arc_grid_type'] = 'arc' + else: + ce_info_dict['ce_arc_grid_type'] = 'nordugrid' ce_info_dict['ce_hostname'] = re.sub(':\w*', '', ce_endpoint_from_queue) - if ce_info_dict['ce_hostname'] == ce_endpoint_from_queue: + if ce_info_dict['ce_hostname'] == ce_endpoint_from_queue \ + and ce_info_dict['ce_arc_grid_type'] != 'arc': # add default port to ce_endpoint if missing default_port_map = { 'cream-ce': 8443, diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index a36997e1..dc4629fa 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -14,6 +14,7 @@ from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestermisc.info_utils import PandaQueuesDict from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper +from pandaharvester.harvestersubmitter import submitter_common # logger base_logger = core_utils.setup_logger('k8s_submitter') @@ -25,7 +26,7 @@ # command defaults DEF_COMMAND = ["/usr/bin/bash"] -DEF_ARGS = ["-c", "cd; wget https://raw.githubusercontent.com/HSF/harvester/master/pandaharvester/harvestercloud/pilots_starter.py; chmod 755 pilots_starter.py; ./pilots_starter.py || true"] +DEF_ARGS = ["-c", "cd; python $EXEC_DIR/pilots_starter.py || true"] # submitter for K8S @@ -35,13 +36,26 @@ def __init__(self, **kwarg): self.logBaseURL = None PluginBase.__init__(self, **kwarg) - self.k8s_client = k8s_Client(namespace=self.k8s_namespace, config_file=self.k8s_config_file) + self.panda_queues_dict = PandaQueuesDict() + + # retrieve the k8s namespace from CRIC + namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName) + + self.k8s_client = k8s_Client(namespace=namespace, queue_name=self.queueName, config_file=self.k8s_config_file) + + # update or create the pilot starter executable + self.k8s_client.create_or_patch_configmap_starter() # required for parsing jobParams self.parser = argparse.ArgumentParser() self.parser.add_argument('-p', dest='executable', type=unquote) self.parser.add_argument('--containerImage', dest='container_image') + # allowed associated parameters from AGIS + self._allowed_agis_attrs = ( + 'pilot_url', + ) + # number of processes try: self.nProcesses @@ -50,18 +64,6 @@ def __init__(self, **kwarg): else: if (not self.nProcesses) or (self.nProcesses < 1): self.nProcesses = 1 - # x509 proxy: obsolete mode - try: - self.x509UserProxy - except AttributeError: - if os.getenv('X509_USER_PROXY'): - self.x509UserProxy = os.getenv('X509_USER_PROXY') - - # x509 proxy for analysis jobs in grandly unified queues - try: - self.x509UserProxyAnalysis - except AttributeError: - self.x509UserProxyAnalysis = os.getenv('X509_USER_PROXY_ANAL') # x509 proxy through k8s secrets: preferred way try: @@ -77,20 +79,8 @@ def __init__(self, **kwarg): if os.getenv('PROXY_SECRET_PATH_ANAL'): self.proxySecretPath = os.getenv('PROXY_SECRET_PATH_ANAL') - # CPU adjust ratio - try: - self.cpuAdjustRatio - except AttributeError: - self.cpuAdjustRatio = 100 - - # Memory adjust ratio - try: - self.memoryAdjustRatio - except AttributeError: - self.memoryAdjustRatio = 100 - def parse_params(self, job_params): - tmp_log = self.make_logger(base_logger, method_name='parse_params') + tmp_log = self.make_logger(base_logger, 'queueName={0}'.format(self.queueName), method_name='parse_params') job_params_list = job_params.split(' ') args, unknown = self.parser.parse_known_args(job_params_list) @@ -119,7 +109,7 @@ def decide_container_image(self, job_fields, job_pars_parsed): - production images: take SLC6 or CentOS7 - otherwise take default image specified for the queue """ - tmp_log = self.make_logger(base_logger, method_name='decide_container_image') + tmp_log = self.make_logger(base_logger, 'queueName={0}'.format(self.queueName), method_name='decide_container_image') try: container_image = job_pars_parsed.container_image if container_image: @@ -166,39 +156,25 @@ def _choose_proxy(self, workspec, is_grandly_unified_queue): Choose the proxy based on the job type and whether k8s secrets are enabled """ cert = None - use_secret = False job_type = workspec.jobType if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis'): if self.proxySecretPathAnalysis: cert = self.proxySecretPathAnalysis - use_secret = True elif self.proxySecretPath: cert = self.proxySecretPath - use_secret = True - elif self.x509UserProxyAnalysis: - cert = self.x509UserProxyAnalysis - use_secret = False - elif self.x509UserProxy: - cert = self.x509UserProxy - use_secret = False else: if self.proxySecretPath: cert = self.proxySecretPath - use_secret = True - elif self.x509UserProxy: - cert = self.x509UserProxy - use_secret = False - return cert, use_secret + return cert def submit_k8s_worker(self, work_spec): - tmp_log = self.make_logger(base_logger, method_name='submit_k8s_worker') + tmp_log = self.make_logger(base_logger, 'queueName={0}'.format(self.queueName), method_name='submit_k8s_worker') # get info from harvester queue config _queueConfigMapper = QueueConfigMapper() harvester_queue_config = _queueConfigMapper.get_queue(self.queueName) - prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) # set the stdout log file log_file_name = '{0}_{1}.out'.format(harvester_config.master.harvester_id, work_spec.workerID) @@ -218,27 +194,49 @@ def submit_k8s_worker(self, work_spec): args)) # choose the appropriate proxy - panda_queues_dict = PandaQueuesDict() - is_grandly_unified_queue = panda_queues_dict.is_grandly_unified_queue(self.queueName) - cert, use_secret = self._choose_proxy(work_spec, is_grandly_unified_queue) + this_panda_queue_dict = self.panda_queues_dict.get(self.queueName, dict()) + + is_grandly_unified_queue = self.panda_queues_dict.is_grandly_unified_queue(self.queueName) + cert = self._choose_proxy(work_spec, is_grandly_unified_queue) if not cert: - err_str = 'No proxy specified in proxySecretPath or x509UserProxy. Not submitted' + err_str = 'No proxy specified in proxySecretPath. Not submitted' tmp_return_value = (False, err_str) return tmp_return_value # get the walltime limit try: - max_time = panda_queues_dict.get(self.queueName)['maxtime'] + max_time = this_panda_queue_dict['maxtime'] except Exception as e: tmp_log.warning('Could not retrieve maxtime field for queue {0}'.format(self.queueName)) max_time = None + associated_params_dict = {} + for key, val in self.panda_queues_dict.get_harvester_params(self.queueName).items(): + if key in self._allowed_agis_attrs: + associated_params_dict[key] = val + + pilot_url = associated_params_dict.get('pilot_url') + pilot_version = str(this_panda_queue_dict.get('pilot_version', 'current')) + python_version = str(this_panda_queue_dict.get('python_version', '2')) + + # prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) + pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType) + if pilot_opt_dict is None: + prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) + pilot_type = work_spec.pilotType + pilot_url_str = '--piloturl {0}'.format(pilot_url) if pilot_url else '' + else: + prod_source_label = pilot_opt_dict['prod_source_label'] + pilot_type = pilot_opt_dict['pilot_type_opt'] + pilot_url_str = pilot_opt_dict['pilot_url_str'] + + pilot_python_option = submitter_common.get_python_version_option(python_version, prod_source_label) + # submit the worker rsp, yaml_content_final = self.k8s_client.create_job_from_yaml(yaml_content, work_spec, prod_source_label, - container_image, executable, args, - cert, cert_in_secret=use_secret, - cpu_adjust_ratio=self.cpuAdjustRatio, - memory_adjust_ratio=self.memoryAdjustRatio, + pilot_type, pilot_url_str, + pilot_python_option, + container_image, executable, args, cert, max_time=max_time) except Exception as _e: tmp_log.error(traceback.format_exc()) @@ -253,7 +251,7 @@ def submit_k8s_worker(self, work_spec): # submit workers def submit_workers(self, workspec_list): - tmp_log = self.make_logger(base_logger, method_name='submit_workers') + tmp_log = self.make_logger(base_logger, 'queueName={0}'.format(self.queueName), method_name='submit_workers') n_workers = len(workspec_list) tmp_log.debug('start, n_workers={0}'.format(n_workers)) diff --git a/pandaharvester/harvestersubmitter/slurm_submitter.py b/pandaharvester/harvestersubmitter/slurm_submitter.py index feddc8ea..d6f764dc 100644 --- a/pandaharvester/harvestersubmitter/slurm_submitter.py +++ b/pandaharvester/harvestersubmitter/slurm_submitter.py @@ -2,6 +2,8 @@ import re import six +import os +import stat try: import subprocess32 as subprocess @@ -87,6 +89,11 @@ def make_batch_script(self, workspec): workerID=workspec.workerID)) ) tmpFile.close() + + # set execution bit and group permissions on the temp file + st = os.stat(tmpFile.name) + os.chmod(tmpFile.name, st.st_mode | stat.S_IEXEC | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH) + return tmpFile.name # get log file names diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py new file mode 100644 index 00000000..fd0da984 --- /dev/null +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -0,0 +1,32 @@ +# Map "pilotType" (defined in harvester) to prodSourceLabel and pilotType option (defined in pilot, -i option) +# and piloturl (pilot option --piloturl) for pilot 2 +def get_complicated_pilot_options(pilot_type, pilot_url=None): + pt_psl_map = { + 'RC': { + 'prod_source_label': 'rc_test2', + 'pilot_type_opt': 'RC', + 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', + }, + 'ALRB': { + 'prod_source_label': 'rc_alrb', + 'pilot_type_opt': 'ALRB', + 'pilot_url_str': '', + }, + 'PT': { + 'prod_source_label': 'ptest', + 'pilot_type_opt': 'PR', + 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz', + }, + } + pilot_opt_dict = pt_psl_map.get(pilot_type, None) + if pilot_url and pilot_opt_dict: + pilot_opt_dict['pilot_url_str'] = '--piloturl {0}'.format(pilot_url) + return pilot_opt_dict + + +# get special flag of pilot wrapper about python version of pilot, and whether to run with python 3 if python version is "3" +def get_python_version_option(python_version, prod_source_label): + option = '' + if python_version.startswith('3'): + option = '--pythonversion 3' + return option diff --git a/pandaharvester/harvestersweeper/arc_sweeper.py b/pandaharvester/harvestersweeper/arc_sweeper.py index ac53acf5..3efb72d9 100644 --- a/pandaharvester/harvestersweeper/arc_sweeper.py +++ b/pandaharvester/harvestersweeper/arc_sweeper.py @@ -134,14 +134,14 @@ def test(jobid): workAttributes["arcjob"]["JobManagementInterfaceName"] = "org.nordugrid.gridftpjob" wspec.workAttributes = workAttributes - print wspec.workAttributes + print (wspec.workAttributes) sweeper = ARCSweeper() - print sweeper.kill_worker(wspec) + print (sweeper.kill_worker(wspec)) if __name__ == "__main__": import time, sys, urlparse if len(sys.argv) != 2: - print "Please give ARC job id" + print ("Please give ARC job id") sys.exit(1) test(sys.argv[1]) diff --git a/pandaharvester/harvestersweeper/k8s_sweeper.py b/pandaharvester/harvestersweeper/k8s_sweeper.py index 9486364c..2b233971 100644 --- a/pandaharvester/harvestersweeper/k8s_sweeper.py +++ b/pandaharvester/harvestersweeper/k8s_sweeper.py @@ -1,6 +1,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper from pandaharvester.harvestermisc.k8s_utils import k8s_Client +from pandaharvester.harvestermisc.info_utils import PandaQueuesDict # logger base_logger = core_utils.setup_logger('k8s_sweeper') @@ -12,52 +13,15 @@ class K8sSweeper(BaseSweeper): def __init__(self, **kwarg): BaseSweeper.__init__(self, **kwarg) - self.k8s_client = k8s_Client(self.k8s_namespace, config_file=self.k8s_config_file) - - self._all_pods_list = [] - - # # kill a worker - # def kill_worker(self, work_spec): - # tmp_log = self.make_logger(base_logger, 'workerID={0}'.format(work_spec.workerID), - # method_name='kill_worker') - # - # tmp_ret_val = (None, 'Nothing done') - # - # batch_id = work_spec.batchID - # try: - # self.k8s_client.delete_job(batch_id) - # except Exception as _e: - # err_str = 'Failed to delete a JOB with id={0} ; {1}'.format(batch_id, _e) - # tmp_log.error(err_str) - # tmp_ret_val = (False, err_str) - # - # self._all_pods_list = self.k8s_client.get_pods_info() - # pods_list = self.k8s_client.filter_pods_info(self._all_pods_list, job_name=batch_id) - # pods_name = [ pods_info['name'] for pods_info in pods_list ] - # job_info = self.k8s_client.get_jobs_info(batch_id) - # - # if not job_info: - # ret_list = self.k8s_client.delete_pods(pods_name) - # if all(item['errMsg'] == '' for item in ret_list): - # tmp_log.info('Deleted a JOB & POD with id={0}'.format(batch_id)) - # tmp_ret_val = (True, '') - # else: - # err_str_list = list() - # for item in ret_list: - # if item['errMsg']: - # err_str = 'Failed to delete a POD with id={0} ; {1}'.format(item['name'], item['errMsg']) - # tmp_log.error(err_str) - # err_str_list.append(err_str) - # tmp_ret_val = (False, ','.join(err_str_list)) - # - # return tmp_ret_val + # retrieve the k8s namespace from CRIC + self.panda_queues_dict = PandaQueuesDict() + namespace = self.panda_queues_dict.get_k8s_namespace(self.queueName) + self.k8s_client = k8s_Client(namespace, queue_name=self.queueName, config_file=self.k8s_config_file) # kill workers def kill_workers(self, work_spec_list): tmp_log = self.make_logger(base_logger, method_name='kill_workers') - self._all_pods_list = self.k8s_client.get_pods_info(workspec_list=work_spec_list) - ret_list = [] for work_spec in work_spec_list: tmp_ret_val = (None, 'Nothing done') @@ -87,27 +51,7 @@ def kill_workers(self, work_spec_list): tmp_log.error(err_str) tmp_ret_val = (False, err_str) - """ - # retrieve the associated pods - pods_list = self.k8s_client.filter_pods_info(self._all_pods_list, job_name=batch_id) - pods_name = [pods_info['name'] for pods_info in pods_list] - job_info = self.k8s_client.get_jobs_info(workspec_list=[work_spec]) - # retrieve the associated pods - if not job_info: - ret_list = self.k8s_client.delete_pods(pods_name) - if all(item['errMsg'] == '' for item in ret_list): - tmp_log.info('Deleted a JOB & POD with id={0}'.format(batch_id)) - tmp_ret_val = (True, '') - else: - err_str_list = list() - for item in ret_list: - if item['errMsg']: - err_str = 'Failed to delete a POD with id={0} ; {1}'.format(item['name'], item['errMsg']) - tmp_log.error(err_str) - err_str_list.append(err_str) - tmp_ret_val = (False, ','.join(err_str_list)) - """ - else: # the worker cannot be cleaned + else: # the worker does not need be cleaned tmp_ret_val = (True, '') ret_list.append(tmp_ret_val) diff --git a/pandaharvester/harvesterworkermaker/simple_worker_maker.py b/pandaharvester/harvesterworkermaker/simple_worker_maker.py index 4a1088d4..c82e2b09 100644 --- a/pandaharvester/harvesterworkermaker/simple_worker_maker.py +++ b/pandaharvester/harvesterworkermaker/simple_worker_maker.py @@ -70,6 +70,18 @@ def get_job_type(self, job_spec, job_type, queue_dict, tmp_prodsourcelabel=None) return job_type_final + def capability_to_rtype(self, capability): + if capability == 'score': + return 'SCORE' + elif capability == 'himem': + return 'SCORE_HIMEM' + elif capability == 'mcore': + return 'MCORE' + elif capability == 'mcorehimem': + return 'MCORE_HIMEM' + else: + return None + # make a worker from jobs def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog = self.make_logger(_logger, 'queue={0}:{1}:{2}'.format(queue_config.queueName, job_type, resource_type), @@ -174,8 +186,14 @@ def make_worker(self, jobspec_list, queue_config, job_type, resource_type): tmpLog.debug('get_job_type decided for job_type: {0} (input job_type: {1}, queue_type: {2}, tmp_prodsourcelabel: {3})' .format(workSpec.jobType, job_type, queue_dict.get('type', None), tmp_prodsourcelabel)) + # retrieve queue resource type + capability = queue_dict.get('capability', '') + queue_rtype = self.capability_to_rtype(capability) + if resource_type and resource_type != 'ANY': workSpec.resourceType = resource_type + elif queue_rtype: + workSpec.resourceType = queue_rtype elif workSpec.nCore == 1: workSpec.resourceType = 'SCORE' else: diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 60b33611..5b79ccf1 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.2.6" +release_version = "0.2.8" diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index abe00f36..8b5ee369 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -180,6 +180,12 @@ queueList = resolverModule = pandaharvester.harvestermisc.info_utils resolverClass = PandaQueuesDict +# configuration of the resolver in JSON +#resolverConfig = +# { +# "refreshPeriod": 300 +# } + # enable auto-blacklisting of resolver which returns status='offline' to blacklist the queue autoBlacklist = False