diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index a7a3cc47..1563b472 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "04-10-2019 11:57:38 on release (by fahui)" +timestamp = "14-11-2019 12:03:49 on release (by fahui)" diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index a6f250ab..1028bc2f 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -374,10 +374,10 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False workSpec.checkTime = datetime.datetime.utcnow() isCheckedList.append(isChecked) if monStatus == WorkSpec.ST_failed: - if not workSpec.has_pilot_error(): + if not workSpec.has_pilot_error() and workSpec.errorCode is None: workSpec.set_pilot_error(PilotErrors.ERR_GENERALERROR, diagMessage) elif monStatus == WorkSpec.ST_cancelled: - if not workSpec.has_pilot_error(): + if not workSpec.has_pilot_error() and workSpec.errorCode is None: workSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL, diagMessage) if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]: workSpec.set_work_params({'finalMonStatus': monStatus}) diff --git a/pandaharvester/harvesterbody/preparator.py b/pandaharvester/harvesterbody/preparator.py index 618d0f5c..3cc94956 100644 --- a/pandaharvester/harvesterbody/preparator.py +++ b/pandaharvester/harvesterbody/preparator.py @@ -43,8 +43,7 @@ def run(self): harvester_config.preparator.checkInterval, harvester_config.preparator.lockInterval, lockedBy, - max_files_per_job=maxFilesPerJob, - ng_file_status_list=['ready']) + max_files_per_job=maxFilesPerJob) mainLog.debug('got {0} jobs to check'.format(len(jobsToCheck))) # loop over all jobs for jobSpec in jobsToCheck: diff --git a/pandaharvester/harvestermessenger/apache_messenger.py b/pandaharvester/harvestermessenger/apache_messenger.py index e2ad7644..6925fdd9 100644 --- a/pandaharvester/harvestermessenger/apache_messenger.py +++ b/pandaharvester/harvestermessenger/apache_messenger.py @@ -1,5 +1,6 @@ import json from pandaharvester.harvestercore import core_utils +from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestermessenger import http_server_messenger from pandaharvester.harvestermisc.frontend_utils import HarvesterToken @@ -49,17 +50,20 @@ def application(environ, start_response): # get params try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) - except: + except Exception as e: + _logger.warning('Zero request body due to {0}: {1}'.format(e.__class__.__name__, e)) request_body_size = 0 # check token - try: - auth_str = environ.get('HTTP_AUTHORIZATION', '').split()[-1] - token = HarvesterToken() - payload = token.get_payload(auth_str) - except: - errMsg = 'Auth failed: Invalid token' - start_response('403 Forbidden', [('Content-Type', 'text/plain')]) - return [errMsg.encode('ascii')] + if getattr(harvester_config.frontend, 'authEnable', True): + try: + auth_str = environ.get('HTTP_AUTHORIZATION', '').split()[-1] + token = HarvesterToken() + payload = token.get_payload(auth_str) + except Exception as e: + _logger.warning('Invalid token due to {0}: {1}'.format(e.__class__.__name__, e)) + errMsg = 'Auth failed: Invalid token' + start_response('403 Forbidden', [('Content-Type', 'text/plain')]) + return [errMsg.encode('ascii')] request_body = environ['wsgi.input'].read(request_body_size) params = json.loads(request_body) # make handler @@ -71,7 +75,7 @@ def application(environ, start_response): _logger.debug("{0} Phrase".format(handler.responseCode)) start_response("{0} Phrase".format(handler.responseCode), handler.headerList) return [handler.message] - except: + except Exception: errMsg = core_utils.dump_error_message(_logger) start_response('500 Phrase', [('Content-Type', 'text/plain')]) return [errMsg] diff --git a/pandaharvester/harvestermessenger/http_server_messenger.py b/pandaharvester/harvestermessenger/http_server_messenger.py index 2b336b6f..20304583 100644 --- a/pandaharvester/harvestermessenger/http_server_messenger.py +++ b/pandaharvester/harvestermessenger/http_server_messenger.py @@ -172,7 +172,8 @@ def do_POST(self): self.send_response(500) message = core_utils.dump_error_message(_logger) if harvester_config.frontend.verbose: - self.tmpLog.debug('method={0} json={1} msg={2}'.format(methodName, dataStr, message)) + self.tmpLog.debug('ip={3} - method={0} json={1} msg={2}'.format(methodName, dataStr, message, + self.client_address[0])) # set the response self.do_postprocessing(message) return diff --git a/pandaharvester/harvestermonitor/act_monitor.py b/pandaharvester/harvestermonitor/act_monitor.py index 52a4facc..ee43859c 100644 --- a/pandaharvester/harvestermonitor/act_monitor.py +++ b/pandaharvester/harvestermonitor/act_monitor.py @@ -4,6 +4,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.work_spec import WorkSpec from pandaharvester.harvestercore.plugin_base import PluginBase +from pandaharvester.harvestercore.worker_errors import WorkerErrors from pandaharvester.harvesterconfig import harvester_config from act.common.aCTConfig import aCTConfigARC @@ -24,7 +25,11 @@ def __init__(self, **kwarg): # Set up aCT DB connection self.log = core_utils.make_logger(baseLogger, 'aCT submitter', method_name='__init__') - self.actDB = aCTDBPanda(self.log) + try: + self.actDB = aCTDBPanda(self.log) + except Exception as e: + self.log.error('Could not connect to aCT database: {0}'.format(str(e))) + self.actDB = None # get access point def get_access_point(self, workspec, panda_id): @@ -68,10 +73,11 @@ def check_workers(self, workspec_list): method_name='check_workers') try: tmpLog.debug('Querying aCT for id {0}'.format(workSpec.batchID)) - columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node'] + columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node', 'error'] actjobs = self.actDB.getJobs("id={0}".format(workSpec.batchID), columns) except Exception as e: - tmpLog.error("Failed to query aCT DB: {0}".format(str(e))) + if self.actDB: + tmpLog.error("Failed to query aCT DB: {0}".format(str(e))) # send back current status retList.append((workSpec.status, '')) continue @@ -85,12 +91,16 @@ def check_workers(self, workspec_list): actstatus = actjobs[0]['actpandastatus'] workSpec.nativeStatus = actstatus newStatus = WorkSpec.ST_running + errorMsg = '' if actstatus in ['waiting', 'sent', 'starting']: newStatus = WorkSpec.ST_submitted elif actstatus == 'done': newStatus = self.check_pilot_status(workSpec, tmpLog) elif actstatus == 'donefailed': newStatus = WorkSpec.ST_failed + errorMsg = actjobs[0]['error'] or 'Unknown error' + error_code = WorkerErrors.error_codes.get('GENERAL_ERROR') + workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg) elif actstatus == 'donecancelled': newStatus = WorkSpec.ST_cancelled @@ -108,6 +118,6 @@ def check_workers(self, workspec_list): except: tmpLog.warning('Could not extract panda ID for worker {0}'.format(workSpec.batchID)) - retList.append((newStatus, '')) + retList.append((newStatus, errorMsg)) return True, retList diff --git a/pandaharvester/harvesterpreparator/go_bulk_preparator.py b/pandaharvester/harvesterpreparator/go_bulk_preparator.py index ac877340..5cd1a4f6 100644 --- a/pandaharvester/harvesterpreparator/go_bulk_preparator.py +++ b/pandaharvester/harvesterpreparator/go_bulk_preparator.py @@ -250,7 +250,11 @@ def check_stage_in_status(self, jobspec): if fileSpec.scope is not None : scope = fileSpec.scope hash = hashlib.md5() - hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 2: + hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 3: + hash_string = "{0}:{1}".format(scope, fileSpec.lfn) + hash.update(bytes(hash_string, 'utf-8')) hash_hex = hash.hexdigest() correctedscope = "/".join(scope.split('.')) #srcURL = fileSpec.path @@ -410,7 +414,7 @@ def trigger_preparation(self, jobspec): tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id)) # set the dummy transfer ID which will be replaced with a real ID in check_stage_in_status() inFiles = jobspec.get_input_file_attributes(skip_ready=True) - lfns = inFiles.keys() + lfns = list(inFiles.keys()) #for inLFN in inFiles.keys(): # lfns.append(inLFN) tmpLog.debug('number of lfns - {0} type(lfns) - {1}'.format(len(lfns),type(lfns))) diff --git a/pandaharvester/harvesterstager/go_bulk_stager.py b/pandaharvester/harvesterstager/go_bulk_stager.py index 68d1c885..07a90383 100644 --- a/pandaharvester/harvesterstager/go_bulk_stager.py +++ b/pandaharvester/harvesterstager/go_bulk_stager.py @@ -170,7 +170,7 @@ def check_stage_out_status(self, jobspec): # get the scope of the log files outfileattrib = jobspec.get_output_file_attributes() scopeLog = 'xxxx' - for key in outfileattrib.keys(): + for key in list(outfileattrib.keys()): if "log.tgz" in key : scopeLog = outfileattrib[key]['scope'] # get transfer groups @@ -274,7 +274,11 @@ def check_stage_out_status(self, jobspec): msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope) tmpLog.debug(msgStr) hash = hashlib.md5() - hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 2: + hash.update('%s:%s' % (scope, fileSpec.lfn)) + if sys.version_info.major == 3: + hash_string = "{0}:{1}".format(scope, fileSpec.lfn) + hash.update(bytes(hash_string, 'utf-8')) hash_hex = hash.hexdigest() correctedscope = "/".join(scope.split('.')) srcURL = fileSpec.path diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 063435d4..16179feb 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -198,7 +198,7 @@ def _get_prodsourcelabel_pilotypeopt_piloturlstr(pilot_type, pilot_version='1'): if pilot_version == '2': # pilot 2 pt_psl_map = { - 'RC': ('rc_test2', 'RC', '--piloturl http://project-atlas-gmsb.web.cern.ch/project-atlas-gmsb/pilot2-dev.tar.gz'), + 'RC': ('rc_test2', 'RC', '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz'), 'ALRB': ('rc_alrb', 'ALRB', ''), 'PT': ('ptest', 'PR', ''), } diff --git a/pandaharvester/harvestersweeper/act_sweeper.py b/pandaharvester/harvestersweeper/act_sweeper.py index 01adca3c..179f93f6 100644 --- a/pandaharvester/harvestersweeper/act_sweeper.py +++ b/pandaharvester/harvestersweeper/act_sweeper.py @@ -18,7 +18,11 @@ def __init__(self, **kwarg): PluginBase.__init__(self, **kwarg) self.log = core_utils.make_logger(baseLogger, 'aCT sweeper', method_name='__init__') - self.actDB = aCTDBPanda(self.log) + try: + self.actDB = aCTDBPanda(self.log) + except Exception as e: + self.log.error('Could not connect to aCT database: {0}'.format(str(e))) + self.actDB = None # kill a worker @@ -44,7 +48,8 @@ def kill_worker(self, workspec): self.actDB.updateJobs("id={0} AND actpandastatus IN ('sent', 'starting', 'running')".format(workspec.batchID), {'actpandastatus': 'tobekilled', 'pandastatus': None}) except Exception as e: - tmpLog.error('Failed to cancel job {0} in aCT: {1}'.format(workspec.batchID, str(e))) + if self.actDB: + tmpLog.error('Failed to cancel job {0} in aCT: {1}'.format(workspec.batchID, str(e))) return False, str(e) tmpLog.info('Job {0} cancelled in aCT'.format(workspec.batchID)) diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index d0efaf92..69176cfa 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.1.5" +release_version = "0.1.6" diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index abeafde3..fce0977a 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -684,6 +684,9 @@ verbose = False # type : simple or apache type = simple +# enable token authentication of apache frontend; default is True +authEnable = True + # file of secret used in token signature secretFile = /FIXME