Skip to content

Commit

Permalink
feat: Enable downloading sandbox from S3
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Sep 29, 2023
1 parent 434027d commit 572cb90
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 1 deletion.
76 changes: 76 additions & 0 deletions src/DIRAC/FrameworkSystem/Utilities/diracx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#### FATORIZE WITH PROXY MANAGER

import requests

from cachetools import TTLCache, cached
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from DIRAC import gConfig, S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import Registry


from diracx.core.preferences import DiracxPreferences

from diracx.core.utils import write_credentials

from diracx.core.models import TokenResponse
from diracx.client import DiracClient

# How long tokens are kept
DEFAULT_TOKEN_CACHE_TTL = 5 * 60

# Add a cache not to query the token all the time
_token_cache = TTLCache(maxsize=100, ttl=DEFAULT_TOKEN_CACHE_TTL)


@cached(_token_cache, key=lambda x, y: repr(x))
def _get_token(credDict, diracxUrl, /) -> Path:
"""
Write token to a temporary file and return the path to that file
"""

apiKey = gConfig.getValue("/DiracX/LegacyExchangeApiKey")
if not apiKey:
raise ValueError("Missing mandatory /DiracX/LegacyExchangeApiKey configuration")

vo = Registry.getVOForGroup(credDict["group"])
dirac_properties = list(set(credDict.get("groupProperties", [])) | set(credDict.get("properties", [])))
group = credDict["group"]
scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties]

r = requests.get(
f"{diracxUrl}/auth/legacy-exchange",
params={
"preferred_username": credDict["username"],
"scope": " ".join(scopes),
},
headers={"Authorization": f"Bearer {apiKey}"},
timeout=10,
)

r.raise_for_status()

token_location = Path(NamedTemporaryFile().name)

write_credentials(TokenResponse(**r.json()), location=token_location)

return token_location


def TheImpersonator(credDict: dict[str, Any]) -> DiracClient:
"""
Client to be used by DIRAC server needing to impersonate
a user for diracx.
It queries a token, places it in a file, and returns the `DiracClient`
class
Use as decorator
"""
diracxUrl = gConfig.getValue("/DiracX/URL")

token_location = _get_token(credDict, diracxUrl)
pref = DiracxPreferences(url=diracxUrl, credentials_path=token_location)

return DiracClient(diracx_preferences=pref)
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Storage/StorageBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def constructURLFromLFN(self, lfn, withWSUrl=False):
voLFN = lfnSplitList[1]
# TODO comparison to Sandbox below is for backward compatibility, should
# be removed in the next release
if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "Sandbox":
if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "Sandbox" and voLFN != "S3":
return S_ERROR(f"LFN ({lfn}) path must start with VO name ({self.se.vo})")

urlDict = dict(self.protocolParameters)
Expand Down
21 changes: 21 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""
import hashlib
import os
import requests
import tempfile
import threading
import time
Expand All @@ -26,6 +27,7 @@
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.Resources.Storage.StorageElement import StorageElement
from DIRAC.Core.Utilities.File import getGlobbedTotalSize
from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator


class SandboxStoreHandlerMixin:
Expand Down Expand Up @@ -431,6 +433,25 @@ def _sendToClient(self, fileID, token, fileHelper=None, raw=False):
credDict = self.getRemoteCredentials()
serviceURL = self.serviceInfoDict["URL"]
filePath = fileID.replace(serviceURL, "")

# If the PFN starts with S3, we know it has been uploaded to the
# S3 sandbox store, so download it from there before sending it
if filePath.startswith("/S3"):
with TheImpersonator(credDict) as client:
res = client.jobs.get_sandbox_file(filePath)
r = requests.get(res.url)
r.raise_for_status()
sbData = r.content
if fileHelper:
from io import BytesIO

result = fileHelper.FDToNetwork(BytesIO(sbData))
fileHelper.oFile.close()
return result
if raw:
return sbData
return S_OK(sbData)

result = self.sandboxDB.getSandboxId(self.__localSEName, filePath, credDict["username"], credDict["group"])
if not result["OK"]:
return result
Expand Down

0 comments on commit 572cb90

Please sign in to comment.