Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove globus authorizer from async transfer object #85

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 50 additions & 46 deletions hera_librarian/async_transfers/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

import os
from pathlib import Path
from typing import Union

import globus_sdk
from pydantic import ConfigDict

from hera_librarian.transfer import TransferStatus

Expand All @@ -21,10 +19,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager):
for authentication.
"""

# We need the following to save the `authorizer` attribute without having
# to build our own pydantic model for Globus-provided classes.
model_config = ConfigDict(arbitrary_types_allowed=True)

destination_endpoint: str
# The Globus endpoint UUID for the destination, entered in the configuration.

Expand All @@ -36,13 +30,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager):
transfer_complete: bool = False
task_id: str = ""

authorizer: Union[
globus_sdk.RefreshTokenAuthorizer,
globus_sdk.AccessTokenAuthorizer,
None,
] = None
# Default to `None`, but allow us to save Authorizer objects on the object

def authorize(self, settings: "ServerSettings"):
"""
Attempt to authorize using the Globus service.
Expand All @@ -67,34 +54,47 @@ def authorize(self, settings: "ServerSettings"):
to specific endpoints. We will do our best to handle this as it comes up
to provide the user with nicer error messages, though we may not have
caught all possible failure modes.

Parameters
----------
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.

Returns
-------
Globus authorizer or None
The object returned will be an instance of
globus_sdk.RefreshTokenAuthorizer (if using the Native App),
globus_sdk.AccessTokenAuthorizer (if using the Confidential App),
or None (if we could not successfully authenticate).
"""
if settings.globus_enable is False:
return False

if self.authorizer is None:
if settings.globus_client_native_app:
try:
client = globus_sdk.NativeAppAuthClient(settings.globus_client_id)
self.authorizer = globus_sdk.RefreshTokenAuthorizer(
settings.globus_client_secret, client
)
except globus_sdk.AuthAPIError as e:
return False
else:
try:
client = globus_sdk.ConfidentialAppAuthClient(
settings.globus_client_id, settings.globus_client_secret
)
tokens = client.oauth2_client_credentials_tokens()
transfer_tokens_info = tokens.by_resource_server[
"transfer.api.globus.org"
]
transfer_token = transfer_tokens_info["access_token"]
self.authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token)
except globus_sdk.AuthAPIError:
return False

return True
return None

if settings.globus_client_native_app:
try:
client = globus_sdk.NativeAppAuthClient(settings.globus_client_id)
authorizer = globus_sdk.RefreshTokenAuthorizer(
settings.globus_client_secret, client
JBorrow marked this conversation as resolved.
Show resolved Hide resolved
)
except globus_sdk.AuthAPIError as e:
return None
else:
try:
client = globus_sdk.ConfidentialAppAuthClient(
settings.globus_client_id, settings.globus_client_secret
)
tokens = client.oauth2_client_credentials_tokens()
transfer_tokens_info = tokens.by_resource_server[
"transfer.api.globus.org"
]
transfer_token = transfer_tokens_info["access_token"]
authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token)
except globus_sdk.AuthAPIError as e:
return None

return authorizer

def valid(self, settings: "ServerSettings") -> bool:
"""
Expand All @@ -105,7 +105,8 @@ def valid(self, settings: "ServerSettings") -> bool:
However, this is an important starting point and can fail for reasons of
network connectivity, Globus as a service being down, etc.
"""
return self.authorize(settings=settings)
authorizer = self.authorize(settings=settings)
return authorizer is not None

def _get_transfer_data(self, label: str, settings: "ServerSettings"):
"""
Expand Down Expand Up @@ -151,14 +152,15 @@ def transfer(
self.transfer_attempted = True

# start by authorizing
if not self.authorize(settings=settings):
authorizer = self.authorize(settings=settings)
if authorizer is None:
return False

# create a label from the name of the book
label = os.path.basename(local_path)

# create a transfer client to handle the transfer
transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)

# get a TransferData object
transfer_data = self._get_transfer_data(label=label, settings=settings)
Expand Down Expand Up @@ -192,14 +194,15 @@ def batch_transfer(
# books using Globus.

# start by authorizing
if not self.authorize(settings=settings):
authorizer = self.authorize(settings=settings)
if authorizer is None:
return False

# make a label from the first book
label = "batch with " + os.path.basename(paths[0][0])

# create a transfer client to handle the transfer
transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)

# get a TransferData object
transfer_data = self._get_transfer_data(label=label, settings=settings)
Expand All @@ -226,7 +229,8 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
"""
Query Globus to see if our transfer has finished yet.
"""
if not self.authorize(settings=settings):
authorizer = self.authorize(settings=settings)
if authorizer is None:
# We *should* be able to just assume that we have already
# authenticated and should be able to query the status of our
# transfer. However, if for whatever reason we're not able to talk
Expand All @@ -242,7 +246,7 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
return TransferStatus.FAILED
else:
# start talking to Globus
transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)
task_doc = transfer_client.get_task(self.task_id)

if task_doc["status"] == "SUCCEEDED":
Expand Down
Loading