Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove globus authorizer from async transfer object #85

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 115 additions & 48 deletions hera_librarian/async_transfers/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

import os
from pathlib import Path
from typing import Union

import globus_sdk
from pydantic import ConfigDict

from hera_librarian.transfer import TransferStatus

Expand All @@ -21,10 +19,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager):
for authentication.
"""

# We need the following to save the `authorizer` attribute without having
# to build our own pydantic model for Globus-provided classes.
model_config = ConfigDict(arbitrary_types_allowed=True)

destination_endpoint: str
# The Globus endpoint UUID for the destination, entered in the configuration.

Expand All @@ -36,13 +30,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager):
transfer_complete: bool = False
task_id: str = ""

authorizer: Union[
globus_sdk.RefreshTokenAuthorizer,
globus_sdk.AccessTokenAuthorizer,
None,
] = None
# Default to `None`, but allow us to save Authorizer objects on the object

def authorize(self, settings: "ServerSettings"):
"""
Attempt to authorize using the Globus service.
Expand All @@ -67,34 +54,52 @@ def authorize(self, settings: "ServerSettings"):
to specific endpoints. We will do our best to handle this as it comes up
to provide the user with nicer error messages, though we may not have
caught all possible failure modes.

Parameters
----------
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.

Returns
-------
Globus authorizer or None
The object returned will be an instance of
globus_sdk.RefreshTokenAuthorizer (if using the Native App),
globus_sdk.AccessTokenAuthorizer (if using the Confidential App),
or None (if we could not successfully authenticate).
"""
if settings.globus_enable is False:
return False

if self.authorizer is None:
if settings.globus_client_native_app:
try:
client = globus_sdk.NativeAppAuthClient(settings.globus_client_id)
self.authorizer = globus_sdk.RefreshTokenAuthorizer(
settings.globus_client_secret, client
)
except globus_sdk.AuthAPIError as e:
return False
else:
try:
client = globus_sdk.ConfidentialAppAuthClient(
settings.globus_client_id, settings.globus_client_secret
)
tokens = client.oauth2_client_credentials_tokens()
transfer_tokens_info = tokens.by_resource_server[
"transfer.api.globus.org"
]
transfer_token = transfer_tokens_info["access_token"]
self.authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token)
except globus_sdk.AuthAPIError:
return False

return True
return None

if settings.globus_client_native_app:
try:
client = globus_sdk.NativeAppAuthClient(
client_id=settings.globus_client_id
)
authorizer = globus_sdk.RefreshTokenAuthorizer(
refresh_token=settings.globus_client_secret, auth_client=client
)
except globus_sdk.AuthAPIError as e:
return None
else:
try:
client = globus_sdk.ConfidentialAppAuthClient(
client_id=settings.globus_client_id,
client_secret=settings.globus_client_secret,
)
tokens = client.oauth2_client_credentials_tokens()
transfer_tokens_info = tokens.by_resource_server[
"transfer.api.globus.org"
]
transfer_token = transfer_tokens_info["access_token"]
authorizer = globus_sdk.AccessTokenAuthorizer(
access_token=transfer_token
)
except globus_sdk.AuthAPIError as e:
return None

return authorizer

def valid(self, settings: "ServerSettings") -> bool:
"""
Expand All @@ -104,13 +109,25 @@ def valid(self, settings: "ServerSettings") -> bool:
does not verify that we can copy files between specific endpoints.
However, this is an important starting point and can fail for reasons of
network connectivity, Globus as a service being down, etc.

Parameters
----------
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.

Returns
-------
bool
Whether we can authenticate with Globus (True) or not (False).
"""
return self.authorize(settings=settings)
authorizer = self.authorize(settings=settings)
return authorizer is not None

def _get_transfer_data(self, label: str, settings: "ServerSettings"):
"""
This is a helper function to create a TransferData object, which is needed
both for single-book transfers and batch transfers.
This is a helper function to create a TransferData object, which is
needed both for single-book transfers and batch transfers.
"""
# create a TransferData object that contains options for the transfer
transfer_data = globus_sdk.TransferData(
Expand Down Expand Up @@ -147,18 +164,27 @@ def transfer(
remote_path : Path
The remote path for the transfer relative to the root Globus
directory, which is generally not the same as /.
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.

Returns
-------
bool
Whether we could successfully initiate a transfer (True) or not (False).
"""
self.transfer_attempted = True

# start by authorizing
if not self.authorize(settings=settings):
authorizer = self.authorize(settings=settings)
if authorizer is None:
return False

# create a label from the name of the book
label = os.path.basename(local_path)

# create a transfer client to handle the transfer
transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)

# get a TransferData object
transfer_data = self._get_transfer_data(label=label, settings=settings)
Expand All @@ -184,6 +210,30 @@ def batch_transfer(
paths: list[tuple[Path]],
settings: "ServerSettings",
) -> bool:
"""
Attempt to transfer a series of books using Globus.

This method will attempt to create a Globus transfer. If successful, we
will have set the task ID of the transfer on the object, which can be
used to query Globus as to its status. If unsuccessful, we will have
gotten nothing but sadness.

Parameters
----------
paths : list of tuples of Paths
A series of length-2 tuples containing pairs of local and remote
Paths to include as part of the transfer.
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.

Returns
-------
bool
Whether we could successfully initiate a transfer (True) or not
(False).

"""
self.transfer_attempted = True

# We have to do a lot of the same legwork as above for a single
Expand All @@ -192,14 +242,15 @@ def batch_transfer(
# books using Globus.

# start by authorizing
if not self.authorize(settings=settings):
authorizer = self.authorize(settings=settings)
if authorizer is None:
return False

# make a label from the first book
label = "batch with " + os.path.basename(paths[0][0])

# create a transfer client to handle the transfer
transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)

# get a TransferData object
transfer_data = self._get_transfer_data(label=label, settings=settings)
Expand All @@ -225,8 +276,24 @@ def batch_transfer(
def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
"""
Query Globus to see if our transfer has finished yet.

Parameters
----------
settings : ServerSettings object
The settings for the Librarian server. These settings should include
the Globus login information.

Returns
-------
TransferStatus
The status of the relevant transfer. Should be one of: INITIATED (if
the transfer has not yet been started, or is in-flight), SUCCEEDED
(if the transfer was successful), or FAILED (if the transfer was
unsuccessful, we could not contact Globus, or if the transfer was
attempted but could not be completed).
"""
if not self.authorize(settings=settings):
authorizer = self.authorize(settings=settings)
if authorizer is None:
# We *should* be able to just assume that we have already
# authenticated and should be able to query the status of our
# transfer. However, if for whatever reason we're not able to talk
Expand All @@ -242,7 +309,7 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus:
return TransferStatus.FAILED
else:
# start talking to Globus
transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer)
transfer_client = globus_sdk.TransferClient(authorizer=authorizer)
task_doc = transfer_client.get_task(self.task_id)

if task_doc["status"] == "SUCCEEDED":
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"xxhash >= 0.8.0",
"cryptography",
"fastapi >= 0.108.0",
"globus-sdk <= 3.40.0",
"globus-sdk",
"httpx",
"pydantic >= 2",
"pydantic-settings >= 2",
Expand Down
Loading