From 3a0c1b2fbb1639f096e3c0de74ba55ed36503a24 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 19 Apr 2024 18:21:38 +0200 Subject: [PATCH] feat: add a new job management mechanism in the PushJobAgent --- .../Resources/supercomputers.rst | 4 + .../Computing/AREXComputingElement.py | 4 + src/DIRAC/Workflow/Modules/Script.py | 24 +- src/DIRAC/Workflow/Modules/UploadOutputs.py | 81 --- .../Agent/PushJobAgent.py | 580 ++++++++++++++++-- .../Agent/test/Test_Agent_PushJobAgent.py | 313 +++++++++- .../ConfigTemplate.cfg | 4 + .../JobWrapper/JobWrapper.py | 218 +++---- .../JobWrapper/JobWrapperOfflineTemplate.py | 14 +- .../JobWrapper/JobWrapperTemplate.py | 216 +------ .../JobWrapper/JobWrapperUtilities.py | 237 +++++++ .../JobWrapper/test/Test_JobWrapper.py | 7 +- .../test/Test_JobWrapperTemplate.py | 31 +- .../Service/JobPolicy.py | 8 +- .../Utilities/Utils.py | 6 +- .../scripts/dirac_jobexec.py | 4 - 16 files changed, 1255 insertions(+), 496 deletions(-) delete mode 100644 src/DIRAC/Workflow/Modules/UploadOutputs.py create mode 100644 src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py diff --git a/docs/source/AdministratorGuide/Resources/supercomputers.rst b/docs/source/AdministratorGuide/Resources/supercomputers.rst index def98694c1d..33d2d95d294 100644 --- a/docs/source/AdministratorGuide/Resources/supercomputers.rst +++ b/docs/source/AdministratorGuide/Resources/supercomputers.rst @@ -126,6 +126,10 @@ To leverage the Push model, one has to add the :mod:`~DIRAC.WorkloadManagementSy # Control the number of jobs handled on the machine MaxJobsToSubmit = 100 Module = PushJobAgent + # SubmissionPolicy can be "Application" or "JobWrapper" + # - Application (soon deprecated): the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site + # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution + SubmissionPolicy = } One has also to authorize the machine hosting the :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` to process jobs via the ``Registry/Hosts/`` CS section:: diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index 0ba9d3b68f0..4b8f99c3088 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -654,6 +654,10 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= f"{jobReference} to CE {self.ceName}", ) + # Remove the bundled preamble + if self.preamble: + os.remove(executableFile) + if batchIDList: result = S_OK(batchIDList) result["PilotStampDict"] = stampDict diff --git a/src/DIRAC/Workflow/Modules/Script.py b/src/DIRAC/Workflow/Modules/Script.py index 9954ea2fe7e..651b942ef30 100644 --- a/src/DIRAC/Workflow/Modules/Script.py +++ b/src/DIRAC/Workflow/Modules/Script.py @@ -10,7 +10,6 @@ from DIRAC import gLogger, gConfig from DIRAC.Core.Utilities.Subprocess import systemCall -from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner from DIRAC.Workflow.Modules.ModuleBase import ModuleBase @@ -83,22 +82,13 @@ def _executeCommand(self): """execute the self.command (uses systemCall)""" failed = False - # Check whether the execution should be done remotely - if gConfig.getValue("/LocalSite/RemoteExecution", False): - remoteRunner = RemoteRunner( - gConfig.getValue("/LocalSite/Site"), - gConfig.getValue("/LocalSite/GridCE"), - gConfig.getValue("/LocalSite/CEQueue"), - ) - retVal = remoteRunner.execute(self.command) - else: - retVal = systemCall( - timeout=0, - cmdSeq=shlex.split(self.command), - env=self.environment, - callbackFunction=self.callbackFunction, - bufferLimit=self.bufferLimit, - ) + retVal = systemCall( + timeout=0, + cmdSeq=shlex.split(self.command), + env=self.environment, + callbackFunction=self.callbackFunction, + bufferLimit=self.bufferLimit, + ) if not retVal["OK"]: failed = True diff --git a/src/DIRAC/Workflow/Modules/UploadOutputs.py b/src/DIRAC/Workflow/Modules/UploadOutputs.py deleted file mode 100644 index a9d1dc199c1..00000000000 --- a/src/DIRAC/Workflow/Modules/UploadOutputs.py +++ /dev/null @@ -1,81 +0,0 @@ -# ##WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING # -# Under development # -# ##WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING # - -""" Module to upload specified job output files according to the parameters - defined in the production workflow. -""" -from DIRAC import gLogger -from DIRAC.Workflow.Modules.ModuleBase import ModuleBase, GracefulTermination - - -class UploadOutputs(ModuleBase): - ############################################################################# - - def __init__(self): - """c'tor""" - self.log = gLogger.getSubLogger(self.__class__.__name__) - super().__init__(self.log) - - self.outputDataStep = "" - self.outputData = None - self.outputList = [] - self.defaultOutputSE = [] - self.outputSE = [] - self.outputPath = "" - - ############################################################################# - - def _resolveInputVariables(self): - """The module parameters are resolved here.""" - super()._resolveInputVariables() - - # this comes from Job().setOutputData(). Typical for user jobs - if "OutputData" in self.workflow_commons: - self.outputData = self.workflow_commons["OutputData"] - if isinstance(self.outputData, str): - self.outputData = [i.strip() for i in self.outputData.split(";")] - # if not present, we use the outputList, which is instead incrementally created based on the single step outputs - # This is more typical for production jobs, that can have many steps linked one after the other - elif "outputList" in self.workflow_commons: - self.outputList = self.workflow_commons["outputList"] - else: - raise GracefulTermination("Nothing to upload") - - # in case you want to put a mask on the steps - # TODO: add it to the DIRAC API - if "outputDataStep" in self.workflow_commons: - self.outputDataStep = self.workflow_commons["outputDataStep"] - - # this comes from Job().setOutputData(). Typical for user jobs - if "OutputSE" in self.workflow_commons: - specifiedSE = self.workflow_commons["OutputSE"] - if not isinstance(specifiedSE, list): - self.outputSE = [i.strip() for i in specifiedSE.split(";")] - else: - self.log.verbose(f"No OutputSE specified, using default value: {', '.join(self.defaultOutputSE)}") - - # this comes from Job().setOutputData(). Typical for user jobs - if "OutputPath" in self.workflow_commons: - self.outputPath = self.workflow_commons["OutputPath"] - - def _initialize(self): - """gets the files to upload, check if to upload""" - # lfnsList = self.__getOutputLFNs( self.outputData ) or outputList? - - if not self._checkWFAndStepStatus(): - raise GracefulTermination("No output data upload attempted") - - def __getOuputLFNs(self, outputList, *args): - """This is really VO-specific. - It should be replaced by each VO. Setting an LFN here just as an idea, and for testing purposes. - """ - lfnList = [] - for outputFile in outputList: - lfnList.append("/".join([str(x) for x in args]) + outputFile) - - return lfnList - - def _execute(self): - """uploads the files""" - pass diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 98d88b4a193..8909293322c 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -9,19 +9,40 @@ """ +import hashlib +import json +import os +from pathlib import Path import random +import shutil import sys from collections import defaultdict +import time -from DIRAC import S_OK +from diraccfg import CFG + +from DIRAC import gConfig, S_OK, S_ERROR from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.Core.Utilities.Version import getVersion from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager -from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent -from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.Resources.Computing import ComputingElement +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + getJobWrapper, + processJobOutputs, + rescheduleFailedJob, + resolveInputData, + transferInputSandbox, +) +from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved +from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent +from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved @@ -48,10 +69,40 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): self.failedQueueCycleFactor = 10 self.failedQueues = defaultdict(int) + # Clients to interact with DIRAC services + self.jobMonitoring = None + self.resourcesModule = None + self.opsHelper = None + + # Choose the submission policy + # - Application (deprecated): the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site + # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution + self.submissionPolicy = "JobWrapper" + + # Options related to the "JobWrapper" submission policy + # The location of the Dirac environment on CVMFS + self.cvmfsLocation = "/cvmfs/dirac.egi.eu" + self.version = None + + # cleanTask is used to clean the task in the remote site + self.cleanTask = True + # The results of the payload execution will be stored in this file + self.payloadResultFile = "payloadResult.json" + # The results of the checksums will be stored in this file + self.checkSumResultsFile = "checksums.json" + def initialize(self): """Sets default parameters and creates CE instance""" super().initialize() + self.jobMonitoring = JobMonitoringClient() + + # Get the submission policy + # Initialized here because it cannot be dynamically modified during the execution + self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy) + if self.submissionPolicy not in ["Application", "JobWrapper"]: + return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") + result = self._initializeComputingElement("Pool") if not result["OK"]: return result @@ -84,6 +135,14 @@ def beginExecution(self): self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit}) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) + self.cleanTask = self.am_getOption("CleanTask", self.cleanTask) + + # Get the location of the CVMFS installation + if self.submissionPolicy == "JobWrapper": + CVMFS_locations = self.opsHelper.getValue("Pilot/CVMFS_locations", "") + if CVMFS_locations and isinstance(CVMFS_locations, list): + self.cvmfsLocation = CVMFS_locations[0] + self.log.info("CVMFS location:", self.cvmfsLocation) # Get target queues from the configuration siteNames = None @@ -111,14 +170,32 @@ def beginExecution(self): self.queueDict = result["Value"] - if self.firstPass: - if self.queueDict: - self.log.always("Agent will serve queues:") - for queue in self.queueDict: - self.log.always( - "Site: %s, CE: %s, Queue: %s" - % (self.queueDict[queue]["Site"], self.queueDict[queue]["CEName"], queue) - ) + # Get the version: + # If the webapp is running on the same machine, the version is the one of the webapp so we should ignore it + versions = getVersion()["Value"]["Extensions"] + for extension, version in versions.items(): + if extension not in ["WebAppDIRAC", "DIRACWebAppResources"]: + self.version = version + break + + if not self.version: + self.log.error("Cannot get the version of the agent") + return S_ERROR("Cannot get the version of the agent") + + for queue in self.queueDict: + ce = self.queueDict[queue]["CE"] + architecture = f"Linux-{ce.ceParameters.get('architecture', 'x86_64')}" + diracInstallLocation = os.path.join(self.cvmfsLocation, "dirac", f"v{self.version}", architecture) + self.queueDict[queue]["ParametersDict"]["DIRACInstallLocation"] = diracInstallLocation + + if self.firstPass: + self.log.always( + "Will serve Site: %s, CE: %s, Queue: %s" + % (self.queueDict[queue]["Site"], self.queueDict[queue]["CEName"], queue) + ) + # Add preamble based on the CVMFS location to the CE + ce.preamble = f"source {os.path.join(diracInstallLocation, 'diracosrc')}" + self.firstPass = False return S_OK() @@ -129,16 +206,16 @@ def execute(self): queueDictItems = list(self.queueDict.items()) random.shuffle(queueDictItems) - # Check that there is enough slots locally - result = self._checkCEAvailability(self.computingElement) - if not result["OK"] or result["Value"]: - return result - - # Check errors that could have occurred during job submission and/or execution - # Status are handled internally, and therefore, not checked outside of the method - result = self._checkSubmittedJobs() - if not result["OK"]: - return result + if self.submissionPolicy == "Application": + # Check that there is enough slots locally + result = self._checkCEAvailability(self.computingElement) + if not result["OK"] or result["Value"]: + return result + # Check errors that could have occurred during job submission and/or execution + # Status are handled internally, and therefore, not checked outside of the method + result = self._checkSubmittedJobs() + if not result["OK"]: + return result for queueName, queueDictionary in queueDictItems: # Make sure there is no problem with the queue before trying to submit @@ -156,6 +233,12 @@ def execute(self): proxy = result["Value"] ce.setProxy(proxy) + if self.submissionPolicy == "JobWrapper": + # Check errors that could have occurred during job submission and/or execution + result = self._checkSubmittedJobWrappers(ce, queueDictionary["ParametersDict"]["Site"]) + if not result["OK"]: + self.failedQueues[queueName] += 1 + # Check that there is enough slots in the remote CE to match a job result = self._checkCEAvailability(ce) if not result["OK"] or result["Value"]: @@ -227,14 +310,6 @@ def execute(self): status=JobStatus.MATCHED, minorStatus="Job Received by Agent", sendFlag=False ) - # Setup proxy - result_setupProxy = self._setupProxy(owner, jobGroup) - if not result_setupProxy["OK"]: - result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) - self.failedQueues[queueName] += 1 - break - proxyChain = result_setupProxy.get("Value") - # Check software and install them if required self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Installing Software", sendFlag=False) software = self._checkInstallSoftware(params, ceDict) @@ -247,29 +322,56 @@ def execute(self): self.failedQueues[queueName] += 1 break + self.jobs[jobID]["JobReport"].setJobParameter(par_name="GridCE", par_value=ce.ceName, sendFlag=False) + self.jobs[jobID]["JobReport"].setJobParameter(par_name="CEQueue", par_value=queueName, sendFlag=False) + # Submit the job to the CE self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") - resultSubmission = self._submitJob( - jobID=jobID, - jobParams=params, - resourceParams=ceDict, - optimizerParams=optimizerParams, - proxyChain=proxyChain, - processors=submissionParams["processors"], - wholeNode=submissionParams["wholeNode"], - maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], - mpTag=submissionParams["mpTag"], - ) - if not result["OK"]: - result = self._rescheduleFailedJob(jobID, resultSubmission["Message"]) - self.failedQueues[queueName] += 1 - break + if self.submissionPolicy == "Application": + # Setup proxy + result_setupProxy = self._setupProxy(owner, jobGroup) + if not result_setupProxy["OK"]: + result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) + self.failedQueues[queueName] += 1 + break + proxyChain = result_setupProxy.get("Value") + + resultSubmission = self._submitJob( + jobID=jobID, + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + proxyChain=proxyChain, + processors=submissionParams["processors"], + wholeNode=submissionParams["wholeNode"], + maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], + mpTag=submissionParams["mpTag"], + ) + if not result["OK"]: + self._rescheduleFailedJob(jobID, resultSubmission["Message"]) + self.failedQueues[queueName] += 1 + break + else: + resultSubmission = self._submitJobWrapper( + jobID=jobID, + ce=ce, + diracInstallLocation=queueDictionary["ParametersDict"]["DIRACInstallLocation"], + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + processors=submissionParams["processors"], + ) + if not result["OK"]: + self.failedQueues[queueName] += 1 + break + self.log.debug(f"After {self.ceName}CE submitJob()") - # Check that there is enough slots locally - result = self._checkCEAvailability(self.computingElement) - if not result["OK"] or result["Value"]: - return result + if self.submissionPolicy == "Application": + # Check that there is enough slots locally + result = self._checkCEAvailability(self.computingElement) + if not result["OK"] or result["Value"]: + return result # Check that there is enough slots in the remote CE to match a new job result = self._checkCEAvailability(ce) @@ -297,7 +399,7 @@ def _buildQueueDict(self, siteNames, ces, ceTypes): :return: dictionary of queue parameters """ - result = self.resourcesModule.getQueues(community="", siteList=siteNames, ceList=ces, ceTypeList=ceTypes) + result = getQueues(community="", siteList=siteNames, ceList=ces, ceTypeList=ceTypes) if not result["OK"]: return result @@ -345,8 +447,7 @@ def _setCEDict(self, ceDict): if project: ceDict["ReleaseProject"] = project - # Add a RemoteExecution entry, which can be used in the next stages - ceDict["RemoteExecution"] = True + ceDict["SubmissionPolicy"] = self.submissionPolicy def _checkMatchingIssues(self, jobRequest): """Check the source of the matching issue @@ -362,3 +463,378 @@ def _checkMatchingIssues(self, jobRequest): self.log.notice("Failed to get jobs", jobRequest["Message"]) return S_OK() + + def _submitJobWrapper( + self, + jobID: str, + ce: ComputingElement, + diracInstallLocation: str, + jobParams: dict, + resourceParams: dict, + optimizerParams: dict, + processors: int, + ): + """Submit a JobWrapper to the remote site + + :param jobID: job ID + :param ce: ComputingElement instance + :param jobParams: job parameters + :param resourceParams: resource parameters + :param optimizerParams: optimizer parameters + :param proxyChain: proxy chain + :param processors: number of processors + + :return: S_OK + """ + jobReport = self.jobs[jobID]["JobReport"] + jobReport.commit() + + # Add the number of requested processors to the job environment + if "ExecutionEnvironment" in jobParams: + if isinstance(jobParams["ExecutionEnvironment"], str): + jobParams["ExecutionEnvironment"] = jobParams["ExecutionEnvironment"].split(";") + jobParams.setdefault("ExecutionEnvironment", []).append("DIRAC_JOB_PROCESSORS=%d" % processors) + + # Add necessary parameters to get the payload result and analyze its integrity + jobParams["PayloadResults"] = self.payloadResultFile + jobParams["Checksum"] = self.checkSumResultsFile + # The dirac-jobexec executable is available through CVMFS in the remote site + # So we need to change the path to the executable + if "Executable" in jobParams and jobParams["Executable"] == "dirac-jobexec": + jobParams["Executable"] = os.path.join(diracInstallLocation, "bin", "dirac-jobexec") + jobParams["Arguments"] += " --cfg=dirac.cfg" + + # Prepare the job for submission + self.log.verbose("Getting a JobWrapper") + arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams} + + job = getJobWrapper(jobID, arguments, jobReport) + if not job: + return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}") + + if "InputSandbox" in jobParams: + self.log.verbose("Getting the inputSandbox of job", jobID) + if not transferInputSandbox(job, jobParams["InputSandbox"], jobReport): + return S_ERROR(f"Cannot get input sandbox of job {jobID}") + jobReport.commit() + + if "InputData" in jobParams and jobParams["InputData"]: + self.log.verbose("Getting the inputData of job", jobID) + if not resolveInputData(job, jobReport): + return S_ERROR(f"Cannot get input data of job {jobID}") + jobReport.commit() + + # Preprocess the payload + try: + self.log.verbose("Pre-processing job", jobID) + result = job.preProcess() + if not result["OK"]: + self.log.error("JobWrapper failed the preprocessing phase for", f"{jobID}: {result['Message']}") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return S_ERROR(JobMinorStatus.JOB_WRAPPER_EXECUTION) + except Exception: + self.log.exception("JobWrapper failed the preprocessing phase for", jobID) + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return S_ERROR(f"JobWrapper failed the preprocessing phase for {jobID}") + payloadParams = result["Value"] + jobReport.commit() + + # Dump the remote CFG config into the job directory: it is needed for the JobWrapperTemplate + cfgFilename = Path(job.jobIDPath) / "dirac.cfg" + gConfig.dumpRemoteCFGToFile(cfgFilename) + + # Generate a light JobWrapper executor script + jobDesc = { + "jobID": jobID, + "jobParams": jobParams, + "resourceParams": resourceParams, + "optimizerParams": optimizerParams, + "payloadParams": payloadParams, + "extraOptions": self.extraOptions, + } + result = createJobWrapper( + log=self.log, + logLevel=self.logLevel, + cfgPath=cfgFilename.name, + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py", + pythonPath="python", + rootLocation=".", + **jobDesc, + ) + if not result["OK"]: + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return result + jobWrapperRunner = result["Value"]["JobExecutablePath"] + jobWrapperCode = result["Value"]["JobWrapperPath"] + jobWrapperConfig = result["Value"]["JobWrapperConfigPath"] + + # Get inputs from the JobWrapper working directory and add the JobWrapper deps + jobInputs = os.listdir(job.jobIDPath) + inputs = [os.path.join(job.jobIDPath, input) for input in jobInputs] + inputs.append(jobWrapperCode) + inputs.append(jobWrapperConfig) + self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs)) + + # Request the whole directory as output + outputs = ["/"] + + self.log.info("Submitting JobWrapper", f"{os.path.basename(jobWrapperRunner)} to {ce.ceName}CE") + + # Submit the job + result = ce.submitJob( + executableFile=jobWrapperRunner, + proxy=None, + inputs=inputs, + outputs=outputs, + ) + if not result["OK"]: + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return result + + taskID = result["Value"][0] + stamp = result["PilotStampDict"][taskID] + self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})") + + jobReport.setJobParameter(par_name="TaskID", par_value=taskID, sendFlag=False) + jobReport.setJobParameter(par_name="Stamp", par_value=stamp, sendFlag=False) + jobReport.commit() + + time.sleep(self.jobSubmissionDelay) + return S_OK() + + def _checkOutputIntegrity(self, workingDirectory: str): + """Make sure that output files are not corrupted. + + :param workingDirectory: path of the outputs + """ + checkSumOutput = os.path.join(workingDirectory, self.checkSumResultsFile) + if not os.path.exists(checkSumOutput): + return S_ERROR( + DErrno.EWMSRESC, f"Cannot guarantee the integrity of the outputs: {checkSumOutput} unavailable" + ) + + with open(checkSumOutput) as f: + checksums = json.load(f) + + # for each output file, compute the md5 checksum + for output, checksum in checksums.items(): + localOutput = os.path.join(workingDirectory, output) + if not os.path.exists(localOutput): + return S_ERROR(f"{localOutput} was expected but not found") + + with open(localOutput, "rb") as f: + digest = hashlib.file_digest(f, "sha256") + + if checksum != digest.hexdigest(): + return S_ERROR(f"{localOutput} is corrupted") + + return S_OK() + + def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str): + """Check the status of the submitted tasks. + If the task is finished, get the output and post process the job. + Finally, remove from the submission dictionary. + + :return: S_OK/S_ERROR + """ + # Get the list of running jobs + if not (result := self.jobMonitoring.getJobs({"Status": JobStatus.RUNNING, "Site": site}))["OK"]: + self.log.error("Failed to get the list of running jobs", result["Message"]) + return result + + jobs = result["Value"] + if not jobs: + self.log.info("No running jobs") + return S_OK() + + # Get their parameters + if not (result := self.jobMonitoring.getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]: + self.log.error("Failed to get the list of taskIDs", result["Message"]) + return result + + # Filter the jobs that are running on the current CE + taskIDs = {} + for jobID, values in result["Value"].items(): + if "GridCE" not in values or values["GridCE"] != ce.ceName: + continue + if "TaskID" not in values: + continue + if "Stamp" not in values: + continue + taskIDs[values["TaskID"]] = {"JobID": jobID, "Stamp": values["Stamp"]} + + if not taskIDs: + self.log.info("No running jobs on the current CE") + return S_OK() + + # Get the status of the jobs + if not (result := ce.getJobStatus(list(taskIDs.keys())))["OK"]: + self.log.error("Failed to get job status", result["Message"]) + return result + + statusDict = result["Value"] + for taskID, status in statusDict.items(): + # Get the jobID and the stamp + jobID = taskIDs[taskID]["JobID"] + stamp = taskIDs[taskID]["Stamp"] + + # Check if the job is still running + if status not in PilotStatus.PILOT_FINAL_STATES: + self.log.info("Job not finished", f"(JobID: {jobID}, DIRAC taskID: {taskID}; Status: {status})") + continue + self.log.info("Job execution finished", f"(JobID: {jobID}, DIRAC taskID: {taskID}; Status: {status})") + + # Get the JDL of the job + if not (result := self.jobMonitoring.getJobJDL(int(jobID), False))["OK"]: + self.log.error("Failed to get the JDL of job", f"{jobID}: {result['Message']}") + continue + + if not (result := self._getJDLParameters(result["Value"]))["OK"]: + self.log.error("Failed to extract the JDL parameters of job", f"{jobID}: {result['Message']}") + continue + + # Get the job and ce parameters + jobParams = result["Value"] + + result = self._getCEDict(ce) + if not (result := self._getCEDict(ce))["OK"]: + self.log.error("Failed to get the CE parameters of job", f"{jobID}: {result['Message']}") + continue + ceDict = result["Value"][0] + ceDict["NumberOfProcessors"] = ce.ceParameters.get("NumberOfProcessors") + + self.log.verbose(f"Restoring the JobWrapper of job {jobID}") + arguments = {"Job": jobParams, "CE": ceDict, "Optimizer": {}} + + # Get the job wrapper + jobReport = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}") + try: + job = JobWrapper(jobID, jobReport) + job.initialize(arguments) + except Exception: + self.log.exception("JobWrapper failed the initialization phase", jobID) + continue + + # Get the output of the job + self.log.info(f"Getting the outputs of taskID {taskID} for {jobID}") + if not (result := ce.getJobOutput(f"{taskID}:::{stamp}", job.jobIDPath))["OK"]: + self.log.error("Failed to get the output of taskID", f"{taskID}: {result['Message']}") + continue + + # Make sure the output is correct + self.log.info(f"Checking the integrity of the outputs of {taskID} for {jobID}") + if not (result := self._checkOutputIntegrity(job.jobIDPath))["OK"]: + self.log.error( + "Failed to check the integrity of the output of taskID", f"{taskID}: {result['Message']}" + ) + if result["Errno"] == DErrno.EWMSRESC: + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + continue + self.log.info("The output has been retrieved and declared complete") + + with open(os.path.join(job.jobIDPath, self.payloadResultFile)) as f: + result = json.load(f) + + if not result["OK"]: + self.log.error("Failed to get the payload results of job", f"{jobID}: {result['Message']}") + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + continue + + payloadResults = result["Value"] + # Post-process the job + try: + result = job.postProcess(**payloadResults) + if not result["OK"]: + self.log.error("Failed to post process the job", f"{jobID}: {result['Message']}") + if result["Errno"] == DErrno.EWMSRESC: + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + continue + + jobReport.setJobParameter("Error Message", result["Message"], sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + shutil.rmtree(job.jobIDPath) + continue + except Exception as exc: # pylint: disable=broad-except + self.log.exception("Job raised exception during post processing phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + shutil.rmtree(job.jobIDPath) + continue + + if "OutputSandbox" in job.jobArgs or "OutputData" in job.jobArgs: + self.log.verbose("Uploading the outputSandbox/Data of the job") + if not processJobOutputs(job, jobReport): + shutil.rmtree(job.jobIDPath) + continue + jobReport.commit() + + # Clean job wrapper locally + job.finalize() + + # Clean job in the remote resource + if self.cleanTask: + if not (result := ce.cleanJob(taskID))["OK"]: + self.log.warn("Failed to clean the output remotely", result["Message"]) + self.log.info(f"TaskID {taskID} has been remotely removed") + + return S_OK() + + def finalize(self): + """PushJob Agent finalization method""" + if self.submissionPolicy == "Application": + # wait for all jobs to be completed + res = self.computingElement.shutdown() + if not res["OK"]: + self.log.error("CE could not be properly shut down", res["Message"]) + + # Check the latest submitted jobs + while self.jobs: + result = self._checkSubmittedJobs() + if not result["OK"]: + self.log.error("Problem while trying to get status of the last submitted jobs") + break + time.sleep(int(self.am_getOption("PollingTime"))) + else: + for _, queueDictionary in self.queueDict.items(): + ce = queueDictionary["CE"] + # Check the submitted JobWrappers a last time + result = self._checkSubmittedJobWrappers(ce, queueDictionary["ParametersDict"]["Site"]) + if not result["OK"]: + self.log.error("Problem while trying to get status of the last submitted JobWrappers") + + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py index e1de80a21cd..2afe368250d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py @@ -2,14 +2,21 @@ """ # imports +import os +from pathlib import Path +import shutil +from unittest.mock import Mock import pytest from collections import defaultdict # DIRAC Components from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.WorkloadManagementSystem.Agent.PushJobAgent import PushJobAgent +from DIRAC.WorkloadManagementSystem.Agent.test.Test_Agent_SiteDirector import config -from DIRAC import gLogger, S_ERROR +from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport gLogger.setLevel("DEBUG") @@ -49,15 +56,25 @@ def test__allowedToSubmit(mocker, queue, failedQueues, failedQueueCycleFactor, e @pytest.mark.parametrize( "ceDict, pilotVersion, pilotProject, expected", [ - ({}, None, None, {"RemoteExecution": True}), - ({}, "8.0.0", None, {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "RemoteExecution": True}), - ({}, ["8.0.0", "7.3.7"], None, {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "RemoteExecution": True}), - ({}, None, "Project", {"ReleaseProject": "Project", "RemoteExecution": True}), + ({}, None, None, {"SubmissionPolicy": "JobWrapper"}), + ({}, "8.0.0", None, {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "SubmissionPolicy": "JobWrapper"}), + ( + {}, + ["8.0.0", "7.3.7"], + None, + {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "SubmissionPolicy": "JobWrapper"}, + ), + ({}, None, "Project", {"ReleaseProject": "Project", "SubmissionPolicy": "JobWrapper"}), ( {}, "8.0.0", "Project", - {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "ReleaseProject": "Project", "RemoteExecution": True}, + { + "DIRACVersion": "8.0.0", + "ReleaseVersion": "8.0.0", + "ReleaseProject": "Project", + "SubmissionPolicy": "JobWrapper", + }, ), ], ) @@ -104,3 +121,287 @@ def test__checkMatchingIssues(mocker, issueMessage, expectedResult): result = jobAgent._checkMatchingIssues(S_ERROR(issueMessage)) assert result["OK"] == expectedResult + + +def test_execute_application_localCENotAvailable(config, mocker): + """Test when local CE is not available: it should not check submitted jobs and return an error message""" + + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule._AgentModule__moduleProperties", + side_effect=lambda x, y=None: y, + create=True, + ) + + jobAgent = PushJobAgent("Test", "Test1") + jobAgent.submissionPolicy = "Application" + jobAgent.queueDict = jobAgent._buildQueueDict( + siteNames=["LCG.Site1.com", "LCG.Site2.site2"], ces=None, ceTypes=None + )["Value"] + + # Mock the CE availability + errorMessage = "CE Not Available" + mocker.patch.object(jobAgent, "_checkCEAvailability", return_value=S_ERROR(errorMessage)) + + mocker.patch.object(jobAgent, "_checkSubmittedJobs") + mocker.patch.object(jobAgent, "_checkSubmittedJobWrappers") + mocker.patch.object(jobAgent, "_allowedToSubmit") + mocker.patch.object(jobAgent, "_matchAJob") + + # Initialize logger + jobAgent.log = gLogger + jobAgent.log.setLevel("DEBUG") + # Initialize inner CE + jobAgent._initializeComputingElement("Pool") + + result = jobAgent.execute() + # Check the CE availability and submitted jobs + jobAgent._checkCEAvailability.assert_called() + jobAgent._checkSubmittedJobs.assert_not_called() + # Does not check if allowed to submit and does not match a job + jobAgent._allowedToSubmit.assert_not_called() + jobAgent._matchAJob.assert_not_called() + # This is not called because submission policy is Application + jobAgent._checkSubmittedJobWrappers.assert_not_called() + # Result should not be OK + assert not result["OK"], result + assert result["Message"] == errorMessage + + +@pytest.fixture +def jobID(): + jobID = "123" + Path(jobID).mkdir(parents=True, exist_ok=True) + + yield jobID + + shutil.rmtree(jobID) + + +def test_submitJobWrapper(mocker, jobID): + """Test JobAgent._submitJobWrapper()""" + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule._AgentModule__moduleProperties", + side_effect=lambda x, y=None: y, + create=True, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + + # Initialize PJA + jobAgent = PushJobAgent("Test", "Test1") + jobAgent.submissionPolicy = "JobWrapper" + jobAgent.queueDict = jobAgent._buildQueueDict( + siteNames=["LCG.Site1.com", "LCG.Site2.site2"], ces=None, ceTypes=None + )["Value"] + jobAgent.log = gLogger + jobAgent.log.setLevel("DEBUG") + + jobAgent.jobs[jobID] = {"JobReport": JobReport(jobID)} + jobParams = {} + + # Current working directory: it should not change + cwd = os.getcwd() + + # Mock the JobWrapper + # Create a mock JobWrapper instance + job = Mock() + job.sendJobAccounting = Mock() + + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities.JobWrapper", return_value=job) + + rescheduleValue = "valueProvingRescheduling" + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities.rescheduleFailedJob", + return_value=rescheduleValue, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PushJobAgent.rescheduleFailedJob", return_value=rescheduleValue) + + # 1. getJobWrapper returns an error + job.initialize = Mock(side_effect=Exception("Error initializing JobWrapper")) + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == f"Cannot get a JobWrapper instance for job {jobID}" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with( + status=rescheduleValue, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION + ) + + # 2. getJobWrapper returns a JobWrapper instance but fails to process input sandbox + jobParams = {"InputSandbox": True} + job.initialize = Mock() + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(side_effect=Exception("Error transferring input sandbox")) + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == f"Cannot get input sandbox of job {jobID}" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with( + status=rescheduleValue, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX + ) + + # 3. getJobWrapper returns a JobWrapper instance but fails to process input data + jobParams = {"InputSandbox": True, "InputData": True} + job.initialize = Mock() + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(side_effect=Exception("Error resolving input data")) + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == f"Cannot get input data of job {jobID}" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with(status=rescheduleValue, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + + # 4. getJobWrapper returns a JobWrapper instance but fails to pre-process payload + jobParams = {"InputSandbox": True, "InputData": True, "Payload": True} + job.initialize = Mock() + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(return_value=S_OK()) + job.preProcess = Mock(side_effect=S_ERROR("Error pre-processing payload")) + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == f"JobWrapper failed the preprocessing phase for {jobID}" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with(status=rescheduleValue, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + + # 5. getJobWrapper returns a JobWrapper instance but fails to submit the job + mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.Utils.createJobWrapper", return_value=S_OK({})) + jobParams = {"InputSandbox": True, "InputData": True, "Payload": True} + job.initialize = Mock() + + jobID = jobID + job.jobIDPath = Path(jobID) + + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(return_value=S_OK()) + job.preProcess = Mock(return_value=S_OK()) + + ce = Mock() + ce.submitJob = Mock(return_value=S_ERROR("Error submitting job")) + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=ce, + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == "Error submitting job" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with(status=rescheduleValue, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + + # 6. getJobWrapper returns a JobWrapper instance and submits it successfully + mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.Utils.createJobWrapper", return_value=S_OK({})) + jobParams = {"InputSandbox": True, "InputData": True, "Payload": True} + job.initialize = Mock() + + jobID = jobID + job.jobIDPath = Path(jobID) + + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(return_value=S_OK()) + job.preProcess = Mock(return_value=S_OK()) + + ce = Mock() + ce.submitJob = Mock(return_value={"OK": True, "Value": ["456"], "PilotStampDict": {"456": "abcdef"}}) + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=ce, + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert result["OK"], result + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_not_called() + shutil.rmtree("job") diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 36bf72b3339..86f22acc4f3 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -321,6 +321,10 @@ Agents MaxJobsToSubmit = 100 # How many cycels to skip if queue is not working FailedQueueCycleFactor = 10 + # How the agent manages the submission of the jobs + SubmissionPolicy = JobWrapper + # Clean the task after the job is done + CleanTask = True } ##END ##BEGIN StatesAccountingAgent diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index b4eebaae229..0a003b10ad6 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -10,10 +10,12 @@ :caption: JobWrapper options """ +import contextlib import datetime import glob import json import os +from pathlib import Path import re import shutil import stat @@ -47,7 +49,6 @@ from DIRAC.Resources.Catalog.FileCatalog import FileCatalog from DIRAC.Resources.Catalog.PoolXMLFile import getGUID from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus -from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient @@ -62,7 +63,7 @@ class JobWrapper: def __init__(self, jobID=None, jobReport=None): """Standard constructor""" self.initialTiming = os.times() - self.section = os.path.join(getSystemSection("WorkloadManagement/JobWrapper"), "JobWrapper") + self.section = "/Systems/WorkloadManagement/JobWrapper" # Create the accounting report self.accountingReport = AccountingJob() # Initialize for accounting @@ -85,7 +86,8 @@ def __init__(self, jobID=None, jobReport=None): self.log.showHeaders(True) # self.root is the path the Wrapper is running at - self.root = os.getcwd() + self.root = Path.cwd() + self.jobIDPath = self.root result = getCurrentVersion() if result["OK"]: self.diracVersion = result["Value"] @@ -209,19 +211,21 @@ def initialize(self, arguments): # Prepare the working directory, cd to there, and copying eventual extra arguments in it if self.jobID: - if os.path.exists(str(self.jobID)): - shutil.rmtree(str(self.jobID)) - os.mkdir(str(self.jobID)) - os.chdir(str(self.jobID)) + self.jobIDPath = self.root / str(self.jobID) + if self.jobIDPath.exists(): + shutil.rmtree(self.jobIDPath) + self.jobIDPath.mkdir() + extraOpts = self.jobArgs.get("ExtraOptions", "") if extraOpts and "dirac-jobexec" in self.jobArgs.get("Executable", "").strip(): - if os.path.exists(f"{self.root}/{extraOpts}"): - shutil.copyfile(f"{self.root}/{extraOpts}", extraOpts) + src = self.root / extraOpts + if os.path.exists(src): + shutil.copyfile(src, self.jobIDPath / extraOpts) else: self.log.info("JobID is not defined, running in current directory") - with open("job.info", "w") as infoFile: + with open(self.jobIDPath / "job.info", "w") as infoFile: infoFile.write(self.__dictAsInfoString(self.jobArgs, "/Job")) self.log.debug("Environment used") @@ -263,7 +267,7 @@ def __dictAsInfoString(self, dData, infoString="", currentBase=""): ############################################################################# def __prepareCommand(self): """Prepare the command to be executed.""" - if not "Executable" in self.jobArgs: + if "Executable" not in self.jobArgs: self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_NOT_FOUND, sendFlag=True) return S_ERROR(f"Job {self.jobID} has no specified executable") @@ -302,9 +306,9 @@ def __prepareCommand(self): configOptions += "-o /LocalSite/CEQueue=%s " % self.ceArgs.get( "Queue", gConfig.getValue("/LocalSite/CEQueue", "") ) - configOptions += "-o /LocalSite/RemoteExecution=%s " % self.ceArgs.get( - "RemoteExecution", gConfig.getValue("/LocalSite/RemoteExecution", False) - ) + submissionPolicy = self.ceArgs.get("SubmissionPolicy", gConfig.getValue("/LocalSite/SubmissionPolicy", "")) + if submissionPolicy == "Application": + configOptions += "-o /LocalSite/RemoteExecution=True " command = executable if jobArguments: @@ -393,59 +397,60 @@ def process(self, command: str, output: str, error: str, env: dict): jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0 spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit)) - exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults) - exeThread.start() - payloadPID = None - for seconds in range(5, 40, 5): - time.sleep(seconds) - payloadPID = spObject.getChildPID() - if payloadPID: - self.__setJobParam("PayloadPID", payloadPID) - break - if not payloadPID: - return S_ERROR("Payload process could not start after 140 seconds") - - watchdog = Watchdog( - pid=self.currentPID, - exeThread=exeThread, - spObject=spObject, - jobCPUTime=jobCPUTime, - memoryLimit=jobMemory, - processors=self.numberOfProcessors, - jobArgs=self.jobArgs, - ) + with contextlib.chdir(self.jobIDPath): + exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults) + exeThread.start() + payloadPID = None + for seconds in range(5, 40, 5): + time.sleep(seconds) + payloadPID = spObject.getChildPID() + if payloadPID: + self.__setJobParam("PayloadPID", payloadPID) + break + if not payloadPID: + return S_ERROR("Payload process could not start after 140 seconds") + + watchdog = Watchdog( + pid=self.currentPID, + exeThread=exeThread, + spObject=spObject, + jobCPUTime=jobCPUTime, + memoryLimit=jobMemory, + processors=self.numberOfProcessors, + jobArgs=self.jobArgs, + ) - self.log.verbose("Initializing Watchdog instance") - watchdog.initialize() - self.log.verbose("Calibrating Watchdog instance") - watchdog.calibrate() - # Do not kill Test jobs by CPU time - if self.jobArgs.get("JobType", "") == "Test": - watchdog.testCPUConsumed = False - - if "DisableCPUCheck" in self.jobArgs: - watchdog.testCPUConsumed = False - - # Disable checks if remote execution: do not need it as pre/post processing occurs locally - if self.ceArgs.get("RemoteExecution", False): - watchdog.testWallClock = False - watchdog.testDiskSpace = False - watchdog.testLoadAvg = False - watchdog.testCPUConsumed = False - watchdog.testCPULimit = False - watchdog.testMemoryLimit = False - watchdog.testTimeLeft = False - - if exeThread.is_alive(): - self.log.info("Application thread is started in Job Wrapper") - watchdog.run() - else: - self.log.warn("Application thread stopped very quickly...") + self.log.verbose("Initializing Watchdog instance") + watchdog.initialize() + self.log.verbose("Calibrating Watchdog instance") + watchdog.calibrate() + # Do not kill Test jobs by CPU time + if self.jobArgs.get("JobType", "") == "Test": + watchdog.testCPUConsumed = False + + if "DisableCPUCheck" in self.jobArgs: + watchdog.testCPUConsumed = False + + # Disable checks if remote execution: do not need it as pre/post processing occurs locally + if self.ceArgs.get("RemoteExecution", False): + watchdog.testWallClock = False + watchdog.testDiskSpace = False + watchdog.testLoadAvg = False + watchdog.testCPUConsumed = False + watchdog.testCPULimit = False + watchdog.testMemoryLimit = False + watchdog.testTimeLeft = False + + if exeThread.is_alive(): + self.log.info("Application thread is started in Job Wrapper") + watchdog.run() + else: + self.log.warn("Application thread stopped very quickly...") - if exeThread.is_alive(): - self.log.warn("Watchdog exited before completion of execution thread") - while exeThread.is_alive(): - time.sleep(5) + if exeThread.is_alive(): + self.log.warn("Watchdog exited before completion of execution thread") + while exeThread.is_alive(): + time.sleep(5) payloadResult = { "payloadStatus": None, @@ -514,9 +519,9 @@ def postProcess( applicationErrorStatus = payloadStatus self.__setJobParam("ApplicationError", applicationErrorStatus, sendFlag=True) - if cpuTimeConsumed: - cpuString = " ".join([f"{x:.2f}" for x in cpuTimeConsumed]) - self.log.info("CPU time consumed in JobWrapper process", cpuString) + # This might happen if process() and postProcess() are called on different machines + if cpuTimeConsumed and not self.executionResults.get("CPU"): + self.executionResults["CPU"] = cpuTimeConsumed if watchdogError: self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True) @@ -536,7 +541,7 @@ def postProcess( self.log.verbose(f"Execution thread status = {payloadStatus}") self.log.info("Checking directory contents after execution:") - res = systemCall(5, ["ls", "-al"]) + res = systemCall(5, ["ls", "-al", str(self.jobIDPath)]) if not res["OK"]: self.log.error("Failed to list the current directory", res["Message"]) elif res["Value"][0]: @@ -596,7 +601,7 @@ def execute(self): return S_OK() ############################################################################# - def __sendFinalStdOut(self, payloadOutput): + def __sendFinalStdOut(self, payloadOutput: str): """After the Watchdog process has finished, this function sends a final report to be presented in the StdOut in the web page via the heartbeat mechanism. @@ -960,7 +965,7 @@ def __resolveOutputSandboxFiles(self, outputSandbox): okFiles = [] for i in outputSandbox: self.log.verbose(f"Looking at OutputSandbox file/directory/wildcard: {i}") - globList = glob.glob(i) + globList = glob.glob(self.jobIDPath / i) for check in globList: if os.path.isfile(check): self.log.verbose(f"Found locally existing OutputSandbox file: {check}") @@ -1009,6 +1014,7 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath): nonlfnList.append(out) # Check whether list of outputData has a globbable pattern + nonlfnList = [self.jobIDPath / x for x in nonlfnList] globbedOutputList = List.uniqueElements(getGlobbedFiles(nonlfnList)) if globbedOutputList != nonlfnList and globbedOutputList: self.log.info( @@ -1035,9 +1041,9 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath): # # file size localfileSize = getGlobbedTotalSize(localfile) - self.outputDataSize += getGlobbedTotalSize(localfile) + self.outputDataSize += localfileSize - outputFilePath = os.path.join(os.getcwd(), localfile) + outputFilePath = os.path.abspath(localfile) # # file GUID fileGUID = pfnGUID[localfile] if localfile in pfnGUID else None @@ -1177,7 +1183,7 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""): lfn = os.path.join(basePath, outputPath, os.path.basename(localfile)) else: # if LFN is given, take it as it is - localfile = os.path.basename(outputFile.replace("LFN:", "")) + localfile = self.jobIDPath / outputFile.replace("LFN:", "") lfn = outputFile.replace("LFN:", "") return (lfn, localfile) @@ -1201,19 +1207,19 @@ def transferInputSandbox(self, inputSandbox): sandboxFiles.append(os.path.basename(isb)) self.log.info(f"Downloading InputSandbox for job {self.jobID}: {', '.join(sandboxFiles)}") - if os.path.exists(f"{self.root}/inputsandbox"): + if (self.root / "inputsandbox").exists(): # This is a debugging tool, get the file from local storage to debug Job Wrapper sandboxFiles.append("jobDescription.xml") for inputFile in sandboxFiles: - if os.path.exists(f"{self.root}/inputsandbox/{inputFile}"): + if (self.root / "inputsandbox" / inputFile).exists(): self.log.info(f"Getting InputSandbox file {inputFile} from local directory for testing") - shutil.copy(self.root + "/inputsandbox/" + inputFile, inputFile) + shutil.copy(self.root / "inputsandbox" / inputFile, self.jobIDPath / inputFile) result = S_OK(sandboxFiles) else: if registeredISB: for isb in registeredISB: self.log.info(f"Downloading Input SandBox {isb}") - result = SandboxStoreClient().downloadSandbox(isb) + result = SandboxStoreClient().downloadSandbox(isb, destinationDir=str(self.jobIDPath)) if not result["OK"]: self.__report(minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX) return S_ERROR(f"Cannot download Input sandbox {isb}: {result['Message']}") @@ -1224,7 +1230,7 @@ def transferInputSandbox(self, inputSandbox): self.log.info("Downloading Input SandBox LFNs, number of files to get", len(lfns)) self.__report(minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX_LFN) lfns = [fname.replace("LFN:", "").replace("lfn:", "") for fname in lfns] - download = self.dm.getFile(lfns) + download = self.dm.getFile(lfns, destinationDir=str(self.jobIDPath)) if not download["OK"]: self.log.warn(download) self.__report(minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX_LFN) @@ -1235,10 +1241,12 @@ def transferInputSandbox(self, inputSandbox): self.log.warn(failed) return S_ERROR(str(failed)) for lfn in lfns: - if os.path.exists(f"{self.root}/{os.path.basename(download['Value']['Successful'][lfn])}"): + if (self.root / os.path.basename(download["Value"]["Successful"][lfn])).exists(): sandboxFiles.append(os.path.basename(download["Value"]["Successful"][lfn])) - userFiles = sandboxFiles + [os.path.basename(lfn) for lfn in lfns] + userFiles = [str(self.jobIDPath / file) for file in sandboxFiles] + [ + self.jobIDPath / os.path.basename(lfn) for lfn in lfns + ] for possibleTarFile in userFiles: if not os.path.exists(possibleTarFile): continue @@ -1247,7 +1255,7 @@ def transferInputSandbox(self, inputSandbox): self.log.info(f"Unpacking input sandbox file {possibleTarFile}") with tarfile.open(possibleTarFile, "r") as tarFile: for member in tarFile.getmembers(): - tarFile.extract(member, os.getcwd()) + tarFile.extract(member, self.jobIDPath) except Exception as x: return S_ERROR(f"Could not untar {possibleTarFile} with exception {str(x)}") @@ -1354,7 +1362,7 @@ def sendJobAccounting(self, status="", minorStatus=""): except ValueError: execTime = 0 - diskSpaceConsumed = getGlobbedTotalSize(os.path.join(self.root, str(self.jobID))) + diskSpaceConsumed = getGlobbedTotalSize(str(self.jobIDPath)) # Fill the data acData = { "User": self.owner, @@ -1469,7 +1477,7 @@ def sendFailoverRequest(self): ############################################################################# def __getRequestFiles(self): """Simple wrapper to return the list of request files.""" - return glob.glob("*_request.json") + return glob.glob(str(self.jobIDPath / "*_request.json")) ############################################################################# def __cleanUp(self): @@ -1482,11 +1490,10 @@ def __cleanUp(self): else: cleanUp = True - os.chdir(self.root) if cleanUp: self.log.verbose("Cleaning up job working directory") - if os.path.exists(str(self.jobID)): - shutil.rmtree(str(self.jobID)) + if self.root != self.jobIDPath and self.jobIDPath.exists(): + shutil.rmtree(self.jobIDPath) ############################################################################# def __report(self, status="", minorStatus="", sendFlag=False): @@ -1596,40 +1603,3 @@ def getOutput(self, lines=0): self.outputLines = self.outputLines[cut:] return S_OK(self.outputLines) return S_ERROR("No Job output found") - - -def rescheduleFailedJob(jobID, minorStatus, jobReport=None): - """Function for rescheduling a jobID, setting a minorStatus""" - - rescheduleResult = JobStatus.RESCHEDULED - - try: - gLogger.warn("Failure during", minorStatus) - - # Setting a job parameter does not help since the job will be rescheduled, - # instead set the status with the cause and then another status showing the - # reschedule operation. - - if not jobReport: - gLogger.info("Creating a new JobReport Object") - jobReport = JobReport(int(jobID), "JobWrapper") - - jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) - jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) - - # We must send Job States and Parameters before it gets reschedule - jobReport.sendStoredStatusInfo() - jobReport.sendStoredJobParameters() - - gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") - - result = JobManagerClient().rescheduleJob(int(jobID)) - if not result["OK"]: - gLogger.warn(result["Message"]) - if "Maximum number of reschedulings is reached" in result["Message"]: - rescheduleResult = JobStatus.FAILED - - return rescheduleResult - except Exception: - gLogger.exception("JobWrapperTemplate failed to reschedule Job") - return JobStatus.FAILED diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py index 4037f782cd9..bd2a5d297be 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py @@ -24,17 +24,17 @@ os.umask(0o22) -def execute(jobID: str, arguments: dict): +def execute(arguments: dict): """The only real function executed here""" payloadParams = arguments.pop("Payload", {}) if not payloadParams: return 1 - if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]: + if "PayloadResults" not in arguments["Job"] or "Checksum" not in arguments["Job"]: return 1 try: - job = JobWrapper(jobID) + job = JobWrapper() job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapper failed the initialization phase", lException=exc) @@ -76,11 +76,9 @@ def execute(jobID: str, arguments: dict): if "Job" not in jobArgs: raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") - jobID = jobArgs["Job"].get("JobID", 0) - jobID = int(jobID) - - ret = execute(jobID, jobArgs) -except Exception as exc: # pylint: disable=broad-except + ret = execute(jobArgs) +except Exception: # pylint: disable=broad-except gLogger.exception("JobWrapperTemplate exception") + ret = -1 sys.exit(ret) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 3198a879a65..921e5d94fd3 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -14,153 +14,55 @@ import sys import json import os -import errno -import time -import signal -sitePython = "@SITEPYTHON@" +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + createAndEnterWorkingDirectory, + executePayload, + finalize, + getJobWrapper, + processJobOutputs, + resolveInputData, + transferInputSandbox, +) + +sitePython = os.path.realpath("@SITEPYTHON@") if sitePython: - sys.path.insert(0, "@SITEPYTHON@") + sys.path.insert(0, sitePython) from DIRAC.Core.Base.Script import Script Script.parseCommandLine() from DIRAC import gLogger -from DIRAC.Core.Utilities import DErrno - -from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport -from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus - - -gJobReport = None os.umask(0o22) -class JobWrapperError(Exception): - """Custom exception for handling JobWrapper "genuine" errors""" - - def __init__(self, value): - self.value = value - super().__init__() - - def __str__(self): - return str(self.value) - - -def killJobWrapper(job): - """Function that stops and ultimately kills the JobWrapper""" - # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process - time.sleep(60) - os.kill(job.currentPID, signal.SIGTERM) - # wait for half a minute and if worker is still alive use REAL silencer - time.sleep(30) - # now you're dead - os.kill(job.currentPID, signal.SIGKILL) - return 1 - - -def sendJobAccounting(job, status, minorStatus): - """safe sending job accounting (always catching exceptions)""" - try: - job.sendJobAccounting(status, minorStatus) - except Exception as exc: # pylint: disable=broad-except - gLogger.exception( - f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", - lException=exc, - ) - - -def execute(arguments): +def execute(jobID: str, arguments: dict, jobReport: JobReport): """The only real function executed here""" - global gJobReport - - jobID = arguments["Job"].get("JobID", 0) - os.environ["JOBID"] = str(jobID) - jobID = int(jobID) - if "WorkingDirectory" in arguments: - wdir = os.path.expandvars(arguments["WorkingDirectory"]) - if os.path.isdir(wdir): - os.chdir(wdir) - else: - try: - os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) - if os.path.isdir(wdir): - os.chdir(wdir) - except OSError as osError: - if osError.errno == errno.EEXIST and os.path.isdir(wdir): - gLogger.exception("JobWrapperTemplate found that the working directory already exists") - rescheduleResult = rescheduleFailedJob(jobID, "Working Directory already exists") - else: - gLogger.exception("JobWrapperTemplate could not create working directory") - rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory") - return 1 - - gJobReport = JobReport(jobID, "JobWrapper") + if not createAndEnterWorkingDirectory(jobID, arguments["WorkingDirectory"], jobReport): + return 1 - try: - job = JobWrapper(jobID, gJobReport) - job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper failed the initialization phase", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + job = getJobWrapper(jobID, arguments, jobReport) + if not job: return 1 if "InputSandbox" in arguments["Job"]: - gJobReport.commit() - try: - result = job.transferInputSandbox(arguments["Job"]["InputSandbox"]) - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to download input sandbox") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + jobReport.commit() + if not transferInputSandbox(job, arguments["Job"]["InputSandbox"], jobReport): return 1 else: gLogger.verbose("Job has no InputSandbox requirement") - gJobReport.commit() + jobReport.commit() if "InputData" in arguments["Job"]: if arguments["Job"]["InputData"]: - try: - result = job.resolveInputData() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to resolve input data") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + if not resolveInputData(job, jobReport): return 1 else: gLogger.verbose("Job has a null InputData requirement:") @@ -168,75 +70,18 @@ def execute(arguments): else: gLogger.verbose("Job has no InputData requirement") - gJobReport.commit() + jobReport.commit() - try: - result = job.execute() - if not result["OK"]: - gLogger.error("Failed to execute job", result["Message"]) - raise JobWrapperError((result["Message"], result["Errno"])) - except JobWrapperError as exc: - if exc.value[1] == 0 or str(exc.value[0]) == "0": - gLogger.verbose("JobWrapper exited with status=0 after execution") - if exc.value[1] == DErrno.EWMSRESC: - gLogger.warn("Asked to reschedule job") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return 1 - gLogger.exception("Job failed in execution phase") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("Job raised exception during execution phase", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + if not executePayload(job, jobReport): return 1 if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]: - try: - result = job.processJobOutputs() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError as exc: - gLogger.exception("JobWrapper failed to process output files") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) - - return 2 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while processing output files", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + if not processJobOutputs(job, jobReport): return 2 else: gLogger.verbose("Job has no OutputData or OutputSandbox requirement") - try: - # Failed jobs will return !=0 / successful jobs will return 0 - return job.finalize() - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception during the finalization phase", lException=exc) - return 2 + return finalize(job) ########################################################## @@ -251,12 +96,17 @@ def execute(arguments): raise TypeError(f"jobArgs is of type {type(jobArgs)}") if "Job" not in jobArgs: raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") - ret = execute(jobArgs) - gJobReport.commit() + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + jobReport = JobReport(jobID, "JobWrapper") + + ret = execute(jobID, jobArgs, jobReport) + jobReport.commit() except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapperTemplate exception") try: - gJobReport.commit() + jobReport.commit() ret = -1 except Exception as exc: # pylint: disable=broad-except gLogger.exception("Could not commit the job report") diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py new file mode 100644 index 00000000000..cbb3df14052 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -0,0 +1,237 @@ +"""JobWrapperUtilities + +This module contains the functions that are used by the JobWrapperTemplate to execute the JobWrapper. +""" +import errno +import os +import signal +import time + +from DIRAC import gLogger +from DIRAC.Core.Utilities import DErrno +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper + + +class JobWrapperError(Exception): + """Custom exception for handling JobWrapper "genuine" errors""" + + def __init__(self, value): + self.value = value + super().__init__() + + def __str__(self): + return str(self.value) + + +def killJobWrapper(job: JobWrapper) -> int: + """Function that stops and ultimately kills the JobWrapper""" + # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process + time.sleep(60) + os.kill(job.currentPID, signal.SIGTERM) + # wait for half a minute and if worker is still alive use REAL silencer + time.sleep(30) + # now you're dead + os.kill(job.currentPID, signal.SIGKILL) + return 1 + + +def rescheduleFailedJob(jobID, minorStatus, jobReport: JobReport): + """Function for rescheduling a jobID, setting a minorStatus""" + + rescheduleResult = JobStatus.RESCHEDULED + + try: + gLogger.warn("Failure during", minorStatus) + + # Setting a job parameter does not help since the job will be rescheduled, + # instead set the status with the cause and then another status showing the + # reschedule operation. + + jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) + jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) + + # We must send Job States and Parameters before it gets reschedule + jobReport.sendStoredStatusInfo() + jobReport.sendStoredJobParameters() + + gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") + + result = JobManagerClient().rescheduleJob(int(jobID)) + if not result["OK"]: + gLogger.warn(result["Message"]) + if "Maximum number of reschedulings is reached" in result["Message"]: + rescheduleResult = JobStatus.FAILED + + return rescheduleResult + except Exception: + gLogger.exception("JobWrapperTemplate failed to reschedule Job") + return JobStatus.FAILED + + +def sendJobAccounting(job: JobWrapper, status: str, minorStatus: str): + """safe sending job accounting (always catching exceptions)""" + try: + job.sendJobAccounting(status, minorStatus) + except Exception: # pylint: disable=broad-except + gLogger.exception( + f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", + ) + + +def createAndEnterWorkingDirectory(jobID: str, workingDirectory: str, jobReport: JobReport) -> bool: + """Create the working directory and change to it""" + wdir = os.path.expandvars(workingDirectory) + if os.path.isdir(wdir): + os.chdir(wdir) + return True + + try: + os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) + if os.path.isdir(wdir): + os.chdir(wdir) + except OSError as osError: + if osError.errno == errno.EEXIST and os.path.isdir(wdir): + gLogger.exception("JobWrapperTemplate found that the working directory already exists") + rescheduleFailedJob(jobID, "Working Directory already exists", jobReport) + else: + gLogger.exception("JobWrapperTemplate could not create working directory") + rescheduleFailedJob(jobID, "Could Not Create Working Directory", jobReport) + return False + return True + + +def getJobWrapper(jobID: str, arguments: dict, jobReport: JobReport) -> JobWrapper: + """Create a JobWrapper instance""" + try: + job = JobWrapper(jobID, jobReport) + job.initialize(arguments) + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper failed the initialization phase") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + return None + return job + + +def transferInputSandbox(job: JobWrapper, inputSandbox: list, jobReport: JobReport) -> bool: + """Transfer the input sandbox""" + try: + result = job.transferInputSandbox(inputSandbox) + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to download input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while downloading input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + return True + + +def resolveInputData(job: JobWrapper, jobReport: JobReport) -> bool: + """Resolve the input data""" + try: + result = job.resolveInputData() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to resolve input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while resolving input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + return True + + +def processJobOutputs(job: JobWrapper, jobReport: JobReport) -> bool: + """Process the job outputs""" + try: + result = job.processJobOutputs() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to process output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while processing output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + return True + + +def finalize(job: JobWrapper) -> int: + """Finalize the job""" + try: + # Failed jobs will return !=0 / successful jobs will return 0 + return job.finalize() + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception during the finalization phase") + return 2 + + +def executePayload(job: JobWrapper, jobReport: JobReport) -> bool: + """Execute the payload""" + try: + result = job.execute() + if not result["OK"]: + gLogger.error("Failed to execute job", result["Message"]) + raise JobWrapperError((result["Message"], result["Errno"])) + except JobWrapperError as exc: + if exc.value[1] == 0 or str(exc.value[0]) == "0": + gLogger.verbose("JobWrapper exited with status=0 after execution") + if exc.value[1] == DErrno.EWMSRESC: + gLogger.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return False + gLogger.exception("Job failed in execution phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("Job raised exception during execution phase") + jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + return True diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py index 4f292797a4f..f999baf6826 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py @@ -132,7 +132,6 @@ def test_preProcess(mocker): f"-o /LocalSite/Site={DIRAC.siteName()}", "-o /LocalSite/GridCE=", "-o /LocalSite/CEQueue=", - "-o /LocalSite/RemoteExecution=False", ] assert ( result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" @@ -145,7 +144,7 @@ def test_preProcess(mocker): # Test dirac-jobexec with arguments jw = JobWrapper() jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} - jw.ceArgs = {"GridCE": "CE", "Queue": "Queue", "RemoteExecution": True} + jw.ceArgs = {"GridCE": "CE", "Queue": "Queue", "SubmissionPolicy": "Application"} result = jw.preProcess() assert result["OK"] @@ -278,7 +277,7 @@ def test_processFailedSubprocess(mocker): assert not result["Value"]["payloadStatus"] assert not result["Value"]["payloadOutput"] assert result["Value"]["payloadExecutorError"] == "Any problem" - assert result["Value"]["cpuTimeConsumed"][0] == 0.0 + assert round(result["Value"]["cpuTimeConsumed"][0], 1) == 0.0 assert not result["Value"]["watchdogError"] assert not result["Value"]["watchdogStats"] @@ -301,7 +300,7 @@ def test_processQuickExecutionNoWatchdog(mocker): assert result["Value"]["payloadStatus"] == 0 assert result["Value"]["payloadOutput"] == "hello" assert not result["Value"]["payloadExecutorError"] - assert result["Value"]["cpuTimeConsumed"][0] == 0.0 + assert round(result["Value"]["cpuTimeConsumed"][0], 1) == 0.0 assert not result["Value"]["watchdogError"] assert not result["Value"]["watchdogStats"] diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py index 9a5e5a21175..7aab4789bdf 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py @@ -104,7 +104,7 @@ def test_createAndExecuteJobWrapperTemplate_success(extraOptions): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f'sys.path.insert(0, "{os.getcwd()}")' in jobWrapperContent + assert f'os.path.realpath("{os.getcwd()}")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"] @@ -172,7 +172,7 @@ def test_createAndExecuteJobWrapperTemplate_missingExtraOptions(): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f'sys.path.insert(0, "{os.getcwd()}")' in jobWrapperContent + assert f'os.path.realpath("{os.getcwd()}")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"] @@ -251,7 +251,7 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f'sys.path.insert(0, "{rootLocation}")' in jobWrapperContent + assert f'os.path.realpath("{rootLocation}")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"] @@ -368,7 +368,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f"sys.path.insert(0, sitePython)" in jobWrapperContent + assert 'os.path.realpath(".")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") @@ -437,7 +437,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) assert result.returncode == 1, result.stderr - assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert b"JobID is not defined, running in current directory" not in result.stdout, result.stdout assert result.stderr == b"", result.stderr # 4. We recreate the job wrapper offline template with the payload params now @@ -467,7 +467,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) assert result.returncode == 1, result.stderr - assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert b"JobID is not defined, running in current directory" not in result.stdout, result.stdout assert result.stderr == b"", result.stderr # The root location should contain: @@ -508,7 +508,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) assert result.returncode == 0, result.stderr - assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout + assert b"JobID is not defined, running in current directory" in result.stdout, result.stdout assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout assert result.stderr == b"", result.stderr @@ -517,13 +517,11 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): # - the job wrapper configuration # - the job executable # - the job/Wrapper directory - # - the directory - assert len(os.listdir(rootLocation)) == numberOfFiles + 5 - assert os.path.exists(os.path.join(rootLocation, "1")) - assert os.path.exists(os.path.join(rootLocation, "1", "payloadResults.json")) - assert os.path.exists(os.path.join(rootLocation, "1", "checksum.json")) + assert len(os.listdir(rootLocation)) == numberOfFiles + 8 + assert os.path.exists(os.path.join(rootLocation, "payloadResults.json")) + assert os.path.exists(os.path.join(rootLocation, "checksum.json")) - with open(os.path.join(rootLocation, "1", "payloadResults.json")) as f: + with open(os.path.join(rootLocation, "payloadResults.json")) as f: payloadResults = json.load(f) assert payloadResults["OK"] @@ -532,7 +530,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): assert "payloadOutput" in payloadResults["Value"] assert "payloadStatus" in payloadResults["Value"] - with open(os.path.join(rootLocation, "1", "checksum.json")) as f: + with open(os.path.join(rootLocation, "checksum.json")) as f: checksums = json.load(f) assert jobParams["PayloadResults"] in checksums @@ -540,5 +538,8 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) os.unlink(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) - shutil.rmtree(os.path.join(rootLocation, "1")) + os.unlink(os.path.join(rootLocation, "payloadResults.json")) + os.unlink(os.path.join(rootLocation, "checksum.json")) + os.unlink(os.path.join(rootLocation, "std.err")) + os.unlink(os.path.join(rootLocation, "job.info")) shutil.rmtree(os.path.join(os.getcwd(), "job")) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py index c200bc834de..96826ecf4ce 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py @@ -5,6 +5,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import ( getGroupsForUser, getPropertiesForGroup, + getPropertiesForHost, getUsersInGroup, ) from DIRAC.Core.Security import Properties @@ -75,7 +76,12 @@ class JobPolicy: def __init__(self, user, userGroup, allInfo=True): self.userName = user self.userGroup = userGroup - self.userProperties = getPropertiesForGroup(userGroup, []) + + properties = getPropertiesForGroup(userGroup, []) + if not properties: + properties = getPropertiesForHost(user, []) + self.userProperties = properties + self.jobDB = None self.allInfo = allInfo self._permissions = {} diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 2655697dbaa..d20476f768c 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -22,6 +22,7 @@ def createJobWrapper( defaultWrapperLocation: str | None = "DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py", log: Logging | None = gLogger, logLevel: str | None = "INFO", + cfgPath: str | None = None, ): """This method creates a job wrapper filled with the CE and Job parameters to execute the job. Main user is the JobAgent. @@ -38,6 +39,7 @@ def createJobWrapper( :param defaultWrapperLocation: Location of the default job wrapper template :param log: Logger :param logLevel: Log level + :param cfgPath: Path to a specific configuration file :return: S_OK with the path to the job wrapper and the path to the job wrapper json file """ if isinstance(extraOptions, str) and extraOptions.endswith(".cfg"): @@ -91,13 +93,15 @@ def createJobWrapper( jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}") jobExeFile = os.path.join(wrapperPath, f"Job{jobID}") jobFileContents = """#!/bin/sh -{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no +{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {} """.format( pythonPath, jobWrapperDirect, extraOptions if extraOptions else "", logLevel, + cfgPath if cfgPath else "", ) + with open(jobExeFile, "w") as jobFile: jobFile.write(jobFileContents) diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py index acc4d0e66ab..e4d7ae198df 100755 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py @@ -1,8 +1,4 @@ #!/usr/bin/env python -######################################################################## -# File : dirac-jobexec -# Author : Stuart Paterson -######################################################################## """ The dirac-jobexec script is equipped to execute workflows that are specified via their XML description. The main client of this script is the Job Wrapper.