diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index fb91f4a7877..761ee8da012 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -174,7 +174,7 @@ def _request(self, method, query, params=None, data=None, headers=None, timeout= except requests.RequestException as e: return S_ERROR(f"Request exception: {e}") - def _checkSession(self, mandatoryProxy: bool = False): + def _checkSession(self): """Check that the session exists and carries a valid proxy.""" if not self.session: return S_ERROR("REST interface not initialised.") @@ -188,13 +188,12 @@ def _checkSession(self, mandatoryProxy: bool = False): self.log.error("Proxy or token not set") return S_ERROR("Proxy or token not set") - # A proxy might be required: in this case, it should be present - if mandatoryProxy and not self.proxy: - self.log.error("Proxy is mandatory but not set") - return S_ERROR("Proxy is mandatory but not set") - - # If a proxy is required or if no token is set - if mandatoryProxy or not self.token: + # If a token is set, we use it + if self.token: + # Attach the token to the headers if present + self.headers["Authorization"] = f"Bearer {self.token['access_token']}" + self.log.verbose("A token is attached to the header of the request(s)") + else: # Prepare the proxy in X509_USER_PROXY if not (result := self._prepareProxy())["OK"]: self.log.error("Failed to set up proxy", result["Message"]) @@ -204,12 +203,6 @@ def _checkSession(self, mandatoryProxy: bool = False): self.session.cert = os.environ.get("X509_USER_PROXY") self.log.verbose("A proxy is attached to the session") - # If a token is set, we use it - if self.token: - # Attach the token to the headers if present - self.headers["Authorization"] = f"Bearer {self.token['access_token']}" - self.log.verbose("A token is attached to the header of the request(s)") - return S_OK() ############################################################################# @@ -426,63 +419,66 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= Assume that the ARC queues are always of the format nordugrid-- And none of our supported batch systems have a "-" in their name """ - result = self._checkSession(mandatoryProxy=True) + result = self._checkSession() if not result["OK"]: self.log.error("Cannot submit jobs", result["Message"]) return result self.log.verbose(f"Executable file path: {executableFile}") - # Get existing delegations - result = self._getDelegationIDs() - if not result["OK"]: - self.log.error("Could not get delegation IDs.", result["Message"]) - return S_ERROR("Could not get delegation IDs") - delegationIDs = result["Value"] - - # Get the delegationID which corresponds to the DIRAC group of the proxy if it exists - currentDelegationID = None - proxyGroup = self.proxy.getDIRACGroup() - for delegationID in delegationIDs: - # Get the proxy attached to the delegationID - result = self._getProxyFromDelegationID(delegationID) - - # Bug in AREX, sometimes delegationID does not exist anymore, - # but still appears in the list of delegationIDs. - # Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133 - # In this case, we just try with the next one - if not result["OK"] and "404" in result["Message"]: - continue - - # Else, it means there was an issue with the CE, - # we stop the execution - if not result["OK"]: - return result - - proxy = result["Value"] - - if proxy.getDIRACGroup() != proxyGroup: - continue - - # If we are here, we have found the right delegationID to use - currentDelegationID = delegationID - break - - if not currentDelegationID: - # No existing delegation, we need to prepare one - result = self._prepareDelegation() - if not result["OK"]: - self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}") - return S_ERROR("Could not get a new delegation") - currentDelegationID = result["Value"] - - delegation = f"\n(delegationid={currentDelegationID})" - if not inputs: inputs = [] if not outputs: outputs = [] + # Delegation cannot be used with a token + delegation = "" + if not self.token: + # Get existing delegations + result = self._getDelegationIDs() + if not result["OK"]: + self.log.error("Could not get delegation IDs.", result["Message"]) + return S_ERROR("Could not get delegation IDs") + delegationIDs = result["Value"] + + # Get the delegationID which corresponds to the DIRAC group of the proxy if it exists + currentDelegationID = None + proxyGroup = self.proxy.getDIRACGroup() + for delegationID in delegationIDs: + # Get the proxy attached to the delegationID + result = self._getProxyFromDelegationID(delegationID) + + # Bug in AREX, sometimes delegationID does not exist anymore, + # but still appears in the list of delegationIDs. + # Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133 + # In this case, we just try with the next one + if not result["OK"] and "404" in result["Message"]: + continue + + # Else, it means there was an issue with the CE, + # we stop the execution + if not result["OK"]: + return result + + proxy = result["Value"] + + if proxy.getDIRACGroup() != proxyGroup: + continue + + # If we are here, we have found the right delegationID to use + currentDelegationID = delegationID + break + + if not currentDelegationID: + # No existing delegation, we need to prepare one + result = self._prepareDelegation() + if not result["OK"]: + self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}") + return S_ERROR("Could not get a new delegation") + currentDelegationID = result["Value"] + + delegation = f"\n(delegationid={currentDelegationID})" + # If there is a preamble, then we bundle it in an executable file if self.preamble: inputs.append(executableFile) @@ -723,14 +719,12 @@ def _renewDelegation(self, delegationID): return S_OK() - ############################################################################# - def getJobStatus(self, jobIDList): """Get the status information for the given list of jobs. :param list jobIDList: list of job references, followed by the DIRAC stamp. """ - result = self._checkSession(mandatoryProxy=True) + result = self._checkSession() if not result["OK"]: self.log.error("Cannot get status of the jobs", result["Message"]) return result @@ -776,15 +770,16 @@ def getJobStatus(self, jobIDList): self.log.debug(f"Killing held job {jobReference}") # Renew delegations to renew the proxies of the jobs - result = self._getDelegationIDs() - if not result["OK"]: - return result - delegationIDs = result["Value"] - for delegationID in delegationIDs: - result = self._renewDelegation(delegationID) + if not self.token: + result = self._getDelegationIDs() if not result["OK"]: - # Only log here as we still want to return statuses - self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}") + return result + delegationIDs = result["Value"] + for delegationID in delegationIDs: + result = self._renewDelegation(delegationID) + if not result["OK"]: + # Only log here as we still want to return statuses + self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}") # Kill held jobs if jobsToCancel: