From dd469d1a71db3285990f91b8c47ab23deb754c91 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Tue, 6 Feb 2024 09:11:33 +0100 Subject: [PATCH] tmp fts token cont --- .../DataManagementSystem/Client/FTS3Job.py | 52 ++++++++++++------- src/DIRAC/Resources/Storage/StorageBase.py | 15 ++++++ src/DIRAC/Resources/Storage/StorageElement.py | 16 +++++- .../TransformationSystem/Client/Utilities.py | 1 + 4 files changed, 63 insertions(+), 21 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index dbfea2c5395..ab1f6cc56a0 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -13,6 +13,7 @@ from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.FrameworkSystem.Client.Logger import gLogger +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR from DIRAC.Core.Utilities.DErrno import cmpError @@ -422,6 +423,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log.debug(f"Not preparing transfer for file {ftsFile.lfn}") continue + srcToken, dstToken = None + sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn] stageURL = allStageURLs.get(ftsFile.lfn) @@ -478,26 +481,33 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if self.activity: trans_metadata["activity"] = self.activity - # will need https://gitlab.cern.ch/fts/fts-rest-flask/-/merge_requests/116 - # If both supports tokens - # get an access token - # In [26]: pfnparse(url) - # Out[26]: - # {'OK': True, - # 'Value': {'Protocol': 'root', - # 'Host': 'x509up_u1000@eoslhcb.cern.ch', - # 'Port': '', - # 'WSUrl': '', - # 'Path': '//eos/lhcb/grid/user/lhcb/tata', - # 'FileName': 'yoyo.txt'}} - # In [24]: fullPath - # Out[24]: PosixPath('//eos/lhcb/grid/user/lhcb/tata') - - # In [25]: fullPath.resolve() - # Out[25]: PosixPath('/eos/lhcb/grid/user/lhcb/tata') - - # In [19]: fullPath.relative_to('//eos/lhcb') - # Out[19]: PosixPath('grid/user/lhcb/tata') + # Add tokens if both storages support it + if self.__seTokenSupport(hopSrcSEName) and self.__seTokenSupport(hopDstSEName): + res = srcSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + srcTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup="lhcb_data", + requiredTimeLeft=3600, + scope=[f"storage.read:/{srcTokenPath}", "offline_access"], + ) + if not res["OK"]: + return res + srcToken = res["Value"] + + res = dstSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + dstTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup="lhcb_data", + requiredTimeLeft=3600, + scope=[f"storage.create:/{dstTokenPath}", "offline_access"], + ) + if not res["OK"]: + return res + dstToken = res["Value"] # because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433) # the checksum needs to be lowercase. It does not impact the other @@ -511,6 +521,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N filesize=ftsFile.size, metadata=trans_metadata, activity=self.activity, + source_token=srcToken, + destination_token=dstToken, ) transfers.append(trans) diff --git a/src/DIRAC/Resources/Storage/StorageBase.py b/src/DIRAC/Resources/Storage/StorageBase.py index 0aef091039b..d8da149ced3 100755 --- a/src/DIRAC/Resources/Storage/StorageBase.py +++ b/src/DIRAC/Resources/Storage/StorageBase.py @@ -39,6 +39,8 @@ import shutil import tempfile +from pathlib import Path + from DIRAC import S_OK, S_ERROR from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse from DIRAC.Core.Utilities.ReturnValues import returnSingleResult @@ -462,3 +464,16 @@ def getOccupancy(self, **kwargs): finally: # Clean the temporary dir shutil.rmtree(tmpDirName) + + def getWLCGTokenPath(self, lfn: str, wlcgTokenBasePath: str) -> str: + """ + Returns the path expected to be in a WLCG token + It basically consists of `basepath - tokenBasePath + LFN + The tokenBasePath is a configuration on the storage side. + + """ + + allDict = dict.fromkeys(["Protocol", "Host", "Port", "Path", "FileName", "Options"], "") + allDict.update({"Path": self.protocolParameters["Path"], "FileName": lfn.lstrip("/")}) + fullPath = pfnunparse(allDict)["Value"] + return Path(fullPath).relative_to(Path(wlcgTokenBasePath)) diff --git a/src/DIRAC/Resources/Storage/StorageElement.py b/src/DIRAC/Resources/Storage/StorageElement.py index 2c2a57f244d..8df2a253cc8 100755 --- a/src/DIRAC/Resources/Storage/StorageElement.py +++ b/src/DIRAC/Resources/Storage/StorageElement.py @@ -20,7 +20,7 @@ from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.File import convertSizeUnits from DIRAC.Core.Utilities.List import getIndexInList -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult, convertToReturnValue from DIRAC.Core.Utilities.TimeUtilities import toEpochMilliSeconds from DIRAC.Resources.Storage.StorageFactory import StorageFactory from DIRAC.Core.Utilities.Pfn import pfnparse @@ -291,6 +291,7 @@ def __init__(self, name, protocolSections=None, vo=None, hideExceptions=False): "isDirectory", "isFile", "getOccupancy", + "getWLCGTokenPath", ] self.okMethods = [ @@ -952,6 +953,19 @@ def getLFNFromURL(self, urls): # This is the generic wrapper for file operations # + @convertToReturnValue + def getWLCGTokenPath(self, lfn: str): + wlcgTokenBasePath = self.options.get("WLCGTokenBasePath") + if not wlcgTokenBasePath: + raise ValueError("WLCGTokenBasePath not configured") + + for storage in self.storages.values(): + try: + return storage.getWLCGTokenPath(lfn, wlcgTokenBasePath) + except: + continue + raise RuntimeError("Could not get WLCGTokenPath") + def getURL(self, lfn, protocol=False, replicaDict=None): """execute 'getTransportURL' operation. diff --git a/src/DIRAC/TransformationSystem/Client/Utilities.py b/src/DIRAC/TransformationSystem/Client/Utilities.py index dac37258af3..e03431742cf 100644 --- a/src/DIRAC/TransformationSystem/Client/Utilities.py +++ b/src/DIRAC/TransformationSystem/Client/Utilities.py @@ -338,6 +338,7 @@ def getPluginParam(self, name, default=None): # First look at a generic value... optionPath = f"TransformationPlugins/{name}" value = Operations().getValue(optionPath, None) + self.logVerbose(f"Default plugin param {optionPath}: '{value}'") # Then look at a plugin-specific value optionPath = f"TransformationPlugins/{self.plugin}/{name}"