From 2d5567b0b769aeab14817e27dcd5d793295aa283 Mon Sep 17 00:00:00 2001 From: a-rampalli Date: Fri, 5 Jan 2024 12:06:21 +0530 Subject: [PATCH] fix: fixes sync failing with key error if stream not available --- .../source-sendgrid/source_sendgrid/streams.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py index 0ce4856e65e4..0ed1774d9764 100644 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py @@ -36,7 +36,7 @@ def parse_response( json_response = response.json() records = json_response.get(self.data_field, []) if self.data_field is not None else json_response - if records is not None: + if records: for record in records: yield record else: @@ -52,6 +52,9 @@ def parse_response( ) # do NOT print request headers as it contains auth token self.logger.info(err_msg) + if response.status_code in self.permission_error_codes.keys(): + err_msg = f"Stream `{self.name}` is not available, due to subscription plan limitations or permission issues. Skipping." + raise PermissionError(err_msg) def should_retry(self, response: requests.Response) -> bool: """Override to provide skip the stream possibility""" @@ -61,7 +64,7 @@ def should_retry(self, response: requests.Response) -> bool: for message in response.json().get("errors", []): if message.get("message") == self.permission_error_codes.get(status): self.logger.error( - f"Stream `{self.name}` is not available, due to subscription plan limitations or perrmission issues. Skipping." + f"Stream `{self.name}` is not available, due to subscription plan limitations or permission issues. Skipping." ) setattr(self, "raise_on_http_errors", False) return False @@ -104,8 +107,9 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: stream_data = response.json() - if self.data_field: - stream_data = stream_data[self.data_field] + if not self.data_field or self.data_field not in stream_data: + return + stream_data = stream_data[self.data_field] if len(stream_data) < self.limit: return self.offset += self.limit