From 5a82178d8f424839e56bc14a944305807cb49ab6 Mon Sep 17 00:00:00 2001 From: AbnerErnaniADSFatec Date: Thu, 18 Jul 2024 17:17:15 -0300 Subject: [PATCH] Add 'x-api-key' to pass access token and remove params in request #69 --- lccs/lccs.py | 58 +++++++++++++++++++++++++-------------------------- lccs/utils.py | 30 ++++++++++++++++++++------ 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/lccs/lccs.py b/lccs/lccs.py index c60fe1e..0deeb4e 100644 --- a/lccs/lccs.py +++ b/lccs/lccs.py @@ -42,20 +42,20 @@ def __init__(self, url, validate=False, access_token=None, language=None): self._url = url.rstrip('/') self._validate = validate self._classification_systems = {} - self._access_token = f'?access_token={access_token}' if access_token else '' + self._access_token = access_token if access_token else '' self._support_l = self._support_language() self._language = self._validate_language(language) if language else '' def _support_language(self): """Get the support language from service.""" import enum - data = Utils._get(f'{self._url}/') + data = Utils._get(f'{self._url}/', access_token=self._access_token) return enum.Enum('Language', {i['language']: i['language'] for i in data['supported_language']}, type=str) def _validate_language(self, language): """Get the support language from service.""" if language in [e.value for e in self._support_l]: - return f'&language={language}' + return f'?language={language}' else: s = ', '.join([e for e in self.allowed_language]) raise KeyError(f'Language not supported! Use: {s}') @@ -67,8 +67,8 @@ def _get_format_identifier(self, name): def _get_classification_systems(self): """Return the Classification Systems available in service.""" - url = f'{self._url}/classification_systems{self._access_token}{self._language}' - data = Utils._get(url) + url = f'{self._url}/classification_systems{self._language}' + data = Utils._get(url, access_token=self._access_token) result = list() [result.append(i['identifier']) for i in data] @@ -110,8 +110,8 @@ def classification_system(self, system: str) -> ClassificationSystem: :rtype: dict """ try: - url = f'{self._url}/classification_systems/{system}{self._access_token}{self._language}' - data = Utils._get(url) + url = f'{self._url}/classification_systems/{system}{self._language}' + data = Utils._get(url, access_token=self._access_token) return ClassificationSystem(data, self._validate) except Exception: raise KeyError(f'Could not retrieve information for classification_system: {system}') @@ -129,7 +129,7 @@ def available_mappings(self, system_source: str) -> list: result = list() try: - data = Utils._get(f'{self._url}/mappings/{system_source}{self._access_token}{self._language}') + data = Utils._get(f'{self._url}/mappings/{system_source}{self._language}', access_token=self._access_token) except Exception: raise KeyError(f'Could not retrieve any available mapping for {system_source}') @@ -153,7 +153,7 @@ def mappings(self, system_source: str, system_target: str) -> MappingGroup: :rtype: list """ try: - data = Utils._get(f'{self._url}/mappings/{system_source}/{system_target}{self._access_token}') + data = Utils._get(f'{self._url}/mappings/{system_source}/{system_target}', access_token=self._access_token) except Exception: raise KeyError(f'Could not retrieve mappings for {system_source} and {system_target}') @@ -171,14 +171,14 @@ def available_style_formats(self) -> list: result = list() try: - data = Utils._get(f'{self._url}/style_formats{self._access_token}') + data = Utils._get(f'{self._url}/style_formats', access_token=self._access_token) except Exception: raise KeyError('Could not retrieve any style format') for i in data: for links in i['links']: if links['rel'] == 'items': - data = Utils._get(f"{links['href']}") + data = Utils._get(f"{links['href']}", access_token=self._access_token) result.append(StyleFormats(data)) return result @@ -195,12 +195,12 @@ def style_formats(self, system) -> List[StyleFormats]: """ result = list() try: - data = Utils._get(f'{self._url}/classification_systems/{system}/style_formats{self._access_token}') + data = Utils._get(f'{self._url}/classification_systems/{system}/style_formats', access_token=self._access_token) except Exception: raise KeyError(f'Could not retrieve any style format for {system}') for i in data: if i['rel'] == 'style': - data = Utils._get(f'{self._url}/style_formats/{i["href"].split("/")[-1]}') + data = Utils._get(f'{self._url}/style_formats/{i["href"].split("/")[-1]}', access_token=self._access_token) result.append(StyleFormats(data)) return result @@ -222,7 +222,7 @@ def get_style(self, system, style_format, path=None): :rtype: File """ try: - file_name, data = Utils._get(f'{self._url}/classification_systems/{system}/styles/{style_format}{self._access_token}') + file_name, data = Utils._get(f'{self._url}/classification_systems/{system}/styles/{style_format}', access_token=self._access_token) except Exception: raise KeyError(f'Could not retrieve any style for {system}') @@ -234,7 +234,7 @@ def get_style(self, system, style_format, path=None): def add_classification_system(self, name: str, authority_name: str, description: dict, title: dict, version: str) -> dict: """Add a new classification system.""" - url = f'{self._url}/classification_systems{self._access_token}' + url = f'{self._url}/classification_systems' data = dict() data["name"] = name @@ -244,7 +244,7 @@ def add_classification_system(self, name: str, authority_name: str, description: data["title"] = title try: - retval = Utils._post(url, json=data) + retval = Utils._post(url, access_token=self._access_token, json=data) except RuntimeError as e: raise ValueError(f'Could not insert classification system {name}!') @@ -252,14 +252,14 @@ def add_classification_system(self, name: str, authority_name: str, description: def add_classes(self, system: str, classes: str) -> List[dict]: """Add new classes to an classification system.""" - url = f'{self._url}/classification_systems/{system}/classes{self._access_token}' + url = f'{self._url}/classification_systems/{system}/classes' if type(classes) == str: with open(classes) as file: classes = json.load(file) try: - retval = Utils._post(url, json=classes) + retval = Utils._post(url, access_token=self._access_token, json=classes) except RuntimeError: raise ValueError('Could not insert classes!') @@ -268,7 +268,7 @@ def add_classes(self, system: str, classes: str) -> List[dict]: def add_style(self, system: str, style_format: str, style_path: str = None, style_tex: str = None, style_name: str = None, style_extension: str = None) -> List[dict]: """Add a new style to a system.""" - url = f'{self._url}/classification_systems/{system}/styles{self._access_token}' + url = f'{self._url}/classification_systems/{system}/styles' if style_path: try: @@ -283,7 +283,7 @@ def add_style(self, system: str, style_format: str, style_path: str = None, styl data = dict(style_format=style_format) try: - retval = Utils._post(url, data=data, files=style) + retval = Utils._post(url, access_token=self._access_token, data=data, files=style) except RuntimeError: raise ValueError('Could not insert style!') @@ -291,13 +291,13 @@ def add_style(self, system: str, style_format: str, style_path: str = None, styl def add_mapping(self, system_source: str, system_target: str, mappings) -> list: """Add new classification system mapping.""" - url = f'{self._url}/mappings/{system_source}/{system_target}{self._access_token}' + url = f'{self._url}/mappings/{system_source}/{system_target}' if type(mappings) == str: with open(mappings) as file: mappings = json.load(file) try: - retval = Utils._post(url, json=mappings) + retval = Utils._post(url, access_token=self._access_token, json=mappings) except RuntimeError: raise ValueError('Could not insert mappings!') @@ -305,12 +305,12 @@ def add_mapping(self, system_source: str, system_target: str, mappings) -> list: def add_style_format(self, name: str) -> dict: """Add a new style format.""" - url = f'{self._url}/style_formats{self._access_token}' + url = f'{self._url}/style_formats' data = {"name": name} try: - retval = Utils._post(url, json=data) + retval = Utils._post(url, access_token=self._access_token, json=data) except RuntimeError: raise ValueError(f'Could not insert style format {name}!') @@ -319,7 +319,7 @@ def add_style_format(self, name: str) -> dict: def delete_classification_system(self, system: str) -> int: """Delete a specific classification system.""" try: - retval = Utils._delete(f'{self._url}/classification_systems/{system}{self._access_token}') + retval = Utils._delete(f'{self._url}/classification_systems/{system}', access_token=self._access_token) except RuntimeError: raise ValueError(f'Could not remove classification system {system}!') @@ -328,7 +328,7 @@ def delete_classification_system(self, system: str) -> int: def delete_class(self, system: str, class_name_or_id: str) -> int: """Delete a specific class.""" try: - retval = Utils._delete(f'{self._url}/classification_systems/{system}/classes/{class_name_or_id}{self._access_token}') + retval = Utils._delete(f'{self._url}/classification_systems/{system}/classes/{class_name_or_id}', access_token=self._access_token) except RuntimeError: raise ValueError(f'Could not remove class {class_name_or_id} of classification system {system}!') @@ -337,7 +337,7 @@ def delete_class(self, system: str, class_name_or_id: str) -> int: def delete_style_format(self, style_format: str) -> int: """Delete a specific style format.""" try: - retval = Utils._delete(f'{self._url}/style_formats/{style_format}{self._access_token}') + retval = Utils._delete(f'{self._url}/style_formats/{style_format}', access_token=self._access_token) except RuntimeError: raise ValueError(f'Could not remove style format {style_format} !') @@ -346,7 +346,7 @@ def delete_style_format(self, style_format: str) -> int: def delete_style(self, system: str, style_format: str) -> int: """Delete the style of a classification system.""" try: - retval = Utils._delete(f'{self._url}/classification_systems/{system}/styles/{style_format}{self._access_token}') + retval = Utils._delete(f'{self._url}/classification_systems/{system}/styles/{style_format}', access_token=self._access_token) except RuntimeError: raise ValueError(f'Could not remove style {style_format} of classification system {system}!') @@ -355,7 +355,7 @@ def delete_style(self, system: str, style_format: str) -> int: def delete_mapping(self, system_source: str, system_target: str) -> int: """Delete the mapping.""" try: - retval = Utils._delete(f'{self._url}/mappings/{system_source}/{system_target}{self._access_token}') + retval = Utils._delete(f'{self._url}/mappings/{system_source}/{system_target}', access_token=self._access_token) except RuntimeError: raise ValueError(f'Could not remove mapping of {system_source} and {system_target}!') diff --git a/lccs/utils.py b/lccs/utils.py index cadde84..b233eff 100644 --- a/lccs/utils.py +++ b/lccs/utils.py @@ -32,7 +32,7 @@ class Utils: """Utils class.""" @staticmethod - def _get(url, params=None): + def _get(url, access_token=None, params=None): """Query the LCCS-WS using HTTP GET verb and return the result as a JSON document. :param url: The URL to query must be a valid LCCS-WS endpoint. @@ -45,7 +45,13 @@ def _get(url, params=None): :rtype: dict :raises ValueError: If the response body does not contain a valid json. """ - response = requests.get(url, params=params) + _headers = {} + if access_token != None: + _headers = { + "x-api-key": access_token + } + + response = requests.get(url, params=params, headers=_headers) response.raise_for_status() @@ -68,18 +74,30 @@ def _get(url, params=None): return response.json() @staticmethod - def _post(url, data=None, json=None, files=None): + def _post(url, access_token, data=None, json=None, files=None): """Request post method.""" - response = requests.post(url, data=data, files=files, json=json) + _headers = {} + if access_token != None: + _headers = { + "x-api-key": access_token + } + + response = requests.post(url, headers=_headers, data=data, files=files, json=json) response.raise_for_status() return response.json() @staticmethod - def _delete(url, params=None): + def _delete(url, access_token, params=None): """Request delete method.""" - response = requests.delete(url, params=params) + _headers = {} + if access_token != None: + _headers = { + "x-api-key": access_token + } + + response = requests.delete(url, params=params, headers=_headers) response.raise_for_status()