Skip to content

Commit

Permalink
fix: fixes messages api pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
a-rampalli committed Jan 16, 2024
1 parent 2d5567b commit 5373456
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def backoff_time(self, response: requests.Response) -> float:
# x-ratelimit-remaining: number of requests remaining
# x-ratelimit-reset: time in seconds after which rate limit resets (Seems to be < 1 minute)
# x-ratelimit-limit: total number of requests allowed in the current window
"""
Waiting for time mentioned in response header when we encounter rate limit error does not guarantee that the rate limit will be reset!
Sometimes we are seeing multiple rate limit errors even after waiting for the time mentioned in the response header.
"""
response_headers = response.headers
RATE_LIMIT_RESET_HEADER = "x-ratelimit-reset"
if RATE_LIMIT_RESET_HEADER in response_headers:
Expand Down Expand Up @@ -120,8 +124,11 @@ class SendgridStreamIncrementalMixin(HttpStream, ABC):
cursor_field = "created"
# Sendgrid API is returning data with latest cursor value first
# So we are fetching data in slices and setting checkpoint after each slice
# Setting it too low would mean more API calls and possibility of rate limit breach => more wait time for rate limit resets
slice_interval_in_minutes = 60
# Setting it too low would mean more API calls and possibility of rate limit breach
# but if api response has more than 1000 records in a slice, api does not provide a way to do pagination.
# Hence setting slice interval to small value.
# TODO: Improve this to reduce number of API calls
slice_interval_in_minutes = 5

def __init__(self, start_time: Optional[Union[int, str]], **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -279,7 +286,7 @@ def initial_path() -> str:
return "templates"


class Messages(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin):
class Messages(SendgridStream, SendgridStreamIncrementalMixin):
"""
https://docs.sendgrid.com/api-reference/e-mail-activity/filter-all-messages
"""
Expand All @@ -299,6 +306,7 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[
date_end = datetime.datetime.utcfromtimestamp(int(params["end_time"])).strftime(time_filter_template)
queryapi = f'last_event_time BETWEEN TIMESTAMP "{date_start}" AND TIMESTAMP "{date_end}"'
params["query"] = urllib.parse.quote(queryapi)
# limit is manadatory param for this api but it does not support offset pagination!
params["limit"] = self.limit
payload_str = "&".join("%s=%s" % (k, v) for k, v in params.items() if k not in ["start_time", "end_time"])
return payload_str
Expand Down

0 comments on commit 5373456

Please sign in to comment.