From fd8c84438e7fe30099adde1b3d7c4ba05b133921 Mon Sep 17 00:00:00 2001 From: fahui Date: Wed, 9 Oct 2019 06:42:20 +0800 Subject: [PATCH 1/7] Option to disable token auth of apache frontend --- pandaharvester/commit_timestamp.py | 2 +- .../harvestermessenger/apache_messenger.py | 24 +++++++++++-------- templates/panda_harvester.cfg.rpmnew.template | 3 +++ 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 8eabed9d..8ffbd205 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "04-10-2019 00:08:20 on contrib_cern (by fahui)" +timestamp = "08-10-2019 22:42:20 on contrib_cern (by fahui)" diff --git a/pandaharvester/harvestermessenger/apache_messenger.py b/pandaharvester/harvestermessenger/apache_messenger.py index e2ad7644..6925fdd9 100644 --- a/pandaharvester/harvestermessenger/apache_messenger.py +++ b/pandaharvester/harvestermessenger/apache_messenger.py @@ -1,5 +1,6 @@ import json from pandaharvester.harvestercore import core_utils +from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestermessenger import http_server_messenger from pandaharvester.harvestermisc.frontend_utils import HarvesterToken @@ -49,17 +50,20 @@ def application(environ, start_response): # get params try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) - except: + except Exception as e: + _logger.warning('Zero request body due to {0}: {1}'.format(e.__class__.__name__, e)) request_body_size = 0 # check token - try: - auth_str = environ.get('HTTP_AUTHORIZATION', '').split()[-1] - token = HarvesterToken() - payload = token.get_payload(auth_str) - except: - errMsg = 'Auth failed: Invalid token' - start_response('403 Forbidden', [('Content-Type', 'text/plain')]) - return [errMsg.encode('ascii')] + if getattr(harvester_config.frontend, 'authEnable', True): + try: + auth_str = environ.get('HTTP_AUTHORIZATION', '').split()[-1] + token = HarvesterToken() + payload = token.get_payload(auth_str) + except Exception as e: + _logger.warning('Invalid token due to {0}: {1}'.format(e.__class__.__name__, e)) + errMsg = 'Auth failed: Invalid token' + start_response('403 Forbidden', [('Content-Type', 'text/plain')]) + return [errMsg.encode('ascii')] request_body = environ['wsgi.input'].read(request_body_size) params = json.loads(request_body) # make handler @@ -71,7 +75,7 @@ def application(environ, start_response): _logger.debug("{0} Phrase".format(handler.responseCode)) start_response("{0} Phrase".format(handler.responseCode), handler.headerList) return [handler.message] - except: + except Exception: errMsg = core_utils.dump_error_message(_logger) start_response('500 Phrase', [('Content-Type', 'text/plain')]) return [errMsg] diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index abeafde3..fce0977a 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -684,6 +684,9 @@ verbose = False # type : simple or apache type = simple +# enable token authentication of apache frontend; default is True +authEnable = True + # file of secret used in token signature secretFile = /FIXME From 66f0acfb096abfac7ecf7f7b37c4a6004dd3df43 Mon Sep 17 00:00:00 2001 From: Nicolo Magini Date: Wed, 9 Oct 2019 13:40:01 +0200 Subject: [PATCH 2/7] New dev pilot url --- pandaharvester/harvestersubmitter/htcondor_submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 063435d4..16179feb 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -198,7 +198,7 @@ def _get_prodsourcelabel_pilotypeopt_piloturlstr(pilot_type, pilot_version='1'): if pilot_version == '2': # pilot 2 pt_psl_map = { - 'RC': ('rc_test2', 'RC', '--piloturl http://project-atlas-gmsb.web.cern.ch/project-atlas-gmsb/pilot2-dev.tar.gz'), + 'RC': ('rc_test2', 'RC', '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz'), 'ALRB': ('rc_alrb', 'ALRB', ''), 'PT': ('ptest', 'PR', ''), } From 73860d4c0b233c1debb8d893e105e260d7091431 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Thu, 10 Oct 2019 15:33:45 +0200 Subject: [PATCH 3/7] added ip in logging --- pandaharvester/harvestermessenger/http_server_messenger.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandaharvester/harvestermessenger/http_server_messenger.py b/pandaharvester/harvestermessenger/http_server_messenger.py index 2b336b6f..20304583 100644 --- a/pandaharvester/harvestermessenger/http_server_messenger.py +++ b/pandaharvester/harvestermessenger/http_server_messenger.py @@ -172,7 +172,8 @@ def do_POST(self): self.send_response(500) message = core_utils.dump_error_message(_logger) if harvester_config.frontend.verbose: - self.tmpLog.debug('method={0} json={1} msg={2}'.format(methodName, dataStr, message)) + self.tmpLog.debug('ip={3} - method={0} json={1} msg={2}'.format(methodName, dataStr, message, + self.client_address[0])) # set the response self.do_postprocessing(message) return From ed191db43a6ca636e0772fe6aa10da63b7e14b33 Mon Sep 17 00:00:00 2001 From: Doug Benjamin Date: Sun, 13 Oct 2019 17:34:43 -0500 Subject: [PATCH 4/7] fix for python 3 --- pandaharvester/harvesterpreparator/go_bulk_preparator.py | 8 ++++++-- pandaharvester/harvesterstager/go_bulk_stager.py | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pandaharvester/harvesterpreparator/go_bulk_preparator.py b/pandaharvester/harvesterpreparator/go_bulk_preparator.py index ac877340..5cd1a4f6 100644 --- a/pandaharvester/harvesterpreparator/go_bulk_preparator.py +++ b/pandaharvester/harvesterpreparator/go_bulk_preparator.py @@ -250,7 +250,11 @@ def check_stage_in_status(self, jobspec): if fileSpec.scope is not None : scope = fileSpec.scope hash = hashlib.md5() - hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 2: + hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 3: + hash_string = "{0}:{1}".format(scope, fileSpec.lfn) + hash.update(bytes(hash_string, 'utf-8')) hash_hex = hash.hexdigest() correctedscope = "/".join(scope.split('.')) #srcURL = fileSpec.path @@ -410,7 +414,7 @@ def trigger_preparation(self, jobspec): tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id)) # set the dummy transfer ID which will be replaced with a real ID in check_stage_in_status() inFiles = jobspec.get_input_file_attributes(skip_ready=True) - lfns = inFiles.keys() + lfns = list(inFiles.keys()) #for inLFN in inFiles.keys(): # lfns.append(inLFN) tmpLog.debug('number of lfns - {0} type(lfns) - {1}'.format(len(lfns),type(lfns))) diff --git a/pandaharvester/harvesterstager/go_bulk_stager.py b/pandaharvester/harvesterstager/go_bulk_stager.py index 68d1c885..07a90383 100644 --- a/pandaharvester/harvesterstager/go_bulk_stager.py +++ b/pandaharvester/harvesterstager/go_bulk_stager.py @@ -170,7 +170,7 @@ def check_stage_out_status(self, jobspec): # get the scope of the log files outfileattrib = jobspec.get_output_file_attributes() scopeLog = 'xxxx' - for key in outfileattrib.keys(): + for key in list(outfileattrib.keys()): if "log.tgz" in key : scopeLog = outfileattrib[key]['scope'] # get transfer groups @@ -274,7 +274,11 @@ def check_stage_out_status(self, jobspec): msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope) tmpLog.debug(msgStr) hash = hashlib.md5() - hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 2: + hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 3: + hash_string = "{0}:{1}".format(scope, fileSpec.lfn) + hash.update(bytes(hash_string, 'utf-8')) hash_hex = hash.hexdigest() correctedscope = "/".join(scope.split('.')) srcURL = fileSpec.path From dd9d7fea9b69475e49d39fda3b7cec5e944bf517 Mon Sep 17 00:00:00 2001 From: David Cameron Date: Mon, 14 Oct 2019 17:28:19 +0200 Subject: [PATCH 5/7] propagate error message from ARC CE and catch DB connection errors --- pandaharvester/harvestermonitor/act_monitor.py | 18 ++++++++++++++---- pandaharvester/harvestersweeper/act_sweeper.py | 9 +++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pandaharvester/harvestermonitor/act_monitor.py b/pandaharvester/harvestermonitor/act_monitor.py index 52a4facc..ee43859c 100644 --- a/pandaharvester/harvestermonitor/act_monitor.py +++ b/pandaharvester/harvestermonitor/act_monitor.py @@ -4,6 +4,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestercore.plugin_base import PluginBase +from pandaharvester.harvestercore.worker_errors import WorkerErrors from pandaharvester.harvesterconfig import harvester_config from act.common.aCTConfig import aCTConfigARC @@ -24,7 +25,11 @@ def __init__(self, **kwarg): # Set up aCT DB connection self.log = core_utils.make_logger(baseLogger, 'aCT submitter', method_name='__init__') - self.actDB = aCTDBPanda(self.log) + try: + self.actDB = aCTDBPanda(self.log) + except Exception as e: + self.log.error('Could not connect to aCT database: {0}'.format(str(e))) + self.actDB = None # get access point def get_access_point(self, workspec, panda_id): @@ -68,10 +73,11 @@ def check_workers(self, workspec_list): method_name='check_workers') try: tmpLog.debug('Querying aCT for id {0}'.format(workSpec.batchID)) - columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node'] + columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node', 'error'] actjobs = self.actDB.getJobs("id={0}".format(workSpec.batchID), columns) except Exception as e: - tmpLog.error("Failed to query aCT DB: {0}".format(str(e))) + if self.actDB: + tmpLog.error("Failed to query aCT DB: {0}".format(str(e))) # send back current status retList.append((workSpec.status, '')) continue @@ -85,12 +91,16 @@ def check_workers(self, workspec_list): actstatus = actjobs[0]['actpandastatus'] workSpec.nativeStatus = actstatus newStatus = WorkSpec.ST_running + errorMsg = '' if actstatus in ['waiting', 'sent', 'starting']: newStatus = WorkSpec.ST_submitted elif actstatus == 'done': newStatus = self.check_pilot_status(workSpec, tmpLog) elif actstatus == 'donefailed': newStatus = WorkSpec.ST_failed + errorMsg = actjobs[0]['error'] or 'Unknown error' + error_code = WorkerErrors.error_codes.get('GENERAL_ERROR') + workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg) elif actstatus == 'donecancelled': newStatus = WorkSpec.ST_cancelled @@ -108,6 +118,6 @@ def check_workers(self, workspec_list): except: tmpLog.warning('Could not extract panda ID for worker {0}'.format(workSpec.batchID)) - retList.append((newStatus, '')) + retList.append((newStatus, errorMsg)) return True, retList diff --git a/pandaharvester/harvestersweeper/act_sweeper.py b/pandaharvester/harvestersweeper/act_sweeper.py index 01adca3c..179f93f6 100644 --- a/pandaharvester/harvestersweeper/act_sweeper.py +++ b/pandaharvester/harvestersweeper/act_sweeper.py @@ -18,7 +18,11 @@ def __init__(self, **kwarg): PluginBase.__init__(self, **kwarg) self.log = core_utils.make_logger(baseLogger, 'aCT sweeper', method_name='__init__') - self.actDB = aCTDBPanda(self.log) + try: + self.actDB = aCTDBPanda(self.log) + except Exception as e: + self.log.error('Could not connect to aCT database: {0}'.format(str(e))) + self.actDB = None # kill a worker @@ -44,7 +48,8 @@ def kill_worker(self, workspec): self.actDB.updateJobs("id={0} AND actpandastatus IN ('sent', 'starting', 'running')".format(workspec.batchID), {'actpandastatus': 'tobekilled', 'pandastatus': None}) except Exception as e: - tmpLog.error('Failed to cancel job {0} in aCT: {1}'.format(workspec.batchID, str(e))) + if self.actDB: + tmpLog.error('Failed to cancel job {0} in aCT: {1}'.format(workspec.batchID, str(e))) return False, str(e) tmpLog.info('Job {0} cancelled in aCT'.format(workspec.batchID)) From 3637dade65acf2b17db68b2dee65876b5ad33072 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Tue, 15 Oct 2019 14:19:48 +0200 Subject: [PATCH 6/7] not to set pilot error when worker.errorCode is not None --- pandaharvester/harvesterbody/monitor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index a6f250ab..1028bc2f 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -374,10 +374,10 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False workSpec.checkTime = datetime.datetime.utcnow() isCheckedList.append(isChecked) if monStatus == WorkSpec.ST_failed: - if not workSpec.has_pilot_error(): + if not workSpec.has_pilot_error() and workSpec.errorCode is None: workSpec.set_pilot_error(PilotErrors.ERR_GENERALERROR, diagMessage) elif monStatus == WorkSpec.ST_cancelled: - if not workSpec.has_pilot_error(): + if not workSpec.has_pilot_error() and workSpec.errorCode is None: workSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL, diagMessage) if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]: workSpec.set_work_params({'finalMonStatus': monStatus}) From 0030daa36ed1aa4e61f9c34451069be82fdc69a2 Mon Sep 17 00:00:00 2001 From: tmaeno Date: Thu, 14 Nov 2019 12:50:29 +0100 Subject: [PATCH 7/7] not to skip ready files when checking preparation status --- pandaharvester/harvesterbody/preparator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandaharvester/harvesterbody/preparator.py b/pandaharvester/harvesterbody/preparator.py index 618d0f5c..3cc94956 100644 --- a/pandaharvester/harvesterbody/preparator.py +++ b/pandaharvester/harvesterbody/preparator.py @@ -43,8 +43,7 @@ def run(self): harvester_config.preparator.checkInterval, harvester_config.preparator.lockInterval, lockedBy, - max_files_per_job=maxFilesPerJob, - ng_file_status_list=['ready']) + max_files_per_job=maxFilesPerJob) mainLog.debug('got {0} jobs to check'.format(len(jobsToCheck))) # loop over all jobs for jobSpec in jobsToCheck: