Skip to content

Commit

Permalink
refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit Chatterjee committed Dec 2, 2023
1 parent cc1c3b6 commit 25e3d5a
Showing 1 changed file with 20 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ class KoboToolStream(HttpStream, IncrementalMixin, ABC):
"""Each Kobo form is a stream"""

primary_key = "_id"
# cursor_field = "_submission_time"
# submission_date_format = "%Y-%m-%dT%H:%M:%S"
# end_time_format = "%Y-%m-%dT%H:%M:%S.%.3f%z"

def __init__(
self,
Expand Down Expand Up @@ -85,9 +82,27 @@ def get_json_schema(self):
"""airbyte needs this function"""
return self.schema

@abstractmethod
@property
def state(self) -> Mapping[str, Any]:
"""State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}"""
if self._cursor_value:
return {self.cursor_field: self._cursor_value}

return {self.cursor_field: self.start_time}

@state.setter
def state(self, value: Mapping[str, Any]):
"""setter for state"""
self._cursor_value = value[self.cursor_field]

def mk_query(self):
"""abstract method"""
"""query using endtime"""
if self.cursor_field == "_submission_time":
return {self.cursor_field: {"$gte": self.state[self.cursor_field]}}
else:
start_sub_time = datetime.fromisoformat(self.state[self.cursor_field])
start_sub_time -= timedelta(days=self.max_days_to_close)
return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}}

def request_params(
self,
Expand Down Expand Up @@ -156,48 +171,12 @@ class KoboStreamSubmissionTime(KoboToolStream):

cursor_field = "_submission_time"

@property
def state(self) -> Mapping[str, Any]:
"""State will be a dict : {'_submission_time': '2023-03-15T00:00:00.000+05:30'}"""
if self._cursor_value:
return {"_submission_time": self._cursor_value}

return {"_submission_time": self.start_time}

@state.setter
def state(self, value: Mapping[str, Any]):
"""setter for state"""
self._cursor_value = value["_submission_time"]

def mk_query(self):
"""query using _submittion_time"""
return {"_submission_time": {"$gte": self.state["_submission_time"]}}


class KoboStreamEndTime(KoboToolStream):
"""KoboStreamEndTime"""

cursor_field = "endtime"

@property
def state(self) -> Mapping[str, Any]:
"""State will be a dict : {'endtime': '2023-03-15T00:00:00.000+05:30'}"""
if self._cursor_value:
return {"endtime": self._cursor_value}

return {"endtime": self.start_time}

@state.setter
def state(self, value: Mapping[str, Any]):
"""setter for state"""
self._cursor_value = value["endtime"]

def mk_query(self):
"""query using endtime"""
start_sub_time = datetime.fromisoformat(self.state["endtime"])
start_sub_time -= timedelta(days=self.max_days_to_close)
return {"_submission_time": {"$gte": start_sub_time.isoformat()}, "endtime": {"$gte": self.state["endtime"]}}


class SourceKobotoolbox(AbstractSource):
"""One instance per sync"""
Expand Down

0 comments on commit 25e3d5a

Please sign in to comment.