Skip to content

Commit

Permalink
don't update the cursor in read_records unless the sync mode is incre…
Browse files Browse the repository at this point in the history
…mental

(and other formatting changes)
  • Loading branch information
Rohit Chatterjee committed Sep 15, 2024
1 parent 913bad7 commit e51017b
Showing 1 changed file with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from urllib.parse import parse_qsl, urlparse
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.models import SyncMode

stream_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -86,39 +87,45 @@ def get_json_schema(self):
@property
def state(self) -> Mapping[str, Any]:
"""State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}"""
retval = {}

if self._cursor_value:
return {self.cursor_field: self._cursor_value}
retval[self.cursor_field] = self._cursor_value
else:
retval[self.cursor_field] = self.start_time

return {self.cursor_field: self.start_time}
return retval

@state.setter
def state(self, value: Mapping[str, Any]):
"""setter for state"""
if self.cursor_field in value:
self._cursor_value = value[self.cursor_field]

def mk_tzaware_utc(self, dt):
def mk_tzaware_utc(self, dt: datetime):
"""
add a utc-tzinfo object to the dt if it doesn't have tzinfo
if it has a tzinfo, convert to utc
"""
from datetime import timezone

if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)

def mk_query(self):
"""query using endtime"""
retval = {}
if self.cursor_field == "_submission_time":
return {self.cursor_field: {"$gte": self.state[self.cursor_field]}}
retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]}

else:
start_sub_time = datetime.fromisoformat(self.state[self.cursor_field])
start_sub_time -= timedelta(days=self.max_days_to_close)
start_sub_time = self.mk_tzaware_utc(start_sub_time)
tzaware_start_time = self.mk_tzaware_utc(datetime.fromisoformat(self.start_time))
start_sub_time = max(start_sub_time, tzaware_start_time)
return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}}
retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]}
retval["_submission_time"] = {"$gte": start_sub_time.isoformat()}
return retval

def request_params(
self,
Expand All @@ -127,7 +134,9 @@ def request_params(
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""build the query request params"""
params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps({self.cursor_field: 1})}
sort_params = {}
sort_params[self.cursor_field] = 1
params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps(sort_params)}

query = self.mk_query()

Expand Down Expand Up @@ -180,11 +189,19 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
retval["endtime"] = endtime.isoformat()
yield retval

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] | None = None,
stream_slice: Mapping[str, Any] | None = None,
stream_state: Mapping[str, Any] | None = None,
**kwargs,
) -> Iterable[Mapping[str, Any]]:
"""read the records from the stream"""
for record in super().read_records(*args, **kwargs):
self._cursor_value = record[self.cursor_field]
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state, **kwargs):
yield record
if sync_mode == SyncMode.incremental:
self._cursor_value = max(record[self.cursor_field], self._cursor_value) if self._cursor_value else record[self.cursor_field]


class KoboStreamSubmissionTime(KoboToolStream):
Expand Down

0 comments on commit e51017b

Please sign in to comment.