Skip to content

Commit

Permalink
feat: adds ability to fetch data in slices and checkpoint progress
Browse files Browse the repository at this point in the history
  • Loading branch information
a-rampalli committed Dec 19, 2023
1 parent a84cc99 commit 2da69f0
Showing 1 changed file with 41 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import datetime
import urllib
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union, List

import pendulum
import requests
Expand Down Expand Up @@ -111,13 +111,47 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,

class SendgridStreamIncrementalMixin(HttpStream, ABC):
cursor_field = "created"
# Sendgrid API is returning data with latest cursor value first
# So we are fetching data in slices and setting checkpoint after each slice
# Setting it too low would mean more API calls and possibility of rate limit breach => more wait time for rate limit resets
slice_interval_in_minutes = 60

def __init__(self, start_time: Optional[Union[int, str]], **kwargs):
super().__init__(**kwargs)
self._start_time = start_time or 0
if isinstance(self._start_time, str):
self._start_time = int(pendulum.parse(self._start_time).timestamp())

def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
stream_slices: list = []

# use the latest date between self.start_date and stream_state
start_date = self._start_time

if stream_state and self.cursor_field and self.cursor_field in stream_state:
# Some streams have a cursor field that is not a timestamp. So convert it to timestamp before comparing
state_timestamp = stream_state[self.cursor_field]
if not isinstance(state_timestamp, int):
state_timestamp = pendulum.parse(state_timestamp).int_timestamp
start_date = max(start_date, state_timestamp)

end_date = int(pendulum.now().timestamp())

while start_date <= end_date:
current_end_date = int(pendulum.from_timestamp(start_date).add(minutes=self.slice_interval_in_minutes).timestamp())
stream_slices.append(
{
"start_date": start_date,
"end_date": min(current_end_date, end_date),
}
)
# +1 because both dates are inclusive
start_date = current_end_date + 1

This comment has been minimized.

Copy link
@nidhilashkari17

nidhilashkari17 Dec 26, 2023

@a-rampalli we are increasing time by 60 mins in every slice, why do we need to do +1 after each slice ?

This comment has been minimized.

Copy link
@a-rampalli

a-rampalli Dec 26, 2023

Author

Because both start and end timestamps are inclusive the record with cursor value of end_date X will be fetched twice: From slice ending with X and from slice starting with X. We are doing +1 to avoid next slice start_date being same as last slice end_date.


return stream_slices

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
Expand All @@ -128,12 +162,10 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}

def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state)
start_time = self._start_time
if stream_state.get(self.cursor_field):
start_time = stream_state[self.cursor_field]
params.update({"start_time": start_time, "end_time": pendulum.now().int_timestamp})
def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
params = super().request_params(stream_state, stream_slice, next_page_token)
# To query data for one stream slice at a time
params.update({"start_time": stream_slice["start_date"], "end_time": stream_slice["end_date"]})
return params


Expand Down Expand Up @@ -250,9 +282,9 @@ class Messages(SendgridStreamOffsetPagination, SendgridStreamIncrementalMixin):
primary_key = "msg_id"
limit = 1000

def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None) -> str:
time_filter_template = "%Y-%m-%dT%H:%M:%SZ"
params = super().request_params(stream_state=stream_state, **kwargs)
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
if isinstance(params["start_time"], int):
date_start = datetime.datetime.utcfromtimestamp(params["start_time"]).strftime(time_filter_template)
else:
Expand Down

0 comments on commit 2da69f0

Please sign in to comment.