diff --git a/PILOTVERSION b/PILOTVERSION index de3696629..56c643365 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.2.4.16 \ No newline at end of file +3.3.0.39 \ No newline at end of file diff --git a/pilot.py b/pilot.py index f275b5b5f..886ac75e8 100755 --- a/pilot.py +++ b/pilot.py @@ -464,9 +464,10 @@ def set_environment_variables(): environ['PILOT_OUTPUT_DIR'] = args.output_dir # keep track of the server urls - environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port) + environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port, update_server=args.update_server) environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url - environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url + if args.storagedata_url: + environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url def wrap_up(): @@ -514,6 +515,13 @@ def wrap_up(): elif trace.pilot['state'] == ERRNO_NOJOBS: logging.critical('pilot did not process any events -- aborting') exitcode = ERRNO_NOJOBS + + try: + exitcode = int(exitcode) + except TypeError as exc: + logging.warning(f'failed to convert exit code to int: {exitcode}, {exc}') + exitcode = 1008 + logging.info('pilot has finished') logging.shutdown() diff --git a/pilot/api/data.py b/pilot/api/data.py index fe0ea8adb..106bb9507 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -51,9 +51,9 @@ class StagingClient(object): # list of allowed schemas to be used for direct acccess mode from REMOTE replicas direct_remoteinput_allowed_schemas = ['root', 'https'] # list of schemas to be used for direct acccess mode from LOCAL replicas - direct_localinput_allowed_schemas = ['root', 'dcache', 'dcap', 'file', 'https', 'davs'] + direct_localinput_allowed_schemas = ['root', 'dcache', 'dcap', 'file', 'https'] # list of allowed schemas to be used for transfers from REMOTE sites - remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'davs', 'srm', 'storm', 'https'] + remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'srm', 'storm', 'https'] def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_copytools='rucio', trace_report=None): """ diff --git a/pilot/common/errorcodes.py b/pilot/common/errorcodes.py index a41db4089..dcc7d7f22 100644 --- a/pilot/common/errorcodes.py +++ b/pilot/common/errorcodes.py @@ -152,6 +152,7 @@ class ErrorCodes: COMMANDTIMEDOUT = 1367 REMOTEFILEOPENTIMEDOUT = 1368 FRONTIER = 1369 + VOMSPROXYABOUTTOEXPIRE = 1370 # note, not a failure but an internal 'error' code used to download a new proxy _error_messages = { GENERALERROR: "General pilot error, consult batch log", @@ -282,7 +283,8 @@ class ErrorCodes: CHECKSUMCALCFAILURE: "Failure during checksum calculation", COMMANDTIMEDOUT: "Command timed out", REMOTEFILEOPENTIMEDOUT: "Remote file open timed out", - FRONTIER: "Frontier error" + FRONTIER: "Frontier error", + VOMSPROXYABOUTTOEXPIRE: "VOMS proxy is about to expire" } put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181] diff --git a/pilot/control/data.py b/pilot/control/data.py index 5db53a4ae..c7a4b30eb 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -477,7 +477,7 @@ def write_utility_output(workdir, step, stdout, stderr): write_output(os.path.join(workdir, step + '_stderr.txt'), stderr) -def copytool_in(queues, traces, args): +def copytool_in(queues, traces, args): # noqa: C901 """ Call the stage-in function and put the job object in the proper queue. @@ -504,20 +504,10 @@ def copytool_in(queues, traces, args): user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 cmd = user.get_utility_commands(job=job, order=UTILITY_BEFORE_STAGEIN) if cmd: - # xcache debug - #_, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh') - #logger.debug('[before xcache start] stdout=%s', _stdout) - #logger.debug('[before xcache start] stderr=%s', _stderr) - _, stdout, stderr = execute(cmd.get('command')) logger.debug('stdout=%s', stdout) logger.debug('stderr=%s', stderr) - # xcache debug - #_, _stdout, _stderr = execute('pgrep -x xrootd | awk \'{print \"ps -p \"$1\" -o args --no-headers --cols 300\"}\' | sh') - #logger.debug('[after xcache start] stdout=%s', _stdout) - #logger.debug('[after xcache start] stderr=%s', _stderr) - # perform any action necessary after command execution (e.g. stdout processing) kwargs = {'label': cmd.get('label', 'utility'), 'output': stdout} user.post_prestagein_utility_command(**kwargs) @@ -562,10 +552,13 @@ def copytool_in(queues, traces, args): # now create input file metadata if required by the payload if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'generic': pilot_user = os.environ.get('PILOT_USER', 'generic').lower() - user = __import__('pilot.user.%s.metadata' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 - file_dictionary = get_input_file_dictionary(job.indata) - xml = user.create_input_file_metadata(file_dictionary, job.workdir) - logger.info('created input file metadata:\n%s', xml) + try: + user = __import__('pilot.user.%s.metadata' % pilot_user, globals(), locals(), [pilot_user], 0) + file_dictionary = get_input_file_dictionary(job.indata) + xml = user.create_input_file_metadata(file_dictionary, job.workdir) + logger.info('created input file metadata:\n%s', xml) + except ModuleNotFoundError as exc: + logger.warning(f'no such module: {exc} (will not create input file metadata)') else: # remove the job from the current stage-in queue _job = queues.current_data_in.get(block=True, timeout=1) @@ -748,7 +741,7 @@ def filter_files_for_log(directory): return filtered_files -def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], is_looping=False, debugmode=False): +def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], output_files=[], piloterrors=[], debugmode=False): """ Create the tarball for the job. @@ -758,7 +751,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out :param cleanup: perform cleanup (Boolean). :param input_files: list of input files to remove (list). :param output_files: list of output files to remove (list). - :param is_looping: True for looping jobs, False by default (Boolean). + :param piloterrors: list of Pilot assigned error codes (list). :param debugmode: True if debug mode has been switched on (Boolean). :raises LogFileCreationFailure: in case of log file creation problem. :return: @@ -776,7 +769,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out if cleanup: pilot_user = os.environ.get('PILOT_USER', 'generic').lower() user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 - user.remove_redundant_files(workdir, islooping=is_looping, debugmode=debugmode) + user.remove_redundant_files(workdir, piloterrors=piloterrors, debugmode=debugmode) # remove any present input/output files before tarring up workdir for fname in input_files + output_files: @@ -840,6 +833,7 @@ def _do_stageout(job, xdata, activity, queue, title, output_dir='', rucio_host=' # should stage-in be done by a script (for containerisation) or by invoking the API (ie classic mode)? use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware")) + if use_container: logger.info('stage-out will be done in a container') try: @@ -938,7 +932,7 @@ def _stage_out_new(job, args): output_files = [fspec.lfn for fspec in job.outdata] create_log(job.workdir, logfile.lfn, tarball_name, args.cleanup, input_files=input_files, output_files=output_files, - is_looping=errors.LOOPINGJOB in job.piloterrorcodes, debugmode=job.debug) + piloterrors=job.piloterrorcodes, debugmode=job.debug) except LogFileCreationFailure as error: logger.warning('failed to create tar file: %s', error) set_pilot_state(job=job, state="failed") diff --git a/pilot/control/job.py b/pilot/control/job.py index 951b5d205..51d3043f6 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -309,7 +309,7 @@ def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False) # will it be the final update? final = is_final_update(job, state, tag='sending' if args.update_server else 'writing') - # build the data structure needed for getJob, updateJob + # build the data structure needed for updateJob data = get_data_structure(job, state, args, xml=xml, metadata=metadata) # write the heartbeat message to file if the server is not to be updated by the pilot (Nordugrid mode) @@ -532,12 +532,14 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False): # job.debug_command = 'gdb --pid % -ex \'generate-core-file\'' -def add_data_structure_ids(data, version_tag): +def add_data_structure_ids(data, version_tag, job): """ Add pilot, batch and scheduler ids to the data structure for getJob, updateJob. :param data: data structure (dict). - :return: updated data structure (dict). + :param version_tag: Pilot version tag (string). + :param job: job object. + :return: updated data structure (dict), batchsystem_id (string|None). """ schedulerid = get_job_scheduler_id() @@ -550,26 +552,27 @@ def add_data_structure_ids(data, version_tag): pilotid = user.get_pilot_id(data['jobId']) if pilotid: pilotversion = os.environ.get('PILOT_VERSION') - # report the batch system job id, if available - batchsystem_type, batchsystem_id = get_batchsystem_jobid() - - if batchsystem_type: - data['pilotID'] = "%s|%s|%s|%s" % (pilotid, batchsystem_type, version_tag, pilotversion) - data['batchID'] = batchsystem_id + if not job.batchid: + job.batchtype, job.batchid = get_batchsystem_jobid() + if job.batchtype and job.batchid: + data['pilotID'] = "%s|%s|%s|%s" % (pilotid, job.batchtype, version_tag, pilotversion) + data['batchID'] = job.batchid else: data['pilotID'] = "%s|%s|%s" % (pilotid, version_tag, pilotversion) + else: + logger.debug('no pilotid') return data def get_data_structure(job, state, args, xml=None, metadata=None): """ - Build the data structure needed for getJob, updateJob. + Build the data structure needed for updateJob. :param job: job object. :param state: state of the job (string). - :param args: + :param args: Pilot args object. :param xml: optional XML string. :param metadata: job report metadata read as a string. :return: data structure (dictionary). @@ -583,7 +586,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None): 'attemptNr': job.attemptnr} # add pilot, batch and scheduler ids to the data structure - data = add_data_structure_ids(data, args.version_tag) + data = add_data_structure_ids(data, args.version_tag, job) starttime = get_postgetjob_time(job.jobid, args) if starttime: @@ -1387,7 +1390,7 @@ def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, max_ge userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0) # is the proxy still valid? - exit_code, diagnostics = userproxy.verify_proxy() + exit_code, diagnostics = userproxy.verify_proxy(test=False) if traces.pilot['error_code'] == 0: # careful so we don't overwrite another error code traces.pilot['error_code'] = exit_code if exit_code == errors.NOPROXY or exit_code == errors.NOVOMSPROXY: @@ -1824,7 +1827,8 @@ def retrieve(queues, traces, args): # noqa: C901 # get a job definition from a source (file or server) res = get_job_definition(args) #res['debug'] = True - dump_job_definition(res) + if res: + dump_job_definition(res) if res is None: logger.fatal('fatal error in job download loop - cannot continue') # do not set graceful stop if pilot has not finished sending the final job update @@ -2591,13 +2595,20 @@ def job_monitor(queues, traces, args): # noqa: C901 # perform the monitoring tasks exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args) if exit_code != 0: - if exit_code == errors.NOVOMSPROXY: - logger.warning('VOMS proxy has expired - keep monitoring job') - elif exit_code == errors.KILLPAYLOAD: + if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: + # attempt to download a new proxy since it is about to expire + ec = download_new_proxy() + exit_code = ec if ec != 0 else 0 # reset the exit_code if success + + if exit_code == errors.KILLPAYLOAD or exit_code == errors.NOVOMSPROXY: jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(exit_code) logger.debug('killing payload process') kill_process(jobs[i].pid) break + elif exit_code == 0: + # ie if download of new proxy was successful + diagnostics = "" + break else: try: fail_monitored_job(jobs[i], exit_code, diagnostics, queues, traces) @@ -2651,6 +2662,30 @@ def job_monitor(queues, traces, args): # noqa: C901 logger.debug('[job] job monitor thread has finished') +def download_new_proxy(): + """ + The production proxy has expired, try to download a new one. + + If it fails to download and verify a new proxy, return the NOVOMSPROXY error. + + :return: exit code (int). + """ + + exit_code = 0 + x509 = os.environ.get('X509_USER_PROXY', '') + logger.warning('VOMS proxy is about to expire - attempt to download a new proxy') + + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + user = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0) + + ec, diagnostics, x509 = user.get_and_verify_proxy(x509, voms_role='atlas:/atlas/Role=production') + if ec != 0: # do not return non-zero exit code if only download fails + logger.warning('failed to download/verify new proxy') + exit_code == errors.NOVOMSPROXY + + return exit_code + + def send_heartbeat_if_time(job, args, update_time): """ Send a heartbeat to the server if it is time to do so. diff --git a/pilot/control/payload.py b/pilot/control/payload.py index c6358e981..692cdfb9c 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -140,11 +140,11 @@ def _validate_payload(job): # perform user specific validation pilot_user = os.environ.get('PILOT_USER', 'generic').lower() - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 + user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) try: status = user.validate(job) except Exception as error: - logger.fatal('failed to execute user validate() function: %s', error) + logger.fatal(f'failed to execute user validate() function: {error}') status = False return status @@ -362,26 +362,10 @@ def get_logging_info(job, args): print(f'exception caught: {exc}') info_dic = {} else: - path = None - if 'tail' in job.debug_command: - filename = job.debug_command.split(' ')[-1] - logger.debug(f'filename={filename}') - counter = 0 - path = None - maxwait = 5 * 60 - while counter < maxwait and not args.graceful_stop.is_set(): - path = find_file(filename, job.workdir) - if not path: - logger.debug(f'file {filename} not found, waiting for max {maxwait} s') - time.sleep(10) - else: - break - counter += 10 - if not path: - logger.warning(f'file {filename} was not found for {maxwait} s, using default') - logf = path if path else config.Payload.payloadstdout - logger.info(f'using {logf} for real-time logging') - info_dic['logfiles'] = [logf] + # find the log file to tail + path = find_log_to_tail(job.debug_command, job.workdir, args, job.is_analysis()) + logger.info(f'using {path} for real-time logging') + info_dic['logfiles'] = [path] else: items = logserver.split(':') info_dic['logging_type'] = items[0].lower() @@ -409,7 +393,46 @@ def get_logging_info(job, args): return info_dic -def run_realtimelog(queues, traces, args): +def find_log_to_tail(debug_command, workdir, args, is_analysis): + """ + Find the log file to tail in the RT logging. + + :param debug_command: requested debug command (string). + :param workdir: job working directory (string). + :param args: pilot args object. + :param is_analysis: True for user jobs (Bool). + :return: path to log file (string). + """ + + path = "" + filename = "" + counter = 0 + maxwait = 5 * 60 + + if 'tail' in debug_command: + filename = debug_command.split(' ')[-1] + elif is_analysis: + filename = 'tmp.stdout*' + if filename: + logger.debug(f'filename={filename}') + while counter < maxwait and not args.graceful_stop.is_set(): + path = find_file(filename, workdir) + if not path: + logger.debug(f'file {filename} not found, waiting for max {maxwait} s') + time.sleep(10) + else: + break + counter += 10 + + # fallback to known log file if no other file could be found + if not path: + logger.warning(f'file {filename} was not found for {maxwait} s, using default') + logf = path if path else config.Payload.payloadstdout + + return logf + + +def run_realtimelog(queues, traces, args): # noqa: C901 """ Validate finished payloads. If payload finished correctly, add the job to the data_out queue. If it failed, add it to the data_out queue as @@ -431,18 +454,21 @@ def run_realtimelog(queues, traces, args): # wait with proceeding until the job is running abort_loops = False + first1 = True + first2 = True while not args.graceful_stop.is_set(): # note: in multi-job mode, the real-time logging will be switched off at the end of the job - first = True while not args.graceful_stop.is_set(): if job.state == 'running': - logger.debug('job is running, check if real-time logger is needed') + if first1: + logger.debug('job is running, check if real-time logger is needed') + first1 = False break if job.state == 'stageout' or job.state == 'failed' or job.state == 'holding': - if first: + if first2: logger.debug(f'job is in state {job.state}, continue to next job or abort (wait for graceful stop)') - first = False + first2 = False time.sleep(10) continue time.sleep(1) @@ -465,7 +491,6 @@ def run_realtimelog(queues, traces, args): # only set info_dic once per job (the info will not change) info_dic = get_logging_info(job, args) - logger.debug(f'info_dic={info_dic}') if info_dic: args.use_realtime_logging = True realtime_logger = get_realtime_logger(args, info_dic, job.workdir, job.pilotsecrets) diff --git a/pilot/copytool/rucio.py b/pilot/copytool/rucio.py index 3d445797e..4d1af5656 100644 --- a/pilot/copytool/rucio.py +++ b/pilot/copytool/rucio.py @@ -49,7 +49,7 @@ def verify_stage_out(fspec): rse_settings = rsemgr.get_rse_info(fspec.ddmendpoint) uploaded_file = {'name': fspec.lfn, 'scope': fspec.scope} logger.info('Checking file: %s', str(fspec.lfn)) - return rsemgr.exists(rse_settings, [uploaded_file]) + return rsemgr.exists(rse_settings, [uploaded_file], domain='lan') #@timeout(seconds=10800) diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index 154e55f29..2c1099962 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -50,6 +50,8 @@ class JobData(BaseData): jobid = None # unique Job identifier (forced to be a string) taskid = None # unique Task identifier, the task that this job belongs to (forced to be a string) + batchid = None # batch system job id (should be removed from here) + batchtype = None # batch system type (should be removed from here) jobparams = "" # job parameters defining the execution of the job transformation = "" # script execution name # current job status; format = {key: value, ..} e.g. key='LOG_TRANSFER', value='DONE' @@ -107,6 +109,7 @@ class JobData(BaseData): corecounts = [] # keep track of all actual core count measurements looping_check = True # perform looping payload check checkinputsize = True # False when mv copytool is used and input reside on non-local disks + subprocesses = [] # list of PIDs for payload subprocesses # time variable used for on-the-fly cpu consumption time measurements done by job monitoring t0 = None # payload startup time @@ -164,7 +167,7 @@ class JobData(BaseData): 'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype', 'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata) 'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets'], - list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts'], + list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses'], dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess', 'postprocess', 'coprocess', 'containeroptions', 'pilotsecrets'], bool: ['is_eventservice', 'is_eventservicemerge', 'is_hpo', 'noexecstrcnv', 'debug', 'usecontainer', @@ -1013,6 +1016,7 @@ def reset_errors(self): # temporary fix, make sure all queues are empty before self.exitcode = 0 self.exitmsg = "" self.corecounts = [] + self.subprocesses = [] def to_json(self): from json import dumps diff --git a/pilot/scripts/stagein.py b/pilot/scripts/stagein.py index 03ec9a6aa..b9a7be86f 100644 --- a/pilot/scripts/stagein.py +++ b/pilot/scripts/stagein.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2020 +# - Paul Nilsson, paul.nilsson@cern.ch, 2020-2022 import argparse import os diff --git a/pilot/scripts/stageout.py b/pilot/scripts/stageout.py index dfc1145b5..07ea3e5d6 100644 --- a/pilot/scripts/stageout.py +++ b/pilot/scripts/stageout.py @@ -5,7 +5,8 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2020 +# - Paul Nilsson, paul.nilsson@cern.ch, 2020-2022 + import argparse import os import re diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 3f3fc8c3b..68caec7a1 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 # - Wen Guan, wen.guan@cern.ch, 2018 from collections import defaultdict @@ -292,7 +292,7 @@ def get_file_open_command(script_path, turls, nthreads): :return: comma-separated list of turls (string). """ - return "%s --turls=%s -w %s -t %s" % (script_path, turls, os.path.dirname(script_path), str(nthreads)) + return "%s --turls=\'%s\' -w %s -t %s" % (script_path, turls, os.path.dirname(script_path), str(nthreads)) def extract_turls(indata): @@ -431,6 +431,7 @@ def get_payload_command(job): else: logger.debug('no remote file open verification') + os.environ['INDS'] = 'unknown' # reset in case set by earlier job if is_standard_atlas_job(job.swrelease): # Normal setup (production and user jobs) logger.info("preparing normal production/analysis job setup command") @@ -2061,7 +2062,7 @@ def remove_special_files(workdir, dir_list, outputfiles): remove_dir_tree(item) -def remove_redundant_files(workdir, outputfiles=None, islooping=False, debugmode=False): +def remove_redundant_files(workdir, outputfiles=None, piloterrors=[], debugmode=False): """ Remove redundant files and directories prior to creating the log file. @@ -2069,7 +2070,7 @@ def remove_redundant_files(workdir, outputfiles=None, islooping=False, debugmode :param workdir: working directory (string). :param outputfiles: list of protected output files (list). - :param islooping: looping job variable to make sure workDir is not removed in case of looping (Boolean). + :param errors: list of Pilot assigned error codes (list). :param debugmode: True if debug mode has been switched on (Boolean). :return: """ @@ -2109,7 +2110,9 @@ def remove_redundant_files(workdir, outputfiles=None, islooping=False, debugmode if os.path.exists(path): # remove at least root files from workDir (ie also in the case of looping job) cleanup_looping_payload(path) - if not islooping: + islooping = errors.LOOPINGJOB in piloterrors + ismemerror = errors.PAYLOADEXCEEDMAXMEM in piloterrors + if not islooping and not ismemerror: logger.debug('removing \'workDir\' from workdir=%s', workdir) remove_dir_tree(path) diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index 96a92ecc2..177d6c1a7 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -5,69 +5,28 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 # - Alexander Bogdanchikov, Alexander.Bogdanchikov@cern.ch, 2019-2020 import os import pipes import re import logging -import traceback # for user container test: import urllib from pilot.common.errorcodes import ErrorCodes from pilot.common.exception import PilotException, FileHandlingFailure from pilot.user.atlas.setup import get_asetup, get_file_system_root_path -from pilot.user.atlas.proxy import verify_proxy +from pilot.user.atlas.proxy import get_and_verify_proxy from pilot.info import InfoService, infosys from pilot.util.config import config from pilot.util.filehandling import write_file -from pilot.util import https logger = logging.getLogger(__name__) errors = ErrorCodes() -def get_payload_proxy(proxy_outfile_name, voms_role='atlas'): - """ - :param proxy_outfile_name: specify the file to store proxy - :param voms_role: what proxy (role) to request. It should exist on Panda node - :return: True on success - """ - try: - # it assumes that https_setup() was done already - url = os.environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver) - res = https.request('{pandaserver}/server/panda/getProxy'.format(pandaserver=url), data={'role': voms_role}) - - if res is None: - logger.error("Unable to get proxy with role '%s' from panda server", voms_role) - return False - - if res['StatusCode'] != 0: - logger.error("When get proxy with role '%s' panda server returned: %s", voms_role, res['errorDialog']) - return False - - proxy_contents = res['userProxy'] - - except Exception as exc: - logger.error("Get proxy from panda server failed: %s, %s", exc, traceback.format_exc()) - return False - - res = False - try: - # pre-create empty proxy file with secure permissions. Prepare it for write_file() which can not - # set file permission mode, it will writes to the existing file with correct permissions. - _file = os.open(proxy_outfile_name, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) - os.close(_file) - res = write_file(proxy_outfile_name, proxy_contents, mute=False) # returns True on success - except (IOError, OSError, FileHandlingFailure) as exc: - logger.error("Exception when try to save proxy to the file '%s': %s, %s", - proxy_outfile_name, exc, traceback.format_exc()) - - return res - - def do_use_container(**kwargs): """ Decide whether to use a container or not. @@ -324,7 +283,7 @@ def update_for_user_proxy(_cmd, cmd, is_analysis=False): # download and verify payload proxy from the server if desired proxy_verification = os.environ.get('PILOT_PROXY_VERIFICATION') == 'True' and os.environ.get('PILOT_PAYLOAD_PROXY_VERIFICATION') == 'True' if proxy_verification and config.Pilot.payload_proxy_from_server and is_analysis: - exit_code, diagnostics, x509 = get_and_verify_payload_proxy_from_server(x509) + exit_code, diagnostics, x509 = get_and_verify_proxy(x509, voms_role='atlas', proxy_type='payload') if exit_code != 0: # do not return non-zero exit code if only download fails logger.warning('payload proxy verification failed') @@ -334,40 +293,6 @@ def update_for_user_proxy(_cmd, cmd, is_analysis=False): return exit_code, diagnostics, _cmd, cmd -def get_and_verify_payload_proxy_from_server(x509): - """ - Download a payload proxy from the server and verify it. - - :param x509: X509_USER_PROXY (string). - :return: exit code (int), diagnostics (string), updated X509_USER_PROXY (string). - """ - - exit_code = 0 - diagnostics = "" - - # try to receive payload proxy and update x509 - x509_payload = re.sub('.proxy$', '', x509) + '-payload.proxy' # compose new name to store payload proxy - #x509_payload = re.sub('.proxy$', '', x509) + 'p.proxy' # compose new name to store payload proxy - - logger.info("download payload proxy from server") - if get_payload_proxy(x509_payload): - logger.info("server returned payload proxy (verifying)") - exit_code, diagnostics = verify_proxy(x509=x509_payload, proxy_id=None) - # if all verifications fail, verify_proxy() returns exit_code=0 and last failure in diagnostics - if exit_code != 0 or (exit_code == 0 and diagnostics != ''): - logger.warning(diagnostics) - logger.info("payload proxy verification failed") - else: - logger.info("payload proxy verified") - # is commented: no user proxy should be in the command the container will execute - # cmd = cmd.replace("export X509_USER_PROXY=%s;" % x509, "export X509_USER_PROXY=%s;" % x509_payload) - x509 = x509_payload - else: - logger.warning("get_payload_proxy() failed") - - return exit_code, diagnostics, x509 - - def set_platform(job, alrb_setup): """ Set thePlatform variable and add it to the sub container command. @@ -851,7 +776,11 @@ def get_middleware_container_script(middleware_container, cmd, asetup=False, lab sitename = 'export PILOT_RUCIO_SITENAME=%s; ' % os.environ.get('PILOT_RUCIO_SITENAME') if 'rucio' in middleware_container: - content = sitename + 'python3 %s ' % cmd # only works with python 3 + content = sitename + content += f'export ATLAS_LOCAL_ROOT_BASE={get_file_system_root_path()}/atlas.cern.ch/repo/ATLASLocalRootBase; ' + content += "alias setupATLAS=\'source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh\'; " + content += "setupATLAS -3; " + content = 'lsetup \"python pilot-default\";python3 %s ' % cmd # only works with python 3 else: content = 'export ALRB_LOCAL_PY3=YES; ' if asetup: # export ATLAS_LOCAL_ROOT_BASE=/cvmfs/..;source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh --quiet; diff --git a/pilot/user/atlas/cpu.py b/pilot/user/atlas/cpu.py index 411b97412..6e6642d3e 100644 --- a/pilot/user/atlas/cpu.py +++ b/pilot/user/atlas/cpu.py @@ -11,7 +11,8 @@ import logging # from .utilities import get_memory_values -from pilot.util.container import execute +#from pilot.util.container import execute +from pilot.util.math import float_to_rounded_string from .utilities import get_memory_values logger = logging.getLogger(__name__) @@ -90,7 +91,12 @@ def set_core_counts(**kwargs): walltime = kwargs.get('walltime', None) if job and walltime: - summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor) + try: + summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor) + except ValueError as exc: + logger.warning(f'failed to parse memory monitor output: {exc}') + summary_dictionary = None + if summary_dictionary: time_dictionary = summary_dictionary.get('Time', None) if time_dictionary: @@ -102,6 +108,7 @@ def set_core_counts(**kwargs): logger.debug(f'walltime={walltime}') cores = float(stime + utime) / float(walltime) logger.debug(f'number of cores={cores}') + job.actualcorecount = float_to_rounded_string(cores, precision=2) else: logger.debug('no stime/utime') else: @@ -111,25 +118,25 @@ def set_core_counts(**kwargs): else: logger.debug(f'failed to calculate number of cores (walltime={walltime})') - if job and job.pgrp: - # ps axo pgid,psr -> 154628 8 \n 154628 9 \n 1546280 1 .. - # sort is redundant; uniq removes any duplicate lines; wc -l gives the final count - # awk is added to get the pgrp list only and then grep -x makes sure that false positives are removed, e.g. 1546280 - cmd = "ps axo pgid,psr | sort | grep %d | uniq | awk '{print $1}' | grep -x %d | wc -l" % (job.pgrp, job.pgrp) - _, stdout, _ = execute(cmd, mute=True) - logger.debug('%s: %s', cmd, stdout) - try: - job.actualcorecount = int(stdout) - except ValueError as exc: - logger.warning('failed to convert number of actual cores to int: %s', exc) - else: - job.corecounts = add_core_count(job.actualcorecount) #, core_counts=job.corecounts) - #logger.debug('current core counts list: %s', str(job.corecounts)) - # check suspicious values - #if job.actualcorecount > 5: - # logger.warning('detected large actualcorecount: %d', job.actualcorecount) - # cmd = "ps axo pgid,stat,euid,ruid,tty,tpgid,sess,pgrp,ppid,pid,pcpu,comm | sort | uniq | grep %d" % job.pgrp - # exit_code, stdout, stderr = execute(cmd, mute=True) - # logger.debug('%s (pgrp=%d): %s', cmd, job.pgrp, stdout) - else: - logger.debug('payload process group not set - cannot check number of cores used by payload') +# if job and job.pgrp: +# # ps axo pgid,psr -> 154628 8 \n 154628 9 \n 1546280 1 .. +# # sort is redundant; uniq removes any duplicate lines; wc -l gives the final count +# # awk is added to get the pgrp list only and then grep -x makes sure that false positives are removed, e.g. 1546280 +# cmd = "ps axo pgid,psr | sort | grep %d | uniq | awk '{print $1}' | grep -x %d | wc -l" % (job.pgrp, job.pgrp) +# _, stdout, _ = execute(cmd, mute=True) +# logger.debug('%s: %s', cmd, stdout) +# try: +# job.actualcorecount = int(stdout) +# except ValueError as exc: +# logger.warning('failed to convert number of actual cores to int: %s', exc) +# else: +# job.corecounts = add_core_count(job.actualcorecount) #, core_counts=job.corecounts) +# #logger.debug('current core counts list: %s', str(job.corecounts)) +# # check suspicious values +# #if job.actualcorecount > 5: +# # logger.warning('detected large actualcorecount: %d', job.actualcorecount) +# # cmd = "ps axo pgid,stat,euid,ruid,tty,tpgid,sess,pgrp,ppid,pid,pcpu,comm | sort | uniq | grep %d" % job.pgrp +# # exit_code, stdout, stderr = execute(cmd, mute=True) +# # logger.debug('%s (pgrp=%d): %s', cmd, job.pgrp, stdout) +# else: +# logger.debug('payload process group not set - cannot check number of cores used by payload') diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index 2aec15b72..4ba9430e1 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -12,16 +12,55 @@ import os import logging +import re +from time import time # from pilot.user.atlas.setup import get_file_system_root_path from pilot.util.container import execute from pilot.common.errorcodes import ErrorCodes -from time import time +from pilot.util.proxy import get_proxy logger = logging.getLogger(__name__) errors = ErrorCodes() +def get_and_verify_proxy(x509, voms_role='', proxy_type=''): + """ + Download a payload proxy from the server and verify it. + + :param x509: X509_USER_PROXY (string). + :param voms_role: role, e.g. 'atlas' (string). + :param proxy_type: proxy type ('payload' for user payload proxy, blank for prod/user proxy) (string). + :return: exit code (int), diagnostics (string), updated X509_USER_PROXY (string). + """ + + exit_code = 0 + diagnostics = "" + + # try to receive payload proxy and update x509 + if proxy_type: + x509_payload = re.sub('.proxy$', '', x509) + f'-{proxy_type}.proxy' # compose new name to store payload proxy + else: + x509_payload = x509 + logger.info(f"download proxy from server (type=\'{proxy_type}\')") + if get_proxy(x509_payload, voms_role): + logger.debug("server returned proxy (verifying)") + exit_code, diagnostics = verify_proxy(x509=x509_payload, proxy_id=None, test=False) + # if all verifications fail, verify_proxy() returns exit_code=0 and last failure in diagnostics + if exit_code != 0 or (exit_code == 0 and diagnostics != ''): + logger.warning(diagnostics) + logger.info(f"proxy verification failed (type=\'{proxy_type}\')") + else: + logger.info(f"proxy verified (type=\'{proxy_type}\')") + # is commented: no user proxy should be in the command the container will execute + # cmd = cmd.replace("export X509_USER_PROXY=%s;" % x509, "export X509_USER_PROXY=%s;" % x509_payload) + x509 = x509_payload + else: + logger.warning(f"failed to get proxy for role=\'{voms_role}\'") + + return exit_code, diagnostics, x509 + + def verify_proxy(limit=None, x509=None, proxy_id="pilot", test=False): """ Check for a valid voms/grid proxy longer than N hours. @@ -83,7 +122,8 @@ def verify_arcproxy(envsetup, limit, proxy_id="pilot", test=False): diagnostics = "" if test: - return errors.NOVOMSPROXY, 'dummy test' + return errors.VOMSPROXYABOUTTOEXPIRE, 'dummy test' + #return errors.NOVOMSPROXY, 'dummy test' if proxy_id is not None: if not hasattr(verify_arcproxy, "cache"): @@ -99,10 +139,14 @@ def verify_arcproxy(envsetup, limit, proxy_id="pilot", test=False): seconds_left = validity_end - tnow logger.info("cache: check %s proxy validity: wanted=%dh left=%.2fh (now=%d validity_end=%d left=%d)", proxy_id, limit, float(seconds_left) / 3600, tnow, validity_end, seconds_left) - if seconds_left < limit * 3600: - diagnostics = "%s proxy validity time is too short: %.2fh" % (proxy_id, float(seconds_left) / 3600) + if seconds_left < limit * 3600: # REMOVE THIS, FAVOUR THE NEXT + diagnostics = f"{proxy_id} proxy validity time is too short: %.2fh" % (float(seconds_left) / 3600) logger.warning(diagnostics) exit_code = errors.NOVOMSPROXY + elif seconds_left < limit * 3600 - 20 * 60: # FAVOUR THIS, IE NEVER SET THE PREVIOUS + diagnostics = f'{proxy_id} proxy is about to expire: %.2fh' % (float(seconds_left) / 3600) + logger.warning(diagnostics) + exit_code = errors.VOMSPROXYABOUTTOEXPIRE else: logger.info("%s proxy validity time is verified", proxy_id) return exit_code, diagnostics @@ -253,6 +297,11 @@ def interpret_proxy_info(ec, stdout, stderr, limit): validity_end, stdout = extract_time_left(stdout) if validity_end: return exitcode, diagnostics, validity_end + else: + diagnostics = "arcproxy failed: %s" % stdout + logger.warning(diagnostics) + exitcode = errors.GENERALERROR + return exitcode, diagnostics, validity_end # test for command errors if "arcproxy:" in stdout: @@ -289,6 +338,8 @@ def extract_time_left(stdout): :return: validity_end, stdout (int, string)) """ + validity_end = None + # remove the last \n in case there is one if stdout[-1] == '\n': stdout = stdout[:-1] diff --git a/pilot/user/atlas/setup.py b/pilot/user/atlas/setup.py index e01aeece0..5b532b066 100644 --- a/pilot/user/atlas/setup.py +++ b/pilot/user/atlas/setup.py @@ -200,6 +200,7 @@ def set_inds(dataset): os.environ['INDS'] = inds else: logger.warning("INDS unknown") + os.environ['INDS'] = 'unknown' def get_analysis_trf(transform, workdir): diff --git a/pilot/user/generic/common.py b/pilot/user/generic/common.py index 5ad05e116..4d28263ba 100644 --- a/pilot/user/generic/common.py +++ b/pilot/user/generic/common.py @@ -5,7 +5,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2020 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 import os from signal import SIGTERM @@ -109,13 +109,13 @@ def update_job_data(job): pass -def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False): +def remove_redundant_files(workdir, outputfiles=[], piloterrors=[], debugmode=False): """ Remove redundant files and directories prior to creating the log file. :param workdir: working directory (string). :param outputfiles: list of output files. - :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean). + :param piloterrors: list of Pilot assigned error codes (list). :return: """ diff --git a/pilot/user/rubin/common.py b/pilot/user/rubin/common.py index 41fa15ffa..35113dc83 100644 --- a/pilot/user/rubin/common.py +++ b/pilot/user/rubin/common.py @@ -110,13 +110,13 @@ def update_job_data(job): pass -def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False): +def remove_redundant_files(workdir, outputfiles=[], piloterrors=[], debugmode=False): """ Remove redundant files and directories prior to creating the log file. :param workdir: working directory (string). :param outputfiles: list of output files. - :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean). + :param piloterrors: list of Pilot assigned error codes (list). :param debugmode: True if debug mode has been switched on (Boolean). :return: """ diff --git a/pilot/user/sphenix/common.py b/pilot/user/sphenix/common.py index 4737ae173..4d28263ba 100644 --- a/pilot/user/sphenix/common.py +++ b/pilot/user/sphenix/common.py @@ -109,13 +109,13 @@ def update_job_data(job): pass -def remove_redundant_files(workdir, outputfiles=[], islooping=False, debugmode=False): +def remove_redundant_files(workdir, outputfiles=[], piloterrors=[], debugmode=False): """ Remove redundant files and directories prior to creating the log file. :param workdir: working directory (string). :param outputfiles: list of output files. - :param islooping: looping job variable to make sure workDir is not removed in case of looping (boolean). + :param piloterrors: list of Pilot assigned error codes (list). :return: """ diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index d417921fc..a6a96d47b 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -124,11 +124,15 @@ def get_batchsystem_jobid(): # Condor (get jobid from classad file) if '_CONDOR_JOB_AD' in os.environ: - from subprocess import getoutput - out = getoutput('sed -n "s/^GlobalJobId.*\\"\\(.*\\)\\".*/\\1/p" %s' % os.environ.get("_CONDOR_JOB_AD")) - out = out.split('\n')[-1] if '\n' in out else out - return "Condor", out - + try: + with open(os.environ.get("_CONDOR_JOB_AD"), 'r') as _fp: + for line in _fp: + res = re.search(r'^GlobalJobId\s*=\s*"(.*)"', line) + if res is None: + continue + return "Condor", res.group(1) + except OSError as exc: + logger.warning("failed to read HTCondor job classAd: %s", exc) return None, "" diff --git a/pilot/util/constants.py b/pilot/util/constants.py index ea21201b5..e2e4fdc22 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -12,9 +12,9 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 -VERSION = '2' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '16' # build number should be reset to '1' for every new development cycle +VERSION = '3' # version number is '1' for first release, '0' until then, increased for bigger updates +REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '39' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/default.cfg b/pilot/util/default.cfg index a34c397bc..01c022f30 100644 --- a/pilot/util/default.cfg +++ b/pilot/util/default.cfg @@ -48,10 +48,10 @@ iddsserver: https://pandaserver.cern.ch:25443 rtlogging:logstash;http://aipanda020.cern.ch:8443 # The heartbeat period in seconds (30*60 = 1800 s in normal mode, 5*60 = 300 s in debug mode) -# A lost heartbeat is 60*60*6 s +# A lost heartbeat is 60*60*3 s, i.e. 3h heartbeat: 1800 debug_heartbeat: 60 -lost_heartbeat = 21600 +lost_heartbeat = 10800 # Heartbeat message file (only used when Pilot is not sending heartbeats to server) heartbeat_message: heartbeat.json @@ -232,6 +232,7 @@ middleware_stageout_stderr: stageout_stderr.txt # This image is used if middleware is not found locally on the worker node. Middleware is expected to be present # in the container image middleware_container: /cvmfs/unpacked.cern.ch/registry.hub.docker.com/atlas/rucio-clients:default +#middleware_container: /cvmfs/unpacked.cern.ch/registry.hub.docker.com/atlas/rucio-clients:release-1.25.3 # On HPC (ALRB will locate the image) middleware_container_no_path: atlas/rucio-clients:default diff --git a/pilot/util/https.py b/pilot/util/https.py index 4b86eeac9..80b846304 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -9,7 +9,7 @@ # - Mario Lassnig, mario.lassnig@cern.ch, 2017 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 -import subprocess +#import subprocess import json import os import platform @@ -27,6 +27,7 @@ from .filehandling import write_file from .config import config from .constants import get_pilot_version +from .container import execute import logging logger = logging.getLogger(__name__) @@ -304,7 +305,9 @@ def execute_request(req): :return: status (int), output (string). """ - return subprocess.getstatusoutput(req) + exit_code, stdout, _ = execute(req) + return exit_code, stdout +# return subprocess.getstatusoutput(req) def execute_urllib(url, data, plain, secure): @@ -401,19 +404,20 @@ def send_update(update_function, data, url, port, job=None): return res -def get_panda_server(url, port): +def get_panda_server(url, port, update_server=True): """ Get the URL for the PanDA server. + The URL will be randomized if the server can be contacted (otherwise fixed). :param url: URL string, if set in pilot option (port not included). :param port: port number, if set in pilot option (int). - :return: full URL (either from pilot options or from config file) + :param update_server: True if the server can be contacted (Boolean). + :return: full URL (either from pilot options or from config file). """ if url != '': parsedurl = url.split('://') scheme = None - loc = None if len(parsedurl) == 2: scheme = parsedurl[0] loc = parsedurl[1] @@ -436,6 +440,9 @@ def get_panda_server(url, port): if not pandaserver.startswith('http'): pandaserver = 'https://' + pandaserver + if not update_server: + return pandaserver + # add randomization for PanDA server default = 'pandaserver.cern.ch' if default in pandaserver: diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 637696eed..6bed91752 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -268,9 +268,10 @@ def verify_user_proxy(current_time, mt): userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 # is it time to verify the proxy? - proxy_verification_time = convert_to_int(config.Pilot.proxy_verification_time, default=600) + proxy_verification_time = 60 # convert_to_int(config.Pilot.proxy_verification_time, default=600) if current_time - mt.get('ct_proxy') > proxy_verification_time: # is the proxy still valid? + logger.debug('calling verify_proxy with test=True') exit_code, diagnostics = userproxy.verify_proxy(test=False) # use test=True to test expired proxy if exit_code != 0: return exit_code, diagnostics diff --git a/pilot/util/proxy.py b/pilot/util/proxy.py index 31c6d2dd9..2b463c48d 100644 --- a/pilot/util/proxy.py +++ b/pilot/util/proxy.py @@ -7,10 +7,16 @@ # Authors: # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022 +import logging +import os +import traceback + +from pilot.common.exception import FileHandlingFailure +from pilot.util import https +from pilot.util.config import config from pilot.util.container import execute +from pilot.util.filehandling import write_file -import os -import logging logger = logging.getLogger(__name__) @@ -49,15 +55,42 @@ def get_distinguished_name(): return dn -def get_proxy(server, path): +def get_proxy(proxy_outfile_name, voms_role): """ - Download proxy from given server and store it in given path. - - :param server: server URL (string). - :return: + :param proxy_outfile_name: specify the file to store proxy (string). + :param voms_role: what proxy (role) to request, e.g. 'atlas' (string). + :return: True on success (Boolean). """ + try: + # it assumes that https_setup() was done already + url = os.environ.get('PANDA_SERVER_URL', config.Pilot.pandaserver) + res = https.request('{pandaserver}/server/panda/getProxy'.format(pandaserver=url), data={'role': voms_role}) - pass + if res is None: + logger.error(f"unable to get proxy with role '{voms_role}' from panda server") + return False + + if res['StatusCode'] != 0: + logger.error(f"panda server returned: \'{res['errorDialog']}\' for proxy role \'{voms_role}\'") + return False + + proxy_contents = res['userProxy'] + + except Exception as exc: + logger.error(f"Get proxy from panda server failed: {exc}, {traceback.format_exc()}") + return False + + res = False + try: + # pre-create empty proxy file with secure permissions. Prepare it for write_file() which can not + # set file permission mode, it will writes to the existing file with correct permissions. + _file = os.open(proxy_outfile_name, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + os.close(_file) + res = write_file(proxy_outfile_name, proxy_contents, mute=False) # returns True on success + except (IOError, OSError, FileHandlingFailure) as exc: + logger.error(f"exception caught:\n{exc},\ntraceback: {traceback.format_exc()}") + + return res def create_cert_files(from_proxy, workdir): @@ -70,9 +103,6 @@ def create_cert_files(from_proxy, workdir): :return: path to crt.pem (string), path to key.pem (string). """ -# return os.path.join(workdir, os.environ.get("X509_USER_PROXY")),\ -# os.path.join(workdir, os.environ.get("X509_USER_PROXY")) - _files = [os.path.join(workdir, 'crt.pem'), os.path.join(workdir, 'key.pem')] if os.path.exists(_files[0]) and os.path.exists(_files[1]): return _files[0], _files[1] diff --git a/pilot/util/workernode.py b/pilot/util/workernode.py index 737154e29..787f4c80a 100644 --- a/pilot/util/workernode.py +++ b/pilot/util/workernode.py @@ -11,9 +11,10 @@ import re import logging -from subprocess import getoutput +#from subprocess import getoutput from pilot.common.exception import PilotException, ErrorCodes +from pilot.util.container import execute from pilot.info import infosys from pilot.util.disk import disk_usage @@ -32,11 +33,13 @@ def get_local_disk_space(path): # -mP = blocks of 1024*1024 (MB) and POSIX format cmd = f"df -mP {path}" - disks = getoutput(cmd) - if disks: - logger.debug(f'disks={disks}') + #disks = getoutput(cmd) + _, stdout, stderr = execute(cmd) + if stdout: + logger.debug(f'stdout={stdout}') + logger.debug(f'stderr={stderr}') try: - disk = float(disks.splitlines()[1].split()[3]) + disk = float(stdout.splitlines()[1].split()[3]) except (IndexError, ValueError, TypeError, AttributeError) as error: msg = f'exception caught while trying to convert disk info: {error}' logger.warning(msg)