From 4f327005b747204c84f1f7873bfe2d6ccde09fea Mon Sep 17 00:00:00 2001 From: a-rampalli Date: Mon, 9 Oct 2023 13:55:09 +0530 Subject: [PATCH 1/3] fix: fixes empty response from api --- .../connectors/source-hubspot/source_hubspot/streams.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index a788ff7f9b9d..ad571ce741c2 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -206,7 +206,13 @@ def get( self, url: str, params: MutableMapping[str, Any] = None ) -> Tuple[Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]], requests.Response]: response = self._session.get(self.BASE_URL + url, params=params) - if any([m in response.json() for m in TOKEN_EXPIRED_ERROR]): + responseJson = None + try: + responseJson = response.json() + except json.decoder.JSONDecodeError as e: + logger.error(f"Failed to parse response: {response.text} with JSONDecodeError") + raise e + if any([m in responseJson for m in TOKEN_EXPIRED_ERROR]): logger.info("Oauth token expired. Re-fetching token") self._session.auth = self.get_authenticator() return self._parse_and_handle_errors(response), response From 69170e1f909e45c30c304e3ea90169d6bb75cb92 Mon Sep 17 00:00:00 2001 From: a-rampalli Date: Tue, 10 Oct 2023 11:52:31 +0530 Subject: [PATCH 2/3] chore: adds retry if we get blank response --- .../source-hubspot/source_hubspot/streams.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index ad571ce741c2..7c6b76878a06 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -10,6 +10,7 @@ from functools import cached_property, lru_cache from http import HTTPStatus from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union +from json.decoder import JSONDecodeError import backoff import pendulum as pendulum @@ -205,13 +206,23 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M def get( self, url: str, params: MutableMapping[str, Any] = None ) -> Tuple[Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]], requests.Response]: - response = self._session.get(self.BASE_URL + url, params=params) responseJson = None - try: - responseJson = response.json() - except json.decoder.JSONDecodeError as e: - logger.error(f"Failed to parse response: {response.text} with JSONDecodeError") - raise e + MAX_RETRIES = 5 + + for retries in range(MAX_RETRIES): + response = self._session.get(self.BASE_URL + url, params=params) + try: + responseJson = response.json() + break # Success, exit the loop + except JSONDecodeError as e: + err_msg = f"Failed to parse response text: {response.text} with JSONDecodeError. Retrying ({retries+1}/{MAX_RETRIES})..." + logger.warn(err_msg) + retries += 1 + + if retries == MAX_RETRIES: + logger.error(f"Failed to parse response: {response.text} with JSONDecodeError") + raise JSONDecodeError("Unable to parse response", e.doc, e.pos) + if any([m in responseJson for m in TOKEN_EXPIRED_ERROR]): logger.info("Oauth token expired. Re-fetching token") self._session.auth = self.get_authenticator() From 1beebf64c6bf87f57b537b767eeeed9e842d68a7 Mon Sep 17 00:00:00 2001 From: a-rampalli Date: Tue, 10 Oct 2023 14:00:14 +0530 Subject: [PATCH 3/3] chore: refactors code --- .../source-hubspot/source_hubspot/streams.py | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 7c6b76878a06..1467dfaa6b44 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -78,6 +78,8 @@ def giveup_handler(exc): if (isinstance(exc, HubspotInvalidAuth) or isinstance(exc, HTTPError)) \ and any([m in exc.response.text.lower() for m in TOKEN_EXPIRED_ERROR]): return False + if isinstance(exc, JSONDecodeError): + return False if TOKEN_REFRESH_RETRIES_EXCEEDED_ERROR.lower() in exc.response.text.lower(): return False if isinstance(exc, (HubspotInvalidAuth, HubspotAccessDenied)): @@ -86,7 +88,7 @@ def giveup_handler(exc): return backoff.on_exception( backoff.expo, - requests.exceptions.RequestException, + (requests.exceptions.RequestException, JSONDecodeError), jitter=None, on_backoff=log_retry_attempt, giveup=giveup_handler, @@ -206,22 +208,14 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M def get( self, url: str, params: MutableMapping[str, Any] = None ) -> Tuple[Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]], requests.Response]: + response = self._session.get(self.BASE_URL + url, params=params) responseJson = None - MAX_RETRIES = 5 - - for retries in range(MAX_RETRIES): - response = self._session.get(self.BASE_URL + url, params=params) - try: - responseJson = response.json() - break # Success, exit the loop - except JSONDecodeError as e: - err_msg = f"Failed to parse response text: {response.text} with JSONDecodeError. Retrying ({retries+1}/{MAX_RETRIES})..." - logger.warn(err_msg) - retries += 1 - - if retries == MAX_RETRIES: - logger.error(f"Failed to parse response: {response.text} with JSONDecodeError") - raise JSONDecodeError("Unable to parse response", e.doc, e.pos) + try: + responseJson = response.json() + except JSONDecodeError as e: + err_msg = f"Failed to parse response text: {response.text} with JSONDecodeError." + logger.warn(err_msg) + raise JSONDecodeError(err_msg, e.doc, e.pos) if any([m in responseJson for m in TOKEN_EXPIRED_ERROR]): logger.info("Oauth token expired. Re-fetching token")