diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py index bfaf4c5b6d31..a32d0b5746f8 100644 --- a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py @@ -102,6 +102,16 @@ def mk_query(self): else: start_sub_time = datetime.fromisoformat(self.state[self.cursor_field]) start_sub_time -= timedelta(days=self.max_days_to_close) + from datetime import timezone + + tzaware_start_time = datetime.fromisoformat(self.start_time) + if tzaware_start_time.tzinfo is None: + # interpret as utc + tzaware_start_time = tzaware_start_time.replace(tzinfo=timezone.utc) + else: + # convert to utc if necessary + tzaware_start_time = tzaware_start_time.astimezone(timezone.utc) + start_sub_time = max(start_sub_time, tzaware_start_time) return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}} def request_params( @@ -157,6 +167,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp retval = {"_id": record["_id"], "data": record} retval["_submission_time"] = record["_submission_time"] retval["endtime"] = record.get("endtime") + if retval["endtime"]: + from datetime import timezone + + # endtime is in utc + endtime = datetime.fromisoformat(retval["endtime"]).astimezone(timezone.utc) + retval["endtime"] = endtime.isoformat() yield retval def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: