From e9f0408316b5d12ef1484aa658e2d5e877293c97 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Sat, 17 Jul 2021 12:40:18 +0800 Subject: [PATCH 01/33] condor submitter: support memory in bytes --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/htcondor_submitter.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index b100971d..97263f50 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "12-07-2021 07:38:50 on master (by tmaeno)" +timestamp = "17-07-2021 04:40:18 on flin (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 49c1917c..f53faa84 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -345,7 +345,9 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e tmpLog.debug('job attributes override by AGIS special_par: {0}={1}'.format(attr, str(_match.group(1)))) # derived job attributes n_node = _div_round_up(n_core_total, n_core_per_node) + request_ram_bytes = request_ram * 2**20 request_ram_per_core = _div_round_up(request_ram * n_node, n_core_total) + request_ram_bytes_per_core = div_round_up(request_ram_bytes * n_node, n_core_total) request_cputime = request_walltime * n_core_total request_walltime_minute = _div_round_up(request_walltime, 60) request_cputime_minute = _div_round_up(request_cputime, 60) @@ -369,7 +371,9 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'nCoreTotal': n_core_total, 'nNode': n_node, 'requestRam': request_ram, + 'requestRamBytes': request_ram_bytes, 'requestRamPerCore': request_ram_per_core, + 'requestRamBytesPerCore': request_ram_bytes_per_core, 'requestDisk': request_disk, 'requestWalltime': request_walltime, 'requestWalltimeMinute': request_walltime_minute, From d39cbaba457f9ff1c683cc333e974f1e29ef2002 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Sat, 17 Jul 2021 13:31:09 +0800 Subject: [PATCH 02/33] fix --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 97263f50..df5099e9 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "17-07-2021 04:40:18 on flin (by mightqxc)" +timestamp = "17-07-2021 05:31:09 on flin (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index f53faa84..7affa659 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -347,7 +347,7 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e n_node = _div_round_up(n_core_total, n_core_per_node) request_ram_bytes = request_ram * 2**20 request_ram_per_core = _div_round_up(request_ram * n_node, n_core_total) - request_ram_bytes_per_core = div_round_up(request_ram_bytes * n_node, n_core_total) + request_ram_bytes_per_core = _div_round_up(request_ram_bytes * n_node, n_core_total) request_cputime = request_walltime * n_core_total request_walltime_minute = _div_round_up(request_walltime, 60) request_cputime_minute = _div_round_up(request_cputime, 60) From 6ae80e6876830d84f172d037675af8dafe9ad19a Mon Sep 17 00:00:00 2001 From: mightqxc Date: Mon, 13 Sep 2021 21:26:37 +0800 Subject: [PATCH 03/33] token_utils --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestermisc/token_utils.py | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 pandaharvester/harvestermisc/token_utils.py diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index df5099e9..6406282e 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "17-07-2021 05:31:09 on flin (by mightqxc)" +timestamp = "13-09-2021 13:26:37 on flin (by mightqxc)" diff --git a/pandaharvester/harvestermisc/token_utils.py b/pandaharvester/harvestermisc/token_utils.py new file mode 100644 index 00000000..2c886672 --- /dev/null +++ b/pandaharvester/harvestermisc/token_utils.py @@ -0,0 +1,21 @@ +import hashlib + +import six + + +def _md5sum(data): + """ + get md5sum hexadecimal string of data + """ + hash = hashlib.md5() + hash.update(six.b(data)) + hash_hex = hash.hexdigest() + return hash_hex + +def endpoint_to_filename(endpoint): + """ + get token file name according to service (CE, storage, etc.) endpoint + currently directly take its md5sum hash as file name + """ + filename = _md5sum(endpoint) + return filename From 68c426aa467b5297c80987c7f383319dde8c80ca Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 16 Sep 2021 17:36:21 +0800 Subject: [PATCH 04/33] htcondor submitter: token support --- pandaharvester/commit_timestamp.py | 2 +- .../harvestersubmitter/htcondor_submitter.py | 45 +++++++++++++++---- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 6406282e..6cb002bb 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "13-09-2021 13:26:37 on flin (by mightqxc)" +timestamp = "16-09-2021 09:36:21 on flin (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 7affa659..3945a2eb 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -15,6 +15,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.plugin_base import PluginBase from pandaharvester.harvestermisc.info_utils import PandaQueuesDict +from pandaharvester.harvestermisc.token_utils import endpoint_to_filename from pandaharvester.harvestermisc.htcondor_utils import get_job_id_tuple_from_batchid from pandaharvester.harvestermisc.htcondor_utils import CondorJobSubmit from pandaharvester.harvestersubmitter import submitter_common @@ -315,7 +316,7 @@ def submit_bag_of_workers(data_list): def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, executable_file, x509_user_proxy, log_subdir=None, ce_info_dict=dict(), batch_log_dict=dict(), pilot_url=None, special_par='', harvester_queue_config=None, is_unified_queue=False, - pilot_version='unknown', python_version='unknown', **kwarg): + pilot_version='unknown', python_version='unknown', token_dir=None, **kwarg): # make logger tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), method_name='make_a_jdl') @@ -361,6 +362,13 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e prod_source_label = pilot_opt_dict['prod_source_label'] pilot_type_opt = pilot_opt_dict['pilot_type_opt'] pilot_url_str = pilot_opt_dict['pilot_url_str'] + # get token filename according to CE + token_filename = None + if token_dir is not None and ce_info_dict.get('ce_endpoint'): + token_filename = endpoint_to_filename(ce_info_dict['ce_endpoint']) + token_path = None + if token_dir is not None and token_filename is not None: + token_path = os.path.join(token_dir, token_filename) # open tmpfile as submit description file tmpFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_submit.sdf', dir=workspec.get_access_point()) # placeholder map @@ -406,6 +414,9 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'submissionHost': workspec.submissionHost, 'submissionHostShort': workspec.submissionHost.split('.')[0], 'ceARCGridType': ce_info_dict.get('ce_arc_grid_type', 'nordugrid'), + 'tokenDir': token_dir, + 'tokenFilename': token_filename, + 'tokenPath': token_path, } # fill in template string jdl_str = template.format(**placeholder_map) @@ -470,6 +481,16 @@ def __init__(self, **kwarg): self.x509UserProxyAnalysis except AttributeError: self.x509UserProxyAnalysis = os.getenv('X509_USER_PROXY_ANAL') + # Default token directory for a queue + try: + self.tokenDir + except AttributeError: + self.tokenDir = None + # token directory for analysis jobs in grandly unified queues + try: + self.tokenDirAnalysis + except AttributeError: + self.tokenDirAnalysis = None # ATLAS AGIS try: self.useAtlasAGIS = bool(self.useAtlasAGIS) @@ -672,18 +693,25 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): # make logger tmpLog = core_utils.make_logger(baseLogger, 'site={0} workerID={1}'.format(self.queueName, workspec.workerID), method_name='_handle_one_worker') - def _choose_proxy(workspec): + def _choose_credential(workspec): """ - Choose the proxy based on the job type + Choose the credential based on the job type """ job_type = workspec.jobType proxy = self.x509UserProxy - if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis') and self.x509UserProxyAnalysis: - tmpLog.debug('Taking analysis proxy') - proxy = self.x509UserProxyAnalysis + token_dir = self.tokenDir + if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis'): + if self.x509UserProxyAnalysis: + tmpLog.debug('Taking analysis proxy') + proxy = self.x509UserProxyAnalysis + if self.tokenDirAnalysis: + tmpLog.debug('Taking analysis token_dir') + token_dir = self.tokenDirAnalysis else: tmpLog.debug('Taking default proxy') - return proxy + if self.tokenDir: + tmpLog.debug('Taking default token_dir') + return proxy, token_dir # initialize ce_info_dict = dict() batch_log_dict = dict() @@ -827,7 +855,7 @@ def _choose_proxy(workspec): tmpLog.debug('Done jobspec attribute setting') # choose the x509 certificate based on the type of job (analysis or production) - proxy = _choose_proxy(workspec) + proxy, token_dir = _choose_credential(workspec) # set data dict data.update({ @@ -851,6 +879,7 @@ def _choose_proxy(workspec): 'pilot_url': pilot_url, 'pilot_version': pilot_version, 'python_version': python_version, + 'token_dir': token_dir, }) return data From 26fb984a4b7707143064b189dd82b6d723f02932 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Fri, 17 Sep 2021 15:02:31 +0800 Subject: [PATCH 05/33] 0.2.9rc0 --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/panda_pkg_info.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 6cb002bb..b86e902d 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "16-09-2021 09:36:21 on flin (by mightqxc)" +timestamp = "17-09-2021 07:02:31 on flin (by mightqxc)" diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 9d833022..6fb0f8b1 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.2.8rc0" +release_version = "0.2.9rc0" From 4366ee74ee35dbd915a0b95bb4e814c4aa2f4656 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 29 Sep 2021 17:10:15 +0800 Subject: [PATCH 06/33] pilot3 on rc_test2 --- pandaharvester/commit_timestamp.py | 2 +- .../harvestersubmitter/htcondor_submitter.py | 2 +- pandaharvester/harvestersubmitter/submitter_common.py | 10 +++++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index b86e902d..d7387183 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "17-09-2021 07:02:31 on flin (by mightqxc)" +timestamp = "29-09-2021 09:10:15 on pilot3 (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 3945a2eb..66ed0c8e 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -353,7 +353,7 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e request_walltime_minute = _div_round_up(request_walltime, 60) request_cputime_minute = _div_round_up(request_cputime, 60) # decide prodSourceLabel - pilot_opt_dict = submitter_common.get_complicated_pilot_options(workspec.pilotType, pilot_url=pilot_url) + pilot_opt_dict = submitter_common.get_complicated_pilot_options(workspec.pilotType, pilot_url, pilot_version) if pilot_opt_dict is None: prod_source_label = harvester_queue_config.get_source_label(workspec.jobType) pilot_type_opt = workspec.pilotType diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index fd0da984..90826562 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -1,11 +1,16 @@ # Map "pilotType" (defined in harvester) to prodSourceLabel and pilotType option (defined in pilot, -i option) # and piloturl (pilot option --piloturl) for pilot 2 -def get_complicated_pilot_options(pilot_type, pilot_url=None): +def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=None): + # for pilot 3 + is_pilot3 = True if pilot_version.startswith('3') else False + # map + # 210929 currently only RC may run pilot 3 pt_psl_map = { 'RC': { 'prod_source_label': 'rc_test2', 'pilot_type_opt': 'RC', - 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', + 'pilot_url_str': 'http://cern.ch/atlas-panda-pilot/pilot3-dev.tar.gz' if is_pilot3 \ + else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', }, 'ALRB': { 'prod_source_label': 'rc_alrb', @@ -23,7 +28,6 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None): pilot_opt_dict['pilot_url_str'] = '--piloturl {0}'.format(pilot_url) return pilot_opt_dict - # get special flag of pilot wrapper about python version of pilot, and whether to run with python 3 if python version is "3" def get_python_version_option(python_version, prod_source_label): option = '' From 439007981fc377fcf56d2e59304bd3fbad4668ac Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 29 Sep 2021 17:38:45 +0800 Subject: [PATCH 07/33] fix --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/submitter_common.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index d7387183..606c264f 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "29-09-2021 09:10:15 on pilot3 (by mightqxc)" +timestamp = "29-09-2021 09:38:45 on pilot3 (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index 90826562..3cad835c 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -9,7 +9,7 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=None 'RC': { 'prod_source_label': 'rc_test2', 'pilot_type_opt': 'RC', - 'pilot_url_str': 'http://cern.ch/atlas-panda-pilot/pilot3-dev.tar.gz' if is_pilot3 \ + 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev.tar.gz' if is_pilot3 \ else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', }, 'ALRB': { From 8f17424ec783bbb5b537e2002bba2593f7c592cb Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 29 Sep 2021 23:13:40 +0800 Subject: [PATCH 08/33] iam_token_cred_manager --- pandaharvester/commit_timestamp.py | 2 +- .../iam_token_cred_manager.py | 131 ++++++++++++++++++ pandaharvester/harvestermisc/token_utils.py | 44 ++++++ 3 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 pandaharvester/harvestercredmanager/iam_token_cred_manager.py diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 606c264f..f90f0a90 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "29-09-2021 09:38:45 on pilot3 (by mightqxc)" +timestamp = "29-09-2021 15:13:40 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py new file mode 100644 index 00000000..d751563c --- /dev/null +++ b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py @@ -0,0 +1,131 @@ +import os +import json +import traceback + +from .base_cred_manager import BaseCredManager +from pandaharvester.harvestercore import core_utils +from pandaharvester.harvestermisc.info_utils import PandaQueuesDict +from pandaharvester.harvestermisc.token_utils import endpoint_to_filename, WLCG_scopes, IssuerBroker + +# logger +_logger = core_utils.setup_logger('iam_token_cred_manager') + +# allowed target types +ALL_TARGET_TYPES = ['ce'] + +# credential manager with IAM token +class IamTokenCredManager(BaseCredManager): + # constructor + def __init__(self, **kwarg): + BaseCredManager.__init__(self, **kwarg) + # make logger + tmp_log = self.make_logger(_logger, method_name='__init__') + # attributes + if hasattr(self, 'inFile'): + # parse inFile setup configuration + try: + with open(self.inFile) as f: + self.setupMap = json.load(f) + except Exception as e: + tmp_log.error('Error with inFile. {0}: {1}'.format(e.__class__.__name__, e)) + self.setupMap = {} + raise + else: + # set up with direct attributes + self.setupMap = dict(vars(self)) + # validate setupMap + try: + self.client_cred_file = self.setupMap['client_cred_file'] + with open(self.inFile) as f: + client_cred_dict = json.load(f) + self.issuer = client_cred_dict['issuer'] + self.client_id = client_cred_dict['client_id'] + self.client_secret = client_cred_dict['client_secret'] + self.target_type = self.setupMap['target_type'] + self.out_dir = self.setupMap['out_dir'] + self.lifetime = self.setupMap('lifetime') + except KeyError as e: + tmp_log.error('Missing attributes in setup. {0}'.format(traceback.format_exc())) + raise + else: + if self.target_type not in ALL_TARGET_TYPES: + tmp_log.error('Unsupported target_type: {0}'.format(self.target_type)) + raise Exception('Unsupported target_type') + # initialize + self.targets_dict = dict() + # handle targets + self._handle_target_types() + # issuer broker + self.issuer_broker = IssuerBroker(self.issuer, self.client_id, self.client_secret, + name=self.setup_name) + + def _handle_target_types(self): + try: + self.panda_queues_dict = PandaQueuesDict() + except Exception as e: + tmp_log.error('Problem calling PandaQueuesDict. {0}'.format(traceback.format_exc())) + raise + if self.target_type == 'ce': + try: + # retrieve CEs from CRIC + for site, val in self.panda_queues_dict: + if val.get('status') == 'offline': + # do not generate token for offline PQs, but for online, brokeroff, pause, ... + continue + ce_q_list = val.get('queues') + if queues_list: + # loop over all ce queues + for ce_q in ce_q_list: + ce_status = ce_q.get('ce_status') + if not ce_status or ce_status == 'DISABLED': + # skip disabled ce queues + continue + ce_endpoint = ce_q.get('ce_endpoint') + ce_flavour = ce_q.get('ce_flavour') + if ce_endpoint and ce_flavour: + target_attr_dict = { + 'ce_flavour': ce_flavour, + } + self.targets_dict[ce_endpoint] = target_attr_dict + else: + # do not generate token if no queues of CE + continue + except Exception as e: + tmp_log.error('Problem retrieving CEs. {0}'.format(traceback.format_exc())) + raise + # scope for CE + self.scope = WLCG_scopes.COMPUTE_ALL + + # check proxy + def check_credential(self): + # make logger + # same update period as credmanager agent + return False + + # renew proxy + def renew_credential(self): + # make logger + tmp_log = self.make_logger(_logger, 'config={0}'.format(self.setup_name), method_name='renew_credential') + # go + all_ok = True + all_err_str = '' + for target in self.targets_dict: + try: + # get access token of target + access_token = self.issuer_broker.get_access_token(self, aud=target, scope=self.scope) + # write to file + token_filename = endpoint_to_filename(target) + token_path = os.path.join(self.out_dir, token_filename) + with open(token_path, 'w') as f: + f.write(access_token) + except Exception as e: + err_str = 'Problem getting token for {0}. {1}'.format(target, traceback.format_exc()) + tmp_log.error(err_str) + all_ok = False + all_err_str = 'failed to get some tokens. Check the plugin log for details ' + continue + else: + tmp_log.debug('got token for {0} at {1}'.format(target, token_path)) + tmp_log.debug('done') + # return + return all_ok, all_err_str diff --git a/pandaharvester/harvestermisc/token_utils.py b/pandaharvester/harvestermisc/token_utils.py index 2c886672..6c050f63 100644 --- a/pandaharvester/harvestermisc/token_utils.py +++ b/pandaharvester/harvestermisc/token_utils.py @@ -1,6 +1,10 @@ +import copy import hashlib +import json +import time import six +import requests def _md5sum(data): @@ -19,3 +23,43 @@ def endpoint_to_filename(endpoint): """ filename = _md5sum(endpoint) return filename + + +class WLCG_scopes(object): + COMPUTE_ALL = 'compute.read compute.modify compute.create compute.cancel' + + +class IssuerBroker(object): + """ + Talk to token issuer with client credentials flow + """ + + def __init__(self, issuer, client_id, client_secret, name='unknown'): + self.issuer = issuer + self.client_id = client_id + self.client_secret = client_secret + self.name = name + self.timeout = 3 + # derived attributes + self.token_request_url = '{0}/token'.format(self.issuer) + self._base_post_data = { + 'grant_type': 'client_credentials', + 'client_id': self.client_id, + 'client_secret': self.client_secret, + } + + def _post(self, **kwarg): + data_dict = copy.deepcopy(self._base_post_data) + data_dict.update(kwarg) + data = copy.deepcopy(data_dict) + resp = requests.post(self.issuer, data=data, timeout=self.timeout) + return resp + + def get_access_token(self, aud=None, scope=None): + resp = self._post(audience=aud, scope=scope) + if resp.status_code == requests.codes.ok: + resp_dict = json.loads(resp.text) + token = resp_dict['access_token'] + return token + else: + resp.raise_for_status() From 7277a94a825f88894e435d9453305c21a4dd36ad Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 30 Sep 2021 23:11:31 +0800 Subject: [PATCH 09/33] fixes --- pandaharvester/commit_timestamp.py | 2 +- .../harvestercredmanager/iam_token_cred_manager.py | 12 +++++++----- pandaharvester/harvestermisc/token_utils.py | 7 +++++-- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index f90f0a90..13face89 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "29-09-2021 15:13:40 on flin (by mightqxc)" +timestamp = "30-09-2021 15:11:31 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py index d751563c..e442a6e5 100644 --- a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py +++ b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py @@ -36,14 +36,14 @@ def __init__(self, **kwarg): # validate setupMap try: self.client_cred_file = self.setupMap['client_cred_file'] - with open(self.inFile) as f: + with open(self.client_cred_file) as f: client_cred_dict = json.load(f) self.issuer = client_cred_dict['issuer'] self.client_id = client_cred_dict['client_id'] self.client_secret = client_cred_dict['client_secret'] self.target_type = self.setupMap['target_type'] self.out_dir = self.setupMap['out_dir'] - self.lifetime = self.setupMap('lifetime') + self.lifetime = self.setupMap.get('lifetime') except KeyError as e: tmp_log.error('Missing attributes in setup. {0}'.format(traceback.format_exc())) raise @@ -60,6 +60,8 @@ def __init__(self, **kwarg): name=self.setup_name) def _handle_target_types(self): + # make logger + tmp_log = self.make_logger(_logger, method_name='_handle_target_types') try: self.panda_queues_dict = PandaQueuesDict() except Exception as e: @@ -68,12 +70,12 @@ def _handle_target_types(self): if self.target_type == 'ce': try: # retrieve CEs from CRIC - for site, val in self.panda_queues_dict: + for site, val in self.panda_queues_dict.items(): if val.get('status') == 'offline': # do not generate token for offline PQs, but for online, brokeroff, pause, ... continue ce_q_list = val.get('queues') - if queues_list: + if ce_q_list: # loop over all ce queues for ce_q in ce_q_list: ce_status = ce_q.get('ce_status') @@ -112,7 +114,7 @@ def renew_credential(self): for target in self.targets_dict: try: # get access token of target - access_token = self.issuer_broker.get_access_token(self, aud=target, scope=self.scope) + access_token = self.issuer_broker.get_access_token(aud=target, scope=self.scope) # write to file token_filename = endpoint_to_filename(target) token_path = os.path.join(self.out_dir, token_filename) diff --git a/pandaharvester/harvestermisc/token_utils.py b/pandaharvester/harvestermisc/token_utils.py index 6c050f63..7ac11ff3 100644 --- a/pandaharvester/harvestermisc/token_utils.py +++ b/pandaharvester/harvestermisc/token_utils.py @@ -52,13 +52,16 @@ def _post(self, **kwarg): data_dict = copy.deepcopy(self._base_post_data) data_dict.update(kwarg) data = copy.deepcopy(data_dict) - resp = requests.post(self.issuer, data=data, timeout=self.timeout) + resp = requests.post(self.token_request_url, data=data, timeout=self.timeout) return resp def get_access_token(self, aud=None, scope=None): resp = self._post(audience=aud, scope=scope) if resp.status_code == requests.codes.ok: - resp_dict = json.loads(resp.text) + try: + resp_dict = json.loads(resp.text) + except Exception as e: + raise token = resp_dict['access_token'] return token else: From fde95ed992f01f0eb98ebaa070c57829f02e738e Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 30 Sep 2021 23:16:39 +0800 Subject: [PATCH 10/33] fix not to use pilot 3 for production workers --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 606c264f..9f12d2b5 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "29-09-2021 09:38:45 on pilot3 (by mightqxc)" +timestamp = "30-09-2021 15:16:40 on pilot3 (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 66ed0c8e..c4af4309 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -626,6 +626,8 @@ def submit_workers(self, workspec_list): is_unified_queue = this_panda_queue_dict.get('capability', '') == 'ucore' pilot_url = associated_params_dict.get('pilot_url') pilot_version = str(this_panda_queue_dict.get('pilot_version', 'current')) + # intentionally omit pilot_version=3 during test phase (already controlld by piloturl) + pilot_version = 'current' if pilot_version.startswith('3') else pilot_version python_version = str(this_panda_queue_dict.get('python_version', '2')) sdf_suffix_str = '_pilot2' From 03ff028836ce2db0b43ce0bbc326a6046b7fbfb8 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Fri, 1 Oct 2021 15:35:49 +0800 Subject: [PATCH 11/33] fix not to use pilot 3 for production workers --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/htcondor_submitter.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 9f12d2b5..84896ac9 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "30-09-2021 15:16:40 on pilot3 (by mightqxc)" +timestamp = "01-10-2021 07:35:49 on pilot3 (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index c4af4309..662b8262 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -409,7 +409,7 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'ioIntensity': io_intensity, 'pilotType': pilot_type_opt, 'pilotUrlOption': pilot_url_str, - 'pilotVersion': pilot_version, + 'pilotVersion': 'current' if pilot_version.startswith('3') else pilot_version, 'pilotPythonOption': submitter_common.get_python_version_option(python_version, prod_source_label), 'submissionHost': workspec.submissionHost, 'submissionHostShort': workspec.submissionHost.split('.')[0], @@ -626,8 +626,6 @@ def submit_workers(self, workspec_list): is_unified_queue = this_panda_queue_dict.get('capability', '') == 'ucore' pilot_url = associated_params_dict.get('pilot_url') pilot_version = str(this_panda_queue_dict.get('pilot_version', 'current')) - # intentionally omit pilot_version=3 during test phase (already controlld by piloturl) - pilot_version = 'current' if pilot_version.startswith('3') else pilot_version python_version = str(this_panda_queue_dict.get('python_version', '2')) sdf_suffix_str = '_pilot2' From 9189bc15161c00ef24ace76f52136fdab88df053 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Fri, 8 Oct 2021 06:45:16 +0800 Subject: [PATCH 12/33] iam_token_cred_manager: support CE list in local file --- pandaharvester/commit_timestamp.py | 2 +- .../iam_token_cred_manager.py | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index b9fb1972..5186bd9b 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "07-10-2021 21:27:58 on flin (by mightqxc)" +timestamp = "07-10-2021 22:45:16 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py index e442a6e5..00689031 100644 --- a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py +++ b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py @@ -44,6 +44,7 @@ def __init__(self, **kwarg): self.target_type = self.setupMap['target_type'] self.out_dir = self.setupMap['out_dir'] self.lifetime = self.setupMap.get('lifetime') + self.target_list_file = self.setupMap.get('target_list_file') except KeyError as e: tmp_log.error('Missing attributes in setup. {0}'.format(traceback.format_exc())) raise @@ -93,8 +94,22 @@ def _handle_target_types(self): # do not generate token if no queues of CE continue except Exception as e: - tmp_log.error('Problem retrieving CEs. {0}'.format(traceback.format_exc())) + tmp_log.error('Problem retrieving CEs from CRIC. {0}'.format(traceback.format_exc())) raise + # retrieve CEs from local file + if self.target_list_file: + try: + with open(self.target_list_file) as f: + for target_str in f.readlines(): + if target_str: + target = target_str.rstrip() + target_attr_dict = { + 'ce_flavour': None, + } + self.targets_dict[target] = target_attr_dict + except Exception as e: + tmp_log.error('Problem retrieving CEs from local file. {0}'.format(traceback.format_exc())) + raise # scope for CE self.scope = WLCG_scopes.COMPUTE_ALL From c28650c5e390fe8df4429931c1caaa05efa9503f Mon Sep 17 00:00:00 2001 From: mightqxc Date: Fri, 8 Oct 2021 21:02:30 +0800 Subject: [PATCH 13/33] htcondor_submitter: suuport analysis credential for non-GU queues --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/htcondor_submitter.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 5186bd9b..dbe544fa 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "07-10-2021 22:45:16 on flin (by mightqxc)" +timestamp = "08-10-2021 13:02:30 on flin (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 662b8262..16def709 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -553,6 +553,11 @@ def __init__(self, **kwarg): self.minBulkToRamdomizedSchedd except AttributeError: self.minBulkToRamdomizedSchedd = 20 + # try to use analysis credentials first + try: + self.useAnalysisCredentials + except AttributeError: + self.useAnalysisCredentials = False # record of information of CE statistics self.ceStatsLock = threading.Lock() self.ceStats = dict() @@ -700,7 +705,7 @@ def _choose_credential(workspec): job_type = workspec.jobType proxy = self.x509UserProxy token_dir = self.tokenDir - if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis'): + if (is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis')) or self.useAnalysisCredentials: if self.x509UserProxyAnalysis: tmpLog.debug('Taking analysis proxy') proxy = self.x509UserProxyAnalysis From b73aa9c0c672b1bd6102ed4b88ed34fef6d42f75 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 12 Oct 2021 00:26:37 +0800 Subject: [PATCH 14/33] pilot 3: allow ptest --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/submitter_common.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index dbe544fa..42dcb638 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "08-10-2021 13:02:30 on flin (by mightqxc)" +timestamp = "11-10-2021 16:26:37 on flin (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index 3cad835c..b3d7f8b1 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -4,7 +4,7 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=None # for pilot 3 is_pilot3 = True if pilot_version.startswith('3') else False # map - # 210929 currently only RC may run pilot 3 + # 211012 currently only RC and PT may run pilot 3 pt_psl_map = { 'RC': { 'prod_source_label': 'rc_test2', @@ -20,7 +20,8 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=None 'PT': { 'prod_source_label': 'ptest', 'pilot_type_opt': 'PR', - 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz', + 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev2.tar.gz' if is_pilot3 \ + else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', }, } pilot_opt_dict = pt_psl_map.get(pilot_type, None) From a0e0d38487f48f805f889a47bff8d43498541595 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 19 Oct 2021 21:51:22 +0800 Subject: [PATCH 15/33] v0.2.10rc --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/panda_pkg_info.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 42dcb638..f0669b26 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "11-10-2021 16:26:37 on flin (by mightqxc)" +timestamp = "19-10-2021 13:51:22 on flin (by mightqxc)" diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 6fb0f8b1..28f8ef89 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.2.9rc0" +release_version = "0.2.10rc0" From ed793fd8b9d03f70b8dc6e62a6b5d9a54a6fd834 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 19 Oct 2021 21:52:21 +0800 Subject: [PATCH 16/33] mysql db proxy: close cursor and connection before reconnect --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestercore/db_proxy.py | 11 +++++++++++ pandaharvester/harvesterfifo/mysql_fifo.py | 11 +++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index f0669b26..c4b88b40 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "19-10-2021 13:51:22 on flin (by mightqxc)" +timestamp = "19-10-2021 13:52:21 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index b038e7ab..8ffbf8fa 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -160,6 +160,17 @@ def _handle_exception(self, exc): try_timestamp = time.time() n_retry = 1 while time.time() - try_timestamp < self.reconnectTimeout: + # close DB cursor + try: + self.cur.close() + except Exception as e: + tmpLog.error('failed to close cursor: {0}'.format(e)) + # close DB connection + try: + self.con.close() + except Exception as e: + tmpLog.error('failed to close connection: {0}'.format(e)) + # restart the proxy instance try: self.__init__() tmpLog.info('renewed connection') diff --git a/pandaharvester/harvesterfifo/mysql_fifo.py b/pandaharvester/harvesterfifo/mysql_fifo.py index f763059e..6145c3f6 100644 --- a/pandaharvester/harvesterfifo/mysql_fifo.py +++ b/pandaharvester/harvesterfifo/mysql_fifo.py @@ -100,6 +100,17 @@ def _wrapped_method(self, *args, **kwargs): try_timestamp = time.time() n_retry = 1 while time.time() - try_timestamp < self.reconnectTimeout: + # close DB cursor + try: + self.cur.close() + except Exception as e: + tmpLog.error('failed to close cursor: {0}'.format(e)) + # close DB connection + try: + self.con.close() + except Exception as e: + tmpLog.error('failed to close connection: {0}'.format(e)) + # restart the proxy instance try: self.__init__() return From 82302671a2adf91993d9a900c7efe7a53e302cf1 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Mon, 25 Oct 2021 20:33:54 +0800 Subject: [PATCH 17/33] mysql: fix --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestercore/db_proxy.py | 19 +++-- pandaharvester/harvesterfifo/mysql_fifo.py | 98 +++++++++++++--------- 3 files changed, 71 insertions(+), 48 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index c4b88b40..42e92f94 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "19-10-2021 13:52:21 on flin (by mightqxc)" +timestamp = "25-10-2021 12:33:54 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 8ffbf8fa..147ec9be 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -71,6 +71,17 @@ def __init__(self, thr_name=None, read_only=False): self.thrName = currentThr.ident if hasattr(harvester_config.db, 'useInspect') and harvester_config.db.useInspect is True: self.useInspect = True + # connect DB + self._connect_db() + self.lockDB = False + # using application side lock if DB doesn't have a mechanism for exclusive access + if harvester_config.db.engine == 'mariadb': + self.usingAppLock = False + else: + self.usingAppLock = True + + # connect DB + def _connect_db(self): if harvester_config.db.engine == 'mariadb': if hasattr(harvester_config.db, 'host'): host = harvester_config.db.host @@ -134,12 +145,6 @@ def fetchall(self): self.cur.execute('PRAGMA journal_mode = WAL') # read to avoid database lock self.cur.fetchone() - self.lockDB = False - # using application side lock if DB doesn't have a mechanism for exclusive access - if harvester_config.db.engine == 'mariadb': - self.usingAppLock = False - else: - self.usingAppLock = True # exception handler for type of DBs def _handle_exception(self, exc): @@ -172,7 +177,7 @@ def _handle_exception(self, exc): tmpLog.error('failed to close connection: {0}'.format(e)) # restart the proxy instance try: - self.__init__() + self._connect_db() tmpLog.info('renewed connection') break except Exception as e: diff --git a/pandaharvester/harvesterfifo/mysql_fifo.py b/pandaharvester/harvesterfifo/mysql_fifo.py index 6145c3f6..dcba48ad 100644 --- a/pandaharvester/harvesterfifo/mysql_fifo.py +++ b/pandaharvester/harvesterfifo/mysql_fifo.py @@ -19,6 +19,19 @@ def __init__(self, **kwarg): self.reconnectTimeout = harvester_config.db.reconnectTimeout PluginBase.__init__(self, **kwarg) self.tableName = '{title}_FIFO'.format(title=self.titleName) + # get connection, cursor and error types + self._connect_db() + # create table for fifo + try: + self._make_table() + # self._make_index() + self.commit() + except Exception as _e: + self.rollback() + raise _e + + # get connection, cursor and error types + def _connect_db(self): # DB access attribues if hasattr(self, 'db_host'): db_host = self.db_host @@ -46,7 +59,6 @@ def __init__(self, **kwarg): db_schema = self.db_schema else: db_schema = harvester_config.fifo.db_schema - # get connection, cursor and error types try: import MySQLdb import MySQLdb.cursors @@ -61,28 +73,10 @@ def __init__(self, **kwarg): self.cur = self.con.cursor(buffered=True) self.OperationalError = mysql.connector.errors.OperationalError else: - class MyCursor (MySQLdb.cursors.Cursor): - def fetchone(self): - tmpRet = MySQLdb.cursors.Cursor.fetchone(self) - if tmpRet is None: - return None - return tmpRet - def fetchall(self): - tmpRets = MySQLdb.cursors.Cursor.fetchall(self) - return tmpRets self.con = MySQLdb.connect(user=db_user, passwd=db_password, - db=db_schema, host=db_host, port=db_port, - cursorclass=MyCursor) + db=db_schema, host=db_host, port=db_port) self.cur = self.con.cursor() self.OperationalError = MySQLdb.OperationalError - # create table for fifo - try: - self._make_table() - # self._make_index() - self.commit() - except Exception as _e: - self.rollback() - raise _e # decorator exception handler for type of DBs def _handle_exception(method): @@ -104,15 +98,15 @@ def _wrapped_method(self, *args, **kwargs): try: self.cur.close() except Exception as e: - tmpLog.error('failed to close cursor: {0}'.format(e)) + pass # close DB connection try: self.con.close() except Exception as e: - tmpLog.error('failed to close connection: {0}'.format(e)) + pass # restart the proxy instance try: - self.__init__() + self._connect_db() return except Exception as _e: exc = _e @@ -289,13 +283,17 @@ def _peek(self, mode='first', id=None, skip_item=False): 'idtemp': sql_peek_by_id_temp, } sql_peek = mode_sql_map[mode] - if mode in ('id', 'idtemp'): - params = (id,) - self.execute(sql_peek, params) - else: - self.execute(sql_peek) - res = self.cur.fetchall() - self.commit() + try: + if mode in ('id', 'idtemp'): + params = (id,) + self.execute(sql_peek, params) + else: + self.execute(sql_peek) + res = self.cur.fetchall() + self.commit() + except Exception as _e: + self.rollback() + raise _e if len(res) > 0: if skip_item: id, score = res[0] @@ -337,7 +335,11 @@ def _update(self, id, item=None, score=None, temporary=None, cond_score=None): params.append(id) if cond_score_str: params.append(score) - self.execute(sql_update, params) + try: + self.execute(sql_update, params) + except Exception as _e: + self.rollback() + raise _e n_row = self.cur.rowcount if n_row == 1: return True @@ -349,8 +351,12 @@ def size(self): sql_size = ( 'SELECT COUNT(id) FROM {table_name}' ).format(table_name=self.tableName) - self.execute(sql_size) - res = self.cur.fetchall() + try: + self.execute(sql_size) + res = self.cur.fetchall() + except Exception as _e: + self.rollback() + raise _e if len(res) > 0: return res[0][0] return None @@ -474,9 +480,13 @@ def peekmany(self, mode='first', minscore=None, maxscore=None, count=None, skip_ ).format(columns=columns_str, table_name=self.tableName, minscore_str=minscore_str, maxscore_str=maxscore_str, rank=mode_rank_map[mode], count_str=count_str) - self.execute(sql_peek_many) - res = self.cur.fetchall() - self.commit() + try: + self.execute(sql_peek_many) + res = self.cur.fetchall() + self.commit() + except Exception as _e: + self.rollback() + raise _e ret_list = [] for _rec in res: if skip_item: @@ -496,7 +506,11 @@ def clear(self): 'DROP TABLE IF EXISTS {table_name} ' ).format(table_name=self.tableName) # self.execute(sql_clear_index) - self.execute(sql_clear_table) + try: + self.execute(sql_clear_table) + except Exception as _e: + self.rollback() + raise _e self.__init__() # delete objects by list of id @@ -506,9 +520,13 @@ def delete(self, ids): placeholders_str = ','.join([' %s'] * len(ids)) sql_delete = sql_delete_template.format( table_name=self.tableName, placeholders=placeholders_str) - self.execute(sql_delete, ids) - n_row = self.cur.rowcount - self.commit() + try: + self.execute(sql_delete, ids) + n_row = self.cur.rowcount + self.commit() + except Exception as _e: + self.rollback() + raise _e return n_row else: raise TypeError('ids should be list or tuple') From 071fbdea33a440787feb7a95d8fd11a24fc246d8 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Mon, 25 Oct 2021 21:03:51 +0800 Subject: [PATCH 18/33] monitor: handle exception about fifo --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvesterbody/monitor.py | 40 ++++++++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 42e92f94..14c81acf 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "25-10-2021 12:33:54 on flin (by mightqxc)" +timestamp = "25-10-2021 13:03:51 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index d4750199..39ee57fd 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -182,19 +182,29 @@ def run(self): while time.time() < last_fifo_cycle_timestamp + fifoCheckDuration: sw.reset() n_loops += 1 - retVal, overhead_time = monitor_fifo.to_check_workers() + try: + retVal, overhead_time = monitor_fifo.to_check_workers() + except Exception as e: + mainLog.error('failed to check workers from FIFO: {0}'.format(e)) if overhead_time is not None: n_chunk_peeked_stat += 1 sum_overhead_time_stat += overhead_time if retVal: # check fifo size - fifo_size = monitor_fifo.size() - mainLog.debug('FIFO size is {0}'.format(fifo_size)) + try: + fifo_size = monitor_fifo.size() + mainLog.debug('FIFO size is {0}'.format(fifo_size)) + except Exception as e: + mainLog.error('failed to get size of FIFO: {0}'.format(e)) + time.sleep(2) + continue mainLog.debug('starting run with FIFO') try: obj_gotten = monitor_fifo.get(timeout=1, protective=fifoProtectiveDequeue) except Exception as errStr: mainLog.error('failed to get object from FIFO: {0}'.format(errStr)) + time.sleep(2) + continue else: if obj_gotten is not None: sw_fifo = core_utils.get_stopwatch() @@ -299,7 +309,10 @@ def run(self): mainLog.error('failed to put object from FIFO head: {0}'.format(errStr)) # delete protective dequeued objects if fifoProtectiveDequeue and len(obj_dequeued_id_list) > 0: - monitor_fifo.delete(ids=obj_dequeued_id_list) + try: + monitor_fifo.delete(ids=obj_dequeued_id_list) + except Exception as e: + mainLog.error('failed to delete object from FIFO: {0}'.format(e)) mainLog.debug('put {0} worker chunks into FIFO'.format(n_chunk_put) + sw.get_elapsed_time()) # adjust adjusted_sleepTime if n_chunk_peeked_stat > 0 and sum_overhead_time_stat > sleepTime: @@ -785,7 +798,11 @@ def monitor_event_digester(self, locked_by, max_events): tmpLog.debug('start') timeNow_timestamp = time.time() retMap = {} - obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True) + try: + obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True) + except Exception as e: + obj_gotten_list = [] + tmpLog.error('monitor_event_fifo excepted with {0}'.format(e)) workerID_list = [ obj_gotten.id for obj_gotten in obj_gotten_list ] tmpLog.debug('got {0} worker events'.format(len(workerID_list))) if len(workerID_list) > 0: @@ -813,10 +830,17 @@ def monitor_event_disposer(self, event_lifetime, max_events): tmpLog = self.make_logger(_logger, 'id=monitor-{0}'.format(self.get_pid()), method_name='monitor_event_disposer') tmpLog.debug('start') timeNow_timestamp = time.time() - obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', + try: + obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', maxscore=(timeNow_timestamp-event_lifetime), count=max_events, temporary=True) + except Exception as e: + obj_gotten_list = [] + tmpLog.error('monitor_event_fifo excepted with {0}'.format(e)) tmpLog.debug('removed {0} events'.format(len(obj_gotten_list))) - n_events = self.monitor_event_fifo.size() - tmpLog.debug('now {0} events in monitor-event fifo'.format(n_events)) + try: + n_events = self.monitor_event_fifo.size() + tmpLog.debug('now {0} events in monitor-event fifo'.format(n_events)) + except Exception as e: + tmpLog.error('failed to get size of monitor-event fifo: {0}'.format(e)) tmpLog.debug('done') From 80625d93be16a23704ad3ad536a449bdfbf0ec19 Mon Sep 17 00:00:00 2001 From: fbarreir Date: Thu, 28 Oct 2021 19:42:06 +0200 Subject: [PATCH 19/33] K8S: GKE script to find nodes and delete nodes with stuck pods in ContainerCreating state --- .../harvestercloud/gke_unhealthy_nodes.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 pandaharvester/harvestercloud/gke_unhealthy_nodes.py diff --git a/pandaharvester/harvestercloud/gke_unhealthy_nodes.py b/pandaharvester/harvestercloud/gke_unhealthy_nodes.py new file mode 100644 index 00000000..efe091be --- /dev/null +++ b/pandaharvester/harvestercloud/gke_unhealthy_nodes.py @@ -0,0 +1,42 @@ +from kubernetes import client, config +import datetime +from subprocess import Popen, PIPE +config.load_kube_config(config_file='PATH TO YOUR CONFIG') +namespace = 'default' + +nodes = [] +current_time = datetime.datetime.now().astimezone() + +corev1 = client.CoreV1Api() +aux = corev1.list_namespaced_pod(namespace=namespace, field_selector='status.phase=Pending') +for item in aux.items: + try: + if item.status.container_statuses[0].state.waiting.reason == 'ContainerCreating' and current_time - item.metadata.creation_timestamp > datetime.timedelta(minutes=30): + if item.spec.node_name not in nodes: + nodes.append(item.spec.node_name) + except Exception: + continue + +# delete the node +command_desc = '/bin/gcloud compute instances describe --format=value[](metadata.items.created-by) {0} --zone={1}' +command_del = "/bin/gcloud compute instance-groups managed delete-instances --instances={0} {1} --zone={2}" + +zones = ['europe-west1-b', 'europe-west1-c', 'europe-west1-d'] + +for node in nodes: + for zone in zones: + command_with_node = command_desc.format(node, zone) + command_list = command_with_node.split(' ') + p = Popen(command_list, stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + if output: + output_str = output[:-1].decode() + command_del_with_vars = command_del.format(node, output_str, zone) + command_del_list = command_del_with_vars.split(' ') + p = Popen(command_del_list, stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + print(command_del_with_vars) + print(output) + print(err) + print("--------------------") + From 5be616bf4bb7db74cdd2cb1c0f8410dc51814b12 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 18 Nov 2021 16:19:33 +0800 Subject: [PATCH 20/33] iam_token_cred_manager: fix --- pandaharvester/commit_timestamp.py | 2 +- .../iam_token_cred_manager.py | 26 ++++++++++++++----- .../harvestersubmitter/htcondor_submitter.py | 2 ++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 14c81acf..092863e4 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "25-10-2021 13:03:51 on flin (by mightqxc)" +timestamp = "18-11-2021 08:19:33 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py index 00689031..1102bb90 100644 --- a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py +++ b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py @@ -1,5 +1,6 @@ import os import json +import re import traceback from .base_cred_manager import BaseCredManager @@ -13,6 +14,11 @@ # allowed target types ALL_TARGET_TYPES = ['ce'] +# default port for CEs +default_port_map = { + 'htcondor-ce': 9619, + } + # credential manager with IAM token class IamTokenCredManager(BaseCredManager): # constructor @@ -79,17 +85,25 @@ def _handle_target_types(self): if ce_q_list: # loop over all ce queues for ce_q in ce_q_list: - ce_status = ce_q.get('ce_status') - if not ce_status or ce_status == 'DISABLED': - # skip disabled ce queues - continue + # ce_status = ce_q.get('ce_status') + # if not ce_status or ce_status == 'DISABLED': + # # skip disabled ce queues + # continue ce_endpoint = ce_q.get('ce_endpoint') + ce_hostname = re.sub(':\w*', '', ce_endpoint) ce_flavour = ce_q.get('ce_flavour') - if ce_endpoint and ce_flavour: + ce_flavour_str = str(ce_flavour).lower() + ce_endpoint_modified = ce_endpoint + if ce_endpoint == ce_hostname: + # no port, add default port + if ce_flavour_str in default_port_map: + default_port = default_port_map[ce_flavour_str] + ce_endpoint_modified = '{0}:{1}'.format(ce_hostname, default_port) + if ce_endpoint_modified and ce_flavour: target_attr_dict = { 'ce_flavour': ce_flavour, } - self.targets_dict[ce_endpoint] = target_attr_dict + self.targets_dict[ce_endpoint_modified] = target_attr_dict else: # do not generate token if no queues of CE continue diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 16def709..110b3cc0 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -369,6 +369,8 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e token_path = None if token_dir is not None and token_filename is not None: token_path = os.path.join(token_dir, token_filename) + else: + tmpLog.warning('token_path is None: site={0}, token_dir={1} , token_filename={2}'.format(panda_queue_name, token_dir, token_filename)) # open tmpfile as submit description file tmpFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_submit.sdf', dir=workspec.get_access_point()) # placeholder map From 5ad1ef722011c0695f12611eb45712574709bb83 Mon Sep 17 00:00:00 2001 From: Fernando Barreiro Date: Mon, 6 Dec 2021 09:34:06 +0100 Subject: [PATCH 21/33] Update workers with pilot status information (#128) * starting pilot worker status synchronization * worker sync kill implementation --- pandaharvester/harvesterbody/monitor.py | 18 +- pandaharvester/harvesterbody/propagator.py | 2 +- pandaharvester/harvesterbody/sweeper.py | 190 ++++++++++-------- .../harvestercloud/aws_unhealthy_nodes.py | 1 - pandaharvester/harvestercore/command_spec.py | 3 + pandaharvester/harvestercore/db_proxy.py | 49 +++-- .../harvestersubmitter/k8s_submitter.py | 3 +- .../harvestersubmitter/submitter_common.py | 2 +- .../harvestersweeper/htcondor_sweeper.py | 4 +- 9 files changed, 152 insertions(+), 120 deletions(-) diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index 39ee57fd..cb300106 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -668,7 +668,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, timeNow - workSpec.checkTime > datetime.timedelta(seconds=checkTimeout): # kill due to timeout tmp_log.debug('kill workerID={0} due to consecutive check failures'.format(workerID)) - self.dbProxy.kill_worker(workSpec.workerID) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) newStatus = WorkSpec.ST_cancelled diagMessage = 'Killed by Harvester due to consecutive worker check failures. ' + diagMessage workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) @@ -678,13 +678,13 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, # request kill if messenger.kill_requested(workSpec): tmp_log.debug('kill workerID={0} as requested'.format(workerID)) - self.dbProxy.kill_worker(workSpec.workerID) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) # stuck queuing for too long if workSpec.status == WorkSpec.ST_submitted \ and timeNow > workSpec.submitTime + datetime.timedelta(seconds=workerQueueTimeLimit): tmp_log.debug('kill workerID={0} due to queuing longer than {1} seconds'.format( workerID, workerQueueTimeLimit)) - self.dbProxy.kill_worker(workSpec.workerID) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) diagMessage = 'Killed by Harvester due to worker queuing too long. ' + diagMessage workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) # set closed @@ -702,9 +702,8 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, if messenger.is_alive(workSpec, worker_heartbeat_limit): tmp_log.debug('heartbeat for workerID={0} is valid'.format(workerID)) else: - tmp_log.debug('heartbeat for workerID={0} expired: sending kill request'.format( - workerID)) - self.dbProxy.kill_worker(workSpec.workerID) + tmp_log.debug('heartbeat for workerID={0} expired: sending kill request'.format(workerID)) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) diagMessage = 'Killed by Harvester due to worker heartbeat expired. ' + diagMessage workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) # get work attributes @@ -738,7 +737,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, newStatus = WorkSpec.ST_idle elif not workSpec.is_post_processed(): if (not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot) \ - or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing): + or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing): # post processing unless heartbeat is suppressed jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, True, @@ -796,7 +795,6 @@ def monitor_event_deliverer(self, time_window): def monitor_event_digester(self, locked_by, max_events): tmpLog = self.make_logger(_logger, 'id=monitor-{0}'.format(self.get_pid()), method_name='monitor_event_digester') tmpLog.debug('start') - timeNow_timestamp = time.time() retMap = {} try: obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True) @@ -814,8 +812,8 @@ def monitor_event_digester(self, locked_by, max_events): tmpLog.debug('checking workers of queueName={0} configID={1}'.format(*qc_key)) try: retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList, - from_fifo=True, config_id=configID, - check_source='Event') + from_fifo=True, config_id=configID, + check_source='Event') except Exception as e: tmpLog.error('monitor_agent_core excepted with {0}'.format(e)) retVal = None # skip the loop diff --git a/pandaharvester/harvesterbody/propagator.py b/pandaharvester/harvesterbody/propagator.py index df118a52..cca25093 100644 --- a/pandaharvester/harvesterbody/propagator.py +++ b/pandaharvester/harvesterbody/propagator.py @@ -107,7 +107,7 @@ def run(self): tmpRet['command'] = 'tobekilled' # got kill command if 'command' in tmpRet and tmpRet['command'] in ['tobekilled']: - nWorkers = self.dbProxy.kill_workers_with_job(tmpJobSpec.PandaID) + nWorkers = self.dbProxy.mark_workers_to_kill_by_pandaid(tmpJobSpec.PandaID) if nWorkers == 0: # no workers tmpJobSpec.status = 'cancelled' diff --git a/pandaharvester/harvesterbody/sweeper.py b/pandaharvester/harvesterbody/sweeper.py index a96e440d..1aaa912c 100644 --- a/pandaharvester/harvesterbody/sweeper.py +++ b/pandaharvester/harvesterbody/sweeper.py @@ -24,153 +24,179 @@ def __init__(self, queue_config_mapper, single_mode=False): self.dbProxy = DBProxy() self.queueConfigMapper = queue_config_mapper self.pluginFactory = PluginFactory() + self.lockedBy = None + def process_kill_commands(self): + # process commands for marking workers that need to be killed + + tmp_log = self.make_logger(_logger, 'id={0}'.format(self.lockedBy), method_name='process_commands') + + # 1. KILL_WORKER commands that were sent to panda server and forwarded to harvester + stopwatch = core_utils.get_stopwatch() + command_string = CommandSpec.COM_killWorkers + tmp_log.debug('try to get {0} commands'.format(command_string)) + command_specs = self.dbProxy.get_commands_for_receiver('sweeper', command_string) + tmp_log.debug('got {0} {1} commands'.format(len(command_specs), command_string)) + for command_spec in command_specs: + n_to_kill = self.dbProxy.mark_workers_to_kill_by_query(command_spec.params) + tmp_log.debug('will kill {0} workers with {1}'.format(n_to_kill, command_spec.params)) + tmp_log.debug('done handling {0} commands took {1}s'.format(command_string, stopwatch.get_elapsed_time())) + + # 2. SYNC_WORKERS_KILL commands from comparing worker status provided by pilot and harvester + stopwatch = core_utils.get_stopwatch() + command_string = CommandSpec.COM_syncWorkersKill + tmp_log.debug('try to get {0} commands'.format(command_string)) + command_specs = self.dbProxy.get_commands_for_receiver('sweeper', command_string) + tmp_log.debug('got {0} {1} commands'.format(len(command_specs), command_string)) + for command_spec in command_specs: + n_to_kill = self.dbProxy.mark_workers_to_kill_by_workerids(command_spec.params) + tmp_log.debug('will kill {0} workers with {1}'.format(n_to_kill, command_spec.params)) + tmp_log.debug('done handling {0} commands took {1}s'.format(command_string, stopwatch.get_elapsed_time())) # main loop def run(self): - lockedBy = 'sweeper-{0}'.format(self.get_pid()) + self.lockedBy = 'sweeper-{0}'.format(self.get_pid()) while True: sw_main = core_utils.get_stopwatch() - mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run') - # get commands to kill - sw_getcomm = core_utils.get_stopwatch() - mainLog.debug('try to get commands') - comStr = CommandSpec.COM_killWorkers - commandSpecs = self.dbProxy.get_commands_for_receiver('sweeper', comStr) - mainLog.debug('got {0} {1} commands'.format(len(commandSpecs), comStr)) - for commandSpec in commandSpecs: - n_to_kill = self.dbProxy.kill_workers_by_query(commandSpec.params) - mainLog.debug('will kill {0} workers with {1}'.format(n_to_kill, commandSpec.params)) - mainLog.debug('done handling commands' + sw_getcomm.get_elapsed_time()) - # killing stage + main_log = self.make_logger(_logger, 'id={0}'.format(self.lockedBy), method_name='run') + + # process commands that mark workers to be killed + try: + self.process_kill_commands() + except Exception: + core_utils.dump_error_message(main_log) + + # actual killing stage sw_kill = core_utils.get_stopwatch() - mainLog.debug('try to get workers to kill') + main_log.debug('try to get workers to kill') # get workers to kill - workersToKill = self.dbProxy.get_workers_to_kill(harvester_config.sweeper.maxWorkers, + workers_to_kill = self.dbProxy.get_workers_to_kill(harvester_config.sweeper.maxWorkers, harvester_config.sweeper.checkInterval) - mainLog.debug('got {0} queues to kill workers'.format(len(workersToKill))) + main_log.debug('got {0} queues to kill workers'.format(len(workers_to_kill))) # loop over all workers sw = core_utils.get_stopwatch() - for queueName, configIdWorkSpecList in iteritems(workersToKill): + for queue_name, configIdWorkSpecList in iteritems(workers_to_kill): for configID, workspec_list in iteritems(configIdWorkSpecList): # get sweeper - if not self.queueConfigMapper.has_queue(queueName, configID): - mainLog.error('queue config for {0}/{1} not found'.format(queueName, configID)) + if not self.queueConfigMapper.has_queue(queue_name, configID): + main_log.error('queue config for {0}/{1} not found'.format(queue_name, configID)) continue - queueConfig = self.queueConfigMapper.get_queue(queueName, configID) + queue_config = self.queueConfigMapper.get_queue(queue_name, configID) try: - sweeperCore = self.pluginFactory.get_plugin(queueConfig.sweeper) + sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper) except Exception: - mainLog.error('failed to launch sweeper plugin for {0}/{1}'.format(queueName, configID)) - core_utils.dump_error_message(mainLog) + main_log.error('failed to launch sweeper plugin for {0}/{1}'.format(queue_name, configID)) + core_utils.dump_error_message(main_log) continue sw.reset() n_workers = len(workspec_list) try: # try bulk method - tmpLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run') - tmpLog.debug('start killing') - tmpList = sweeperCore.kill_workers(workspec_list) + tmp_log = self.make_logger(_logger, 'id={0}'.format(self.lockedBy), method_name='run') + tmp_log.debug('start killing') + tmp_list = sweeper_core.kill_workers(workspec_list) except AttributeError: # fall back to single-worker method for workspec in workspec_list: - tmpLog = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='run') + tmp_log = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), + method_name='run') try: - tmpLog.debug('start killing one worker') - tmpStat, tmpOut = sweeperCore.kill_worker(workspec) - tmpLog.debug('done killing with status={0} diag={1}'.format(tmpStat, tmpOut)) + tmp_log.debug('start killing one worker') + tmp_stat, tmp_out = sweeper_core.kill_worker(workspec) + tmp_log.debug('done killing with status={0} diag={1}'.format(tmp_stat, tmp_out)) except Exception: - core_utils.dump_error_message(tmpLog) + core_utils.dump_error_message(tmp_log) except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) else: # bulk method n_killed = 0 - for workspec, (tmpStat, tmpOut) in zip(workspec_list, tmpList): - tmpLog.debug('done killing workerID={0} with status={1} diag={2}'.format( - workspec.workerID, tmpStat, tmpOut)) - if tmpStat: + for workspec, (tmp_stat, tmp_out) in zip(workspec_list, tmp_list): + tmp_log.debug('done killing workerID={0} with status={1} diag={2}'.format( + workspec.workerID, tmp_stat, tmp_out)) + if tmp_stat: n_killed += 1 - tmpLog.debug('killed {0}/{1} workers'.format(n_killed, n_workers)) - mainLog.debug('done killing {0} workers'.format(n_workers) + sw.get_elapsed_time()) - mainLog.debug('done all killing' + sw_kill.get_elapsed_time()) + tmp_log.debug('killed {0}/{1} workers'.format(n_killed, n_workers)) + main_log.debug('done killing {0} workers'.format(n_workers) + sw.get_elapsed_time()) + main_log.debug('done all killing' + sw_kill.get_elapsed_time()) + # cleanup stage sw_cleanup = core_utils.get_stopwatch() # timeout for missed try: - keepMissed = harvester_config.sweeper.keepMissed + keep_missed = harvester_config.sweeper.keepMissed except Exception: - keepMissed = 24 + keep_missed = 24 try: - keepPending = harvester_config.sweeper.keepPending + keep_pending = harvester_config.sweeper.keepPending except Exception: - keepPending = 24 + keep_pending = 24 # get workers for cleanup statusTimeoutMap = {'finished': harvester_config.sweeper.keepFinished, 'failed': harvester_config.sweeper.keepFailed, 'cancelled': harvester_config.sweeper.keepCancelled, - 'missed': keepMissed, - 'pending': keepPending + 'missed': keep_missed, + 'pending': keep_pending } workersForCleanup = self.dbProxy.get_workers_for_cleanup(harvester_config.sweeper.maxWorkers, statusTimeoutMap) - mainLog.debug('got {0} queues for workers cleanup'.format(len(workersForCleanup))) + main_log.debug('got {0} queues for workers cleanup'.format(len(workersForCleanup))) sw = core_utils.get_stopwatch() - for queueName, configIdWorkSpecList in iteritems(workersForCleanup): + for queue_name, configIdWorkSpecList in iteritems(workersForCleanup): for configID, workspec_list in iteritems(configIdWorkSpecList): # get sweeper - if not self.queueConfigMapper.has_queue(queueName, configID): - mainLog.error('queue config for {0}/{1} not found'.format(queueName, configID)) + if not self.queueConfigMapper.has_queue(queue_name, configID): + main_log.error('queue config for {0}/{1} not found'.format(queue_name, configID)) continue - queueConfig = self.queueConfigMapper.get_queue(queueName, configID) - sweeperCore = self.pluginFactory.get_plugin(queueConfig.sweeper) - messenger = self.pluginFactory.get_plugin(queueConfig.messenger) + queue_config = self.queueConfigMapper.get_queue(queue_name, configID) + sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper) + messenger = self.pluginFactory.get_plugin(queue_config.messenger) sw.reset() n_workers = len(workspec_list) # make sure workers to clean up are all terminated - mainLog.debug('making sure workers to clean up are all terminated') + main_log.debug('making sure workers to clean up are all terminated') try: # try bulk method - tmpList = sweeperCore.kill_workers(workspec_list) + tmp_list = sweeper_core.kill_workers(workspec_list) except AttributeError: # fall back to single-worker method for workspec in workspec_list: - tmpLog = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='run') + tmp_log = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), + method_name='run') try: - tmpStat, tmpOut = sweeperCore.kill_worker(workspec) + tmp_stat, tmp_out = sweeper_core.kill_worker(workspec) except Exception: - core_utils.dump_error_message(tmpLog) + core_utils.dump_error_message(tmp_log) except Exception: - core_utils.dump_error_message(mainLog) - mainLog.debug('made sure workers to clean up are all terminated') + core_utils.dump_error_message(main_log) + main_log.debug('made sure workers to clean up are all terminated') # start cleanup for workspec in workspec_list: - tmpLog = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='run') + tmp_log = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), + method_name='run') try: - tmpLog.debug('start cleaning up one worker') + tmp_log.debug('start cleaning up one worker') # sweep worker - tmpStat, tmpOut = sweeperCore.sweep_worker(workspec) - tmpLog.debug('swept_worker with status={0} diag={1}'.format(tmpStat, tmpOut)) - tmpLog.debug('start messenger cleanup') - mc_tmpStat, mc_tmpOut = messenger.clean_up(workspec) - tmpLog.debug('messenger cleaned up with status={0} diag={1}'.format(mc_tmpStat, mc_tmpOut)) - if tmpStat: + tmp_stat, tmp_out = sweeper_core.sweep_worker(workspec) + tmp_log.debug('swept_worker with status={0} diag={1}'.format(tmp_stat, tmp_out)) + tmp_log.debug('start messenger cleanup') + mc_tmp_stat, mc_tmp_out = messenger.clean_up(workspec) + tmp_log.debug('messenger cleaned up with status={0} diag={1}'.format(mc_tmp_stat, mc_tmp_out)) + if tmp_stat: self.dbProxy.delete_worker(workspec.workerID) except Exception: - core_utils.dump_error_message(tmpLog) - mainLog.debug('done cleaning up {0} workers'.format(n_workers) + sw.get_elapsed_time()) - mainLog.debug('done all cleanup' + sw_cleanup.get_elapsed_time()) + core_utils.dump_error_message(tmp_log) + main_log.debug('done cleaning up {0} workers'.format(n_workers) + sw.get_elapsed_time()) + main_log.debug('done all cleanup' + sw_cleanup.get_elapsed_time()) + # old-job-deletion stage sw_delete = core_utils.get_stopwatch() - mainLog.debug('delete old jobs') + main_log.debug('delete old jobs') jobTimeout = max(statusTimeoutMap.values()) + 1 self.dbProxy.delete_old_jobs(jobTimeout) # delete orphaned job info self.dbProxy.delete_orphaned_job_info() - mainLog.debug('done deletion of old jobs' + sw_delete.get_elapsed_time()) + main_log.debug('done deletion of old jobs' + sw_delete.get_elapsed_time()) # disk cleanup if hasattr(harvester_config.sweeper, 'diskCleanUpInterval') and \ hasattr(harvester_config.sweeper, 'diskHighWatermark'): @@ -182,7 +208,7 @@ def run(self): for item in harvester_config.sweeper.diskHighWatermark.split(','): # dir name and watermark in GB dir_name, watermark = item.split('|') - mainLog.debug('checking {0} for cleanup with watermark {1} GB'.format(dir_name, watermark)) + main_log.debug('checking {0} for cleanup with watermark {1} GB'.format(dir_name, watermark)) watermark = int(watermark) * 10**9 total_size = 0 file_dict = {} @@ -197,11 +223,11 @@ def run(self): file_dict[mtime].add((base_name, full_name, f_size)) # delete if necessary if total_size < watermark: - mainLog.debug( + main_log.debug( 'skip cleanup {0} due to total_size {1} GB < watermark {2} GB'.format( dir_name, total_size//(10**9), watermark//(10**9))) else: - mainLog.debug( + main_log.debug( 'cleanup {0} due to total_size {1} GB >= watermark {2} GB'.format( dir_name, total_size//(10**9), watermark//(10**9))) # get active input files @@ -217,17 +243,17 @@ def run(self): try: os.remove(full_name) except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) deleted_size += f_size if total_size - deleted_size < watermark: break if total_size - deleted_size < watermark: break except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) # time the cycle - mainLog.debug('done a sweeper cycle' + sw_main.get_elapsed_time()) + main_log.debug('done a sweeper cycle' + sw_main.get_elapsed_time()) # check if being terminated if self.terminated(harvester_config.sweeper.sleepTime): - mainLog.debug('terminated') + main_log.debug('terminated') return diff --git a/pandaharvester/harvestercloud/aws_unhealthy_nodes.py b/pandaharvester/harvestercloud/aws_unhealthy_nodes.py index d6f5aecb..d03e9d1e 100644 --- a/pandaharvester/harvestercloud/aws_unhealthy_nodes.py +++ b/pandaharvester/harvestercloud/aws_unhealthy_nodes.py @@ -31,7 +31,6 @@ print('------------------------------------') print(command_with_id) print('return code: {0}'.format(p.returncode)) - print('return code: {0}'.format(p.returncode)) print('output: {0}'.format(output)) print('err: {0}'.format(err)) print('------------------------------------') \ No newline at end of file diff --git a/pandaharvester/harvestercore/command_spec.py b/pandaharvester/harvestercore/command_spec.py index ac18f30a..fe6253d0 100644 --- a/pandaharvester/harvestercore/command_spec.py +++ b/pandaharvester/harvestercore/command_spec.py @@ -18,11 +18,14 @@ class CommandSpec(SpecBase): COM_reportWorkerStats = 'REPORT_WORKER_STATS' COM_setNWorkers = 'SET_N_WORKERS_JOBTYPE' COM_killWorkers = 'KILL_WORKERS' + COM_syncWorkersKill = 'SYNC_WORKERS_KILL' + # mapping between command and receiver receiver_map = { COM_reportWorkerStats: 'propagator', COM_setNWorkers: 'submitter', COM_killWorkers: 'sweeper', + COM_syncWorkersKill: 'sweeper' } # constructor diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 147ec9be..60aba3fb 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3515,11 +3515,11 @@ def get_worker_stats_bulk(self, active_ups_queues): return {} # send kill command to workers associated to a job - def kill_workers_with_job(self, panda_id): + def mark_workers_to_kill_by_pandaid(self, panda_id): try: # get logger tmpLog = core_utils.make_logger(_logger, 'PandaID={0}'.format(panda_id), - method_name='kill_workers_with_job') + method_name='mark_workers_to_kill_by_pandaid') tmpLog.debug('start') # sql to set killTime sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) @@ -3557,26 +3557,27 @@ def kill_workers_with_job(self, panda_id): # return return None - # send kill command to a worker - def kill_worker(self, worker_id): + # send kill command to workers + def mark_workers_to_kill_by_workerids(self, worker_ids): try: # get logger - tmpLog = core_utils.make_logger(_logger, 'workerID={0}'.format(worker_id), - method_name='kill_worker') + tmpLog = core_utils.make_logger(_logger, method_name='mark_workers_to_kill_by_workerids') tmpLog.debug('start') # sql to set killTime sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) " # set an older time to trigger sweeper setTime = datetime.datetime.utcnow() - datetime.timedelta(hours=6) - # set killTime - varMap = dict() - varMap[':workerID'] = worker_id - varMap[':setTime'] = setTime - varMap[':st1'] = WorkSpec.ST_finished - varMap[':st2'] = WorkSpec.ST_failed - varMap[':st3'] = WorkSpec.ST_cancelled - self.execute(sqlL, varMap) + varMaps = [] + for worker_id in worker_ids: + varMap = dict() + varMap[':workerID'] = worker_id + varMap[':setTime'] = setTime + varMap[':st1'] = WorkSpec.ST_finished + varMap[':st2'] = WorkSpec.ST_failed + varMap[':st3'] = WorkSpec.ST_cancelled + varMaps.append(varMap) + self.executemany(sqlL, varMaps) nRow = self.cur.rowcount # commit self.commit() @@ -5514,21 +5515,23 @@ def get_workers_from_ids(self, ids): return {} # send kill command to workers by query - def kill_workers_by_query(self, params): + def mark_workers_to_kill_by_query(self, params): try: # get logger - tmpLog = core_utils.make_logger(_logger, method_name='kill_workers_by_query') + tmpLog = core_utils.make_logger(_logger, method_name='mark_workers_to_kill_by_query') tmpLog.debug('start') + # sql to set killTime sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) " + # sql to get workers constraints_query_string_list = [] tmp_varMap = {} constraint_map = {'status': params.get('status', [WorkSpec.ST_submitted]), - 'computingSite': params.get('computingSite', []), - 'computingElement': params.get('computingElement', []), - 'submissionHost': params.get('submissionHost', [])} + 'computingSite': params.get('computingSite', []), + 'computingElement': params.get('computingElement', []), + 'submissionHost': params.get('submissionHost', [])} tmpLog.debug('query {0}'.format(constraint_map)) for attribute, match_list in iteritems(constraint_map): if match_list == 'ALL': @@ -5537,13 +5540,14 @@ def kill_workers_by_query(self, params): tmpLog.debug('{0} constraint is not specified in the query. Skipped'.format(attribute)) return 0 else: - one_param_list = [ ':param_{0}_{1}'.format(attribute, v_i) for v_i in range(len(match_list)) ] + one_param_list = [':param_{0}_{1}'.format(attribute, v_i) for v_i in range(len(match_list))] tmp_varMap.update(zip(one_param_list, match_list)) params_string = '(' + ','.join(one_param_list) + ')' constraints_query_string_list.append('{0} IN {1}'.format(attribute, params_string)) - constranits_query_string = ' AND '.join(constraints_query_string_list) + constraints_query_string = ' AND '.join(constraints_query_string_list) sqlW = "SELECT workerID FROM {0} ".format(workTableName) - sqlW += "WHERE {0} ".format(constranits_query_string) + sqlW += "WHERE {0} ".format(constraints_query_string) + # set an older time to trigger sweeper setTime = datetime.datetime.utcnow() - datetime.timedelta(hours=6) # get workers @@ -5562,6 +5566,7 @@ def kill_workers_by_query(self, params): varMap[':st3'] = WorkSpec.ST_cancelled self.execute(sqlL, varMap) nRow += self.cur.rowcount + # commit self.commit() tmpLog.debug('set killTime to {0} workers'.format(nRow)) diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index dc4629fa..e134e7a2 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -220,7 +220,8 @@ def submit_k8s_worker(self, work_spec): python_version = str(this_panda_queue_dict.get('python_version', '2')) # prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) - pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType) + pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType, pilot_url, + pilot_version) if pilot_opt_dict is None: prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) pilot_type = work_spec.pilotType diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index b3d7f8b1..f81431c6 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -1,6 +1,6 @@ # Map "pilotType" (defined in harvester) to prodSourceLabel and pilotType option (defined in pilot, -i option) # and piloturl (pilot option --piloturl) for pilot 2 -def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=None): +def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=""): # for pilot 3 is_pilot3 = True if pilot_version.startswith('3') else False # map diff --git a/pandaharvester/harvestersweeper/htcondor_sweeper.py b/pandaharvester/harvestersweeper/htcondor_sweeper.py index 7cc30daa..30e0f710 100644 --- a/pandaharvester/harvestersweeper/htcondor_sweeper.py +++ b/pandaharvester/harvestersweeper/htcondor_sweeper.py @@ -103,10 +103,10 @@ def kill_workers(self, workspec_list): # Fill return list for workspec in workspec_list: if workspec.batchID is None: - ret = (True, 'worker withoug batchID; skipped') + ret = (True, 'worker without batchID; skipped') else: ret = all_job_ret_map.get(condor_job_id_from_workspec(workspec), - (False, 'batch job unfound in return map')) + (False, 'batch job not found in return map')) retList.append(ret) tmpLog.debug('done') # Return From d227266e83124c9a588355f6e2ee93ee70bfd526 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 8 Dec 2021 23:18:49 +0800 Subject: [PATCH 22/33] htcondor submitter: allow pilot 3 for all jobs --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 +- pandaharvester/harvestersubmitter/submitter_common.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 092863e4..125b2264 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "18-11-2021 08:19:33 on flin (by mightqxc)" +timestamp = "08-12-2021 15:18:49 on flin (by mightqxc)" diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 110b3cc0..97b240f6 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -411,7 +411,7 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'ioIntensity': io_intensity, 'pilotType': pilot_type_opt, 'pilotUrlOption': pilot_url_str, - 'pilotVersion': 'current' if pilot_version.startswith('3') else pilot_version, + 'pilotVersion': pilot_version, 'pilotPythonOption': submitter_common.get_python_version_option(python_version, prod_source_label), 'submissionHost': workspec.submissionHost, 'submissionHostShort': workspec.submissionHost.split('.')[0], diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index f81431c6..8acc3059 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -21,7 +21,7 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=""): 'prod_source_label': 'ptest', 'pilot_type_opt': 'PR', 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev2.tar.gz' if is_pilot3 \ - else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', + else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz', }, } pilot_opt_dict = pt_psl_map.get(pilot_type, None) From d5519ee5efa3313668d0be239b98a3e79adc983b Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 24 Jan 2022 09:43:25 +0100 Subject: [PATCH 23/33] Added pilotVersion argument --- .../harvestercloud/pilots_starter.py | 24 ++++++++++++------- pandaharvester/harvestermisc/k8s_utils.py | 3 ++- .../harvestersubmitter/k8s_submitter.py | 2 +- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/pandaharvester/harvestercloud/pilots_starter.py b/pandaharvester/harvestercloud/pilots_starter.py index 87ef8226..9366e855 100644 --- a/pandaharvester/harvestercloud/pilots_starter.py +++ b/pandaharvester/harvestercloud/pilots_starter.py @@ -147,6 +147,9 @@ def get_configuration(): python_option = os.environ.get('pythonOption', '') logging.debug('[main] got pythonOption: {0}'.format(python_option)) + pilot_version = os.environ.get('pilotVersion', '') + logging.debug('[main] got pilotVersion: {0}'.format(pilot_version)) + # get the Harvester ID harvester_id = os.environ.get('HARVESTER_ID') logging.debug('[main] got Harvester ID: {0}'.format(harvester_id)) @@ -182,15 +185,16 @@ def get_configuration(): WORK_DIR = tmpdir return proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, \ - pilot_url_option, python_option, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, stdout_name, \ - submit_mode + pilot_url_option, python_option, pilot_version, harvester_id, worker_id, logs_frontend_w, \ + logs_frontend_r, stdout_name, submit_mode if __name__ == "__main__": # get all the configuration from environment proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, pilot_url_opt, \ - python_option, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, destination_name, submit_mode \ + python_option, pilot_version, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, \ + destination_name, submit_mode = get_configuration() # the pilot should propagate the download link via the pilotId field in the job table @@ -216,11 +220,15 @@ def get_configuration(): if pilot_type: pilot_type_option = '-i {0}'.format(pilot_type) - wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6} {7} {8} {9}'.format(WORK_DIR, panda_site, panda_queue, - panda_queue, resource_type_option, - psl_option, pilot_type_option, - job_type_option, pilot_url_opt, - python_option) + pilot_version_option = '--pilotversion 2' + if pilot_version: + pilot_version_option = '--pilotversion {0}'.format(pilot_version) + + wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6} {7} {8} {9} {10}'.format(WORK_DIR, panda_site, panda_queue, + panda_queue, resource_type_option, + psl_option, pilot_type_option, + job_type_option, pilot_url_opt, + python_option, pilot_version_option) if submit_mode == 'PUSH': # job configuration files need to be copied, because k8s configmap mounts as read-only file system diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 744d031e..9e6b92eb 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -41,7 +41,7 @@ def read_yaml_file(self, yaml_file): return yaml_content def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, - pilot_python_option, container_image, executable, args, + pilot_python_option, pilot_version, container_image, executable, args, cert, max_time=None): tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), @@ -165,6 +165,7 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, pilot {'name': 'pilotType', 'value': pilot_type}, {'name': 'pilotUrlOpt', 'value': pilot_url_str}, {'name': 'pythonOption', 'value': pilot_python_option}, + {'name': 'pilotVersion', 'value': pilot_version}, {'name': 'jobType', 'value': work_spec.jobType}, {'name': 'proxySecretPath', 'value': cert}, {'name': 'workerID', 'value': str(work_spec.workerID)}, diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index e134e7a2..8afcf0f9 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -236,7 +236,7 @@ def submit_k8s_worker(self, work_spec): # submit the worker rsp, yaml_content_final = self.k8s_client.create_job_from_yaml(yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, - pilot_python_option, + pilot_python_option, pilot_version, container_image, executable, args, cert, max_time=max_time) except Exception as _e: From 35ae7b0b4a46d79710880fe23c7a16c97a3d8f3b Mon Sep 17 00:00:00 2001 From: fbarreir Date: Mon, 24 Jan 2022 09:56:48 +0100 Subject: [PATCH 24/33] Typo --- pandaharvester/harvestercloud/pilots_starter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandaharvester/harvestercloud/pilots_starter.py b/pandaharvester/harvestercloud/pilots_starter.py index 9366e855..313a95b8 100644 --- a/pandaharvester/harvestercloud/pilots_starter.py +++ b/pandaharvester/harvestercloud/pilots_starter.py @@ -194,8 +194,7 @@ def get_configuration(): # get all the configuration from environment proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, pilot_url_opt, \ python_option, pilot_version, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, \ - destination_name, submit_mode - = get_configuration() + destination_name, submit_mode = get_configuration() # the pilot should propagate the download link via the pilotId field in the job table log_download_url = '{0}/{1}'.format(logs_frontend_r, destination_name) From 4a40a0df069dc31acc9d48e0034ab1250c9abad7 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Fri, 28 Jan 2022 10:22:55 +0100 Subject: [PATCH 25/33] configmap --- Dockerfile.txt | 26 +++++++++++++++++++ .../harvesterconfig/harvester_config.py | 12 +++++++++ 2 files changed, 38 insertions(+) create mode 100644 Dockerfile.txt diff --git a/Dockerfile.txt b/Dockerfile.txt new file mode 100644 index 00000000..45349845 --- /dev/null +++ b/Dockerfile.txt @@ -0,0 +1,26 @@ +FROM docker.io/centos:7 + +RUN yum update -y +RUN yum install -y epel-release +RUN yum install -y python3 python3-devel gcc less git mysql-devel +RUN python3 -m venv /opt/harvester +RUN /opt/harvester/bin/pip install -U pip +RUN /opt/harvester/bin/pip install -U setuptools +RUN /opt/harvester/bin/pip install -U mysqlclient uWSGI +RUN /opt/harvester/bin/pip install git+git://github.com/HSF/harvester.git + +RUN mv /opt/harvester/etc/sysconfig/panda_harvester.rpmnew.template /opt/harvester/etc/sysconfig/panda_harvester +RUN mv /opt/harvester/etc/panda/panda_common.cfg.rpmnew /opt/harvester/etc/panda/panda_common.cfg +RUN mv /opt/harvester/etc/panda/panda_harvester.cfg.rpmnew.template /opt/harvester/etc/panda/panda_harvester.cfg +RUN mv /opt/harvester/etc/panda/panda_harvester-uwsgi.ini.rpmnew.template /opt/harvester/etc/panda/panda_harvester-uwsgi.ini + +RUN ln -fs /opt/harvester/etc/queue_config/panda_queueconfig.json /opt/harvester/etc/panda/panda_queueconfig.json + +RUN adduser atlpan +RUN groupadd zp +RUN usermod -a -G zp atlpan + +RUN mkdir -p /var/log/panda +RUN chown -R atlpan:zp /var/log/panda + +CMD exec /bin/bash -c "trap : TERM INT; sleep infinity & wait" diff --git a/pandaharvester/harvesterconfig/harvester_config.py b/pandaharvester/harvesterconfig/harvester_config.py index f27ad5ed..09cf5872 100644 --- a/pandaharvester/harvesterconfig/harvester_config.py +++ b/pandaharvester/harvesterconfig/harvester_config.py @@ -26,10 +26,22 @@ def __init__(self): pass +# load configmap +config_map_data = {} +if 'PANDA_HOME' in os.environ: + config_map_name = 'panda_harvester_configmap.json' + config_map_path = os.path.join(os.environ['PANDA_HOME'], 'etc/configmap', config_map_name) + if os.path.exists(config_map_path): + with open(config_map_path) as f: + config_map_data = json.load(f) + # loop over all sections for tmpSection in tmpConf.sections(): # read section tmpDict = getattr(tmpConf, tmpSection) + # load configmap + if tmpSection in config_map_data: + tmpDict.update(config_map_data[tmpSection]) # make section class tmpSelf = _SectionClass() # update module dict From 1114d93b0889640ae1466a5a525d99f677aaf9e1 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Fri, 28 Jan 2022 10:24:44 +0100 Subject: [PATCH 26/33] configmap --- Dockerfile.txt => Dockerfile | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename Dockerfile.txt => Dockerfile (100%) diff --git a/Dockerfile.txt b/Dockerfile similarity index 100% rename from Dockerfile.txt rename to Dockerfile From 362e467ed5fd48950a0db9a3633daf3d5303bd75 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Fri, 28 Jan 2022 12:39:05 +0100 Subject: [PATCH 27/33] configmap --- Dockerfile | 4 ++++ pandaharvester/harvesterconfig/harvester_config.py | 6 ++++-- pandaharvester/harvestercore/db_proxy.py | 3 ++- .../init.d/panda_harvester-uwsgi.rpmnew.template | 13 ++++++++----- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 45349845..1e5f339b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,7 @@ RUN mv /opt/harvester/etc/sysconfig/panda_harvester.rpmnew.template /opt/harves RUN mv /opt/harvester/etc/panda/panda_common.cfg.rpmnew /opt/harvester/etc/panda/panda_common.cfg RUN mv /opt/harvester/etc/panda/panda_harvester.cfg.rpmnew.template /opt/harvester/etc/panda/panda_harvester.cfg RUN mv /opt/harvester/etc/panda/panda_harvester-uwsgi.ini.rpmnew.template /opt/harvester/etc/panda/panda_harvester-uwsgi.ini +RUN mv /opt/harvester/etc/rc.d/init.d/panda_harvester-uwsgi.rpmnew.template /opt/harvester/etc/rc.d/init.d/panda_harvester-uwsgi RUN ln -fs /opt/harvester/etc/queue_config/panda_queueconfig.json /opt/harvester/etc/panda/panda_queueconfig.json @@ -23,4 +24,7 @@ RUN usermod -a -G zp atlpan RUN mkdir -p /var/log/panda RUN chown -R atlpan:zp /var/log/panda +RUN mkdir -p /data/harvester +RUN chown -R atlpan:zp /data/harvester + CMD exec /bin/bash -c "trap : TERM INT; sleep infinity & wait" diff --git a/pandaharvester/harvesterconfig/harvester_config.py b/pandaharvester/harvesterconfig/harvester_config.py index 09cf5872..0718e512 100644 --- a/pandaharvester/harvesterconfig/harvester_config.py +++ b/pandaharvester/harvesterconfig/harvester_config.py @@ -49,14 +49,16 @@ def __init__(self): # expand all values for tmpKey, tmpVal in iteritems(tmpDict): # use env vars - if tmpVal.startswith('$'): + if isinstance(tmpVal, str) and tmpVal.startswith('$'): tmpMatch = re.search('\$\{*([^\}]+)\}*', tmpVal) envName = tmpMatch.group(1) if envName not in os.environ: raise KeyError('{0} in the cfg is an undefined environment variable.'.format(envName)) tmpVal = os.environ[envName] # convert string to bool/int - if tmpVal == 'True': + if not isinstance(tmpVal, str): + pass + elif tmpVal == 'True': tmpVal = True elif tmpVal == 'False': tmpVal = False diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 60aba3fb..1f6f0439 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -61,6 +61,7 @@ def __init__(self, thr_name=None, read_only=False): self.verbLog = None self.useInspect = False self.reconnectTimeout = 300 + self.read_only = read_only if hasattr(harvester_config.db, 'reconnectTimeout'): self.reconnectTimeout = harvester_config.db.reconnectTimeout if harvester_config.db.verbose: @@ -127,7 +128,7 @@ def fetchall(self): self.cur = self.con.cursor(named_tuple=True, buffered=True) else: import sqlite3 - if read_only: + if self.read_only: fd = os.open(harvester_config.db.database_filename, os.O_RDONLY) database_filename = '/dev/fd/{0}'.format(fd) else: diff --git a/templates/init.d/panda_harvester-uwsgi.rpmnew.template b/templates/init.d/panda_harvester-uwsgi.rpmnew.template index 16b476b7..b2aab55b 100755 --- a/templates/init.d/panda_harvester-uwsgi.rpmnew.template +++ b/templates/init.d/panda_harvester-uwsgi.rpmnew.template @@ -16,15 +16,18 @@ ARGV="$@" set -a #======= START CONFIGURATION SECTION ========================== # user and group to run harvester uWSGI -userName="#FIXME" -groupName="#FIXME" +# FIXME +userName=atlpan +# FIXME +groupName=zp # setup python and virtual env -VIRTUAL_ENV=/#FIXME +# FIXME +VIRTUAL_ENV=/opt/harvester # set log directory -#LOG_DIR=${VIRTUAL_ENV}/var/log/panda -LOG_DIR=/#FIXME +# FIXME +LOG_DIR=/var/log/panda # pid and lock files PIDFILE=${LOG_DIR}/panda_harvester.pid From f852c996991e70a3e051072f6b7244e3fee2f3e9 Mon Sep 17 00:00:00 2001 From: Tadashi Maeno Date: Fri, 28 Jan 2022 13:16:08 +0100 Subject: [PATCH 28/33] Create docker-publish.yml --- .github/workflows/docker-publish.yml | 89 ++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 .github/workflows/docker-publish.yml diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 00000000..72eb7a00 --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,89 @@ +name: Docker + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +on: + release: + types: [published] + + workflow_dispatch: + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + + +jobs: + build: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # This is used to complete the identity challenge + # with sigstore/fulcio when running outside of PRs. + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + # Install the cosign tool except on PR + # https://github.com/sigstore/cosign-installer + - name: Install cosign + if: github.event_name != 'pull_request' + uses: sigstore/cosign-installer@1e95c1de343b5b0c23352d6417ee3e48d5bcd422 + with: + cosign-release: 'v1.4.0' + + + # Workaround: https://github.com/docker/build-push-action/issues/461 + - name: Setup Docker buildx + uses: docker/setup-buildx-action@79abd3f86f79a9d68a23c75a09a9a85889262adf + + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@28218f9b04b4f3f62068d7b6ce6ca5b26e35336c + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + # Build and push Docker image with Buildx (don't push on PR) + # https://github.com/docker/build-push-action + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + + # Sign the resulting Docker image digest except on PRs. + # This will only write to the public Rekor transparency log when the Docker + # repository is public to avoid leaking data. If you would like to publish + # transparency data even for private images, pass --force to cosign below. + # https://github.com/sigstore/cosign + - name: Sign the published Docker image + if: ${{ github.event_name != 'pull_request' }} + env: + COSIGN_EXPERIMENTAL: "true" + # This step uses the identity token to provision an ephemeral certificate + # against the sigstore community Fulcio instance. + run: cosign sign ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ steps.build-and-push.outputs.digest }} From 78fadae7a1064396763780575bf8184c78921a6b Mon Sep 17 00:00:00 2001 From: tmaeno Date: Fri, 28 Jan 2022 13:33:44 +0100 Subject: [PATCH 29/33] docker --- .github/workflows/docker-publish.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 72eb7a00..379354dd 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -80,10 +80,10 @@ jobs: # repository is public to avoid leaking data. If you would like to publish # transparency data even for private images, pass --force to cosign below. # https://github.com/sigstore/cosign - - name: Sign the published Docker image - if: ${{ github.event_name != 'pull_request' }} - env: - COSIGN_EXPERIMENTAL: "true" - # This step uses the identity token to provision an ephemeral certificate - # against the sigstore community Fulcio instance. - run: cosign sign ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ steps.build-and-push.outputs.digest }} + #- name: Sign the published Docker image + # if: ${{ github.event_name != 'pull_request' }} + # env: + # COSIGN_EXPERIMENTAL: "true" + # # This step uses the identity token to provision an ephemeral certificate + # # against the sigstore community Fulcio instance. + # run: cosign sign ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ steps.build-and-push.outputs.digest }} From b2cbc6ce6411247acfaa9b643dd227b8a085a4d5 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Wed, 2 Feb 2022 15:02:48 +0100 Subject: [PATCH 30/33] oidc in panda_communicator --- .../panda_communicator.py | 53 ++++++++++++++++--- .../harvesterconfig/harvester_config.py | 3 +- setup.py | 1 + templates/panda_harvester.cfg.rpmnew.template | 13 ++++- 4 files changed, 59 insertions(+), 11 deletions(-) diff --git a/pandaharvester/harvestercommunicator/panda_communicator.py b/pandaharvester/harvestercommunicator/panda_communicator.py index b61d5bb5..9a8d0541 100644 --- a/pandaharvester/harvestercommunicator/panda_communicator.py +++ b/pandaharvester/harvestercommunicator/panda_communicator.py @@ -44,6 +44,27 @@ def __init__(self): self.useInspect = True else: self.verbose = False + if hasattr(harvester_config.pandacon, 'auth_type'): + self.auth_type = harvester_config.pandacon.auth_type + else: + self.auth_type = 'x509' + self.auth_token = None + self.auth_token_last_update = None + + # renew token + def renew_token(self): + if hasattr(harvester_config.pandacon, 'auth_token'): + if harvester_config.pandacon.auth_token.startswith('file:'): + if self.auth_token_last_update is not None and \ + datetime.datetime.utcnow() - self.auth_token_last_update < datetime.timedelta(minutes=60): + return + with open(harvester_config.pandacon.auth_token.split(':')[-1]) as f: + self.auth_token = f.read() + self.auth_token_last_update = datetime.datetime.utcnow() + else: + if self.auth_token_last_update is None: + self.auth_token = harvester_config.pandacon.auth_token + self.auth_token_last_update = datetime.datetime.utcnow() # POST with http def post(self, path, data): @@ -92,15 +113,22 @@ def post_ssl(self, path, data, cert=None, base_url=None): url = '{0}/{1}'.format(base_url, path) if self.verbose: tmpLog.debug('exec={0} URL={1} data={2}'.format(tmpExec, url, str(data))) - if cert is None: - cert = (harvester_config.pandacon.cert_file, - harvester_config.pandacon.key_file) + headers = {"Accept": "application/json", + "Connection": "close"} + if self.auth_type == 'oidc': + self.renew_token() + cert = None + headers['Authorization'] = 'Bearer {0}'.format(self.auth_token) + headers['Origin'] = harvester_config.pandacon.auth_origin + else: + if cert is None: + cert = (harvester_config.pandacon.cert_file, + harvester_config.pandacon.key_file) session = get_http_adapter_with_random_dns_resolution() sw = core_utils.get_stopwatch() res = session.post(url, data=data, - headers={"Accept": "application/json", - "Connection": "close"}, + headers=headers, timeout=harvester_config.pandacon.timeout, verify=harvester_config.pandacon.ca_cert, cert=cert) @@ -134,12 +162,21 @@ def put_ssl(self, path, files, cert=None, base_url=None): url = '{0}/{1}'.format(base_url, path) if self.verbose: tmpLog.debug('exec={0} URL={1} files={2}'.format(tmpExec, url, files['file'][0])) - if cert is None: - cert = (harvester_config.pandacon.cert_file, - harvester_config.pandacon.key_file) + if self.auth_type == 'oidc': + self.renew_token() + cert = None + headers = dict() + headers['Authorization'] = 'Bearer {0}'.format(self.auth_token) + headers['Origin'] = harvester_config.pandacon.auth_origin + else: + headers = None + if cert is None: + cert = (harvester_config.pandacon.cert_file, + harvester_config.pandacon.key_file) session = get_http_adapter_with_random_dns_resolution() res = session.post(url, files=files, + headers=headers, timeout=harvester_config.pandacon.timeout, verify=harvester_config.pandacon.ca_cert, cert=cert) diff --git a/pandaharvester/harvesterconfig/harvester_config.py b/pandaharvester/harvesterconfig/harvester_config.py index 0718e512..5dc3daf8 100644 --- a/pandaharvester/harvesterconfig/harvester_config.py +++ b/pandaharvester/harvesterconfig/harvester_config.py @@ -1,6 +1,7 @@ import re import os import sys +import six import json from future.utils import iteritems @@ -56,7 +57,7 @@ def __init__(self): raise KeyError('{0} in the cfg is an undefined environment variable.'.format(envName)) tmpVal = os.environ[envName] # convert string to bool/int - if not isinstance(tmpVal, str): + if not isinstance(tmpVal, six.string_types): pass elif tmpVal == 'True': tmpVal = True diff --git a/setup.py b/setup.py index 92ea5f28..08d9ea54 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ 'psutil >= 5.4.8', 'scandir; python_version < "3.5"', 'panda-pilot >= 2.7.2.1', + 'six', ], # optional pip dependencies diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index 8b5ee369..c8fc1197 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -122,15 +122,24 @@ nConnections = 5 # timeout timeout = 180 +# auth type x509 (default) or oidc +auth_type = x509 + # CA file ca_cert = /etc/pki/tls/certs/CERN-bundle.pem -# certificate +# certificate for x509 cert_file = FIXME -# key +# key for x509 key_file = FIXME +# token for oidc +auth_token = FIXME + +# origin for oidc +auth_origin = FIXME + # base URL via http pandaURL = http://pandaserver.cern.ch:25080/server/panda From 6c496fd976a7b4ddbadc4f2731a204501347d85e Mon Sep 17 00:00:00 2001 From: tmaeno Date: Wed, 2 Feb 2022 17:35:22 +0100 Subject: [PATCH 31/33] oidc in panda_communicator --- templates/panda_harvester.cfg.rpmnew.template | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index c8fc1197..ee976da2 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -134,7 +134,7 @@ cert_file = FIXME # key for x509 key_file = FIXME -# token for oidc +# token for oidc. bare string or filename (file:/path) auth_token = FIXME # origin for oidc From 75064741fcd9f7e92c700d2a6b86133d48615cd2 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Mon, 7 Feb 2022 10:09:45 +0100 Subject: [PATCH 32/33] added condor to Dockerfile --- Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 1e5f339b..b1bad2e1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,10 @@ FROM docker.io/centos:7 RUN yum update -y RUN yum install -y epel-release -RUN yum install -y python3 python3-devel gcc less git mysql-devel +RUN yum install -y python3 python3-devel gcc less git mysql-devel curl + +curl -fsSL https://get.htcondor.org | /bin/bash -s -- --no-dry-run + RUN python3 -m venv /opt/harvester RUN /opt/harvester/bin/pip install -U pip RUN /opt/harvester/bin/pip install -U setuptools From 05f0e95156180f5e1b7afb823151539d43c120b3 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Mon, 7 Feb 2022 10:15:27 +0100 Subject: [PATCH 33/33] added condor to Dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index b1bad2e1..4a4c4014 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ RUN yum update -y RUN yum install -y epel-release RUN yum install -y python3 python3-devel gcc less git mysql-devel curl -curl -fsSL https://get.htcondor.org | /bin/bash -s -- --no-dry-run +RUN curl -fsSL https://get.htcondor.org | /bin/bash -s -- --no-dry-run RUN python3 -m venv /opt/harvester RUN /opt/harvester/bin/pip install -U pip