diff --git a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py index 37abb2ed4fe7..867c2219d96c 100644 --- a/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py +++ b/airbyte-integrations/connectors/source-sendgrid/source_sendgrid/streams.py @@ -5,7 +5,7 @@ import datetime import urllib from abc import ABC, abstractmethod -from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union +from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union, List import pendulum import requests @@ -111,6 +111,10 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, class SendgridStreamIncrementalMixin(HttpStream, ABC): cursor_field = "created" + # Sendgrid API is returning data with latest cursor value first + # So we are fetching data in slices and setting checkpoint after each slice + # Setting it too low would mean more API calls and possibility of rate limit breach => more wait time for rate limit resets + slice_interval_in_minutes = 60 def __init__(self, start_time: Optional[Union[int, str]], **kwargs): super().__init__(**kwargs) @@ -118,6 +122,36 @@ def __init__(self, start_time: Optional[Union[int, str]], **kwargs): if isinstance(self._start_time, str): self._start_time = int(pendulum.parse(self._start_time).timestamp()) + def stream_slices( + self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + stream_slices: list = [] + + # use the latest date between self.start_date and stream_state + start_date = self._start_time + + if stream_state and self.cursor_field and self.cursor_field in stream_state: + # Some streams have a cursor field that is not a timestamp. So convert it to timestamp before comparing + state_timestamp = stream_state[self.cursor_field] + if not isinstance(state_timestamp, int): + state_timestamp = pendulum.parse(state_timestamp).int_timestamp + start_date = max(start_date, state_timestamp) + + end_date = int(pendulum.now().timestamp()) + + while start_date <= end_date: + current_end_date = int(pendulum.from_timestamp(start_date).add(minutes=self.slice_interval_in_minutes).timestamp()) + stream_slices.append( + { + "start_date": start_date, + "end_date": min(current_end_date, end_date), + } + ) + # +1 because both dates are inclusive + start_date = current_end_date + 1 + + return stream_slices + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object @@ -128,12 +162,10 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} return {self.cursor_field: latest_benchmark} - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(stream_state=stream_state) - start_time = self._start_time - if stream_state.get(self.cursor_field): - start_time = stream_state[self.cursor_field] - params.update({"start_time": start_time, "end_time": pendulum.now().int_timestamp}) + def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + # To query data for one stream slice at a time + params.update({"start_time": stream_slice["start_date"], "end_time": stream_slice["end_date"]}) return params @@ -250,9 +282,9 @@ class Messages(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin): primary_key = "msg_id" limit = 1000 - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: + def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str: time_filter_template = "%Y-%m-%dT%H:%M:%SZ" - params = super().request_params(stream_state=stream_state, **kwargs) + params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) if isinstance(params["start_time"], int): date_start = datetime.datetime.utcfromtimestamp(params["start_time"]).strftime(time_filter_template) else: