From 537345624b3d4ce88ff8f5481942e16cfa389e47 Mon Sep 17 00:00:00 2001 From: a-rampalli Date: Tue, 16 Jan 2024 16:10:37 +0530 Subject: [PATCH] fix: fixes messages api pagination --- .../source-sendgrid/source_sendgrid/streams.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py index 0ed1774d9764..a645a4fbedac 100644 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py @@ -81,6 +81,10 @@ def backoff_time(self, response: requests.Response) -> float: # x-ratelimit-remaining: number of requests remaining # x-ratelimit-reset: time in seconds after which rate limit resets (Seems to be < 1 minute) # x-ratelimit-limit: total number of requests allowed in the current window + """ + Waiting for time mentioned in response header when we encounter rate limit error does not guarantee that the rate limit will be reset! + Sometimes we are seeing multiple rate limit errors even after waiting for the time mentioned in the response header. + """ response_headers = response.headers RATE_LIMIT_RESET_HEADER = "x-ratelimit-reset" if RATE_LIMIT_RESET_HEADER in response_headers: @@ -120,8 +124,11 @@ class SendgridStreamIncrementalMixin(HttpStream, ABC): cursor_field = "created" # Sendgrid API is returning data with latest cursor value first # So we are fetching data in slices and setting checkpoint after each slice - # Setting it too low would mean more API calls and possibility of rate limit breach => more wait time for rate limit resets - slice_interval_in_minutes = 60 + # Setting it too low would mean more API calls and possibility of rate limit breach + # but if api response has more than 1000 records in a slice, api does not provide a way to do pagination. + # Hence setting slice interval to small value. + # TODO: Improve this to reduce number of API calls + slice_interval_in_minutes = 5 def __init__(self, start_time: Optional[Union[int, str]], **kwargs): super().__init__(**kwargs) @@ -279,7 +286,7 @@ def initial_path() -> str: return "templates" -class Messages(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin): +class Messages(SendgridStream, SendgridStreamIncrementalMixin): """ https://docs.sendgrid.com/api-reference/e-mail-activity/filter-all-messages """ @@ -299,6 +306,7 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[ date_end = datetime.datetime.utcfromtimestamp(int(params["end_time"])).strftime(time_filter_template) queryapi = f'last_event_time BETWEEN TIMESTAMP "{date_start}" AND TIMESTAMP "{date_end}"' params["query"] = urllib.parse.quote(queryapi) + # limit is manadatory param for this api but it does not support offset pagination! params["limit"] = self.limit payload_str = "&".join("%s=%s" % (k, v) for k, v in params.items() if k not in ["start_time", "end_time"]) return payload_str