diff --git a/hera_librarian/async_transfers/globus.py b/hera_librarian/async_transfers/globus.py index 3896ed3..c7a3740 100644 --- a/hera_librarian/async_transfers/globus.py +++ b/hera_librarian/async_transfers/globus.py @@ -4,10 +4,8 @@ import os from pathlib import Path -from typing import Union import globus_sdk -from pydantic import ConfigDict from hera_librarian.transfer import TransferStatus @@ -21,10 +19,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager): for authentication. """ - # We need the following to save the `authorizer` attribute without having - # to build our own pydantic model for Globus-provided classes. - model_config = ConfigDict(arbitrary_types_allowed=True) - destination_endpoint: str # The Globus endpoint UUID for the destination, entered in the configuration. @@ -36,13 +30,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager): transfer_complete: bool = False task_id: str = "" - authorizer: Union[ - globus_sdk.RefreshTokenAuthorizer, - globus_sdk.AccessTokenAuthorizer, - None, - ] = None - # Default to `None`, but allow us to save Authorizer objects on the object - def authorize(self, settings: "ServerSettings"): """ Attempt to authorize using the Globus service. @@ -67,34 +54,52 @@ def authorize(self, settings: "ServerSettings"): to specific endpoints. We will do our best to handle this as it comes up to provide the user with nicer error messages, though we may not have caught all possible failure modes. + + Parameters + ---------- + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + Globus authorizer or None + The object returned will be an instance of + globus_sdk.RefreshTokenAuthorizer (if using the Native App), + globus_sdk.AccessTokenAuthorizer (if using the Confidential App), + or None (if we could not successfully authenticate). """ if settings.globus_enable is False: - return False - - if self.authorizer is None: - if settings.globus_client_native_app: - try: - client = globus_sdk.NativeAppAuthClient(settings.globus_client_id) - self.authorizer = globus_sdk.RefreshTokenAuthorizer( - settings.globus_client_secret, client - ) - except globus_sdk.AuthAPIError as e: - return False - else: - try: - client = globus_sdk.ConfidentialAppAuthClient( - settings.globus_client_id, settings.globus_client_secret - ) - tokens = client.oauth2_client_credentials_tokens() - transfer_tokens_info = tokens.by_resource_server[ - "transfer.api.globus.org" - ] - transfer_token = transfer_tokens_info["access_token"] - self.authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token) - except globus_sdk.AuthAPIError: - return False - - return True + return None + + if settings.globus_client_native_app: + try: + client = globus_sdk.NativeAppAuthClient( + client_id=settings.globus_client_id + ) + authorizer = globus_sdk.RefreshTokenAuthorizer( + refresh_token=settings.globus_client_secret, auth_client=client + ) + except globus_sdk.AuthAPIError as e: + return None + else: + try: + client = globus_sdk.ConfidentialAppAuthClient( + client_id=settings.globus_client_id, + client_secret=settings.globus_client_secret, + ) + tokens = client.oauth2_client_credentials_tokens() + transfer_tokens_info = tokens.by_resource_server[ + "transfer.api.globus.org" + ] + transfer_token = transfer_tokens_info["access_token"] + authorizer = globus_sdk.AccessTokenAuthorizer( + access_token=transfer_token + ) + except globus_sdk.AuthAPIError as e: + return None + + return authorizer def valid(self, settings: "ServerSettings") -> bool: """ @@ -104,13 +109,25 @@ def valid(self, settings: "ServerSettings") -> bool: does not verify that we can copy files between specific endpoints. However, this is an important starting point and can fail for reasons of network connectivity, Globus as a service being down, etc. + + Parameters + ---------- + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + bool + Whether we can authenticate with Globus (True) or not (False). """ - return self.authorize(settings=settings) + authorizer = self.authorize(settings=settings) + return authorizer is not None def _get_transfer_data(self, label: str, settings: "ServerSettings"): """ - This is a helper function to create a TransferData object, which is needed - both for single-book transfers and batch transfers. + This is a helper function to create a TransferData object, which is + needed both for single-book transfers and batch transfers. """ # create a TransferData object that contains options for the transfer transfer_data = globus_sdk.TransferData( @@ -147,18 +164,27 @@ def transfer( remote_path : Path The remote path for the transfer relative to the root Globus directory, which is generally not the same as /. + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + bool + Whether we could successfully initiate a transfer (True) or not (False). """ self.transfer_attempted = True # start by authorizing - if not self.authorize(settings=settings): + authorizer = self.authorize(settings=settings) + if authorizer is None: return False # create a label from the name of the book label = os.path.basename(local_path) # create a transfer client to handle the transfer - transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) # get a TransferData object transfer_data = self._get_transfer_data(label=label, settings=settings) @@ -184,6 +210,30 @@ def batch_transfer( paths: list[tuple[Path]], settings: "ServerSettings", ) -> bool: + """ + Attempt to transfer a series of books using Globus. + + This method will attempt to create a Globus transfer. If successful, we + will have set the task ID of the transfer on the object, which can be + used to query Globus as to its status. If unsuccessful, we will have + gotten nothing but sadness. + + Parameters + ---------- + paths : list of tuples of Paths + A series of length-2 tuples containing pairs of local and remote + Paths to include as part of the transfer. + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + bool + Whether we could successfully initiate a transfer (True) or not + (False). + + """ self.transfer_attempted = True # We have to do a lot of the same legwork as above for a single @@ -192,14 +242,15 @@ def batch_transfer( # books using Globus. # start by authorizing - if not self.authorize(settings=settings): + authorizer = self.authorize(settings=settings) + if authorizer is None: return False # make a label from the first book label = "batch with " + os.path.basename(paths[0][0]) # create a transfer client to handle the transfer - transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) # get a TransferData object transfer_data = self._get_transfer_data(label=label, settings=settings) @@ -225,8 +276,24 @@ def batch_transfer( def transfer_status(self, settings: "ServerSettings") -> TransferStatus: """ Query Globus to see if our transfer has finished yet. + + Parameters + ---------- + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + TransferStatus + The status of the relevant transfer. Should be one of: INITIATED (if + the transfer has not yet been started, or is in-flight), SUCCEEDED + (if the transfer was successful), or FAILED (if the transfer was + unsuccessful, we could not contact Globus, or if the transfer was + attempted but could not be completed). """ - if not self.authorize(settings=settings): + authorizer = self.authorize(settings=settings) + if authorizer is None: # We *should* be able to just assume that we have already # authenticated and should be able to query the status of our # transfer. However, if for whatever reason we're not able to talk @@ -242,7 +309,7 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus: return TransferStatus.FAILED else: # start talking to Globus - transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) task_doc = transfer_client.get_task(self.task_id) if task_doc["status"] == "SUCCEEDED": diff --git a/pyproject.toml b/pyproject.toml index da1111b..5f267a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ "xxhash >= 0.8.0", "cryptography", "fastapi >= 0.108.0", - "globus-sdk <= 3.40.0", + "globus-sdk", "httpx", "pydantic >= 2", "pydantic-settings >= 2",