From 0adf4dd514129fc963589195405093b3730027e5 Mon Sep 17 00:00:00 2001 From: akharit <38331238+akharit@users.noreply.github.com> Date: Mon, 5 Nov 2018 11:30:58 -0800 Subject: [PATCH] Fix for external token not having retry options (#256) * Fix for external token not having retry options * Updated history --- HISTORY.rst | 4 +++ azure/datalake/store/__init__.py | 2 +- azure/datalake/store/lib.py | 52 ++++++++++++++++---------------- azure/datalake/store/retry.py | 6 ++-- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 0c96aef..26091f4 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,10 @@ Release History =============== +0.0.37 (2018-11-02) ++++++++++++++++++++ +* Reverted some changes introduced in 0.0.35 that didn't work with other tokens + 0.0.36 (2018-10-31) +++++++++++++++++++ * Fixed typo in refresh_token call diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index 925f81b..e9af45b 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,7 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.36" +__version__ = "0.0.37" from .core import AzureDLFileSystem diff --git a/azure/datalake/store/lib.py b/azure/datalake/store/lib.py index 0ea042c..ebc7f01 100644 --- a/azure/datalake/store/lib.py +++ b/azure/datalake/store/lib.py @@ -135,7 +135,6 @@ def get_token_internal(): code = context.acquire_user_code(resource, client_id) print(code['message']) out = context.acquire_token_with_device_code(resource, code, client_id) - elif username and password: out = context.acquire_token_with_username_password(resource, username, password, client_id) @@ -147,19 +146,21 @@ def get_token_internal(): else: raise ValueError("No authentication method found for credentials") return out - out = get_token_internal() + out.update({'access': out['accessToken'], 'resource': resource, 'refresh': out.get('refreshToken', False), 'time': time.time(), 'tenant': tenant_id, 'client': client_id}) return DataLakeCredential(out) + class DataLakeCredential: + # Be careful modifying this. DataLakeCredential is a general class in azure, and we have to maintain parity. def __init__(self, token): self.token = token - def signed_session(self, retry_policy=None): + def signed_session(self): # type: () -> requests.Session """Create requests session with any required auth headers applied. @@ -167,14 +168,14 @@ def signed_session(self, retry_policy=None): """ session = requests.Session() if time.time() - self.token['time'] > self.token['expiresIn'] - 100: - self.refresh_token(retry_policy=retry_policy) + self.refresh_token() scheme, token = self.token['tokenType'], self.token['access'] header = "{} {}".format(scheme, token) session.headers['Authorization'] = header return session - def refresh_token(self, authority=None, retry_policy=None): + def refresh_token(self, authority=None): """ Refresh an expired authorization token Parameters @@ -191,21 +192,15 @@ def refresh_token(self, authority=None, retry_policy=None): context = adal.AuthenticationContext(authority + self.token['tenant']) - @retry_decorator_for_auth(retry_policy=retry_policy) - def get_token_internal(): - # Internal function used so as to use retry decorator - if self.token.get('secret') and self.token.get('client'): - out = context.acquire_token_with_client_credentials(self.token['resource'], - self.token['client'], - self.token['secret']) - out.update({'secret': self.token['secret']}) - else: - out = context.acquire_token_with_refresh_token(self.token['refresh'], - client_id=self.token['client'], - resource=self.token['resource']) - return out - - out = get_token_internal() + if self.token.get('secret') and self.token.get('client'): + out = context.acquire_token_with_client_credentials(self.token['resource'], + self.token['client'], + self.token['secret']) + out.update({'secret': self.token['secret']}) + else: + out = context.acquire_token_with_refresh_token(self.token['refresh'], + client_id=self.token['client'], + resource=self.token['resource']) # common items to update out.update({'access': out['accessToken'], 'time': time.time(), 'tenant': self.token['tenant'], @@ -271,7 +266,9 @@ def __init__(self, store_name=default_store, token=None, # There is a case where the user can opt to exclude an API version, in which case # the service itself decides on the API version to use (it's default). self.api_version = api_version or None - self.head = {'Authorization': token.signed_session(retry_policy=None).headers['Authorization']} + self.head = None + self._check_token() # Retryable method. Will ensure that signed_session token is current when we set it on next line + self.head = {'Authorization': token.signed_session().headers['Authorization']} self.url = 'https://%s.%s/' % (store_name, url_suffix) self.webhdfs = 'webhdfs/v1/' self.extended_operations = 'webhdfsext/' @@ -296,11 +293,14 @@ def session(self): self.local.session = s return s - def _check_token(self, retry_policy=None): - cur_session = self.token.signed_session(retry_policy=retry_policy) - if not self.head or self.head.get('Authorization') != cur_session.headers['Authorization']: - self.head = {'Authorization': cur_session.headers['Authorization']} - self.local.session = None + def _check_token(self, retry_policy= None): + @retry_decorator_for_auth(retry_policy=retry_policy) + def check_token_internal(): + cur_session = self.token.signed_session() + if not self.head or self.head.get('Authorization') != cur_session.headers['Authorization']: + self.head = {'Authorization': cur_session.headers['Authorization']} + self.local.session = None + check_token_internal() def _log_request(self, method, url, op, path, params, headers, retry_count): msg = "HTTP Request\n{} {}\n".format(method.upper(), url) diff --git a/azure/datalake/store/retry.py b/azure/datalake/store/retry.py index 41afbdf..48258df 100644 --- a/azure/datalake/store/retry.py +++ b/azure/datalake/store/retry.py @@ -86,9 +86,8 @@ def deco_retry(func): @wraps(func) def f_retry(*args, **kwargs): retry_count = -1 - last_exception = None - out = None while True: + last_exception = None retry_count += 1 try: out = func(*args, **kwargs) @@ -105,10 +104,9 @@ def f_retry(*args, **kwargs): request_successful = last_exception is None or response.status_code == 401 # 401 = Invalid credentials if request_successful or not retry_policy.should_retry(response, last_exception, retry_count): break - if out is None: + if last_exception is not None: raise last_exception return out - return f_retry return deco_retry