Skip to content

Commit

Permalink
fix(Resources): AREX can submit jobs without proxy (token only)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Oct 10, 2024
1 parent fbfad32 commit 1f7eb96
Showing 1 changed file with 66 additions and 71 deletions.
137 changes: 66 additions & 71 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _request(self, method, query, params=None, data=None, headers=None, timeout=
except requests.RequestException as e:
return S_ERROR(f"Request exception: {e}")

def _checkSession(self, mandatoryProxy: bool = False):
def _checkSession(self):
"""Check that the session exists and carries a valid proxy."""
if not self.session:
return S_ERROR("REST interface not initialised.")
Expand All @@ -188,13 +188,12 @@ def _checkSession(self, mandatoryProxy: bool = False):
self.log.error("Proxy or token not set")
return S_ERROR("Proxy or token not set")

# A proxy might be required: in this case, it should be present
if mandatoryProxy and not self.proxy:
self.log.error("Proxy is mandatory but not set")
return S_ERROR("Proxy is mandatory but not set")

# If a proxy is required or if no token is set
if mandatoryProxy or not self.token:
# If a token is set, we use it
if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = f"Bearer {self.token['access_token']}"
self.log.verbose("A token is attached to the header of the request(s)")
else:
# Prepare the proxy in X509_USER_PROXY
if not (result := self._prepareProxy())["OK"]:
self.log.error("Failed to set up proxy", result["Message"])
Expand All @@ -204,12 +203,6 @@ def _checkSession(self, mandatoryProxy: bool = False):
self.session.cert = os.environ.get("X509_USER_PROXY")
self.log.verbose("A proxy is attached to the session")

# If a token is set, we use it
if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = f"Bearer {self.token['access_token']}"
self.log.verbose("A token is attached to the header of the request(s)")

return S_OK()

#############################################################################
Expand Down Expand Up @@ -426,63 +419,66 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
And none of our supported batch systems have a "-" in their name
"""
result = self._checkSession(mandatoryProxy=True)
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot submit jobs", result["Message"])
return result

self.log.verbose(f"Executable file path: {executableFile}")

# Get existing delegations
result = self._getDelegationIDs()
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")
delegationIDs = result["Value"]

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)

# Bug in AREX, sometimes delegationID does not exist anymore,
# but still appears in the list of delegationIDs.
# Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133
# In this case, we just try with the next one
if not result["OK"] and "404" in result["Message"]:
continue

# Else, it means there was an issue with the CE,
# we stop the execution
if not result["OK"]:
return result

proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID
break

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
return S_ERROR("Could not get a new delegation")
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

if not inputs:
inputs = []
if not outputs:
outputs = []

# Delegation cannot be used with a token
delegation = ""
if not self.token:
# Get existing delegations
result = self._getDelegationIDs()
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")
delegationIDs = result["Value"]

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)

# Bug in AREX, sometimes delegationID does not exist anymore,
# but still appears in the list of delegationIDs.
# Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133
# In this case, we just try with the next one
if not result["OK"] and "404" in result["Message"]:
continue

# Else, it means there was an issue with the CE,
# we stop the execution
if not result["OK"]:
return result

proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID
break

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
return S_ERROR("Could not get a new delegation")
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

# If there is a preamble, then we bundle it in an executable file
if self.preamble:
inputs.append(executableFile)
Expand Down Expand Up @@ -723,14 +719,12 @@ def _renewDelegation(self, delegationID):

return S_OK()

#############################################################################

def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs.
:param list jobIDList: list of job references, followed by the DIRAC stamp.
"""
result = self._checkSession(mandatoryProxy=True)
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot get status of the jobs", result["Message"])
return result
Expand Down Expand Up @@ -776,15 +770,16 @@ def getJobStatus(self, jobIDList):
self.log.debug(f"Killing held job {jobReference}")

# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
if not result["OK"]:
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not self.token:
result = self._getDelegationIDs()
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")

# Kill held jobs
if jobsToCancel:
Expand Down

0 comments on commit 1f7eb96

Please sign in to comment.