Skip to content

Commit

Permalink
feat: introduce the JobWrapperOfflineTemplate
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Apr 17, 2024
1 parent 79c82ad commit 46adcc0
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
)
if not result["OK"]:
return result
wrapperPath = result["Value"]["JobExecutableRelocatedPath"]
wrapperPath = result["Value"].get("JobExecutableRelocatedPath")

if self.__installDIRACInContainer:
infoDict = None
Expand Down
5 changes: 3 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,6 @@ def process(self, command: str, output: str, error: str, env: dict):
# Job specifies memory in GB, internally use KB
jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0

# The actual executable is not yet running: it will be in few lines
self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True)
spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit))
exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults)
exeThread.start()
Expand Down Expand Up @@ -574,6 +572,9 @@ def execute(self):
return result
payloadParams = result["Value"]

# The actual executable is not yet running: it will be in few lines
self.__report(minorStatus=JobMinorStatus.APPLICATION, sendFlag=True)

result = self.process(
command=payloadParams["command"],
output=payloadParams["output"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python
""" This template will become the job wrapper that's actually executed.
The JobWrapperOfflineTemplate is completed and invoked by the PushJobAgent and uses functionalities from JobWrapper module.
It is executed in environment where external connections are not allowed.
We assume this script is executed in a specific environment where DIRAC is available.
"""
import hashlib
import sys
import json
import os

sitePython = os.path.realpath("@SITEPYTHON@")
if sitePython:
sys.path.insert(0, sitePython)

from DIRAC.Core.Base.Script import Script

Script.parseCommandLine()

from DIRAC import gLogger
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper

os.umask(0o22)


def execute(jobID: str, arguments: dict):
"""The only real function executed here"""
payloadParams = arguments.pop("Payload", {})
if not payloadParams:
return 1

if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]:
return 1

try:
job = JobWrapper(jobID)
job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapper failed the initialization phase", lException=exc)
return 1

payloadResult = job.process(**payloadParams)
if not payloadResult["OK"]:
return 1

# Store the payload result
with open(arguments["Job"]["PayloadResults"], "w") as f:
json.dump(payloadResult, f)

# Generate the checksum of the files present in the current directory
checksums = {}
for file in os.listdir("."):
if not os.path.isfile(file):
continue
with open(file, "rb") as f:
digest = hashlib.file_digest(f, "sha256")
checksums[file] = digest.hexdigest()

with open(arguments["Job"]["Checksum"], "w") as f:
json.dump(checksums, f)

return 0


##########################################################


ret = -3
try:
jsonFileName = os.path.realpath(__file__) + ".json"
with open(jsonFileName) as f:
jobArgs = json.load(f)
if not isinstance(jobArgs, dict):
raise TypeError(f"jobArgs is of type {type(jobArgs)}")
if "Job" not in jobArgs:
raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}")

jobID = jobArgs["Job"].get("JobID", 0)
jobID = int(jobID)

ret = execute(jobID, jobArgs)
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapperTemplate exception")

sys.exit(ret)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""
import sys
import json
import ast
import os
import errno
import time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def test_createAndExecuteJobWrapperTemplate_success(extraOptions):

# This is the default wrapper path
assert os.path.exists(os.path.join(os.getcwd(), "job/Wrapper"))
shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper"))
shutil.rmtree(os.path.join(os.getcwd(), "job"))


def test_createAndExecuteJobWrapperTemplate_missingExtraOptions():
Expand Down Expand Up @@ -211,7 +211,7 @@ def test_createAndExecuteJobWrapperTemplate_missingExtraOptions():

# This is the default wrapper path
assert os.path.exists(os.path.join(os.getcwd(), "job/Wrapper"))
shutil.rmtree(os.path.join(os.getcwd(), "job/Wrapper"))
shutil.rmtree(os.path.join(os.getcwd(), "job"))


def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions):
Expand Down Expand Up @@ -332,3 +332,212 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions):

shutil.rmtree(rootLocation)
shutil.rmtree(wrapperPath)


def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions):
"""Test the creation of an offline job wrapper and its execution:
This is generally used when pre/post processing operations are executed locally,
while the workflow itself is executed on a remote computing resource (PushJobAgent).
"""
# Working directory on the remote resource
rootLocation = "."
numberOfFiles = len(os.listdir(rootLocation))

# Create relocated job wrapper
res = createJobWrapper(
jobID=1,
jobParams=jobParams,
resourceParams=resourceParams,
optimizerParams=optimizerParams,
# This is the interesting part
defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py",
pythonPath="python",
rootLocation=rootLocation,
extraOptions=extraOptions,
)
assert res["OK"], res.get("Message")

# Test job wrapper content
jobWrapperPath = res["Value"].get("JobWrapperPath")
assert jobWrapperPath
assert os.path.exists(jobWrapperPath)
assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperPath)))

with open(jobWrapperPath) as f:
jobWrapperContent = f.read()

assert "@SITEPYTHON@" not in jobWrapperContent
assert f"sys.path.insert(0, sitePython)" in jobWrapperContent

# Test job wrapper configuration path
jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath")
assert jobWrapperConfigPath
assert os.path.exists(jobWrapperConfigPath)
assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath)))

with open(jobWrapperConfigPath) as f:
jobWrapperConfigContent = json.load(f)

assert jobWrapperConfigContent["Job"] == jobParams
assert jobWrapperConfigContent["CE"] == resourceParams
assert jobWrapperConfigContent["Optimizer"] == optimizerParams
assert "Payload" not in jobWrapperConfigContent

# Test job executable path
jobExecutablePath = res["Value"].get("JobExecutablePath")
assert jobExecutablePath
assert os.path.exists(jobExecutablePath)
assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobExecutablePath)))

with open(jobExecutablePath) as f:
jobExecutableContent = f.read()

assert os.path.realpath(sys.executable) not in jobExecutableContent
assert "python" in jobExecutableContent

assert jobWrapperPath not in jobExecutableContent
assert os.path.join(rootLocation, os.path.basename(jobWrapperPath)) in jobExecutableContent
assert extraOptions in jobExecutableContent
assert "-o LogLevel=INFO" in jobExecutableContent
assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent

# Test job executable relocated path
jobExecutableRelocatedPath = res["Value"].get("JobExecutableRelocatedPath")
assert jobExecutableRelocatedPath
assert jobExecutablePath != jobExecutableRelocatedPath
assert os.path.basename(jobExecutablePath) == os.path.basename(jobExecutableRelocatedPath)
assert not os.path.exists(jobExecutableRelocatedPath)

# 1. Execute the executable file in a subprocess without relocating the files as if they were on the remote resource
# We expect it to fail because the job wrapper is not in the expected location
os.chmod(jobExecutablePath, 0o755)
result = subprocess.run(jobExecutablePath, shell=True, capture_output=True)

assert result.returncode == 2, result.stderr
assert result.stdout == b"", result.stdout
assert b"can't open file" in result.stderr, result.stderr

# 2. Execute the relocated executable file in a subprocess without relocating the files as they would be on the remote resource
# We expect it to fail because the relocated executable should not exist
os.chmod(jobExecutablePath, 0o755)
result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True)

assert result.returncode == 127, result.stderr
assert result.stdout == b"", result.stdout
assert f"{jobExecutableRelocatedPath}: not found".encode() in result.stderr, result.stderr

# 3. Now we relocate the files as if they were on a remote resource and execute the relocated executable file in a subprocess
# We expect it to fail because the payload parameters are not available
shutil.copy(jobWrapperPath, rootLocation)
shutil.copy(jobWrapperConfigPath, rootLocation)
shutil.copy(jobExecutablePath, rootLocation)
os.chmod(jobExecutablePath, 0o755)

result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True)

assert result.returncode == 1, result.stderr
assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout
assert result.stderr == b"", result.stderr

# 4. We recreate the job wrapper offline template with the payload params now
# We did not specify where the results and checksum should be stored, so we expect it to fail
res = createJobWrapper(
jobID=1,
jobParams=jobParams,
resourceParams=resourceParams,
optimizerParams=optimizerParams,
# This is the interesting part
defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py",
pythonPath="python",
rootLocation=rootLocation,
extraOptions=extraOptions,
payloadParams=payloadParams,
)
assert res["OK"], res.get("Message")
jobWrapperPath = res["Value"].get("JobWrapperPath")
jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath")
jobExecutablePath = res["Value"].get("JobExecutablePath")

shutil.copy(jobWrapperPath, rootLocation)
shutil.copy(jobWrapperConfigPath, rootLocation)
shutil.copy(jobExecutablePath, rootLocation)
os.chmod(jobExecutablePath, 0o755)

result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True)

assert result.returncode == 1, result.stderr
assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout
assert result.stderr == b"", result.stderr

# The root location should contain:
# - the job wrapper
# - the job wrapper configuration
# - the job executable
# - the job/Wrapper directory
print(os.listdir(rootLocation))
assert len(os.listdir(rootLocation)) == numberOfFiles + 4

# 5. We recreate the job wrapper offline template with the payload params and the additional job params
# It should work fine now
jobParams["PayloadResults"] = "payloadResults.json"
jobParams["Checksum"] = "checksum.json"

res = createJobWrapper(
jobID=1,
jobParams=jobParams,
resourceParams=resourceParams,
optimizerParams=optimizerParams,
# This is the interesting part
defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py",
pythonPath="python",
rootLocation=rootLocation,
extraOptions=extraOptions,
payloadParams=payloadParams,
)
assert res["OK"], res.get("Message")
jobWrapperPath = res["Value"].get("JobWrapperPath")
jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath")
jobExecutablePath = res["Value"].get("JobExecutablePath")

shutil.copy(jobWrapperPath, rootLocation)
shutil.copy(jobWrapperConfigPath, rootLocation)
shutil.copy(jobExecutablePath, rootLocation)
os.chmod(jobExecutablePath, 0o755)

result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True)

assert result.returncode == 0, result.stderr
assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout
assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout
assert result.stderr == b"", result.stderr

# The root location should contain:
# - the job wrapper
# - the job wrapper configuration
# - the job executable
# - the job/Wrapper directory
# - the <jobID> directory
assert len(os.listdir(rootLocation)) == numberOfFiles + 5
assert os.path.exists(os.path.join(rootLocation, "1"))
assert os.path.exists(os.path.join(rootLocation, "1", "payloadResults.json"))
assert os.path.exists(os.path.join(rootLocation, "1", "checksum.json"))

with open(os.path.join(rootLocation, "1", "payloadResults.json")) as f:
payloadResults = json.load(f)

assert payloadResults["OK"]
assert "cpuTimeConsumed" in payloadResults["Value"]
assert "payloadExecutorError" in payloadResults["Value"]
assert "payloadOutput" in payloadResults["Value"]
assert "payloadStatus" in payloadResults["Value"]

with open(os.path.join(rootLocation, "1", "checksum.json")) as f:
checksums = json.load(f)

assert jobParams["PayloadResults"] in checksums

os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperPath)))
os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath)))
os.unlink(os.path.join(rootLocation, os.path.basename(jobExecutablePath)))
shutil.rmtree(os.path.join(rootLocation, "1"))
shutil.rmtree(os.path.join(os.getcwd(), "job"))
6 changes: 5 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def createJobWrapper(
jobParams: dict,
resourceParams: dict,
optimizerParams: dict,
payloadParams: dict | None = None,
extraOptions: str | None = None,
wrapperPath: str | None = None,
rootLocation: str | None = None,
Expand All @@ -29,11 +30,12 @@ def createJobWrapper(
:param jobParams: Job parameters
:param resourceParams: CE parameters
:param optimizerParams: Optimizer parameters
:param payloadParams: Payload parameters
:param extraOptions: Extra options to be passed to the job wrapper
:param wrapperPath: Path where the job wrapper will be created
:param rootLocation: Location where the job wrapper will be executed
:param defaultWrapperLocation: Location of the default job wrapper template
:param pythonPath: Path to the python executable
:param defaultWrapperLocation: Location of the default job wrapper template
:param log: Logger
:param logLevel: Log level
:return: S_OK with the path to the job wrapper and the path to the job wrapper json file
Expand All @@ -42,6 +44,8 @@ def createJobWrapper(
extraOptions = f"--cfg {extraOptions}"

arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams}
if payloadParams:
arguments["Payload"] = payloadParams
log.verbose(f"Job arguments are: \n {arguments}")

if not wrapperPath:
Expand Down

0 comments on commit 46adcc0

Please sign in to comment.