Skip to content

Commit

Permalink
feat: merge createJobWrapper & createRelocatedJobWrapper + few adjust…
Browse files Browse the repository at this point in the history
…ments
  • Loading branch information
aldbr committed Mar 22, 2024
1 parent 610c3b7 commit 1736b5d
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 125 deletions.
7 changes: 4 additions & 3 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createRelocatedJobWrapper
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper

# Default container to use if it isn't specified in the CE options
CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4"
Expand Down Expand Up @@ -255,20 +255,21 @@ def __createWorkArea(self, jobDesc=None, log=None, logLevel="INFO", proxy=None):
self.log.warn("No user proxy")

# Job Wrapper (Standard-ish DIRAC wrapper)
result = createRelocatedJobWrapper(
result = createJobWrapper(
wrapperPath=tmpDir,
rootLocation=self.__innerdir,
jobID=jobDesc.get("jobID", 0),
jobParams=jobDesc.get("jobParams", {}),
resourceParams=jobDesc.get("resourceParams", {}),
optimizerParams=jobDesc.get("optimizerParams", {}),
pythonPath="python",
log=log,
logLevel=logLevel,
extraOptions="" if self.__installDIRACInContainer else "/tmp/pilot.cfg",
)
if not result["OK"]:
return result
wrapperPath = result["Value"]
wrapperPath = result["Value"]["JobExecutableRelocatedPath"]

if self.__installDIRACInContainer:
infoDict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ def test_submitJob():
resourceParams = {"GridCE": "some_CE"}
optimizerParams = {}

wrapperFile = createJobWrapper(2, jobParams, resourceParams, optimizerParams, logLevel="DEBUG")["Value"][
0
wrapperFile = createJobWrapper(
jobID=2, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG"
)["Value"][
"JobExecutablePath"
] # This is not under test, assuming it works fine
res = ce.submitJob(
wrapperFile,
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
Expand Down Expand Up @@ -631,8 +630,8 @@ def _submitJob(
if not result["OK"]:
return result

wrapperFile = result["Value"][0]
inputs = list(result["Value"][1:])
wrapperFile = result["Value"]["JobExecutablePath"]
inputs = [result["Value"]["JobWrapperPath"], result["Value"]["JobWrapperConfigPath"]]
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Submitting To CE")

self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Test class for Job Agent
"""
import os
from pathlib import Path
import pytest
import time
from unittest.mock import MagicMock
Expand Down Expand Up @@ -540,6 +541,27 @@ def test__rescheduleFailedJob_multipleJobIDs(mocker):


#############################################################################
@pytest.fixture
def manageJobFiles():
"""Create fake job files and yield their paths."""
jobExecutablePath = "testJob.py"
with open(jobExecutablePath, "w") as execFile:
pass
os.chmod(jobExecutablePath, 0o755)

# Generate fake jobWrapperPath and jobWrapperConfigPath
jobWrapperPath = "Wrapper_123"
with open(jobWrapperPath, "w") as temp:
temp.write("test")
jobWrapperConfigPath = "Wrapper_123.json"
with open(jobWrapperConfigPath, "w") as temp:
temp.write("test")

yield (jobExecutablePath, jobWrapperPath, jobWrapperConfigPath)

Path(jobExecutablePath).unlink(missing_ok=True)
Path(jobWrapperPath).unlink(missing_ok=True)
Path(jobWrapperConfigPath).unlink(missing_ok=True)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -587,22 +609,29 @@ def test_submitJob(mocker, mockJWInput, expected):
("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])),
],
)
def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2):
def test_submitAndCheckJob(mocker, manageJobFiles, localCE, job, expectedResult1, expectedResult2):
"""Test the submission and the management of the job status."""
jobName = "testJob.py"
with open(jobName, "w") as execFile:
execFile.write(job)
os.chmod(jobName, 0o755)

jobID = "123"
jobExecutablePath, jobWrapperPath, jobWrapperConfigPath = manageJobFiles
with open(jobExecutablePath, "w") as execFile:
execFile.write(job)

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobMonitoringClient.getJobsStatus",
return_value=S_OK({int(jobID): {"Status": JobStatus.RUNNING}}),
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK([jobName]))
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper",
return_value=S_OK(
{
"JobExecutablePath": jobExecutablePath,
"JobWrapperPath": jobWrapperPath,
"JobWrapperConfigPath": jobWrapperConfigPath,
}
),
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK())
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
Expand Down Expand Up @@ -684,7 +713,12 @@ def test_submitAndCheck2Jobs(mocker):
# Mock the JobAgent
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"]))
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper",
return_value=S_OK(
{"JobExecutablePath": "jobName", "JobWrapperPath": "jobName", "JobWrapperConfigPath": "jobName"}
),
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK())
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ def execute(arguments):
try:
jsonFileName = os.path.realpath(__file__) + ".json"
with open(jsonFileName) as f:
jobArgsFromJSON = json.loads(f.readlines()[0])
jobArgs = ast.literal_eval(jobArgsFromJSON)
jobArgs = json.load(f)
if not isinstance(jobArgs, dict):
raise TypeError(f"jobArgs is of type {type(jobArgs)}")
if "Job" not in jobArgs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Here we test the creation of a job wrapper and make sure it can be executed without crashing.
We don't test the actual execution of the wrapper or its payload.
"""
import ast
import json
import os
import shutil
Expand All @@ -12,7 +11,7 @@
from diraccfg import CFG
import pytest

from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper, createRelocatedJobWrapper
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
import subprocess


Expand Down Expand Up @@ -96,31 +95,31 @@ def test_createAndExecuteJobWrapperTemplate_success(extraOptions):
assert res["OK"], res.get("Message")

# Test job wrapper content
jobWrapperPath = res["Value"][2]
jobWrapperPath = res["Value"]["JobWrapperPath"]
assert jobWrapperPath
assert os.path.exists(jobWrapperPath)

with open(jobWrapperPath) as f:
jobWrapperContent = f.read()

assert "@SITEPYTHON@" not in jobWrapperContent
assert f"{os.getcwd()}" in jobWrapperContent
assert f'sys.path.insert(0, "{os.getcwd()}")' in jobWrapperContent

# Test job wrapper configuration path
jobWrapperConfigPath = res["Value"][1]
jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"]
assert jobWrapperConfigPath
assert os.path.exists(jobWrapperConfigPath)

with open(jobWrapperConfigPath) as f:
jobWrapperConfigContent = ast.literal_eval(json.loads(f.readlines()[0]))
jobWrapperConfigContent = json.load(f)

assert jobWrapperConfigContent["Job"] == jobParams
assert jobWrapperConfigContent["CE"] == resourceParams
assert jobWrapperConfigContent["Optimizer"] == optimizerParams
assert "Payload" not in jobWrapperConfigContent

# Test job executable path
jobExecutablePath = res["Value"][0]
jobExecutablePath = res["Value"]["JobExecutablePath"]
assert jobExecutablePath
assert os.path.exists(jobExecutablePath)

Expand All @@ -133,6 +132,9 @@ def test_createAndExecuteJobWrapperTemplate_success(extraOptions):
assert "-o LogLevel=INFO" in jobExecutableContent
assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent

# Test job executable relocated path
assert not res["Value"].get("JobExecutableRelocatedPath")

# Execute wrapperFile in a subprocess
os.chmod(jobExecutablePath, 0o755)
result = subprocess.run(jobExecutablePath, shell=True, capture_output=True)
Expand Down Expand Up @@ -161,31 +163,31 @@ def test_createAndExecuteJobWrapperTemplate_missingExtraOptions():
assert res["OK"], res.get("Message")

# Test job wrapper content
jobWrapperPath = res["Value"][2]
jobWrapperPath = res["Value"]["JobWrapperPath"]
assert jobWrapperPath
assert os.path.exists(jobWrapperPath)

with open(jobWrapperPath) as f:
jobWrapperContent = f.read()

assert "@SITEPYTHON@" not in jobWrapperContent
assert f"{os.getcwd()}" in jobWrapperContent
assert f'sys.path.insert(0, "{os.getcwd()}")' in jobWrapperContent

# Test job wrapper configuration path
jobWrapperConfigPath = res["Value"][1]
jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"]
assert jobWrapperConfigPath
assert os.path.exists(jobWrapperConfigPath)

with open(jobWrapperConfigPath) as f:
jobWrapperConfigContent = ast.literal_eval(json.loads(f.readlines()[0]))
jobWrapperConfigContent = json.load(f)

assert jobWrapperConfigContent["Job"] == jobParams
assert jobWrapperConfigContent["CE"] == resourceParams
assert jobWrapperConfigContent["Optimizer"] == optimizerParams
assert "Payload" not in jobWrapperConfigContent

# Test job executable path
jobExecutablePath = res["Value"][0]
jobExecutablePath = res["Value"]["JobExecutablePath"]
assert jobExecutablePath
assert os.path.exists(jobExecutablePath)

Expand All @@ -197,6 +199,9 @@ def test_createAndExecuteJobWrapperTemplate_missingExtraOptions():
assert "-o LogLevel=INFO" in jobExecutableContent
assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent

# Test job executable relocated path
assert not res["Value"].get("JobExecutableRelocatedPath")

# Execute wrapperFile in a subprocess
os.chmod(jobExecutablePath, 0o755)
result = subprocess.run(jobExecutablePath, shell=True, capture_output=True)
Expand All @@ -221,19 +226,21 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions):
os.makedirs(rootLocation, exist_ok=True)

# Create relocated job wrapper
res = createRelocatedJobWrapper(
res = createJobWrapper(
jobID=1,
jobParams=jobParams,
resourceParams=resourceParams,
optimizerParams=optimizerParams,
# This is the interesting part
pythonPath="python",
wrapperPath=wrapperPath,
rootLocation=rootLocation,
extraOptions=extraOptions,
)
assert res["OK"], res.get("Message")

# Test job wrapper content
jobWrapperPath = os.path.join(wrapperPath, f"Wrapper_1")
jobWrapperPath = res["Value"]["JobWrapperPath"]
assert jobWrapperPath
assert os.path.exists(jobWrapperPath)
assert os.path.exists(os.path.join(wrapperPath, os.path.basename(jobWrapperPath)))
Expand All @@ -243,25 +250,25 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions):
jobWrapperContent = f.read()

assert "@SITEPYTHON@" not in jobWrapperContent
assert rootLocation in jobWrapperContent
assert f'sys.path.insert(0, "{rootLocation}")' in jobWrapperContent

# Test job wrapper configuration path
jobWrapperConfigPath = os.path.join(wrapperPath, f"Wrapper_1.json")
jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"]
assert jobWrapperConfigPath
assert os.path.exists(jobWrapperConfigPath)
assert os.path.exists(os.path.join(wrapperPath, os.path.basename(jobWrapperConfigPath)))
assert not os.path.exists(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath)))

with open(jobWrapperConfigPath) as f:
jobWrapperConfigContent = ast.literal_eval(json.loads(f.readlines()[0]))
jobWrapperConfigContent = json.load(f)

assert jobWrapperConfigContent["Job"] == jobParams
assert jobWrapperConfigContent["CE"] == resourceParams
assert jobWrapperConfigContent["Optimizer"] == optimizerParams
assert "Payload" not in jobWrapperConfigContent

# Test job executable path
jobExecutablePath = os.path.join(wrapperPath, f"Job1")
jobExecutablePath = res["Value"]["JobExecutablePath"]
assert jobExecutablePath
assert os.path.exists(jobExecutablePath)
assert os.path.exists(os.path.join(wrapperPath, os.path.basename(jobExecutablePath)))
Expand All @@ -280,7 +287,7 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions):
assert "-o /DIRAC/Security/UseServerCertificate=no" in jobExecutableContent

# Test job executable relocated path
jobExecutableRelocatedPath = res["Value"]
jobExecutableRelocatedPath = res["Value"]["JobExecutableRelocatedPath"]
assert jobExecutableRelocatedPath
assert jobExecutablePath != jobExecutableRelocatedPath
assert os.path.basename(jobExecutablePath) == os.path.basename(jobExecutableRelocatedPath)
Expand Down
Loading

0 comments on commit 1736b5d

Please sign in to comment.