diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py index 096a67181e59..4d590bc988b7 100644 --- a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py @@ -6,7 +6,9 @@ import json import re from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from abc import ABC from urllib.parse import parse_qsl, urlparse +from datetime import datetime, timedelta import requests from airbyte_cdk.sources import AbstractSource @@ -27,62 +29,108 @@ "data": { "type": "object", }, + "endtime": {"type": ["string", "null"]}, + "end": {"type": ["string", "null"]}, "_submission_time": {"type": ["string", "null"]}, }, } -class KoboToolStream(HttpStream, IncrementalMixin): +# pylint:disable=too-many-instance-attributes +class KoboToolStream(HttpStream, IncrementalMixin, ABC): + """Each Kobo form is a stream""" + primary_key = "_id" - cursor_field = "_submission_time" - # submission_date_format = "%Y-%m-%dT%H:%M:%S" - # end_time_format = "%Y-%m-%dT%H:%M:%S.%.3f%z" - def __init__(self, config: Mapping[str, Any], form_id, schema, name, pagination_limit, auth_token, **kwargs): + def __init__( + self, + config: Mapping[str, Any], + form_id: str, + schema: dict, + name: str, + pagination_limit: int, + auth_token: str, + **kwargs, + ): + """constructor""" super().__init__() self.form_id = form_id self.auth_token = auth_token self.schema = schema self.stream_name = name self.base_url = config["base_url"] + # pylint:disable=invalid-name self.PAGINATION_LIMIT = pagination_limit self._cursor_value = None self.start_time = config["start_time"] + self.max_days_to_close = config.get("max_days_to_close", 30) self.exclude_fields = config["exclude_fields"] if "exclude_fields" in config else [] @property def url_base(self) -> str: + """base url for all http requests for kobo forms""" return f"{self.base_url}/api/v2/assets/{self.form_id}/" @property def name(self) -> str: - # Return the english substring as stream name. If not found return form uid + """Return the english substring as stream name. If not found return form uid""" regex = re.compile("[^a-zA-Z ]") s = regex.sub("", self.stream_name) s = s.strip() return s if len(s) > 0 else self.form_id - # State will be a dict : {'_submission_time': '2023-03-15T00:00:00.000+05:30'} + def get_json_schema(self): + """airbyte needs this function""" + return self.schema @property def state(self) -> Mapping[str, Any]: + """State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}""" if self._cursor_value: return {self.cursor_field: self._cursor_value} - else: - return {self.cursor_field: self.start_time} + + return {self.cursor_field: self.start_time} @state.setter def state(self, value: Mapping[str, Any]): + """setter for state""" self._cursor_value = value[self.cursor_field] - def get_json_schema(self): - return self.schema + def mk_tzaware_utc(self, dt): + """ + add a utc-tzinfo object to the dt if it doesn't have tzinfo + if it has a tzinfo, convert to utc + """ + from datetime import timezone + + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + def mk_query(self): + """query using endtime""" + if self.cursor_field == "_submission_time": + return {self.cursor_field: {"$gte": self.state[self.cursor_field]}} + else: + start_sub_time = datetime.fromisoformat(self.state[self.cursor_field]) + start_sub_time -= timedelta(days=self.max_days_to_close) + start_sub_time = self.mk_tzaware_utc(start_sub_time) + tzaware_start_time = self.mk_tzaware_utc(datetime.fromisoformat(self.start_time)) + start_sub_time = max(start_sub_time, tzaware_start_time) + return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}} def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + self, + stream_state: Mapping[str, Any], # pylint:disable=unused-argument + stream_slice: Mapping[str, any] = None, # pylint:disable=unused-argument + next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: + """build the query request params""" params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps({self.cursor_field: 1})} - params["query"] = json.dumps({self.cursor_field: {"$gte": self.state[self.cursor_field]}}) + + query = self.mk_query() + + params["query"] = json.dumps(query) if next_page_token: params.update(next_page_token) @@ -90,6 +138,7 @@ def request_params( return params def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """pagination""" json_response: Mapping[str, str] = response.json() next_url = json_response.get("next") params = None @@ -98,15 +147,21 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, params = dict(parse_qsl(parsed_url.query)) return params - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: # pylint:disable=unused-argument + """airbyte needs this function""" return "data.json" def request_headers( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + self, + stream_state: Mapping[str, Any], # pylint:disable=unused-argument + stream_slice: Mapping[str, Any] = None, # pylint:disable=unused-argument + next_page_token: Mapping[str, Any] = None, # pylint:disable=unused-argument ) -> Mapping[str, Any]: + """build the request headers""" return {"Authorization": "Token " + self.auth_token} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """parse the response and yield the records""" json_response = response.json() result = json_response.get("results") @@ -115,49 +170,68 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp if to_remove_field in record: record.pop(to_remove_field) retval = {"_id": record["_id"], "data": record} - retval[self.cursor_field] = record[self.cursor_field] + retval["_submission_time"] = record["_submission_time"] + retval["endtime"] = record.get("endtime") + retval["end"] = record.get("end") + if retval["endtime"]: + # endtime is in utc + endtime = self.mk_tzaware_utc(datetime.fromisoformat(retval["endtime"])) + retval["endtime"] = endtime.isoformat() yield retval def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + """read the records from the stream""" for record in super().read_records(*args, **kwargs): self._cursor_value = record[self.cursor_field] yield record +class KoboStreamSubmissionTime(KoboToolStream): + """KoboStreamSubmissionTime""" + + cursor_field = "_submission_time" + + +class KoboStreamEndTime(KoboToolStream): + """KoboStreamEndTime""" + + cursor_field = "endtime" + + +class KoboStreamEnd(KoboToolStream): + """KoboStreamEnd""" + + cursor_field = "end" + + class SourceKobotoolbox(AbstractSource): + """One instance per sync""" + # API_URL = "https://kf.kobotoolbox.org/api/v2" # TOKEN_URL = "https://kf.kobotoolbox.org/token/?format=json" PAGINATION_LIMIT = 30000 - @classmethod - def _check_credentials(cls, config: Mapping[str, Any]) -> Tuple[bool, Any]: - # check if the credentials are provided correctly, because for now these value are not required in spec - if not config.get("username"): - return False, "username in credentials is not provided" - - if not config.get("password"): - return False, "password in credentials is not provided" - - return True, None - def get_access_token(self, config) -> Tuple[str, any]: + """get the access token for the given credentials""" token_url = f"{config['base_url']}/token/?format=json" - + auth = (config["username"], config["password"]) try: - response = requests.post(token_url, auth=(config["username"], config["password"])) + response = requests.post(token_url, auth=auth, timeout=30) response.raise_for_status() - json_response = response.json() - return (json_response.get("token", None), None) if json_response is not None else (None, None) - except requests.exceptions.RequestException as e: - return None, e + except requests.exceptions.RequestException: + return None, "error" + + json_response = response.json() + if json_response is not None: + return json_response.get("token"), None - def check_connection(self, logger, config) -> Tuple[bool, any]: - is_valid_credentials, msg = self._check_credentials(config) - if not is_valid_credentials: - return is_valid_credentials, msg + return None, "error" + def check_connection(self, logger, config) -> Tuple[bool, any]: # pylint:disable=unused-argument + """check the connection with the credentials provided""" url = f"{config['base_url']}/api/v2/assets.json" - response = requests.get(url, auth=(config["username"], config["password"])) + auth = (config["username"], config["password"]) + response = requests.get(url, auth=auth, timeout=30) try: response.raise_for_status() @@ -167,14 +241,15 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: return True, None def streams(self, config: Mapping[str, Any]) -> List[Stream]: - # Fetch all assets(forms) + """Fetch all assets(forms)""" url = f"{config['base_url']}/api/v2/assets.json" - response = requests.get(url, auth=(config["username"], config["password"])) + auth = (config["username"], config["password"]) + response = requests.get(url, auth=auth, timeout=30) json_response = response.json() key_list = json_response.get("results") # Generate a auth token for all streams - auth_token, msg = self.get_access_token(config) + auth_token, msg = self.get_access_token(config) # pylint:disable=unused-variable if auth_token is None: return [] @@ -182,14 +257,33 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams = [] for form_dict in key_list: if form_dict["has_deployment"]: - stream = KoboToolStream( - config=config, - form_id=form_dict["uid"], - schema=stream_json_schema, - name=form_dict["name"], - pagination_limit=self.PAGINATION_LIMIT, - auth_token=auth_token, - ) + if "forms_using_endtime" in config and form_dict["name"] in config["forms_using_endtime"]: + stream = KoboStreamEndTime( + config=config, + form_id=form_dict["uid"], + schema=stream_json_schema, + name=form_dict["name"], + pagination_limit=self.PAGINATION_LIMIT, + auth_token=auth_token, + ) + elif "forms_using_end" in config and form_dict["name"] in config["forms_using_end"]: + stream = KoboStreamEnd( + config=config, + form_id=form_dict["uid"], + schema=stream_json_schema, + name=form_dict["name"], + pagination_limit=self.PAGINATION_LIMIT, + auth_token=auth_token, + ) + else: + stream = KoboStreamSubmissionTime( + config=config, + form_id=form_dict["uid"], + schema=stream_json_schema, + name=form_dict["name"], + pagination_limit=self.PAGINATION_LIMIT, + auth_token=auth_token, + ) streams.append(stream) return streams diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml index b2878a1f5271..a0404d2e3cf8 100644 --- a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml @@ -39,5 +39,21 @@ connectionSpecification: exclude_fields: type: array title: Exclude Fields - description: Column names that you dont want to sync + description: Column names not to sync order: 5 + forms_using_endtime: + type: array + title: Forms Using Endtime + description: List of forms that use endtime instead of submission time + order: 6 + forms_using_end: + type: array + title: Forms Using End + description: List of forms that use end instead of submission time + order: 6 + max_days_to_close: + type: integer + title: Max Days To Close + description: The maximum number of days between a form's submission date and end date, for those forms listed above + default: 30 + order: 7 \ No newline at end of file