Skip to content

Commit

Permalink
v0.1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Nov 14, 2019
2 parents d2f88af + 0030daa commit f43656f
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "04-10-2019 11:57:38 on release (by fahui)"
timestamp = "14-11-2019 12:03:49 on release (by fahui)"
4 changes: 2 additions & 2 deletions pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,10 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False
workSpec.checkTime = datetime.datetime.utcnow()
isCheckedList.append(isChecked)
if monStatus == WorkSpec.ST_failed:
if not workSpec.has_pilot_error():
if not workSpec.has_pilot_error() and workSpec.errorCode is None:
workSpec.set_pilot_error(PilotErrors.ERR_GENERALERROR, diagMessage)
elif monStatus == WorkSpec.ST_cancelled:
if not workSpec.has_pilot_error():
if not workSpec.has_pilot_error() and workSpec.errorCode is None:
workSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL, diagMessage)
if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
workSpec.set_work_params({'finalMonStatus': monStatus})
Expand Down
3 changes: 1 addition & 2 deletions pandaharvester/harvesterbody/preparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ def run(self):
harvester_config.preparator.checkInterval,
harvester_config.preparator.lockInterval,
lockedBy,
max_files_per_job=maxFilesPerJob,
ng_file_status_list=['ready'])
max_files_per_job=maxFilesPerJob)
mainLog.debug('got {0} jobs to check'.format(len(jobsToCheck)))
# loop over all jobs
for jobSpec in jobsToCheck:
Expand Down
24 changes: 14 additions & 10 deletions pandaharvester/harvestermessenger/apache_messenger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestermessenger import http_server_messenger
from pandaharvester.harvestermisc.frontend_utils import HarvesterToken

Expand Down Expand Up @@ -49,17 +50,20 @@ def application(environ, start_response):
# get params
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except:
except Exception as e:
_logger.warning('Zero request body due to {0}: {1}'.format(e.__class__.__name__, e))
request_body_size = 0
# check token
try:
auth_str = environ.get('HTTP_AUTHORIZATION', '').split()[-1]
token = HarvesterToken()
payload = token.get_payload(auth_str)
except:
errMsg = 'Auth failed: Invalid token'
start_response('403 Forbidden', [('Content-Type', 'text/plain')])
return [errMsg.encode('ascii')]
if getattr(harvester_config.frontend, 'authEnable', True):
try:
auth_str = environ.get('HTTP_AUTHORIZATION', '').split()[-1]
token = HarvesterToken()
payload = token.get_payload(auth_str)
except Exception as e:
_logger.warning('Invalid token due to {0}: {1}'.format(e.__class__.__name__, e))
errMsg = 'Auth failed: Invalid token'
start_response('403 Forbidden', [('Content-Type', 'text/plain')])
return [errMsg.encode('ascii')]
request_body = environ['wsgi.input'].read(request_body_size)
params = json.loads(request_body)
# make handler
Expand All @@ -71,7 +75,7 @@ def application(environ, start_response):
_logger.debug("{0} Phrase".format(handler.responseCode))
start_response("{0} Phrase".format(handler.responseCode), handler.headerList)
return [handler.message]
except:
except Exception:
errMsg = core_utils.dump_error_message(_logger)
start_response('500 Phrase', [('Content-Type', 'text/plain')])
return [errMsg]
3 changes: 2 additions & 1 deletion pandaharvester/harvestermessenger/http_server_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ def do_POST(self):
self.send_response(500)
message = core_utils.dump_error_message(_logger)
if harvester_config.frontend.verbose:
self.tmpLog.debug('method={0} json={1} msg={2}'.format(methodName, dataStr, message))
self.tmpLog.debug('ip={3} - method={0} json={1} msg={2}'.format(methodName, dataStr, message,
self.client_address[0]))
# set the response
self.do_postprocessing(message)
return
Expand Down
18 changes: 14 additions & 4 deletions pandaharvester/harvestermonitor/act_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.work_spec import WorkSpec
from pandaharvester.harvestercore.plugin_base import PluginBase
from pandaharvester.harvestercore.worker_errors import WorkerErrors
from pandaharvester.harvesterconfig import harvester_config

from act.common.aCTConfig import aCTConfigARC
Expand All @@ -24,7 +25,11 @@ def __init__(self, **kwarg):

# Set up aCT DB connection
self.log = core_utils.make_logger(baseLogger, 'aCT submitter', method_name='__init__')
self.actDB = aCTDBPanda(self.log)
try:
self.actDB = aCTDBPanda(self.log)
except Exception as e:
self.log.error('Could not connect to aCT database: {0}'.format(str(e)))
self.actDB = None

# get access point
def get_access_point(self, workspec, panda_id):
Expand Down Expand Up @@ -68,10 +73,11 @@ def check_workers(self, workspec_list):
method_name='check_workers')
try:
tmpLog.debug('Querying aCT for id {0}'.format(workSpec.batchID))
columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node']
columns = ['actpandastatus', 'pandastatus', 'computingElement', 'node', 'error']
actjobs = self.actDB.getJobs("id={0}".format(workSpec.batchID), columns)
except Exception as e:
tmpLog.error("Failed to query aCT DB: {0}".format(str(e)))
if self.actDB:
tmpLog.error("Failed to query aCT DB: {0}".format(str(e)))
# send back current status
retList.append((workSpec.status, ''))
continue
Expand All @@ -85,12 +91,16 @@ def check_workers(self, workspec_list):
actstatus = actjobs[0]['actpandastatus']
workSpec.nativeStatus = actstatus
newStatus = WorkSpec.ST_running
errorMsg = ''
if actstatus in ['waiting', 'sent', 'starting']:
newStatus = WorkSpec.ST_submitted
elif actstatus == 'done':
newStatus = self.check_pilot_status(workSpec, tmpLog)
elif actstatus == 'donefailed':
newStatus = WorkSpec.ST_failed
errorMsg = actjobs[0]['error'] or 'Unknown error'
error_code = WorkerErrors.error_codes.get('GENERAL_ERROR')
workSpec.set_supplemental_error(error_code=error_code, error_diag=errorMsg)
elif actstatus == 'donecancelled':
newStatus = WorkSpec.ST_cancelled

Expand All @@ -108,6 +118,6 @@ def check_workers(self, workspec_list):
except:
tmpLog.warning('Could not extract panda ID for worker {0}'.format(workSpec.batchID))

retList.append((newStatus, ''))
retList.append((newStatus, errorMsg))

return True, retList
8 changes: 6 additions & 2 deletions pandaharvester/harvesterpreparator/go_bulk_preparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ def check_stage_in_status(self, jobspec):
if fileSpec.scope is not None :
scope = fileSpec.scope
hash = hashlib.md5()
hash.update('%s:%s' % (scope, fileSpec.lfn))
if sys.version_info.major == 2:
hash.update('%s:%s' % (scope, fileSpec.lfn))
if sys.version_info.major == 3:
hash_string = "{0}:{1}".format(scope, fileSpec.lfn)
hash.update(bytes(hash_string, 'utf-8'))
hash_hex = hash.hexdigest()
correctedscope = "/".join(scope.split('.'))
#srcURL = fileSpec.path
Expand Down Expand Up @@ -410,7 +414,7 @@ def trigger_preparation(self, jobspec):
tmpLog.debug('Change self.dummy_transfer_id from {0} to {1}'.format(old_dummy_transfer_id,self.dummy_transfer_id))
# set the dummy transfer ID which will be replaced with a real ID in check_stage_in_status()
inFiles = jobspec.get_input_file_attributes(skip_ready=True)
lfns = inFiles.keys()
lfns = list(inFiles.keys())
#for inLFN in inFiles.keys():
# lfns.append(inLFN)
tmpLog.debug('number of lfns - {0} type(lfns) - {1}'.format(len(lfns),type(lfns)))
Expand Down
8 changes: 6 additions & 2 deletions pandaharvester/harvesterstager/go_bulk_stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def check_stage_out_status(self, jobspec):
# get the scope of the log files
outfileattrib = jobspec.get_output_file_attributes()
scopeLog = 'xxxx'
for key in outfileattrib.keys():
for key in list(outfileattrib.keys()):
if "log.tgz" in key :
scopeLog = outfileattrib[key]['scope']
# get transfer groups
Expand Down Expand Up @@ -274,7 +274,11 @@ def check_stage_out_status(self, jobspec):
msgStr = "printed first 25 files skipping the rest".format(fileSpec.lfn, fileSpec.scope)
tmpLog.debug(msgStr)
hash = hashlib.md5()
hash.update('%s:%s' % (scope, fileSpec.lfn))
if sys.version_info.major == 2:
hash.update('%s:%s' % (scope, fileSpec.lfn))
if sys.version_info.major == 3:
hash_string = "{0}:{1}".format(scope, fileSpec.lfn)
hash.update(bytes(hash_string, 'utf-8'))
hash_hex = hash.hexdigest()
correctedscope = "/".join(scope.split('.'))
srcURL = fileSpec.path
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/harvestersubmitter/htcondor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _get_prodsourcelabel_pilotypeopt_piloturlstr(pilot_type, pilot_version='1'):
if pilot_version == '2':
# pilot 2
pt_psl_map = {
'RC': ('rc_test2', 'RC', '--piloturl http://project-atlas-gmsb.web.cern.ch/project-atlas-gmsb/pilot2-dev.tar.gz'),
'RC': ('rc_test2', 'RC', '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz'),
'ALRB': ('rc_alrb', 'ALRB', ''),
'PT': ('ptest', 'PR', ''),
}
Expand Down
9 changes: 7 additions & 2 deletions pandaharvester/harvestersweeper/act_sweeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ def __init__(self, **kwarg):
PluginBase.__init__(self, **kwarg)

self.log = core_utils.make_logger(baseLogger, 'aCT sweeper', method_name='__init__')
self.actDB = aCTDBPanda(self.log)
try:
self.actDB = aCTDBPanda(self.log)
except Exception as e:
self.log.error('Could not connect to aCT database: {0}'.format(str(e)))
self.actDB = None


# kill a worker
Expand All @@ -44,7 +48,8 @@ def kill_worker(self, workspec):
self.actDB.updateJobs("id={0} AND actpandastatus IN ('sent', 'starting', 'running')".format(workspec.batchID),
{'actpandastatus': 'tobekilled', 'pandastatus': None})
except Exception as e:
tmpLog.error('Failed to cancel job {0} in aCT: {1}'.format(workspec.batchID, str(e)))
if self.actDB:
tmpLog.error('Failed to cancel job {0} in aCT: {1}'.format(workspec.batchID, str(e)))
return False, str(e)

tmpLog.info('Job {0} cancelled in aCT'.format(workspec.batchID))
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/panda_pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.1.5"
release_version = "0.1.6"
3 changes: 3 additions & 0 deletions templates/panda_harvester.cfg.rpmnew.template
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,9 @@ verbose = False
# type : simple or apache
type = simple

# enable token authentication of apache frontend; default is True
authEnable = True

# file of secret used in token signature
secretFile = /FIXME

Expand Down

0 comments on commit f43656f

Please sign in to comment.