diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py index a8242f4ebdd2..5d377a5342f0 100644 --- a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py @@ -8,12 +8,13 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple from abc import ABC from urllib.parse import parse_qsl, urlparse -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import requests from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import IncrementalMixin, Stream from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.models import SyncMode stream_json_schema = { "$schema": "http://json-schema.org/draft-07/schema#", @@ -86,10 +87,14 @@ def get_json_schema(self): @property def state(self) -> Mapping[str, Any]: """State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}""" + retval = {} + if self._cursor_value: - return {self.cursor_field: self._cursor_value} + retval[self.cursor_field] = self._cursor_value + else: + retval[self.cursor_field] = self.start_time - return {self.cursor_field: self.start_time} + return retval @state.setter def state(self, value: Mapping[str, Any]): @@ -97,28 +102,30 @@ def state(self, value: Mapping[str, Any]): if self.cursor_field in value: self._cursor_value = value[self.cursor_field] - def mk_tzaware_utc(self, dt): + def mk_tzaware_utc(self, dt: datetime): """ add a utc-tzinfo object to the dt if it doesn't have tzinfo if it has a tzinfo, convert to utc """ - from datetime import timezone - if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def mk_query(self): """query using endtime""" + retval = {} if self.cursor_field == "_submission_time": - return {self.cursor_field: {"$gte": self.state[self.cursor_field]}} + retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]} + else: start_sub_time = datetime.fromisoformat(self.state[self.cursor_field]) start_sub_time -= timedelta(days=self.max_days_to_close) start_sub_time = self.mk_tzaware_utc(start_sub_time) tzaware_start_time = self.mk_tzaware_utc(datetime.fromisoformat(self.start_time)) start_sub_time = max(start_sub_time, tzaware_start_time) - return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}} + retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]} + retval["_submission_time"] = {"$gte": start_sub_time.isoformat()} + return retval def request_params( self, @@ -127,7 +134,9 @@ def request_params( next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: """build the query request params""" - params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps({self.cursor_field: 1})} + sort_params = {} + sort_params[self.cursor_field] = 1 + params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps(sort_params)} query = self.mk_query() @@ -180,11 +189,19 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp retval["endtime"] = endtime.isoformat() yield retval - def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] | None = None, + stream_slice: Mapping[str, Any] | None = None, + stream_state: Mapping[str, Any] | None = None, + **kwargs, + ) -> Iterable[Mapping[str, Any]]: """read the records from the stream""" - for record in super().read_records(*args, **kwargs): - self._cursor_value = record[self.cursor_field] + for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state, **kwargs): yield record + if sync_mode == SyncMode.incremental: + self._cursor_value = max(record[self.cursor_field], self._cursor_value) if self._cursor_value else record[self.cursor_field] class KoboStreamSubmissionTime(KoboToolStream):