Skip to content

Commit

Permalink
Pagination fix for fetching all of the data. (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
xacadil authored Apr 18, 2024
1 parent ec816e1 commit 8e6399c
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions tap_restaurant365/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,31 @@ def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
# Check if pagination is enabled
if self.paginate == True:
data = response.json()
# Check for the presence of a next page link in the response data. nextLink is only present if there are more than 5000 records in filter response.
if "@odata.nextLink" in data:
# Increment the skip counter for pagination
self.skip += 5000
# Update the previous token if it exists
if previous_token:
previous_token = previous_token['token']
# Return the next page token and the updated skip value
return {"token":previous_token,"skip":self.skip}
else:
# Reset skip value for a new pagination sequence
self.skip = 0
#Pick starting value just incase progress marker is not present.
# Determine the starting replication value for data extraction
replication_key_value = self.tap_state["bookmarks"][self.name]['starting_replication_value']
# Update the replication key value if progress markers are present
if "progress_markers" in self.tap_state["bookmarks"][self.name]:
replication_key_value = self.tap_state["bookmarks"][self.name]['progress_markers']["replication_key_value"]

# Calculate the start date for data extraction
start_date = (parser.parse(replication_key_value) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
# Adjust the start date based on the previous token if applicable (will occur if progress marker is unable to find a value in empty data response)
if (
previous_token
and "token" in previous_token
Expand All @@ -50,10 +59,13 @@ def get_next_page_token(
start_date = previous_token["token"] + timedelta(days=self.days_delta)
next_token = start_date.replace(tzinfo=None)

if (today - next_token).days < self.days_delta:
# Disable pagination if the next token's date is in the future
if (today - next_token).days < 0:
self.paginate = False
# Return the next token and the current skip value
return {"token":next_token,'skip':self.skip}
else:
# Return None if pagination is not enabled
return None

def get_url_params(
Expand All @@ -73,6 +85,7 @@ def get_url_params(
params["$filter"] = (
f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT23:59:59Z')}"
)
#Order by replication key so the response is consistent
params['$orderby'] = f"{self.replication_key}"
if self.name == "journal_entries":
#set a date in the stream to check later to see if we need to keep calling to the stream
Expand Down

0 comments on commit 8e6399c

Please sign in to comment.