Skip to content

Commit

Permalink
Add 'x-api-key' to pass access token and remove params in request bra…
Browse files Browse the repository at this point in the history
  • Loading branch information
AbnerErnaniADSFatec committed Jul 18, 2024
1 parent b92de73 commit 5a82178
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 35 deletions.
58 changes: 29 additions & 29 deletions lccs/lccs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ def __init__(self, url, validate=False, access_token=None, language=None):
self._url = url.rstrip('/')
self._validate = validate
self._classification_systems = {}
self._access_token = f'?access_token={access_token}' if access_token else ''
self._access_token = access_token if access_token else ''
self._support_l = self._support_language()
self._language = self._validate_language(language) if language else ''

def _support_language(self):
"""Get the support language from service."""
import enum
data = Utils._get(f'{self._url}/')
data = Utils._get(f'{self._url}/', access_token=self._access_token)
return enum.Enum('Language', {i['language']: i['language'] for i in data['supported_language']}, type=str)

def _validate_language(self, language):
"""Get the support language from service."""
if language in [e.value for e in self._support_l]:
return f'&language={language}'
return f'?language={language}'
else:
s = ', '.join([e for e in self.allowed_language])
raise KeyError(f'Language not supported! Use: {s}')
Expand All @@ -67,8 +67,8 @@ def _get_format_identifier(self, name):

def _get_classification_systems(self):
"""Return the Classification Systems available in service."""
url = f'{self._url}/classification_systems{self._access_token}{self._language}'
data = Utils._get(url)
url = f'{self._url}/classification_systems{self._language}'
data = Utils._get(url, access_token=self._access_token)
result = list()

[result.append(i['identifier']) for i in data]
Expand Down Expand Up @@ -110,8 +110,8 @@ def classification_system(self, system: str) -> ClassificationSystem:
:rtype: dict
"""
try:
url = f'{self._url}/classification_systems/{system}{self._access_token}{self._language}'
data = Utils._get(url)
url = f'{self._url}/classification_systems/{system}{self._language}'
data = Utils._get(url, access_token=self._access_token)
return ClassificationSystem(data, self._validate)
except Exception:
raise KeyError(f'Could not retrieve information for classification_system: {system}')
Expand All @@ -129,7 +129,7 @@ def available_mappings(self, system_source: str) -> list:
result = list()

try:
data = Utils._get(f'{self._url}/mappings/{system_source}{self._access_token}{self._language}')
data = Utils._get(f'{self._url}/mappings/{system_source}{self._language}', access_token=self._access_token)
except Exception:
raise KeyError(f'Could not retrieve any available mapping for {system_source}')

Expand All @@ -153,7 +153,7 @@ def mappings(self, system_source: str, system_target: str) -> MappingGroup:
:rtype: list
"""
try:
data = Utils._get(f'{self._url}/mappings/{system_source}/{system_target}{self._access_token}')
data = Utils._get(f'{self._url}/mappings/{system_source}/{system_target}', access_token=self._access_token)
except Exception:
raise KeyError(f'Could not retrieve mappings for {system_source} and {system_target}')

Expand All @@ -171,14 +171,14 @@ def available_style_formats(self) -> list:
result = list()

try:
data = Utils._get(f'{self._url}/style_formats{self._access_token}')
data = Utils._get(f'{self._url}/style_formats', access_token=self._access_token)
except Exception:
raise KeyError('Could not retrieve any style format')

for i in data:
for links in i['links']:
if links['rel'] == 'items':
data = Utils._get(f"{links['href']}")
data = Utils._get(f"{links['href']}", access_token=self._access_token)
result.append(StyleFormats(data))

return result
Expand All @@ -195,12 +195,12 @@ def style_formats(self, system) -> List[StyleFormats]:
"""
result = list()
try:
data = Utils._get(f'{self._url}/classification_systems/{system}/style_formats{self._access_token}')
data = Utils._get(f'{self._url}/classification_systems/{system}/style_formats', access_token=self._access_token)
except Exception:
raise KeyError(f'Could not retrieve any style format for {system}')
for i in data:
if i['rel'] == 'style':
data = Utils._get(f'{self._url}/style_formats/{i["href"].split("/")[-1]}')
data = Utils._get(f'{self._url}/style_formats/{i["href"].split("/")[-1]}', access_token=self._access_token)
result.append(StyleFormats(data))

return result
Expand All @@ -222,7 +222,7 @@ def get_style(self, system, style_format, path=None):
:rtype: File
"""
try:
file_name, data = Utils._get(f'{self._url}/classification_systems/{system}/styles/{style_format}{self._access_token}')
file_name, data = Utils._get(f'{self._url}/classification_systems/{system}/styles/{style_format}', access_token=self._access_token)
except Exception:
raise KeyError(f'Could not retrieve any style for {system}')

Expand All @@ -234,7 +234,7 @@ def get_style(self, system, style_format, path=None):
def add_classification_system(self, name: str, authority_name: str, description: dict, title: dict,
version: str) -> dict:
"""Add a new classification system."""
url = f'{self._url}/classification_systems{self._access_token}'
url = f'{self._url}/classification_systems'

data = dict()
data["name"] = name
Expand All @@ -244,22 +244,22 @@ def add_classification_system(self, name: str, authority_name: str, description:
data["title"] = title

try:
retval = Utils._post(url, json=data)
retval = Utils._post(url, access_token=self._access_token, json=data)
except RuntimeError as e:
raise ValueError(f'Could not insert classification system {name}!')

return retval

def add_classes(self, system: str, classes: str) -> List[dict]:
"""Add new classes to an classification system."""
url = f'{self._url}/classification_systems/{system}/classes{self._access_token}'
url = f'{self._url}/classification_systems/{system}/classes'

if type(classes) == str:
with open(classes) as file:
classes = json.load(file)

try:
retval = Utils._post(url, json=classes)
retval = Utils._post(url, access_token=self._access_token, json=classes)
except RuntimeError:
raise ValueError('Could not insert classes!')

Expand All @@ -268,7 +268,7 @@ def add_classes(self, system: str, classes: str) -> List[dict]:
def add_style(self, system: str, style_format: str, style_path: str = None, style_tex: str = None,
style_name: str = None, style_extension: str = None) -> List[dict]:
"""Add a new style to a system."""
url = f'{self._url}/classification_systems/{system}/styles{self._access_token}'
url = f'{self._url}/classification_systems/{system}/styles'

if style_path:
try:
Expand All @@ -283,34 +283,34 @@ def add_style(self, system: str, style_format: str, style_path: str = None, styl
data = dict(style_format=style_format)

try:
retval = Utils._post(url, data=data, files=style)
retval = Utils._post(url, access_token=self._access_token, data=data, files=style)
except RuntimeError:
raise ValueError('Could not insert style!')

return retval

def add_mapping(self, system_source: str, system_target: str, mappings) -> list:
"""Add new classification system mapping."""
url = f'{self._url}/mappings/{system_source}/{system_target}{self._access_token}'
url = f'{self._url}/mappings/{system_source}/{system_target}'

if type(mappings) == str:
with open(mappings) as file:
mappings = json.load(file)
try:
retval = Utils._post(url, json=mappings)
retval = Utils._post(url, access_token=self._access_token, json=mappings)
except RuntimeError:
raise ValueError('Could not insert mappings!')

return retval

def add_style_format(self, name: str) -> dict:
"""Add a new style format."""
url = f'{self._url}/style_formats{self._access_token}'
url = f'{self._url}/style_formats'

data = {"name": name}

try:
retval = Utils._post(url, json=data)
retval = Utils._post(url, access_token=self._access_token, json=data)
except RuntimeError:
raise ValueError(f'Could not insert style format {name}!')

Expand All @@ -319,7 +319,7 @@ def add_style_format(self, name: str) -> dict:
def delete_classification_system(self, system: str) -> int:
"""Delete a specific classification system."""
try:
retval = Utils._delete(f'{self._url}/classification_systems/{system}{self._access_token}')
retval = Utils._delete(f'{self._url}/classification_systems/{system}', access_token=self._access_token)
except RuntimeError:
raise ValueError(f'Could not remove classification system {system}!')

Expand All @@ -328,7 +328,7 @@ def delete_classification_system(self, system: str) -> int:
def delete_class(self, system: str, class_name_or_id: str) -> int:
"""Delete a specific class."""
try:
retval = Utils._delete(f'{self._url}/classification_systems/{system}/classes/{class_name_or_id}{self._access_token}')
retval = Utils._delete(f'{self._url}/classification_systems/{system}/classes/{class_name_or_id}', access_token=self._access_token)
except RuntimeError:
raise ValueError(f'Could not remove class {class_name_or_id} of classification system {system}!')

Expand All @@ -337,7 +337,7 @@ def delete_class(self, system: str, class_name_or_id: str) -> int:
def delete_style_format(self, style_format: str) -> int:
"""Delete a specific style format."""
try:
retval = Utils._delete(f'{self._url}/style_formats/{style_format}{self._access_token}')
retval = Utils._delete(f'{self._url}/style_formats/{style_format}', access_token=self._access_token)
except RuntimeError:
raise ValueError(f'Could not remove style format {style_format} !')

Expand All @@ -346,7 +346,7 @@ def delete_style_format(self, style_format: str) -> int:
def delete_style(self, system: str, style_format: str) -> int:
"""Delete the style of a classification system."""
try:
retval = Utils._delete(f'{self._url}/classification_systems/{system}/styles/{style_format}{self._access_token}')
retval = Utils._delete(f'{self._url}/classification_systems/{system}/styles/{style_format}', access_token=self._access_token)
except RuntimeError:
raise ValueError(f'Could not remove style {style_format} of classification system {system}!')

Expand All @@ -355,7 +355,7 @@ def delete_style(self, system: str, style_format: str) -> int:
def delete_mapping(self, system_source: str, system_target: str) -> int:
"""Delete the mapping."""
try:
retval = Utils._delete(f'{self._url}/mappings/{system_source}/{system_target}{self._access_token}')
retval = Utils._delete(f'{self._url}/mappings/{system_source}/{system_target}', access_token=self._access_token)
except RuntimeError:
raise ValueError(f'Could not remove mapping of {system_source} and {system_target}!')

Expand Down
30 changes: 24 additions & 6 deletions lccs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Utils:
"""Utils class."""

@staticmethod
def _get(url, params=None):
def _get(url, access_token=None, params=None):
"""Query the LCCS-WS using HTTP GET verb and return the result as a JSON document.
:param url: The URL to query must be a valid LCCS-WS endpoint.
Expand All @@ -45,7 +45,13 @@ def _get(url, params=None):
:rtype: dict
:raises ValueError: If the response body does not contain a valid json.
"""
response = requests.get(url, params=params)
_headers = {}
if access_token != None:
_headers = {
"x-api-key": access_token
}

response = requests.get(url, params=params, headers=_headers)

response.raise_for_status()

Expand All @@ -68,18 +74,30 @@ def _get(url, params=None):
return response.json()

@staticmethod
def _post(url, data=None, json=None, files=None):
def _post(url, access_token, data=None, json=None, files=None):
"""Request post method."""
response = requests.post(url, data=data, files=files, json=json)
_headers = {}
if access_token != None:
_headers = {
"x-api-key": access_token
}

response = requests.post(url, headers=_headers, data=data, files=files, json=json)

response.raise_for_status()

return response.json()

@staticmethod
def _delete(url, params=None):
def _delete(url, access_token, params=None):
"""Request delete method."""
response = requests.delete(url, params=params)
_headers = {}
if access_token != None:
_headers = {
"x-api-key": access_token
}

response = requests.delete(url, params=params, headers=_headers)

response.raise_for_status()

Expand Down

0 comments on commit 5a82178

Please sign in to comment.