diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 4f114911..853b90ab 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -26,6 +26,7 @@ import logging import os import re +import time from collections import defaultdict from functools import reduce @@ -487,14 +488,17 @@ def get_nthreads(catchall: str) -> int: return _nthreads if _nthreads else 1 -def get_payload_command(job: JobData) -> str: +def get_payload_command(job: JobData, args: object = None) -> str: """ Return the full command for executing the payload, including the sourcing of all setup files and setting of environment variables. :param job: job object (JobData) - :return: command (str). + :param args: pilot arguments (object) + :return: command (str) :raises TrfDownloadFailure: in case of download failure. """ + if not args: # bypass pylint complaint + pass # Should the pilot do the setup or does jobPars already contain the information? preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams) @@ -515,6 +519,7 @@ def get_payload_command(job: JobData) -> str: exitcode = 0 diagnostics = "" + t0 = time.time() try: exitcode, diagnostics, not_opened_turls, lsetup_time = open_remote_files(job.indata, job.workdir, get_nthreads(catchall)) except Exception as exc: @@ -532,6 +537,10 @@ def get_payload_command(job: JobData) -> str: else: process_remote_file_traces(path, job, not_opened_turls) # ignore PyCharm warning, path is str + t1 = time.time() + dt = int(t1 - t0) + logger.info(f'remote file verification finished in {dt} s') + # fail the job if the remote files could not be verified if exitcode != 0: job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode, msg=diagnostics) diff --git a/pilot/user/generic/common.py b/pilot/user/generic/common.py index 3e2f312e..4c819e7e 100644 --- a/pilot/user/generic/common.py +++ b/pilot/user/generic/common.py @@ -27,6 +27,7 @@ from signal import SIGTERM from pilot.common.exception import TrfDownloadFailure +from pilot.info.jobdata import JobData from pilot.util.config import config from pilot.util.constants import ( UTILITY_BEFORE_PAYLOAD, @@ -64,7 +65,7 @@ def validate(job: object) -> bool: return True -def get_payload_command(job: object) -> str: +def get_payload_command(job: JobData, args: object = None) -> str: """ Return the full command for executing the payload. @@ -73,12 +74,15 @@ def get_payload_command(job: object) -> str: By default, the full payload command is assumed to be in the job.jobparams. :param job: job object (object) + :param args: pilot arguments (object) :return: command (str). """ # Try to download the trf # if job.imagename != "" or "--containerImage" in job.jobparams: # job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer") # logger.warning('overwrote job.transformation, now set to: %s' % job.transformation) + if not args: # bypass pylint complaint + pass ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir) if ec != 0: raise TrfDownloadFailure(diagnostics) diff --git a/pilot/user/rubin/common.py b/pilot/user/rubin/common.py index d83bed7d..0a853656 100644 --- a/pilot/user/rubin/common.py +++ b/pilot/user/rubin/common.py @@ -28,6 +28,7 @@ from typing import Any from pilot.common.exception import TrfDownloadFailure +from pilot.info.jobdata import JobData from pilot.util.config import config from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED from pilot.util.filehandling import read_file @@ -60,7 +61,7 @@ def validate(job: Any) -> bool: return True -def get_payload_command(job: object): +def get_payload_command(job: JobData, args: object = None): """ Return the full command for executing the payload. @@ -68,12 +69,15 @@ def get_payload_command(job: object): By default, the full payload command is assumed to be in the job.jobparams. :param job: job object (object) + :param args: pilot arguments (object) :return: command (str). """ # Try to download the trf # if job.imagename != "" or "--containerImage" in job.jobparams: # job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer") # logger.warning('overwrote job.transformation, now set to: %s' % job.transformation) + if not args: # bypass pylint complaint + pass ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir) if ec != 0: raise TrfDownloadFailure(diagnostics) diff --git a/pilot/user/sphenix/common.py b/pilot/user/sphenix/common.py index b4aa30ee..fff836bc 100644 --- a/pilot/user/sphenix/common.py +++ b/pilot/user/sphenix/common.py @@ -30,6 +30,7 @@ FileHandlingFailure ) from pilot.info import FileSpec +from pilot.info.jobdata import JobData from pilot.util.config import config from pilot.util.constants import ( UTILITY_AFTER_PAYLOAD_FINISHED, @@ -76,7 +77,7 @@ def validate(job: object) -> bool: return True -def get_payload_command(job: object) -> str: +def get_payload_command(job: JobData, args: object = None) -> str: """ Return the full command for executing the payload. @@ -84,12 +85,15 @@ def get_payload_command(job: object) -> str: By default, the full payload command is assumed to be in the job.jobparams. :param job: job object (object) + :param args: pilot arguments (object) :return: command (str). """ # Try to download the trf # if job.imagename != "" or "--containerImage" in job.jobparams: # job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer") # logger.warning('overwrote job.transformation, now set to: %s' % job.transformation) + if not args: # to bypass pylint complaint + pass ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir) if ec != 0: raise TrfDownloadFailure(diagnostics) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 58835e2c..9a1ed683 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -65,6 +65,8 @@ PILOT_POST_FINAL_UPDATE = 'PILOT_POST_FINAL_UPDATE' PILOT_END_TIME = 'PILOT_END_TIME' PILOT_KILL_SIGNAL = 'PILOT_KILL_SIGNAL' +PILOT_PRE_REMOTEIO = 'PILOT_PRE_REMOTEIO' +PILOT_POST_REMOTEIO = 'PILOT_POST_REMOTEIO' # Keep track of log transfers LOG_TRANSFER_NOT_DONE = 'NOT_DONE'