From 7b241f11b6d9df9936d5f22984c69c00c4292d3d Mon Sep 17 00:00:00 2001 From: Paul La Plante Date: Wed, 31 Jul 2024 14:47:41 -0700 Subject: [PATCH 1/4] Remove globus authorizer from async transfer object --- hera_librarian/async_transfers/globus.py | 96 ++++++++++++------------ 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/hera_librarian/async_transfers/globus.py b/hera_librarian/async_transfers/globus.py index 3896ed3..398afa2 100644 --- a/hera_librarian/async_transfers/globus.py +++ b/hera_librarian/async_transfers/globus.py @@ -4,10 +4,8 @@ import os from pathlib import Path -from typing import Union import globus_sdk -from pydantic import ConfigDict from hera_librarian.transfer import TransferStatus @@ -21,10 +19,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager): for authentication. """ - # We need the following to save the `authorizer` attribute without having - # to build our own pydantic model for Globus-provided classes. - model_config = ConfigDict(arbitrary_types_allowed=True) - destination_endpoint: str # The Globus endpoint UUID for the destination, entered in the configuration. @@ -36,13 +30,6 @@ class GlobusAsyncTransferManager(CoreAsyncTransferManager): transfer_complete: bool = False task_id: str = "" - authorizer: Union[ - globus_sdk.RefreshTokenAuthorizer, - globus_sdk.AccessTokenAuthorizer, - None, - ] = None - # Default to `None`, but allow us to save Authorizer objects on the object - def authorize(self, settings: "ServerSettings"): """ Attempt to authorize using the Globus service. @@ -67,34 +54,47 @@ def authorize(self, settings: "ServerSettings"): to specific endpoints. We will do our best to handle this as it comes up to provide the user with nicer error messages, though we may not have caught all possible failure modes. + + Parameters + ---------- + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + Globus authorizer or None + The object returned will be an instance of + globus_sdk.RefreshTokenAuthorizer (if using the Native App), + globus_sdk.AccessTokenAuthorizer (if using the Confidential App), + or None (if we could not successfully authenticate). """ if settings.globus_enable is False: - return False - - if self.authorizer is None: - if settings.globus_client_native_app: - try: - client = globus_sdk.NativeAppAuthClient(settings.globus_client_id) - self.authorizer = globus_sdk.RefreshTokenAuthorizer( - settings.globus_client_secret, client - ) - except globus_sdk.AuthAPIError as e: - return False - else: - try: - client = globus_sdk.ConfidentialAppAuthClient( - settings.globus_client_id, settings.globus_client_secret - ) - tokens = client.oauth2_client_credentials_tokens() - transfer_tokens_info = tokens.by_resource_server[ - "transfer.api.globus.org" - ] - transfer_token = transfer_tokens_info["access_token"] - self.authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token) - except globus_sdk.AuthAPIError: - return False - - return True + return None + + if settings.globus_client_native_app: + try: + client = globus_sdk.NativeAppAuthClient(settings.globus_client_id) + authorizer = globus_sdk.RefreshTokenAuthorizer( + settings.globus_client_secret, client + ) + except globus_sdk.AuthAPIError as e: + return None + else: + try: + client = globus_sdk.ConfidentialAppAuthClient( + settings.globus_client_id, settings.globus_client_secret + ) + tokens = client.oauth2_client_credentials_tokens() + transfer_tokens_info = tokens.by_resource_server[ + "transfer.api.globus.org" + ] + transfer_token = transfer_tokens_info["access_token"] + authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token) + except globus_sdk.AuthAPIError as e: + return None + + return authorizer def valid(self, settings: "ServerSettings") -> bool: """ @@ -105,7 +105,8 @@ def valid(self, settings: "ServerSettings") -> bool: However, this is an important starting point and can fail for reasons of network connectivity, Globus as a service being down, etc. """ - return self.authorize(settings=settings) + authorizer = self.authorize(settings=settings) + return authorizer is not None def _get_transfer_data(self, label: str, settings: "ServerSettings"): """ @@ -151,14 +152,15 @@ def transfer( self.transfer_attempted = True # start by authorizing - if not self.authorize(settings=settings): + authorizer = self.authorize(settings=settings) + if authorizer is None: return False # create a label from the name of the book label = os.path.basename(local_path) # create a transfer client to handle the transfer - transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) # get a TransferData object transfer_data = self._get_transfer_data(label=label, settings=settings) @@ -192,14 +194,15 @@ def batch_transfer( # books using Globus. # start by authorizing - if not self.authorize(settings=settings): + authorizer = self.authorize(settings=settings) + if authorizer is None: return False # make a label from the first book label = "batch with " + os.path.basename(paths[0][0]) # create a transfer client to handle the transfer - transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) # get a TransferData object transfer_data = self._get_transfer_data(label=label, settings=settings) @@ -226,7 +229,8 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus: """ Query Globus to see if our transfer has finished yet. """ - if not self.authorize(settings=settings): + authorizer = self.authorize(settings=settings) + if authorizer is None: # We *should* be able to just assume that we have already # authenticated and should be able to query the status of our # transfer. However, if for whatever reason we're not able to talk @@ -242,7 +246,7 @@ def transfer_status(self, settings: "ServerSettings") -> TransferStatus: return TransferStatus.FAILED else: # start talking to Globus - transfer_client = globus_sdk.TransferClient(authorizer=self.authorizer) + transfer_client = globus_sdk.TransferClient(authorizer=authorizer) task_doc = transfer_client.get_task(self.task_id) if task_doc["status"] == "SUCCEEDED": From 662ec2238ccafbc89ae2e04177a88f5219d63cf2 Mon Sep 17 00:00:00 2001 From: Paul La Plante Date: Mon, 12 Aug 2024 11:00:18 -0700 Subject: [PATCH 2/4] Change globus_sdk calls to use explicit keyword arguments --- hera_librarian/async_transfers/globus.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/hera_librarian/async_transfers/globus.py b/hera_librarian/async_transfers/globus.py index 398afa2..88dd095 100644 --- a/hera_librarian/async_transfers/globus.py +++ b/hera_librarian/async_transfers/globus.py @@ -74,23 +74,28 @@ def authorize(self, settings: "ServerSettings"): if settings.globus_client_native_app: try: - client = globus_sdk.NativeAppAuthClient(settings.globus_client_id) + client = globus_sdk.NativeAppAuthClient( + client_id=settings.globus_client_id + ) authorizer = globus_sdk.RefreshTokenAuthorizer( - settings.globus_client_secret, client + refresh_token=settings.globus_client_secret, auth_client=client ) except globus_sdk.AuthAPIError as e: return None else: try: client = globus_sdk.ConfidentialAppAuthClient( - settings.globus_client_id, settings.globus_client_secret + client_id=settings.globus_client_id, + client_secret=settings.globus_client_secret, ) tokens = client.oauth2_client_credentials_tokens() transfer_tokens_info = tokens.by_resource_server[ "transfer.api.globus.org" ] transfer_token = transfer_tokens_info["access_token"] - authorizer = globus_sdk.AccessTokenAuthorizer(transfer_token) + authorizer = globus_sdk.AccessTokenAuthorizer( + access_token=transfer_token + ) except globus_sdk.AuthAPIError as e: return None From a0cffb71fd9c96e787abf58e0052c3723dad5b92 Mon Sep 17 00:00:00 2001 From: Paul La Plante Date: Mon, 12 Aug 2024 11:14:43 -0700 Subject: [PATCH 3/4] Improve docstrings --- hera_librarian/async_transfers/globus.py | 62 +++++++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/hera_librarian/async_transfers/globus.py b/hera_librarian/async_transfers/globus.py index 88dd095..c7a3740 100644 --- a/hera_librarian/async_transfers/globus.py +++ b/hera_librarian/async_transfers/globus.py @@ -109,14 +109,25 @@ def valid(self, settings: "ServerSettings") -> bool: does not verify that we can copy files between specific endpoints. However, this is an important starting point and can fail for reasons of network connectivity, Globus as a service being down, etc. + + Parameters + ---------- + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + bool + Whether we can authenticate with Globus (True) or not (False). """ authorizer = self.authorize(settings=settings) return authorizer is not None def _get_transfer_data(self, label: str, settings: "ServerSettings"): """ - This is a helper function to create a TransferData object, which is needed - both for single-book transfers and batch transfers. + This is a helper function to create a TransferData object, which is + needed both for single-book transfers and batch transfers. """ # create a TransferData object that contains options for the transfer transfer_data = globus_sdk.TransferData( @@ -153,6 +164,14 @@ def transfer( remote_path : Path The remote path for the transfer relative to the root Globus directory, which is generally not the same as /. + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + bool + Whether we could successfully initiate a transfer (True) or not (False). """ self.transfer_attempted = True @@ -191,6 +210,30 @@ def batch_transfer( paths: list[tuple[Path]], settings: "ServerSettings", ) -> bool: + """ + Attempt to transfer a series of books using Globus. + + This method will attempt to create a Globus transfer. If successful, we + will have set the task ID of the transfer on the object, which can be + used to query Globus as to its status. If unsuccessful, we will have + gotten nothing but sadness. + + Parameters + ---------- + paths : list of tuples of Paths + A series of length-2 tuples containing pairs of local and remote + Paths to include as part of the transfer. + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + bool + Whether we could successfully initiate a transfer (True) or not + (False). + + """ self.transfer_attempted = True # We have to do a lot of the same legwork as above for a single @@ -233,6 +276,21 @@ def batch_transfer( def transfer_status(self, settings: "ServerSettings") -> TransferStatus: """ Query Globus to see if our transfer has finished yet. + + Parameters + ---------- + settings : ServerSettings object + The settings for the Librarian server. These settings should include + the Globus login information. + + Returns + ------- + TransferStatus + The status of the relevant transfer. Should be one of: INITIATED (if + the transfer has not yet been started, or is in-flight), SUCCEEDED + (if the transfer was successful), or FAILED (if the transfer was + unsuccessful, we could not contact Globus, or if the transfer was + attempted but could not be completed). """ authorizer = self.authorize(settings=settings) if authorizer is None: From dc990e4c33a2b3d6cf00dff5e8104d99068c79fa Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 11 Nov 2024 13:53:26 -0500 Subject: [PATCH 4/4] Remove version pin on globus-sdk --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index da1111b..5f267a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ "xxhash >= 0.8.0", "cryptography", "fastapi >= 0.108.0", - "globus-sdk <= 3.40.0", + "globus-sdk", "httpx", "pydantic >= 2", "pydantic-settings >= 2",