Skip to content

Commit

Permalink
Merge pull request DIRACGrid#7440 from chaen/v8.0_FEAT_fts3TokenSubmit
Browse files Browse the repository at this point in the history
[8.0] Support token transfers in FTS
  • Loading branch information
fstagni authored Oct 7, 2024
2 parents 447be15 + ba770f8 commit 4057543
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 26 deletions.
1 change: 1 addition & 0 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ Resources
SpaceReservation = LHCb-EOS # Space reservation name if any. Concept like SpaceToken
ArchiveTimeout = 84600 # Timeout for the FTS archiving
BringOnlineTimeout = 84600 # Timeout for the bring online operation used by FTS
WLCGTokenBasePath = /eos/lhcb # EXPERIMENTAL Path from which the token should be relative to
# Protocol section, see http://dirac.readthedocs.io/en/latest/AdministratorGuide/Resources/Storages/index.html#available-protocol-plugins
GFAL2_SRM2
{
Expand Down
1 change: 1 addition & 0 deletions docs/source/AdministratorGuide/Resources/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Configuration options are:
* ``SpaceReservation``: just a name of a zone of the physical storage which can have some space reserved. Extends the SRM ``SpaceToken`` concept.
* ``ArchiveTimeout``: for tape SE only. If set to a value in seconds, enables the `FTS Archive Monitoring feature <https://fts3-docs.web.cern.ch/fts3-docs/docs/archive_monitoring.html>`_
* ``BringOnlineTimeout``: for tape SE only. If set to a value in seconds, specify the BringOnline parameter for FTS transfers. Otherwise, the default is whatever is in the ``FTS3Job`` class.
* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to

VO specific paths
-----------------
Expand Down
24 changes: 24 additions & 0 deletions docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,27 @@ More details on how the intermediate SE selection is done and how the matrix is
Work in FTS has a `task <https://its.cern.ch/jira/projects/FTS/issues/FTS-1755>`_ to try and bring that feature in.
A future solution may come from DIRAC.
In the meantime, the best solution is to ask the site to either cleanup themselves (some storages like EOS have that built in) or to give you a dump of the namespace, and then do the cleaning yourself.


Token support
----------------

.. versionadded:: v8.0.51

.. warning::
Very experimental feature


The current state is the one in which LHCb ran the DC24 challenge. It only worked for dCache site, as there is still not a uniform way for storages to understand permissions...
A transfer will happen with token if:

* ``UseTokens`` is true in the FTSAgent configuration
* ``WLCGTokenBasePath`` is set for both the source and the destination

The tokens use specific file path, and not generic wildcard permissions.

.. warning::
Token support is as experimental as can be in any layer of the stack (DIRAC, storage, FTS... even the model is experimental)

.. warning::
The FTS3Agent got occasionaly stuck when tokens were used
21 changes: 19 additions & 2 deletions src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
:caption: FTS3Agent options
"""

import datetime
import errno
import os
Expand All @@ -38,6 +39,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
from DIRAC.DataManagementSystem.private import FTS3Utilities
from DIRAC.DataManagementSystem.DB.FTS3DB import FTS3DB
from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
Expand Down Expand Up @@ -105,8 +107,8 @@ def __readConf(self):
self.maxDelete = self.am_getOption("DeleteLimitPerCycle", 100)
# lifetime of the proxy we download to delegate to FTS
self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME)

self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE)
self.useTokens = self.am_getOption("UseTokens", False)

return S_OK()

Expand Down Expand Up @@ -497,7 +499,22 @@ def _treatOperation(self, operation):
log.error("Could not select TPC list", repr(e))
continue

res = ftsJob.submit(context=context, protocols=tpcProtocols)
# If we use token, get an access token with the
# fts scope in it
# The FTS3Job will decide to use it or not
fts_access_token = None
if self.useTokens:
res = gTokenManager.getToken(
userGroup=ftsJob.userGroup,
requiredTimeLeft=3600,
scope=["fts"],
)
if not res["OK"]:
return res

fts_access_token = res["Value"]["access_token"]

res = ftsJob.submit(context=context, protocols=tpcProtocols, fts_access_token=fts_access_token)

if not res["OK"]:
log.error("Could not submit FTS3Job", f"FTS3Operation {operation.operationID} : {res}")
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/DataManagementSystem/Client/FTS3File.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class FTS3File(JSerializable):
"Started", # From FTS: File transfer has started
"Not_used", # From FTS: Transfer not being considered yet, waiting for another one (multihop)
"Archiving", # From FTS: file not yet migrated to tape
"Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token
]

# These are the states that we consider final.
Expand Down
129 changes: 107 additions & 22 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from DIRAC.Resources.Storage.StorageElement import StorageElement

from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager

from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRAC.Core.Utilities.DErrno import cmpError
Expand Down Expand Up @@ -301,7 +302,18 @@ def __isTapeSE(seName, vo):

return isTape

def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None):
@staticmethod
def __seTokenSupport(seObj):
"""Check whether a given SE supports token
:param seObj: StorageElement object
:returns: True/False
In case of error, returns False
"""
return seObj.options.get("TokenSupport", "").lower() in ("true", "yes")

def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None, tokensEnabled=False):
"""Build a job for transfer
Some attributes of the job are expected to be set
Expand Down Expand Up @@ -329,6 +341,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
log = gLogger.getSubLogger(f"constructTransferJob/{self.operationID}/{self.sourceSE}_{self.targetSE}")

isMultiHop = False
useTokens = False

# Check if it is a multiHop transfer
if self.multiHopSE:
Expand Down Expand Up @@ -429,6 +442,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
log.debug(f"Not preparing transfer for file {ftsFile.lfn}")
continue

srcToken = None
dstToken = None

sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn]
stageURL = allStageURLs.get(ftsFile.lfn)

Expand Down Expand Up @@ -485,6 +501,44 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
if self.activity:
trans_metadata["activity"] = self.activity

# Add tokens if both storages support it and if the requested
if tokensEnabled and self.__seTokenSupport(srcSE) and self.__seTokenSupport(dstSE):
# We get a read token for the source
# offline_access is to allow FTS to refresh it
res = srcSE.getWLCGTokenPath(ftsFile.lfn)
if not res["OK"]:
return res
srcTokenPath = res["Value"]
res = gTokenManager.getToken(
userGroup=self.userGroup,
requiredTimeLeft=3600,
scope=[f"storage.read:/{srcTokenPath}", "offline_access"],
useCache=False,
)
if not res["OK"]:
return res
srcToken = res["Value"]["access_token"]

# We get a token with modify and read for the destination
# We need the read to be able to stat
# CAUTION: only works with dcache for now, other storages
# interpret permissions differently
# offline_access is to allow FTS to refresh it
res = dstSE.getWLCGTokenPath(ftsFile.lfn)
if not res["OK"]:
return res
dstTokenPath = res["Value"]
res = gTokenManager.getToken(
userGroup=self.userGroup,
requiredTimeLeft=3600,
scope=[f"storage.modify:/{dstTokenPath}", f"storage.read:/{dstTokenPath}", "offline_access"],
useCache=False,
)
if not res["OK"]:
return res
dstToken = res["Value"]["access_token"]
useTokens = True

# because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433)
# the checksum needs to be lowercase. It does not impact the other
# protocol, so it's fine to put it here.
Expand All @@ -497,6 +551,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
filesize=ftsFile.size,
metadata=trans_metadata,
activity=self.activity,
source_token=srcToken,
destination_token=dstToken,
)

transfers.append(trans)
Expand All @@ -514,6 +570,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
"rmsReqID": self.rmsReqID,
"sourceSE": self.sourceSE,
"targetSE": self.targetSE,
"useTokens": useTokens, # Store the information here to propagate it to submission
}

if self.activity:
Expand Down Expand Up @@ -676,7 +733,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken):

return S_OK((job, fileIDsInTheJob))

def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None):
def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None, fts_access_token=None):
"""submit the job to the FTS server
Some attributes are expected to be defined for the submission to work:
Expand All @@ -700,17 +757,13 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc
:param ucert: path to the user certificate/proxy. Might be inferred by the fts cli (see its doc)
:param protocols: list of protocols from which we should choose the protocol to use
:param fts_access_token: token to be used to talk to FTS and to be passed when creating a context
:returns: S_OK([FTSFiles ids of files submitted])
"""

log = gLogger.getLocalSubLogger(f"submit/{self.operationID}/{self.sourceSE}_{self.targetSE}")

if not context:
if not ftsServer:
ftsServer = self.ftsServer
context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False)

# Construct the target SURL
res = self.__fetchSpaceToken(self.targetSE, self.vo)
if not res["OK"]:
Expand All @@ -720,7 +773,10 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc
allLFNs = [ftsFile.lfn for ftsFile in self.filesToSubmit]

if self.type == "Transfer":
res = self._constructTransferJob(pinTime, allLFNs, target_spacetoken, protocols=protocols)
res = self._constructTransferJob(
pinTime, allLFNs, target_spacetoken, protocols=protocols, tokensEnabled=bool(fts_access_token)
)

elif self.type == "Staging":
res = self._constructStagingJob(pinTime, allLFNs, target_spacetoken)
# elif self.type == 'Removal':
Expand All @@ -731,6 +787,21 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc

job, fileIDsInTheJob = res["Value"]

# If we need a token, don't use the context given in parameter
# because the one given in parameter is only with X509 creds
if job["params"].get("job_metadata", {}).get("useTokens"):
if not fts_access_token:
return S_ERROR("Job needs token support but no FTS token was supplied")
context = None

if not context:
if not ftsServer:
ftsServer = self.ftsServer
res = self.generateContext(ftsServer, ucert, fts_access_token)
if not res["OK"]:
return res
context = res["Value"]

try:
self.ftsGUID = fts3.submit(context, job)
log.info(f"Got GUID {self.ftsGUID}")
Expand Down Expand Up @@ -766,31 +837,45 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc
return S_OK(fileIDsInTheJob)

@staticmethod
def generateContext(ftsServer, ucert, lifetime=25200):
def generateContext(ftsServer, ucert, fts_access_token=None, lifetime=25200):
"""This method generates an fts3 context
Only a certificate or an fts token can be given
:param ftsServer: address of the fts3 server
:param ucert: the path to the certificate to be used
:param fts_access_token: token to access FTS
:param lifetime: duration (in sec) of the delegation to the FTS3 server
(default is 7h, like FTS3 default)
:returns: an fts3 context
"""
if fts_access_token and ucert:
return S_ERROR("fts_access_token and ucert cannot be both set")

try:
context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False)
context = fts3.Context(
endpoint=ftsServer,
ucert=ucert,
request_class=ftsSSLRequest,
verify=False,
fts_access_token=fts_access_token,
)

# Explicitely delegate to be sure we have the lifetime we want
# Note: the delegation will re-happen only when the FTS server
# decides that there is not enough timeleft.
# At the moment, this is 1 hour, which effectively means that if you do
# not submit a job for more than 1h, you have no valid proxy in FTS servers
# anymore, and all the jobs failed. So we force it when
# one third of the lifetime will be left.
# Also, the proxy given as parameter might have less than "lifetime" left
# since it is cached, but it does not matter, because in the FTS3Agent
# we make sure that we renew it often enough
td_lifetime = datetime.timedelta(seconds=lifetime)
fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3)
# The delegation only makes sense for X509 auth
if ucert:
# Explicitely delegate to be sure we have the lifetime we want
# Note: the delegation will re-happen only when the FTS server
# decides that there is not enough timeleft.
# At the moment, this is 1 hour, which effectively means that if you do
# not submit a job for more than 1h, you have no valid proxy in FTS servers
# anymore, and all the jobs failed. So we force it when
# one third of the lifetime will be left.
# Also, the proxy given as parameter might have less than "lifetime" left
# since it is cached, but it does not matter, because in the FTS3Agent
# we make sure that we renew it often enough
td_lifetime = datetime.timedelta(seconds=lifetime)
fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3)

return S_OK(context)
except FTS3ClientException as e:
Expand Down
3 changes: 3 additions & 0 deletions src/DIRAC/DataManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ Agents
KickLimitPerCycle = 100
# Lifetime in sec of the Proxy we download to delegate to FTS3 (default 36h)
ProxyLifetime = 129600
# Whether we use tokens to submit jobs to FTS3
# VERY EXPERIMENTAL
UseTokens = False
}
##END FTS3Agent
}
Loading

0 comments on commit 4057543

Please sign in to comment.