Skip to content

Commit

Permalink
Merge pull request #220 from bento-platform/feat/drs/async-session-kw…
Browse files Browse the repository at this point in the history
…args

feat(drs): optional session_kwargs for aiohttp.ClientSession
  • Loading branch information
davidlougheed authored Aug 15, 2024
2 parents f65e398 + 19d8a42 commit e8f2e97
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions bento_lib/drs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def decode_drs_uri(drs_uri: str, internal_drs_base_url: Optional[str] = None) ->
parsed_drs_uri = urlparse(drs_uri)

if parsed_drs_uri.scheme != "drs":
print(f"[Bento Lib] Invalid scheme: '{parsed_drs_uri.scheme}'",
file=sys.stderr, flush=True)
print(f"[Bento Lib] Invalid scheme: '{parsed_drs_uri.scheme}'", file=sys.stderr, flush=True)
raise DrsInvalidScheme(f"Encountered invalid DRS scheme: {parsed_drs_uri.scheme}")

drs_base_path = internal_drs_base_url.rstrip("/") if internal_drs_base_url else f"https://{parsed_drs_uri.netloc}"
Expand Down Expand Up @@ -82,12 +81,17 @@ def fetch_drs_record_by_uri(drs_uri: str, internal_drs_base_url: Optional[str] =
return drs_res.json()


async def fetch_drs_record_by_uri_async(drs_uri: str, internal_drs_base_url: Optional[str] = None) -> Optional[dict]:
async def fetch_drs_record_by_uri_async(
drs_uri: str,
internal_drs_base_url: Optional[str] = None,
session_kwargs: dict | None = None,
) -> Optional[dict]:
"""
Given a URI in the format drs://<hostname>/<object-id>, decodes it into an
HTTP URL and asynchronously fetches the object metadata.
:param drs_uri: The URI of the object to fetch.
:param internal_drs_base_url: An optional override hard-coded DRS base URL to use, for container networking etc.
:param session_kwargs: Optional dictionary of parameters to pass to the aiohttp.ClientSession constructor.
:return: The fetched DRS object metadata.
"""

Expand All @@ -97,7 +101,7 @@ async def fetch_drs_record_by_uri_async(drs_uri: str, internal_drs_base_url: Opt
print(f"[Bento Lib] Attempting to fetch {decoded_object_uri}", flush=True)

params = {"internal_path": "true"} if internal_drs_base_url else {}
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**(session_kwargs or {})) as session:
async with session.get(decoded_object_uri, params=params) as drs_res:
if drs_res.status != 200:
print(f"[Bento Lib] Could not fetch: '{decoded_object_uri}'", file=sys.stderr, flush=True)
Expand Down

0 comments on commit e8f2e97

Please sign in to comment.