Skip to content

Commit

Permalink
generated with codegen at box/box-codegen@8058767 and spec at box/box…
Browse files Browse the repository at this point in the history
  • Loading branch information
box-sdk-build committed Sep 11, 2023
1 parent 3a01c3c commit b16d187
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 33 deletions.
27 changes: 18 additions & 9 deletions box_sdk_gen/ccg_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Union, Optional

from .auth import Authentication
from .token_storage import TokenStorage, InMemoryTokenStorage
from .auth_schemas import (
TokenRequestBoxSubjectType,
TokenRequest,
Expand All @@ -20,6 +21,7 @@ def __init__(
client_secret: str,
enterprise_id: Union[None, str] = None,
user_id: Union[None, str] = None,
token_storage: TokenStorage = None,
):
"""
:param client_id:
Expand All @@ -45,14 +47,20 @@ def __init__(
<https://developer.box.com/en/guides/applications/>
<https://developer.box.com/en/guides/authentication/select/>
:param token_storage:
Object responsible for storing token. If no custom implementation provided,
the token will be stored in memory.
"""
if token_storage is None:
token_storage = InMemoryTokenStorage()
if not enterprise_id and not user_id:
raise Exception("Enterprise ID or User ID is needed")

self.client_id = client_id
self.client_secret = client_secret
self.enterprise_id = enterprise_id
self.user_id = user_id
self.token_storage = token_storage


class CCGAuth(Authentication):
Expand All @@ -62,7 +70,7 @@ def __init__(self, config: CCGConfig):
Configuration object of Client Credentials Grant auth.
"""
self.config = config
self.token: Union[None, AccessToken] = None
self.token_storage = config.token_storage

if config.user_id:
self.subject_id = self.config.user_id
Expand All @@ -79,9 +87,10 @@ def retrieve_token(
:param network_session: An object to keep network session state
:return: Access token
"""
if self.token is None:
self.refresh_token(network_session=network_session)
return self.token
token = self.token_storage.get()
if token is None:
return self.refresh_token(network_session=network_session)
return token

def refresh_token(
self, network_session: Optional[NetworkSession] = None
Expand Down Expand Up @@ -109,9 +118,9 @@ def refresh_token(
),
)

token_response = AccessToken.from_dict(json.loads(response.text))
self.token = token_response
return token_response
new_token = AccessToken.from_dict(json.loads(response.text))
self.token_storage.store(new_token)
return new_token

def as_user(self, user_id: str):
"""
Expand All @@ -129,7 +138,7 @@ def as_user(self, user_id: str):
"""
self.subject_id = user_id
self.subject_type = TokenRequestBoxSubjectType.USER
self.token = None
self.token_storage.clear()

def as_enterprise(self, enterprise_id: str):
"""
Expand All @@ -140,4 +149,4 @@ def as_enterprise(self, enterprise_id: str):
"""
self.subject_id = enterprise_id
self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE
self.token = None
self.token_storage.clear()
44 changes: 32 additions & 12 deletions box_sdk_gen/jwt_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
jwt, default_backend, serialization = None, None, None

from .auth import Authentication
from .token_storage import TokenStorage, InMemoryTokenStorage
from .auth_schemas import (
TokenRequestBoxSubjectType,
TokenRequest,
Expand All @@ -35,6 +36,7 @@ def __init__(
enterprise_id: Optional[str] = None,
user_id: Optional[str] = None,
jwt_algorithm: str = 'RS256',
token_storage: TokenStorage = None,
**_kwargs
):
"""
Expand Down Expand Up @@ -69,7 +71,12 @@ def __init__(
<https://developer.box.com/en/guides/authentication/select/>
:param jwt_algorithm:
Which algorithm to use for signing the JWT assertion. Must be one of 'RS256', 'RS384', 'RS512'.
:param token_storage:
Object responsible for storing token. If no custom implementation provided,
the token will be stored in memory.
"""
if token_storage is None:
token_storage = InMemoryTokenStorage()
if not enterprise_id and not user_id:
raise Exception("Enterprise ID or User ID is needed")

Expand All @@ -81,17 +88,21 @@ def __init__(
self.private_key = private_key
self.private_key_passphrase = private_key_passphrase
self.jwt_algorithm = jwt_algorithm
self.token_storage = token_storage

@classmethod
def from_config_json_string(
cls, config_json_string: str, **kwargs: Any
cls, config_json_string: str, token_storage: TokenStorage = None, **kwargs: Any
) -> 'JWTConfig':
"""
Create an auth instance as defined by a string content of JSON file downloaded from the Box Developer Console.
See https://developer.box.com/en/guides/authentication/jwt/ for more information.
:param config_json_string:
String content of JSON file containing the configuration.
:param token_storage:
Object responsible for storing token. If no custom implementation provided,
the token will be stored in memory.
:return:
Auth instance configured as specified by the config dictionary.
"""
Expand All @@ -111,22 +122,30 @@ def from_config_json_string(
private_key_passphrase=config_dict['boxAppSettings']['appAuth'].get(
'passphrase', None
),
token_storage=token_storage,
**kwargs
)

@classmethod
def from_config_file(cls, config_file_path: str, **kwargs: Any) -> 'JWTConfig':
def from_config_file(
cls, config_file_path: str, token_storage: TokenStorage = None, **kwargs: Any
) -> 'JWTConfig':
"""
Create an auth instance as defined by a JSON file downloaded from the Box Developer Console.
See https://developer.box.com/en/guides/authentication/jwt/ for more information.
:param config_file_path:
Path to the JSON file containing the configuration.
:param token_storage:
Object responsible for storing token. If no custom implementation provided,
the token will be stored in memory.
:return:
Auth instance configured as specified by the JSON file.
"""
with open(config_file_path, encoding='utf-8') as config_file:
return cls.from_config_json_string(config_file.read(), **kwargs)
return cls.from_config_json_string(
config_file.read(), token_storage, **kwargs
)


class JWTAuth(Authentication):
Expand All @@ -142,7 +161,7 @@ def __init__(self, config: JWTConfig):
)

self.config = config
self.token: Union[None, AccessToken] = None
self.token_storage = config.token_storage

if config.enterprise_id:
self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE
Expand All @@ -163,9 +182,10 @@ def retrieve_token(
:param network_session: An object to keep network session state
:return: Access token
"""
if self.token is None:
self.refresh_token(network_session=network_session)
return self.token
token = self.token_storage.get()
if token is None:
return self.refresh_token(network_session=network_session)
return token

def refresh_token(
self, network_session: Optional[NetworkSession] = None
Expand Down Expand Up @@ -218,9 +238,9 @@ def refresh_token(
),
)

token_response = AccessToken.from_dict(json.loads(response.text))
self.token = token_response
return self.token
new_token = AccessToken.from_dict(json.loads(response.text))
self.token_storage.store(new_token)
return new_token

def as_user(self, user_id: str):
"""
Expand All @@ -238,7 +258,7 @@ def as_user(self, user_id: str):
"""
self.subject_id = user_id
self.subject_type = TokenRequestBoxSubjectType.USER
self.token = None
self.token_storage.clear()

def as_enterprise(self, enterprise_id: str):
"""
Expand All @@ -249,7 +269,7 @@ def as_enterprise(self, enterprise_id: str):
"""
self.subject_id = enterprise_id
self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE
self.token = None
self.token_storage.clear()

@classmethod
def _get_rsa_private_key(
Expand Down
41 changes: 29 additions & 12 deletions box_sdk_gen/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional

from .auth import Authentication
from .token_storage import TokenStorage, InMemoryTokenStorage
from .auth_schemas import TokenRequest, TokenRequestGrantType
from .fetch import fetch, FetchResponse, FetchOptions
from .network import NetworkSession
Expand All @@ -11,18 +12,23 @@

class OAuthConfig:
def __init__(
self,
client_id: str,
client_secret: str,
self, client_id: str, client_secret: str, token_storage: TokenStorage = None
):
"""
:param client_id:
Box API key used for identifying the application the user is authenticating with.
:param client_secret:
Box API secret used for making auth requests.
:param token_storage:
Object responsible for storing token. If no custom implementation provided,
the token will be stored in memory.
"""

if token_storage is None:
token_storage = InMemoryTokenStorage()
self.client_id = client_id
self.client_secret = client_secret
self.token_storage = token_storage


class GetAuthorizeUrlOptions:
Expand Down Expand Up @@ -59,7 +65,7 @@ def __init__(self, config: OAuthConfig):
Configuration object of OAuth.
"""
self.config = config
self.token: Optional[AccessToken] = None
self.token_storage = config.token_storage

def get_authorize_url(
self, options: Optional[GetAuthorizeUrlOptions] = None
Expand Down Expand Up @@ -104,7 +110,7 @@ def get_authorize_url(

def get_tokens_authorization_code_grant(
self, authorization_code: str, network_session: Optional[NetworkSession] = None
) -> str:
) -> AccessToken:
"""
Send token request and return the access_token
:param authorization_code: Short-lived authorization code
Expand All @@ -118,8 +124,9 @@ def get_tokens_authorization_code_grant(
code=authorization_code,
)

self.token = self._send_token_request(request_body, network_session)
return self.token.access_token
token: AccessToken = self._send_token_request(request_body, network_session)
self.token_storage.store(token)
return token

def retrieve_token(
self, network_session: Optional[NetworkSession] = None
Expand All @@ -129,12 +136,13 @@ def retrieve_token(
:param network_session: An object to keep network session state
:return: Valid access token
"""
if self.token is None:
token = self.token_storage.get()
if token is None:
raise Exception(
"Access and refresh tokens not available. Authenticate before making"
" any API call first."
)
return self.token
return token

def refresh_token(
self,
Expand All @@ -147,15 +155,24 @@ def refresh_token(
:param refresh_token: Refresh token, which can be used to obtain a new access token
:return: Valid access token
"""
old_token: Optional[AccessToken] = self.token_storage.get()
token_used_for_refresh = (
refresh_token or old_token.refresh_token if old_token else None
)

if token_used_for_refresh is None:
raise Exception("No refresh_token is available.")

request_body = TokenRequest(
grant_type=TokenRequestGrantType.REFRESH_TOKEN,
client_id=self.config.client_id,
client_secret=self.config.client_secret,
refresh_token=refresh_token or self.token.refresh_token,
refresh_token=refresh_token or old_token.refresh_token,
)

self.token = self._send_token_request(request_body, network_session)
return self.token
new_token = self._send_token_request(request_body, network_session)
self.token_storage.store(new_token)
return new_token

@staticmethod
def _send_token_request(
Expand Down
Loading

0 comments on commit b16d187

Please sign in to comment.