Skip to content

Commit

Permalink
Merge pull request #35 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.3.0.39
  • Loading branch information
PalNilsson authored Apr 28, 2022
2 parents ab2d317 + 7030622 commit c3f18bf
Show file tree
Hide file tree
Showing 26 changed files with 331 additions and 225 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.4.16
3.3.0.39
12 changes: 10 additions & 2 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,10 @@ def set_environment_variables():
environ['PILOT_OUTPUT_DIR'] = args.output_dir

# keep track of the server urls
environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port)
environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port, update_server=args.update_server)
environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url
environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url
if args.storagedata_url:
environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url


def wrap_up():
Expand Down Expand Up @@ -514,6 +515,13 @@ def wrap_up():
elif trace.pilot['state'] == ERRNO_NOJOBS:
logging.critical('pilot did not process any events -- aborting')
exitcode = ERRNO_NOJOBS

try:
exitcode = int(exitcode)
except TypeError as exc:
logging.warning(f'failed to convert exit code to int: {exitcode}, {exc}')
exitcode = 1008

logging.info('pilot has finished')
logging.shutdown()

Expand Down
4 changes: 2 additions & 2 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class StagingClient(object):
# list of allowed schemas to be used for direct acccess mode from REMOTE replicas
direct_remoteinput_allowed_schemas = ['root', 'https']
# list of schemas to be used for direct acccess mode from LOCAL replicas
direct_localinput_allowed_schemas = ['root', 'dcache', 'dcap', 'file', 'https', 'davs']
direct_localinput_allowed_schemas = ['root', 'dcache', 'dcap', 'file', 'https']
# list of allowed schemas to be used for transfers from REMOTE sites
remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'davs', 'srm', 'storm', 'https']
remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'srm', 'storm', 'https']

def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_copytools='rucio', trace_report=None):
"""
Expand Down
4 changes: 3 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class ErrorCodes:
COMMANDTIMEDOUT = 1367
REMOTEFILEOPENTIMEDOUT = 1368
FRONTIER = 1369
VOMSPROXYABOUTTOEXPIRE = 1370 # note, not a failure but an internal 'error' code used to download a new proxy

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -282,7 +283,8 @@ class ErrorCodes:
CHECKSUMCALCFAILURE: "Failure during checksum calculation",
COMMANDTIMEDOUT: "Command timed out",
REMOTEFILEOPENTIMEDOUT: "Remote file open timed out",
FRONTIER: "Frontier error"
FRONTIER: "Frontier error",
VOMSPROXYABOUTTOEXPIRE: "VOMS proxy is about to expire"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
32 changes: 13 additions & 19 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def write_utility_output(workdir, step, stdout, stderr):
write_output(os.path.join(workdir, step + '_stderr.txt'), stderr)


def copytool_in(queues, traces, args):
def copytool_in(queues, traces, args): # noqa: C901
"""
Call the stage-in function and put the job object in the proper queue.
Expand All @@ -504,20 +504,10 @@ def copytool_in(queues, traces, args):
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
cmd = user.get_utility_commands(job=job, order=UTILITY_BEFORE_STAGEIN)
if cmd:
# xcache debug
#_, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh')
#logger.debug('[before xcache start] stdout=%s', _stdout)
#logger.debug('[before xcache start] stderr=%s', _stderr)

_, stdout, stderr = execute(cmd.get('command'))
logger.debug('stdout=%s', stdout)
logger.debug('stderr=%s', stderr)

# xcache debug
#_, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh')
#logger.debug('[after xcache start] stdout=%s', _stdout)
#logger.debug('[after xcache start] stderr=%s', _stderr)

# perform any action necessary after command execution (e.g. stdout processing)
kwargs = {'label': cmd.get('label', 'utility'), 'output': stdout}
user.post_prestagein_utility_command(**kwargs)
Expand Down Expand Up @@ -562,10 +552,13 @@ def copytool_in(queues, traces, args):
# now create input file metadata if required by the payload
if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'generic':
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.metadata' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
file_dictionary = get_input_file_dictionary(job.indata)
xml = user.create_input_file_metadata(file_dictionary, job.workdir)
logger.info('created input file metadata:\n%s', xml)
try:
user = __import__('pilot.user.%s.metadata' % pilot_user, globals(), locals(), [pilot_user], 0)
file_dictionary = get_input_file_dictionary(job.indata)
xml = user.create_input_file_metadata(file_dictionary, job.workdir)
logger.info('created input file metadata:\n%s', xml)
except ModuleNotFoundError as exc:
logger.warning(f'no such module: {exc} (will not create input file metadata)')
else:
# remove the job from the current stage-in queue
_job = queues.current_data_in.get(block=True, timeout=1)
Expand Down Expand Up @@ -748,7 +741,7 @@ def filter_files_for_log(directory):
return filtered_files


def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], is_looping=False, debugmode=False):
def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], piloterrors=[], debugmode=False):
"""
Create the tarball for the job.
Expand All @@ -758,7 +751,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
:param cleanup: perform cleanup (Boolean).
:param input_files: list of input files to remove (list).
:param output_files: list of output files to remove (list).
:param is_looping: True for looping jobs, False by default (Boolean).
:param piloterrors: list of Pilot assigned error codes (list).
:param debugmode: True if debug mode has been switched on (Boolean).
:raises LogFileCreationFailure: in case of log file creation problem.
:return:
Expand All @@ -776,7 +769,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
if cleanup:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
user.remove_redundant_files(workdir, islooping=is_looping, debugmode=debugmode)
user.remove_redundant_files(workdir, piloterrors=piloterrors, debugmode=debugmode)

# remove any present input/output files before tarring up workdir
for fname in input_files + output_files:
Expand Down Expand Up @@ -840,6 +833,7 @@ def _do_stageout(job, xdata, activity, queue, title, output_dir='', rucio_host='

# should stage-in be done by a script (for containerisation) or by invoking the API (ie classic mode)?
use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))

if use_container:
logger.info('stage-out will be done in a container')
try:
Expand Down Expand Up @@ -938,7 +932,7 @@ def _stage_out_new(job, args):
output_files = [fspec.lfn for fspec in job.outdata]
create_log(job.workdir, logfile.lfn, tarball_name, args.cleanup,
input_files=input_files, output_files=output_files,
is_looping=errors.LOOPINGJOB in job.piloterrorcodes, debugmode=job.debug)
piloterrors=job.piloterrorcodes, debugmode=job.debug)
except LogFileCreationFailure as error:
logger.warning('failed to create tar file: %s', error)
set_pilot_state(job=job, state="failed")
Expand Down
69 changes: 52 additions & 17 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False)
# will it be the final update?
final = is_final_update(job, state, tag='sending' if args.update_server else 'writing')

# build the data structure needed for getJob, updateJob
# build the data structure needed for updateJob
data = get_data_structure(job, state, args, xml=xml, metadata=metadata)

# write the heartbeat message to file if the server is not to be updated by the pilot (Nordugrid mode)
Expand Down Expand Up @@ -532,12 +532,14 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False):
# job.debug_command = 'gdb --pid % -ex \'generate-core-file\''


def add_data_structure_ids(data, version_tag):
def add_data_structure_ids(data, version_tag, job):
"""
Add pilot, batch and scheduler ids to the data structure for getJob, updateJob.
:param data: data structure (dict).
:return: updated data structure (dict).
:param version_tag: Pilot version tag (string).
:param job: job object.
:return: updated data structure (dict), batchsystem_id (string|None).
"""

schedulerid = get_job_scheduler_id()
Expand All @@ -550,26 +552,27 @@ def add_data_structure_ids(data, version_tag):
pilotid = user.get_pilot_id(data['jobId'])
if pilotid:
pilotversion = os.environ.get('PILOT_VERSION')

# report the batch system job id, if available
batchsystem_type, batchsystem_id = get_batchsystem_jobid()

if batchsystem_type:
data['pilotID'] = "%s|%s|%s|%s" % (pilotid, batchsystem_type, version_tag, pilotversion)
data['batchID'] = batchsystem_id
if not job.batchid:
job.batchtype, job.batchid = get_batchsystem_jobid()
if job.batchtype and job.batchid:
data['pilotID'] = "%s|%s|%s|%s" % (pilotid, job.batchtype, version_tag, pilotversion)
data['batchID'] = job.batchid
else:
data['pilotID'] = "%s|%s|%s" % (pilotid, version_tag, pilotversion)
else:
logger.debug('no pilotid')

return data


def get_data_structure(job, state, args, xml=None, metadata=None):
"""
Build the data structure needed for getJob, updateJob.
Build the data structure needed for updateJob.
:param job: job object.
:param state: state of the job (string).
:param args:
:param args: Pilot args object.
:param xml: optional XML string.
:param metadata: job report metadata read as a string.
:return: data structure (dictionary).
Expand All @@ -583,7 +586,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
'attemptNr': job.attemptnr}

# add pilot, batch and scheduler ids to the data structure
data = add_data_structure_ids(data, args.version_tag)
data = add_data_structure_ids(data, args.version_tag, job)

starttime = get_postgetjob_time(job.jobid, args)
if starttime:
Expand Down Expand Up @@ -1387,7 +1390,7 @@ def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, max_ge
userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0)

# is the proxy still valid?
exit_code, diagnostics = userproxy.verify_proxy()
exit_code, diagnostics = userproxy.verify_proxy(test=False)
if traces.pilot['error_code'] == 0: # careful so we don't overwrite another error code
traces.pilot['error_code'] = exit_code
if exit_code == errors.NOPROXY or exit_code == errors.NOVOMSPROXY:
Expand Down Expand Up @@ -1824,7 +1827,8 @@ def retrieve(queues, traces, args): # noqa: C901
# get a job definition from a source (file or server)
res = get_job_definition(args)
#res['debug'] = True
dump_job_definition(res)
if res:
dump_job_definition(res)
if res is None:
logger.fatal('fatal error in job download loop - cannot continue')
# do not set graceful stop if pilot has not finished sending the final job update
Expand Down Expand Up @@ -2591,13 +2595,20 @@ def job_monitor(queues, traces, args): # noqa: C901
# perform the monitoring tasks
exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args)
if exit_code != 0:
if exit_code == errors.NOVOMSPROXY:
logger.warning('VOMS proxy has expired - keep monitoring job')
elif exit_code == errors.KILLPAYLOAD:
if exit_code == errors.VOMSPROXYABOUTTOEXPIRE:
# attempt to download a new proxy since it is about to expire
ec = download_new_proxy()
exit_code = ec if ec != 0 else 0 # reset the exit_code if success

if exit_code == errors.KILLPAYLOAD or exit_code == errors.NOVOMSPROXY:
jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(exit_code)
logger.debug('killing payload process')
kill_process(jobs[i].pid)
break
elif exit_code == 0:
# ie if download of new proxy was successful
diagnostics = ""
break
else:
try:
fail_monitored_job(jobs[i], exit_code, diagnostics, queues, traces)
Expand Down Expand Up @@ -2651,6 +2662,30 @@ def job_monitor(queues, traces, args): # noqa: C901
logger.debug('[job] job monitor thread has finished')


def download_new_proxy():
"""
The production proxy has expired, try to download a new one.
If it fails to download and verify a new proxy, return the NOVOMSPROXY error.
:return: exit code (int).
"""

exit_code = 0
x509 = os.environ.get('X509_USER_PROXY', '')
logger.warning('VOMS proxy is about to expire - attempt to download a new proxy')

pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0)

ec, diagnostics, x509 = user.get_and_verify_proxy(x509, voms_role='atlas:/atlas/Role=production')
if ec != 0: # do not return non-zero exit code if only download fails
logger.warning('failed to download/verify new proxy')
exit_code == errors.NOVOMSPROXY

return exit_code


def send_heartbeat_if_time(job, args, update_time):
"""
Send a heartbeat to the server if it is time to do so.
Expand Down
Loading

0 comments on commit c3f18bf

Please sign in to comment.