diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index b49c5a2091a..e13b1ed6c90 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -26,6 +26,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.Core.Utilities.Proxy import executeWithUserProxy from DIRAC.Core.Utilities.Version import getVersion from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.Resources.Computing import ComputingElement @@ -106,9 +107,10 @@ def initialize(self): if self.submissionPolicy not in ["Application", "JobWrapper"]: return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") - result = self._initializeComputingElement("Pool") - if not result["OK"]: - return result + if self.submissionPolicy == "Application": + result = self._initializeComputingElement("Pool") + if not result["OK"]: + return result # on-the fly imports ol = ObjectLoader() @@ -133,14 +135,15 @@ def beginExecution(self): return result self.pilotDN, _ = result["Value"] - # Maximum number of jobs that can be handled at the same time by the agent + # (only for Application submission) Maximum number of jobs that can be handled at the same time by the agent self.maxJobsToSubmit = self.am_getOption("MaxJobsToSubmit", self.maxJobsToSubmit) - self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit}) + if self.submissionPolicy == "Application": + self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit}) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) self.cleanTask = self.am_getOption("CleanTask", self.cleanTask) - # Get the location of the CVMFS installation + # (only for JobWrapper submission) Get the location of the CVMFS installation if self.submissionPolicy == "JobWrapper": self.cvmfsLocation = self.am_getOption("CVMFSLocation", self.cvmfsLocation) self.log.info("CVMFS location:", self.cvmfsLocation) @@ -207,6 +210,15 @@ def execute(self): queueDictItems = list(self.queueDict.items()) random.shuffle(queueDictItems) + # Get a pilot proxy + cpuTime = 86400 * 3 + self.log.verbose("Getting pilot proxy", "for %s/%s %d long" % (self.pilotDN, self.vo, cpuTime)) + pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup") + result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, cpuTime) + if not result["OK"]: + return result + pilotProxy = result["Value"] + if self.submissionPolicy == "Application": # Check that there is enough slots locally result = self._checkCEAvailability(self.computingElement) @@ -223,16 +235,9 @@ def execute(self): if not self._allowedToSubmit(queueName): continue - # Get a working proxy + # Get the CE instance ce = queueDictionary["CE"] - cpuTime = 86400 * 3 - self.log.verbose("Getting pilot proxy", "for %s/%s %d long" % (self.pilotDN, self.vo, cpuTime)) - pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup") - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, cpuTime) - if not result["OK"]: - return result - proxy = result["Value"] - ce.setProxy(proxy) + ce.setProxy(pilotProxy) if self.submissionPolicy == "JobWrapper": # Check errors that could have occurred during job submission and/or execution @@ -360,6 +365,8 @@ def execute(self): jobParams=params, resourceParams=ceDict, optimizerParams=optimizerParams, + owner=owner, + jobGroup=jobGroup, processors=submissionParams["processors"], ) if not result["OK"]: @@ -465,6 +472,48 @@ def _checkMatchingIssues(self, jobRequest): return S_OK() + ############################################################################# + + @executeWithUserProxy + def preProcessJob(self, job: JobWrapper): + """Preprocess the job before submission: should be executed with the user proxy associated with the payload + + :param JobWrapper job: job to preprocess + """ + if "InputSandbox" in job.jobArgs: + self.log.verbose("Getting the inputSandbox of job", job.jobID) + if not transferInputSandbox(job, job.jobArgs["InputSandbox"]): + return S_ERROR(f"Cannot get input sandbox of job {job.jobID}") + job.jobReport.commit() + + if "InputData" in job.jobArgs and job.jobArgs["InputData"]: + self.log.verbose("Getting the inputData of job", job.jobID) + if not resolveInputData(job): + return S_ERROR(f"Cannot get input data of job {job.jobID}") + job.jobReport.commit() + + # Preprocess the payload + try: + self.log.verbose("Pre-processing job", job.jobID) + result = job.preProcess() + if not result["OK"]: + self.log.error("JobWrapper failed the preprocessing phase for", f"{job.jobID}: {result['Message']}") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return S_ERROR(JobMinorStatus.JOB_WRAPPER_EXECUTION) + except Exception: + self.log.exception("JobWrapper failed the preprocessing phase for", job.jobID) + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return S_ERROR(f"JobWrapper failed the preprocessing phase for {job.jobID}") + + job.jobReport.commit() + return S_OK(result["Value"]) + def _submitJobWrapper( self, jobID: str, @@ -473,6 +522,8 @@ def _submitJobWrapper( jobParams: dict, resourceParams: dict, optimizerParams: dict, + owner: str, + jobGroup: str, processors: int, ): """Submit a JobWrapper to the remote site @@ -482,7 +533,8 @@ def _submitJobWrapper( :param jobParams: job parameters :param resourceParams: resource parameters :param optimizerParams: optimizer parameters - :param proxyChain: proxy chain + :param owner: owner of the job + :param jobGroup: group of the job :param processors: number of processors :return: S_OK @@ -513,38 +565,10 @@ def _submitJobWrapper( if not job: return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}") - if "InputSandbox" in jobParams: - self.log.verbose("Getting the inputSandbox of job", jobID) - if not transferInputSandbox(job, jobParams["InputSandbox"], jobReport): - return S_ERROR(f"Cannot get input sandbox of job {jobID}") - jobReport.commit() - - if "InputData" in jobParams and jobParams["InputData"]: - self.log.verbose("Getting the inputData of job", jobID) - if not resolveInputData(job, jobReport): - return S_ERROR(f"Cannot get input data of job {jobID}") - jobReport.commit() - - # Preprocess the payload - try: - self.log.verbose("Pre-processing job", jobID) - result = job.preProcess() - if not result["OK"]: - self.log.error("JobWrapper failed the preprocessing phase for", f"{jobID}: {result['Message']}") - rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return S_ERROR(JobMinorStatus.JOB_WRAPPER_EXECUTION) - except Exception: - self.log.exception("JobWrapper failed the preprocessing phase for", jobID) - rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return S_ERROR(f"JobWrapper failed the preprocessing phase for {jobID}") + result = self.preProcessJob(job, proxyUserName=job.owner, proxyUserGroup=job.userGroup) + if not result["OK"]: + return result payloadParams = result["Value"] - jobReport.commit() # Dump the remote CFG config into the job directory: it is needed for the JobWrapperTemplate cfgFilename = Path(job.jobIDPath) / "dirac.cfg" @@ -622,6 +646,8 @@ def _submitJobWrapper( time.sleep(self.jobSubmissionDelay) return S_OK() + ############################################################################# + def _checkOutputIntegrity(self, workingDirectory: str): """Make sure that output files are not corrupted. @@ -650,6 +676,55 @@ def _checkOutputIntegrity(self, workingDirectory: str): return S_OK() + @executeWithUserProxy + def postProcessJob(self, job: JobWrapper, payloadResults): + """Perform post-processing for a job: should be executed with the user proxy associated with the payload. + + :param job: JobWrapper instance + :param payloadResults: Payload results + """ + try: + result = job.postProcess(**payloadResults) + if not result["OK"]: + self.log.error("Failed to post process the job", f"{job.jobID}: {result['Message']}") + if result["Errno"] == DErrno.EWMSRESC: + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + return + + job.jobReport.setJobParameter("Error Message", result["Message"], sendFlag=False) + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + shutil.rmtree(job.jobIDPath) + return + except Exception as exc: # pylint: disable=broad-except + self.log.exception("Job raised exception during post processing phase") + job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + shutil.rmtree(job.jobIDPath) + return + + if "OutputSandbox" in job.jobArgs or "OutputData" in job.jobArgs: + self.log.verbose("Uploading the outputSandbox/Data of the job") + if not processJobOutputs(job): + shutil.rmtree(job.jobIDPath) + return + job.jobReport.commit() + + # Clean job wrapper locally + job.finalize() + def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str): """Check the status of the submitted tasks. If the task is finished, get the output and post process the job. @@ -771,48 +846,7 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str): continue payloadResults = result["Value"] - # Post-process the job - try: - result = job.postProcess(**payloadResults) - if not result["OK"]: - self.log.error("Failed to post process the job", f"{jobID}: {result['Message']}") - if result["Errno"] == DErrno.EWMSRESC: - self.log.warn("Asked to reschedule job") - rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - shutil.rmtree(job.jobIDPath) - continue - - jobReport.setJobParameter("Error Message", result["Message"], sendFlag=False) - jobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - shutil.rmtree(job.jobIDPath) - continue - except Exception as exc: # pylint: disable=broad-except - self.log.exception("Job raised exception during post processing phase") - jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - jobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - shutil.rmtree(job.jobIDPath) - continue - - if "OutputSandbox" in job.jobArgs or "OutputData" in job.jobArgs: - self.log.verbose("Uploading the outputSandbox/Data of the job") - if not processJobOutputs(job, jobReport): - shutil.rmtree(job.jobIDPath) - continue - jobReport.commit() - - # Clean job wrapper locally - job.finalize() + self.postProcessJob(job, payloadResults, proxyUserName=job.owner, proxyUserGroup=job.userGroup) # Clean job in the remote resource if self.cleanTask: @@ -837,12 +871,5 @@ def finalize(self): self.log.error("Problem while trying to get status of the last submitted jobs") break time.sleep(int(self.am_getOption("PollingTime"))) - else: - for _, queueDictionary in self.queueDict.items(): - ce = queueDictionary["CE"] - # Check the submitted JobWrappers a last time - result = self._checkSubmittedJobWrappers(ce, queueDictionary["ParametersDict"]["Site"]) - if not result["OK"]: - self.log.error("Problem while trying to get status of the last submitted JobWrappers") return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index d8543ca30ba..f40bf3a8db5 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -502,7 +502,7 @@ def process(self, command: str, env: dict): ############################################################################# def postProcess( self, - payloadStatus: int, + payloadStatus: int | None, payloadOutput: str, payloadExecutorError: str, cpuTimeConsumed: list, diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 350daf4d355..94c9b2b8ae6 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -53,7 +53,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport): if "InputSandbox" in arguments["Job"]: jobReport.commit() - if not transferInputSandbox(job, arguments["Job"]["InputSandbox"], jobReport): + if not transferInputSandbox(job, arguments["Job"]["InputSandbox"]): return 1 else: gLogger.verbose("Job has no InputSandbox requirement") @@ -62,7 +62,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport): if "InputData" in arguments["Job"]: if arguments["Job"]["InputData"]: - if not resolveInputData(job, jobReport): + if not resolveInputData(job): return 1 else: gLogger.verbose("Job has a null InputData requirement:") @@ -72,11 +72,11 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport): jobReport.commit() - if not executePayload(job, jobReport): + if not executePayload(job): return 1 if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]: - if not processJobOutputs(job, jobReport): + if not processJobOutputs(job): return 2 else: gLogger.verbose("Job has no OutputData or OutputSandbox requirement") diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py index a5ef23cf7b2..77eb6d68342 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -118,7 +118,7 @@ def getJobWrapper(jobID: int, arguments: dict, jobReport: JobReport) -> JobWrapp return job -def transferInputSandbox(job: JobWrapper, inputSandbox: list, jobReport: JobReport) -> bool: +def transferInputSandbox(job: JobWrapper, inputSandbox: list) -> bool: """Transfer the input sandbox""" try: result = job.transferInputSandbox(inputSandbox) @@ -128,21 +128,21 @@ def transferInputSandbox(job: JobWrapper, inputSandbox: list, jobReport: JobRepo except JobWrapperError: gLogger.exception("JobWrapper failed to download input sandbox") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) return False except Exception: # pylint: disable=broad-except gLogger.exception("JobWrapper raised exception while downloading input sandbox") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) return False return True -def resolveInputData(job: JobWrapper, jobReport: JobReport) -> bool: +def resolveInputData(job: JobWrapper) -> bool: """Resolve the input data""" try: result = job.resolveInputData() @@ -152,21 +152,21 @@ def resolveInputData(job: JobWrapper, jobReport: JobReport) -> bool: except JobWrapperError: gLogger.exception("JobWrapper failed to resolve input data") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) return False except Exception: # pylint: disable=broad-except gLogger.exception("JobWrapper raised exception while resolving input data") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) return False return True -def processJobOutputs(job: JobWrapper, jobReport: JobReport) -> bool: +def processJobOutputs(job: JobWrapper) -> bool: """Process the job outputs""" try: result = job.processJobOutputs() @@ -176,14 +176,14 @@ def processJobOutputs(job: JobWrapper, jobReport: JobReport) -> bool: except JobWrapperError: gLogger.exception("JobWrapper failed to process output files") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) return False except Exception: # pylint: disable=broad-except gLogger.exception("JobWrapper raised exception while processing output files") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) return False @@ -200,7 +200,7 @@ def finalize(job: JobWrapper) -> int: return 2 -def executePayload(job: JobWrapper, jobReport: JobReport) -> bool: +def executePayload(job: JobWrapper) -> bool: """Execute the payload""" try: result = job.execute() @@ -213,13 +213,13 @@ def executePayload(job: JobWrapper, jobReport: JobReport) -> bool: if exc.value[1] == DErrno.EWMSRESC: gLogger.warn("Asked to reschedule job") rescheduleResult = rescheduleFailedJob( - jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) return False gLogger.exception("Job failed in execution phase") - jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - jobReport.setJobStatus( + job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + job.jobReport.setJobStatus( status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False ) job.sendFailoverRequest() @@ -227,8 +227,8 @@ def executePayload(job: JobWrapper, jobReport: JobReport) -> bool: return False except Exception as exc: # pylint: disable=broad-except gLogger.exception("Job raised exception during execution phase") - jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - jobReport.setJobStatus( + job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + job.jobReport.setJobStatus( status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False ) job.sendFailoverRequest() diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py index 96826ecf4ce..caf5f6e0951 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py @@ -5,7 +5,6 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import ( getGroupsForUser, getPropertiesForGroup, - getPropertiesForHost, getUsersInGroup, ) from DIRAC.Core.Security import Properties @@ -76,11 +75,7 @@ class JobPolicy: def __init__(self, user, userGroup, allInfo=True): self.userName = user self.userGroup = userGroup - - properties = getPropertiesForGroup(userGroup, []) - if not properties: - properties = getPropertiesForHost(user, []) - self.userProperties = properties + self.userProperties = getPropertiesForGroup(userGroup, []) self.jobDB = None self.allInfo = allInfo