Skip to content

Commit

Permalink
Merge pull request #69 from DalgoT4D/58-users-should-be-able-to-selec…
Browse files Browse the repository at this point in the history
…t-a-cursor-field-in-kobotoolbox-connector

KoboStreamEndTime and KoboStreamSubmissionTime classes
  • Loading branch information
fatchat authored Dec 7, 2023
2 parents 7b86e7d + a613ac8 commit 3700641
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import json
import re
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from abc import ABC
from urllib.parse import parse_qsl, urlparse
from datetime import datetime, timedelta

import requests
from airbyte_cdk.sources import AbstractSource
Expand All @@ -27,69 +29,116 @@
"data": {
"type": "object",
},
"endtime": {"type": ["string", "null"]},
"end": {"type": ["string", "null"]},
"_submission_time": {"type": ["string", "null"]},
},
}


class KoboToolStream(HttpStream, IncrementalMixin):
# pylint:disable=too-many-instance-attributes
class KoboToolStream(HttpStream, IncrementalMixin, ABC):
"""Each Kobo form is a stream"""

primary_key = "_id"
cursor_field = "_submission_time"
# submission_date_format = "%Y-%m-%dT%H:%M:%S"
# end_time_format = "%Y-%m-%dT%H:%M:%S.%.3f%z"

def __init__(self, config: Mapping[str, Any], form_id, schema, name, pagination_limit, auth_token, **kwargs):
def __init__(
self,
config: Mapping[str, Any],
form_id: str,
schema: dict,
name: str,
pagination_limit: int,
auth_token: str,
**kwargs,
):
"""constructor"""
super().__init__()
self.form_id = form_id
self.auth_token = auth_token
self.schema = schema
self.stream_name = name
self.base_url = config["base_url"]
# pylint:disable=invalid-name
self.PAGINATION_LIMIT = pagination_limit
self._cursor_value = None
self.start_time = config["start_time"]
self.max_days_to_close = config.get("max_days_to_close", 30)
self.exclude_fields = config["exclude_fields"] if "exclude_fields" in config else []

@property
def url_base(self) -> str:
"""base url for all http requests for kobo forms"""
return f"{self.base_url}/api/v2/assets/{self.form_id}/"

@property
def name(self) -> str:
# Return the english substring as stream name. If not found return form uid
"""Return the english substring as stream name. If not found return form uid"""
regex = re.compile("[^a-zA-Z ]")
s = regex.sub("", self.stream_name)
s = s.strip()
return s if len(s) > 0 else self.form_id

# State will be a dict : {'_submission_time': '2023-03-15T00:00:00.000+05:30'}
def get_json_schema(self):
"""airbyte needs this function"""
return self.schema

@property
def state(self) -> Mapping[str, Any]:
"""State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}"""
if self._cursor_value:
return {self.cursor_field: self._cursor_value}
else:
return {self.cursor_field: self.start_time}

return {self.cursor_field: self.start_time}

@state.setter
def state(self, value: Mapping[str, Any]):
"""setter for state"""
self._cursor_value = value[self.cursor_field]

def get_json_schema(self):
return self.schema
def mk_tzaware_utc(self, dt):
"""
add a utc-tzinfo object to the dt if it doesn't have tzinfo
if it has a tzinfo, convert to utc
"""
from datetime import timezone

if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)

def mk_query(self):
"""query using endtime"""
if self.cursor_field == "_submission_time":
return {self.cursor_field: {"$gte": self.state[self.cursor_field]}}
else:
start_sub_time = datetime.fromisoformat(self.state[self.cursor_field])
start_sub_time -= timedelta(days=self.max_days_to_close)
start_sub_time = self.mk_tzaware_utc(start_sub_time)
tzaware_start_time = self.mk_tzaware_utc(datetime.fromisoformat(self.start_time))
start_sub_time = max(start_sub_time, tzaware_start_time)
return {"_submission_time": {"$gte": start_sub_time.isoformat()}, self.cursor_field: {"$gte": self.state[self.cursor_field]}}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
self,
stream_state: Mapping[str, Any], # pylint:disable=unused-argument
stream_slice: Mapping[str, any] = None, # pylint:disable=unused-argument
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
"""build the query request params"""
params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps({self.cursor_field: 1})}
params["query"] = json.dumps({self.cursor_field: {"$gte": self.state[self.cursor_field]}})

query = self.mk_query()

params["query"] = json.dumps(query)

if next_page_token:
params.update(next_page_token)

return params

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""pagination"""
json_response: Mapping[str, str] = response.json()
next_url = json_response.get("next")
params = None
Expand All @@ -98,15 +147,21 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
params = dict(parse_qsl(parsed_url.query))
return params

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: # pylint:disable=unused-argument
"""airbyte needs this function"""
return "data.json"

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
self,
stream_state: Mapping[str, Any], # pylint:disable=unused-argument
stream_slice: Mapping[str, Any] = None, # pylint:disable=unused-argument
next_page_token: Mapping[str, Any] = None, # pylint:disable=unused-argument
) -> Mapping[str, Any]:
"""build the request headers"""
return {"Authorization": "Token " + self.auth_token}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""parse the response and yield the records"""
json_response = response.json()
result = json_response.get("results")

Expand All @@ -115,49 +170,68 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
if to_remove_field in record:
record.pop(to_remove_field)
retval = {"_id": record["_id"], "data": record}
retval[self.cursor_field] = record[self.cursor_field]
retval["_submission_time"] = record["_submission_time"]
retval["endtime"] = record.get("endtime")
retval["end"] = record.get("end")
if retval["endtime"]:
# endtime is in utc
endtime = self.mk_tzaware_utc(datetime.fromisoformat(retval["endtime"]))
retval["endtime"] = endtime.isoformat()
yield retval

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
"""read the records from the stream"""
for record in super().read_records(*args, **kwargs):
self._cursor_value = record[self.cursor_field]
yield record


class KoboStreamSubmissionTime(KoboToolStream):
"""KoboStreamSubmissionTime"""

cursor_field = "_submission_time"


class KoboStreamEndTime(KoboToolStream):
"""KoboStreamEndTime"""

cursor_field = "endtime"


class KoboStreamEnd(KoboToolStream):
"""KoboStreamEnd"""

cursor_field = "end"


class SourceKobotoolbox(AbstractSource):
"""One instance per sync"""

# API_URL = "https://kf.kobotoolbox.org/api/v2"
# TOKEN_URL = "https://kf.kobotoolbox.org/token/?format=json"
PAGINATION_LIMIT = 30000

@classmethod
def _check_credentials(cls, config: Mapping[str, Any]) -> Tuple[bool, Any]:
# check if the credentials are provided correctly, because for now these value are not required in spec
if not config.get("username"):
return False, "username in credentials is not provided"

if not config.get("password"):
return False, "password in credentials is not provided"

return True, None

def get_access_token(self, config) -> Tuple[str, any]:
"""get the access token for the given credentials"""
token_url = f"{config['base_url']}/token/?format=json"

auth = (config["username"], config["password"])
try:
response = requests.post(token_url, auth=(config["username"], config["password"]))
response = requests.post(token_url, auth=auth, timeout=30)
response.raise_for_status()
json_response = response.json()
return (json_response.get("token", None), None) if json_response is not None else (None, None)
except requests.exceptions.RequestException as e:
return None, e
except requests.exceptions.RequestException:
return None, "error"

json_response = response.json()
if json_response is not None:
return json_response.get("token"), None

def check_connection(self, logger, config) -> Tuple[bool, any]:
is_valid_credentials, msg = self._check_credentials(config)
if not is_valid_credentials:
return is_valid_credentials, msg
return None, "error"

def check_connection(self, logger, config) -> Tuple[bool, any]: # pylint:disable=unused-argument
"""check the connection with the credentials provided"""
url = f"{config['base_url']}/api/v2/assets.json"
response = requests.get(url, auth=(config["username"], config["password"]))
auth = (config["username"], config["password"])
response = requests.get(url, auth=auth, timeout=30)

try:
response.raise_for_status()
Expand All @@ -167,29 +241,49 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
return True, None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# Fetch all assets(forms)
"""Fetch all assets(forms)"""
url = f"{config['base_url']}/api/v2/assets.json"
response = requests.get(url, auth=(config["username"], config["password"]))
auth = (config["username"], config["password"])
response = requests.get(url, auth=auth, timeout=30)
json_response = response.json()
key_list = json_response.get("results")

# Generate a auth token for all streams
auth_token, msg = self.get_access_token(config)
auth_token, msg = self.get_access_token(config) # pylint:disable=unused-variable
if auth_token is None:
return []

# Generate array of stream objects
streams = []
for form_dict in key_list:
if form_dict["has_deployment"]:
stream = KoboToolStream(
config=config,
form_id=form_dict["uid"],
schema=stream_json_schema,
name=form_dict["name"],
pagination_limit=self.PAGINATION_LIMIT,
auth_token=auth_token,
)
if "forms_using_endtime" in config and form_dict["name"] in config["forms_using_endtime"]:
stream = KoboStreamEndTime(
config=config,
form_id=form_dict["uid"],
schema=stream_json_schema,
name=form_dict["name"],
pagination_limit=self.PAGINATION_LIMIT,
auth_token=auth_token,
)
elif "forms_using_end" in config and form_dict["name"] in config["forms_using_end"]:
stream = KoboStreamEnd(
config=config,
form_id=form_dict["uid"],
schema=stream_json_schema,
name=form_dict["name"],
pagination_limit=self.PAGINATION_LIMIT,
auth_token=auth_token,
)
else:
stream = KoboStreamSubmissionTime(
config=config,
form_id=form_dict["uid"],
schema=stream_json_schema,
name=form_dict["name"],
pagination_limit=self.PAGINATION_LIMIT,
auth_token=auth_token,
)
streams.append(stream)

return streams
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,21 @@ connectionSpecification:
exclude_fields:
type: array
title: Exclude Fields
description: Column names that you dont want to sync
description: Column names not to sync
order: 5
forms_using_endtime:
type: array
title: Forms Using Endtime
description: List of forms that use endtime instead of submission time
order: 6
forms_using_end:
type: array
title: Forms Using End
description: List of forms that use end instead of submission time
order: 6
max_days_to_close:
type: integer
title: Max Days To Close
description: The maximum number of days between a form's submission date and end date, for those forms listed above
default: 30
order: 7

0 comments on commit 3700641

Please sign in to comment.