Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix full-refresh syncs #107

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from urllib.parse import parse_qsl, urlparse
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.models import SyncMode

stream_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -86,39 +87,45 @@ def get_json_schema(self):
@property
def state(self) -> Mapping[str, Any]:
"""State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}"""
retval = {}

if self._cursor_value:
return {self.cursor_field: self._cursor_value}
retval[self.cursor_field] = self._cursor_value
else:
retval[self.cursor_field] = self.start_time

return {self.cursor_field: self.start_time}
return retval

@state.setter
def state(self, value: Mapping[str, Any]):
"""setter for state"""
if self.cursor_field in value:
self._cursor_value = value[self.cursor_field]

def mk_tzaware_utc(self, dt):
def mk_tzaware_utc(self, dt: datetime):
"""
add a utc-tzinfo object to the dt if it doesn't have tzinfo
if it has a tzinfo, convert to utc
"""
from datetime import timezone

if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)

def mk_query(self):
"""query using endtime"""
retval = {}
if self.cursor_field == "_submission_time":
return {self.cursor_field: {"$gte": self.state[self.cursor_field]}}
retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]}

else:
start_sub_time = datetime.fromisoformat(self.state[self.cursor_field])
start_sub_time -= timedelta(days=self.max_days_to_close)
start_sub_time = self.mk_tzaware_utc(start_sub_time)
tzaware_start_time = self.mk_tzaware_utc(datetime.fromisoformat(self.start_time))
start_sub_time = max(start_sub_time, tzaware_start_time)
return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}}
retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]}
retval["_submission_time"] = {"$gte": start_sub_time.isoformat()}
return retval

def request_params(
self,
Expand All @@ -127,7 +134,9 @@ def request_params(
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""build the query request params"""
params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps({self.cursor_field: 1})}
sort_params = {}
sort_params[self.cursor_field] = 1
params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps(sort_params)}

query = self.mk_query()

Expand Down Expand Up @@ -180,11 +189,19 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
retval["endtime"] = endtime.isoformat()
yield retval

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] | None = None,
stream_slice: Mapping[str, Any] | None = None,
stream_state: Mapping[str, Any] | None = None,
**kwargs,
) -> Iterable[Mapping[str, Any]]:
"""read the records from the stream"""
for record in super().read_records(*args, **kwargs):
self._cursor_value = record[self.cursor_field]
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state, **kwargs):
yield record
if sync_mode == SyncMode.incremental:
self._cursor_value = max(record[self.cursor_field], self._cursor_value) if self._cursor_value else record[self.cursor_field]


class KoboStreamSubmissionTime(KoboToolStream):
Expand Down
Loading