Skip to content

Commit

Permalink
feat: introduce the JobExecutionCoordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jul 8, 2024
1 parent f8d8003 commit b6cef9b
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from DIRAC import S_OK


class JobExecutionCoordinator:
"""
Abstract class for job execution coordinators.
This class is responsible for pre-processing and post-processing jobs before and after execution.
It should be implemented by the community job execution coordinator.
It is used by the JobWrapper to handle the execution of jobs.
"""

def __init__(self, jobArgs: dict, ceArgs: dict) -> None:
"""
Initialize the job execution coordinator.
:param jobArgs: The job arguments
:param ceArgs: The environment arguments
"""
self.jobArgs = jobArgs
self.ceArgs = ceArgs

def preProcess(self, command: str, exeEnv: dict):
"""
Pre-process a job before executing it.
This should handle tasks like downloading inputs, preparing commands, etc.
:param job: The job to be pre-processed
"""
return S_OK({"command": command, "env": exeEnv})

def postProcess(self):
"""
Post-process a job after executing it.
This should handle tasks like uploading outputs, checking results, etc.
:param job: The job to be post-processed
"""
return S_OK()
49 changes: 30 additions & 19 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def __init__(self, jobID=None, jobReport=None):
if self.maxPeekLines < 0:
self.maxPeekLines = 0
self.defaultCPUTime = gConfig.getValue(self.section + "/DefaultCPUTime", 600)
self.defaultOutputFile = gConfig.getValue(self.section + "/DefaultOutputFile", "std.out")
self.defaultErrorFile = gConfig.getValue(self.section + "/DefaultErrorFile", "std.err")
self.outputFile = gConfig.getValue(self.section + "/DefaultOutputFile", "std.out")
self.errorFile = gConfig.getValue(self.section + "/DefaultErrorFile", "std.err")
self.diskSE = gConfig.getValue(self.section + "/DiskSE", ["-disk", "-DST", "-USER"])
self.tapeSE = gConfig.getValue(self.section + "/TapeSE", ["-tape", "-RDST", "-RAW"])
self.failoverRequestDelay = gConfig.getValue(self.section + "/FailoverRequestDelay", 45)
Expand Down Expand Up @@ -175,6 +175,8 @@ def __init__(self, jobID=None, jobReport=None):
self.optArgs = {}
self.ceArgs = {}

self.jobExecutionCoordinator = None

# Store the result of the payload execution
self.executionResults = {}

Expand All @@ -194,6 +196,10 @@ def initialize(self, arguments):
self.jobGroup = self.jobArgs.get("JobGroup", self.jobGroup)
self.jobName = self.jobArgs.get("JobName", self.jobName)
self.jobType = self.jobArgs.get("JobType", self.jobType)
# Prepare outputs
self.errorFile = self.jobArgs.get("StdError", self.errorFile)
self.outputFile = self.jobArgs.get("StdOutput", self.outputFile)

dataParam = self.jobArgs.get("InputData", [])
if dataParam and not isinstance(dataParam, list):
dataParam = [dataParam]
Expand Down Expand Up @@ -232,6 +238,14 @@ def initialize(self, arguments):
self.log.debug("================")
self.log.debug(json.dumps(dict(os.environ), indent=4))

# Load the Job Execution Coordinator: can be overriden by a specific implementation
result = ObjectLoader().loadObject(
"WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator", "JobExecutionCoordinator"
)
if not result["OK"]:
return result
self.jobExecutionCoordinator = result["Value"](jobArgs=self.jobArgs, ceArgs=self.ceArgs)

#############################################################################
def __setInitialJobParameters(self):
"""Sets some initial job parameters"""
Expand Down Expand Up @@ -358,26 +372,19 @@ def preProcess(self):
command = result["Value"]
self.log.verbose(f"Execution command: {command}")

# Prepare outputs
errorFile = self.jobArgs.get("StdError", self.defaultErrorFile)
outputFile = self.jobArgs.get("StdOutput", self.defaultOutputFile)

result = self.__prepareEnvironment()
if not result["OK"]:
return result
exeEnv = result["Value"]

return S_OK(
{
"command": command,
"error": errorFile,
"output": outputFile,
"env": exeEnv,
}
)
if not (result := self.jobExecutionCoordinator.preProcess(command, exeEnv))["OK"]:
self.log.error("Failed to pre-process the job", result["Message"])
return result

return result

#############################################################################
def process(self, command: str, output: str, error: str, env: dict):
def process(self, command: str, env: dict):
"""This method calls the payload."""
self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}")

Expand All @@ -398,7 +405,9 @@ def process(self, command: str, output: str, error: str, env: dict):

spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit))
with contextlib.chdir(self.jobIDPath):
exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults)
exeThread = ExecutionThread(
spObject, command, self.maxPeekLines, self.outputFile, self.errorFile, env, self.executionResults
)
exeThread.start()
payloadPID = None
for seconds in range(5, 40, 5):
Expand Down Expand Up @@ -565,7 +574,11 @@ def postProcess(
self.failedFlag = False
self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True)

return S_OK()
if not (result := self.jobExecutionCoordinator.postProcess())["OK"]:
self.log.error("Failed to post-process the job", result["Message"])
return result

return result

#############################################################################
def execute(self):
Expand All @@ -580,8 +593,6 @@ def execute(self):

result = self.process(
command=payloadParams["command"],
output=payloadParams["output"],
error=payloadParams["error"],
env=payloadParams["env"],
)
if not result["OK"]:
Expand Down
Loading

0 comments on commit b6cef9b

Please sign in to comment.