From 1ad81931fbaea4b7f1d98766eadc450c4792233a Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Thu, 7 Mar 2024 14:56:06 +0100 Subject: [PATCH 1/3] feat (TokenManager): add a way to get token without cache --- .../Client/TokenManagerClient.py | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py b/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py index 74324bcbddc..8e4a1c9670f 100644 --- a/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py +++ b/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py @@ -1,6 +1,7 @@ """ The TokenManagerClient is a class representing the client of the DIRAC :py:mod:`TokenManager ` service. """ + import time from DIRAC import S_OK, S_ERROR @@ -31,7 +32,6 @@ def __init__(self, **kwargs): self.__tokensCache = DictCache() self.idps = IdProviderFactory() - @gTokensSync def getToken( self, username: str = None, @@ -40,6 +40,70 @@ def getToken( audience: str = None, identityProvider: str = None, requiredTimeLeft: int = 0, + useCache: bool = True, + ): + """Get an access token for a user/group + + :param username: user name + :param userGroup: group name + :param scope: scope + :param audience: audience + :param identityProvider: identity Provider + :param requiredTimeLeft: required time + :param cacheToken: if True (default) save the token in cache. + Otherwise it is not cached but it avoids the lock + + :return: S_OK(dict)/S_ERROR() + """ + meth = self.getTokenWithCache if useCache else self.getTokenWithoutCache + + return meth( + username=username, + userGroup=userGroup, + scope=scope, + audience=audience, + identityProvider=identityProvider, + requiredTimeLeft=requiredTimeLeft, + ) + + def getTokenWithoutCache( + self, + username: str = None, + userGroup: str = None, + scope: list[str] = None, + audience: str = None, + identityProvider: str = None, + requiredTimeLeft: int = 0, + ): + """Get an access token for a user/group without caching it + + :param username: user name + :param userGroup: group name + :param scope: scope + :param audience: audience + :param identityProvider: identity Provider + :param requiredTimeLeft: required time + + :return: S_OK(dict)/S_ERROR() + """ + # Get an IdProvider Client instance + result = getIdProviderClient(userGroup, identityProvider) + if not result["OK"]: + return result + idpObj = result["Value"] + + # No token in cache: get a token from the server + return self.executeRPC(username, userGroup, scope, audience, idpObj.name, requiredTimeLeft, call="getToken") + + @gTokensSync + def getTokenWithCache( + self, + username: str = None, + userGroup: str = None, + scope: list[str] = None, + audience: str = None, + identityProvider: str = None, + requiredTimeLeft: int = 0, ): """Get an access token for a user/group keeping the local cache From 8df421a0dec5c690a16dcf3e61c4b4c8b2085faa Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Thu, 7 Mar 2024 15:05:29 +0100 Subject: [PATCH 2/3] feat (SE): add getWLCGTokenPath methods --- dirac.cfg | 1 + .../AdministratorGuide/Resources/storage.rst | 1 + src/DIRAC/Resources/Storage/StorageBase.py | 15 +++++++++++++ src/DIRAC/Resources/Storage/StorageElement.py | 22 ++++++++++++++++++- 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/dirac.cfg b/dirac.cfg index 209667aba79..bb68a53af1c 100644 --- a/dirac.cfg +++ b/dirac.cfg @@ -801,6 +801,7 @@ Resources SpaceReservation = LHCb-EOS # Space reservation name if any. Concept like SpaceToken ArchiveTimeout = 84600 # Timeout for the FTS archiving BringOnlineTimeout = 84600 # Timeout for the bring online operation used by FTS + WLCGTokenBasePath = /eos/lhcb # EXPERIMENTAL Path from which the token should be relative to # Protocol section, see http://dirac.readthedocs.io/en/latest/AdministratorGuide/Resources/Storages/index.html#available-protocol-plugins GFAL2_SRM2 { diff --git a/docs/source/AdministratorGuide/Resources/storage.rst b/docs/source/AdministratorGuide/Resources/storage.rst index 263a35a739b..eadf9e90738 100644 --- a/docs/source/AdministratorGuide/Resources/storage.rst +++ b/docs/source/AdministratorGuide/Resources/storage.rst @@ -59,6 +59,7 @@ Configuration options are: * ``SpaceReservation``: just a name of a zone of the physical storage which can have some space reserved. Extends the SRM ``SpaceToken`` concept. * ``ArchiveTimeout``: for tape SE only. If set to a value in seconds, enables the `FTS Archive Monitoring feature `_ * ``BringOnlineTimeout``: for tape SE only. If set to a value in seconds, specify the BringOnline parameter for FTS transfers. Otherwise, the default is whatever is in the ``FTS3Job`` class. +* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to VO specific paths ----------------- diff --git a/src/DIRAC/Resources/Storage/StorageBase.py b/src/DIRAC/Resources/Storage/StorageBase.py index 0aef091039b..270b3c79903 100755 --- a/src/DIRAC/Resources/Storage/StorageBase.py +++ b/src/DIRAC/Resources/Storage/StorageBase.py @@ -39,6 +39,8 @@ import shutil import tempfile +from pathlib import Path + from DIRAC import S_OK, S_ERROR from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse from DIRAC.Core.Utilities.ReturnValues import returnSingleResult @@ -462,3 +464,16 @@ def getOccupancy(self, **kwargs): finally: # Clean the temporary dir shutil.rmtree(tmpDirName) + + def getWLCGTokenPath(self, lfn: str, wlcgTokenBasePath: str) -> str: + """ + Returns the path expected to be in a WLCG token + It basically consists of ``basepath - tokenBasePath + LFN`` + The tokenBasePath is a configuration on the storage side. + + """ + + allDict = dict.fromkeys(["Protocol", "Host", "Port", "Path", "FileName", "Options"], "") + allDict.update({"Path": self.protocolParameters["Path"], "FileName": lfn.lstrip("/")}) + fullPath = pfnunparse(allDict)["Value"] + return Path(fullPath).relative_to(Path(wlcgTokenBasePath)) diff --git a/src/DIRAC/Resources/Storage/StorageElement.py b/src/DIRAC/Resources/Storage/StorageElement.py index b11e6382f2c..e7e8e1424dc 100755 --- a/src/DIRAC/Resources/Storage/StorageElement.py +++ b/src/DIRAC/Resources/Storage/StorageElement.py @@ -21,7 +21,7 @@ from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.File import convertSizeUnits from DIRAC.Core.Utilities.List import getIndexInList -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult, convertToReturnValue from DIRAC.Core.Utilities.TimeUtilities import toEpochMilliSeconds from DIRAC.Resources.Storage.StorageFactory import StorageFactory from DIRAC.Core.Utilities.Pfn import pfnparse @@ -294,6 +294,7 @@ def __init__(self, name, protocolSections=None, vo=None, hideExceptions=False): "isDirectory", "isFile", "getOccupancy", + "getWLCGTokenPath", ] self.okMethods = [ @@ -964,6 +965,25 @@ def getLFNFromURL(self, urls): # This is the generic wrapper for file operations # + @convertToReturnValue + def getWLCGTokenPath(self, lfn: str): + """ + EXPERIMENTAL + return the path to put in the token, relative to the vo path as configured + in the storage. + + """ + wlcgTokenBasePath = self.options.get("WLCGTokenBasePath") + if not wlcgTokenBasePath: + raise ValueError("WLCGTokenBasePath not configured") + + for storage in self.storages.values(): + try: + return storage.getWLCGTokenPath(lfn, wlcgTokenBasePath) + except Exception: + continue + raise RuntimeError("Could not get WLCGTokenPath") + def getURL(self, lfn, protocol=False, replicaDict=None): """execute 'getTransportURL' operation. From 985b3c8fe82e6d53f7641d79a8812ab9ee45f5d3 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Wed, 11 Sep 2024 16:34:00 +0200 Subject: [PATCH 3/3] feat (FTS3): add experimental token support --- .../Systems/DataManagement/fts3.rst | 24 ++++ .../DataManagementSystem/Agent/FTS3Agent.py | 21 ++- .../DataManagementSystem/Client/FTS3File.py | 1 + .../DataManagementSystem/Client/FTS3Job.py | 129 +++++++++++++++--- .../DataManagementSystem/ConfigTemplate.cfg | 3 + 5 files changed, 155 insertions(+), 23 deletions(-) diff --git a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst index 0566058e3ea..9403936398e 100644 --- a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst +++ b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst @@ -200,3 +200,27 @@ More details on how the intermediate SE selection is done and how the matrix is Work in FTS has a `task `_ to try and bring that feature in. A future solution may come from DIRAC. In the meantime, the best solution is to ask the site to either cleanup themselves (some storages like EOS have that built in) or to give you a dump of the namespace, and then do the cleaning yourself. + + +Token support +---------------- + +.. versionadded:: v8.0.51 + +.. warning:: + Very experimental feature + + +The current state is the one in which LHCb ran the DC24 challenge. It only worked for dCache site, as there is still not a uniform way for storages to understand permissions... +A transfer will happen with token if: + + * ``UseTokens`` is true in the FTSAgent configuration + * ``WLCGTokenBasePath`` is set for both the source and the destination + +The tokens use specific file path, and not generic wildcard permissions. + +.. warning:: + Token support is as experimental as can be in any layer of the stack (DIRAC, storage, FTS... even the model is experimental) + +.. warning:: + The FTS3Agent got occasionaly stuck when tokens were used diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index 4151a3fffb8..9e05e14fe4e 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -12,6 +12,7 @@ :caption: FTS3Agent options """ + import datetime import errno import os @@ -38,6 +39,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.DataManagementSystem.private import FTS3Utilities from DIRAC.DataManagementSystem.DB.FTS3DB import FTS3DB from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job @@ -106,6 +108,8 @@ def __readConf(self): # lifetime of the proxy we download to delegate to FTS self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME) + self.useTokens = self.am_getOption("UseTokens", False) + return S_OK() def initialize(self): @@ -495,7 +499,22 @@ def _treatOperation(self, operation): log.error("Could not select TPC list", repr(e)) continue - res = ftsJob.submit(context=context, protocols=tpcProtocols) + # If we use token, get an access token with the + # fts scope in it + # The FTS3Job will decide to use it or not + fts_access_token = None + if self.useTokens: + res = gTokenManager.getToken( + userGroup=ftsJob.userGroup, + requiredTimeLeft=3600, + scope=["fts"], + ) + if not res["OK"]: + return res + + fts_access_token = res["Value"]["access_token"] + + res = ftsJob.submit(context=context, protocols=tpcProtocols, fts_access_token=fts_access_token) if not res["OK"]: log.error("Could not submit FTS3Job", f"FTS3Operation {operation.operationID} : {res}") diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3File.py b/src/DIRAC/DataManagementSystem/Client/FTS3File.py index 0b687300af3..bcd4e34a55f 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3File.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3File.py @@ -21,6 +21,7 @@ class FTS3File(JSerializable): "Started", # From FTS: File transfer has started "Not_used", # From FTS: Transfer not being considered yet, waiting for another one (multihop) "Archiving", # From FTS: file not yet migrated to tape + "Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token ] # These are the states that we consider final. diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 708edfd9d30..62279376be9 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -25,6 +25,7 @@ from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.FrameworkSystem.Client.Logger import gLogger +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR from DIRAC.Core.Utilities.DErrno import cmpError @@ -296,7 +297,18 @@ def __isTapeSE(seName, vo): return isTape - def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None): + @staticmethod + def __seTokenSupport(seObj): + """Check whether a given SE supports token + + :param seObj: StorageElement object + + :returns: True/False + In case of error, returns False + """ + return seObj.options.get("TokenSupport", "").lower() in ("true", "yes") + + def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None, tokensEnabled=False): """Build a job for transfer Some attributes of the job are expected to be set @@ -324,6 +336,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log = gLogger.getSubLogger(f"constructTransferJob/{self.operationID}/{self.sourceSE}_{self.targetSE}") isMultiHop = False + useTokens = False # Check if it is a multiHop transfer if self.multiHopSE: @@ -424,6 +437,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log.debug(f"Not preparing transfer for file {ftsFile.lfn}") continue + srcToken = None + dstToken = None + sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn] stageURL = allStageURLs.get(ftsFile.lfn) @@ -480,6 +496,44 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if self.activity: trans_metadata["activity"] = self.activity + # Add tokens if both storages support it and if the requested + if tokensEnabled and self.__seTokenSupport(srcSE) and self.__seTokenSupport(dstSE): + # We get a read token for the source + # offline_access is to allow FTS to refresh it + res = srcSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + srcTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup=self.userGroup, + requiredTimeLeft=3600, + scope=[f"storage.read:/{srcTokenPath}", "offline_access"], + useCache=False, + ) + if not res["OK"]: + return res + srcToken = res["Value"]["access_token"] + + # We get a token with modify and read for the destination + # We need the read to be able to stat + # CAUTION: only works with dcache for now, other storages + # interpret permissions differently + # offline_access is to allow FTS to refresh it + res = dstSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + dstTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup=self.userGroup, + requiredTimeLeft=3600, + scope=[f"storage.modify:/{dstTokenPath}", f"storage.read:/{dstTokenPath}", "offline_access"], + useCache=False, + ) + if not res["OK"]: + return res + dstToken = res["Value"]["access_token"] + useTokens = True + # because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433) # the checksum needs to be lowercase. It does not impact the other # protocol, so it's fine to put it here. @@ -492,6 +546,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N filesize=ftsFile.size, metadata=trans_metadata, activity=self.activity, + source_token=srcToken, + destination_token=dstToken, ) transfers.append(trans) @@ -509,6 +565,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N "rmsReqID": self.rmsReqID, "sourceSE": self.sourceSE, "targetSE": self.targetSE, + "useTokens": useTokens, # Store the information here to propagate it to submission } if self.activity: @@ -671,7 +728,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken): return S_OK((job, fileIDsInTheJob)) - def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None): + def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None, fts_access_token=None): """submit the job to the FTS server Some attributes are expected to be defined for the submission to work: @@ -695,17 +752,13 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc :param ucert: path to the user certificate/proxy. Might be inferred by the fts cli (see its doc) :param protocols: list of protocols from which we should choose the protocol to use + :param fts_access_token: token to be used to talk to FTS and to be passed when creating a context :returns: S_OK([FTSFiles ids of files submitted]) """ log = gLogger.getLocalSubLogger(f"submit/{self.operationID}/{self.sourceSE}_{self.targetSE}") - if not context: - if not ftsServer: - ftsServer = self.ftsServer - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) - # Construct the target SURL res = self.__fetchSpaceToken(self.targetSE, self.vo) if not res["OK"]: @@ -715,7 +768,10 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc allLFNs = [ftsFile.lfn for ftsFile in self.filesToSubmit] if self.type == "Transfer": - res = self._constructTransferJob(pinTime, allLFNs, target_spacetoken, protocols=protocols) + res = self._constructTransferJob( + pinTime, allLFNs, target_spacetoken, protocols=protocols, tokensEnabled=bool(fts_access_token) + ) + elif self.type == "Staging": res = self._constructStagingJob(pinTime, allLFNs, target_spacetoken) # elif self.type == 'Removal': @@ -726,6 +782,21 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc job, fileIDsInTheJob = res["Value"] + # If we need a token, don't use the context given in parameter + # because the one given in parameter is only with X509 creds + if job["params"].get("job_metadata", {}).get("useTokens"): + if not fts_access_token: + return S_ERROR("Job needs token support but no FTS token was supplied") + context = None + + if not context: + if not ftsServer: + ftsServer = self.ftsServer + res = self.generateContext(ftsServer, ucert, fts_access_token) + if not res["OK"]: + return res + context = res["Value"] + try: self.ftsGUID = fts3.submit(context, job) log.info(f"Got GUID {self.ftsGUID}") @@ -761,31 +832,45 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc return S_OK(fileIDsInTheJob) @staticmethod - def generateContext(ftsServer, ucert, lifetime=25200): + def generateContext(ftsServer, ucert, fts_access_token=None, lifetime=25200): """This method generates an fts3 context + Only a certificate or an fts token can be given + :param ftsServer: address of the fts3 server :param ucert: the path to the certificate to be used + :param fts_access_token: token to access FTS :param lifetime: duration (in sec) of the delegation to the FTS3 server (default is 7h, like FTS3 default) :returns: an fts3 context """ + if fts_access_token and ucert: + return S_ERROR("fts_access_token and ucert cannot be both set") + try: - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) + context = fts3.Context( + endpoint=ftsServer, + ucert=ucert, + request_class=ftsSSLRequest, + verify=False, + fts_access_token=fts_access_token, + ) - # Explicitely delegate to be sure we have the lifetime we want - # Note: the delegation will re-happen only when the FTS server - # decides that there is not enough timeleft. - # At the moment, this is 1 hour, which effectively means that if you do - # not submit a job for more than 1h, you have no valid proxy in FTS servers - # anymore, and all the jobs failed. So we force it when - # one third of the lifetime will be left. - # Also, the proxy given as parameter might have less than "lifetime" left - # since it is cached, but it does not matter, because in the FTS3Agent - # we make sure that we renew it often enough - td_lifetime = datetime.timedelta(seconds=lifetime) - fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) + # The delegation only makes sense for X509 auth + if ucert: + # Explicitely delegate to be sure we have the lifetime we want + # Note: the delegation will re-happen only when the FTS server + # decides that there is not enough timeleft. + # At the moment, this is 1 hour, which effectively means that if you do + # not submit a job for more than 1h, you have no valid proxy in FTS servers + # anymore, and all the jobs failed. So we force it when + # one third of the lifetime will be left. + # Also, the proxy given as parameter might have less than "lifetime" left + # since it is cached, but it does not matter, because in the FTS3Agent + # we make sure that we renew it often enough + td_lifetime = datetime.timedelta(seconds=lifetime) + fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) return S_OK(context) except FTS3ClientException as e: diff --git a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg index 406f9d90bc7..9ac459bccd5 100644 --- a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg @@ -157,6 +157,9 @@ Agents KickLimitPerCycle = 100 # Lifetime in sec of the Proxy we download to delegate to FTS3 (default 36h) ProxyLifetime = 129600 + # Whether we use tokens to submit jobs to FTS3 + # VERY EXPERIMENTAL + UseTokens = False } ##END FTS3Agent }