From ddd710525194d3cf37844d9b17f17faf0d4e2a8d Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Thu, 19 Dec 2024 11:59:12 +0530 Subject: [PATCH] kobotoolbox connector --- .../source-kobotoolbox/.dockerignore | 6 + .../connectors/source-kobotoolbox/Dockerfile | 35 ++ .../connectors/source-kobotoolbox/README.md | 129 ++++++++ .../acceptance-test-config.yml | 37 +++ .../acceptance-test-docker.sh | 16 + .../source-kobotoolbox/build.gradle | 9 + .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 5 + .../integration_tests/acceptance.py | 16 + .../integration_tests/configured_catalog.json | 20 ++ .../integration_tests/invalid_config.json | 4 + .../connectors/source-kobotoolbox/main.py | 13 + .../source-kobotoolbox/requirements.txt | 3 + .../sample_files/configured_catalog.json | 16 + .../connectors/source-kobotoolbox/setup.py | 23 ++ .../source_kobotoolbox/__init__.py | 8 + .../source_kobotoolbox/source.py | 307 ++++++++++++++++++ .../source_kobotoolbox/spec.yaml | 59 ++++ .../unit_tests/test_source.py | 35 ++ .../unit_tests/test_stream.py | 103 ++++++ 20 files changed, 847 insertions(+) create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/.dockerignore create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/Dockerfile create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/README.md create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-config.yml create mode 100755 airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/build.gradle create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/main.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/requirements.txt create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/sample_files/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/setup.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/__init__.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_stream.py diff --git a/airbyte-integrations/connectors/source-kobotoolbox/.dockerignore b/airbyte-integrations/connectors/source-kobotoolbox/.dockerignore new file mode 100644 index 000000000000..39a284db1c82 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/.dockerignore @@ -0,0 +1,6 @@ +* +!Dockerfile +!main.py +!source_kobotoolbox +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-kobotoolbox/Dockerfile b/airbyte-integrations/connectors/source-kobotoolbox/Dockerfile new file mode 100644 index 000000000000..3178d789c975 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/Dockerfile @@ -0,0 +1,35 @@ +FROM python:3.10.8-slim-bullseye as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apt-get update && apt-get install -y && rm -rf /var/lib/apt/lists/* \ + # apk --no-cache upgrade \ + && pip install --upgrade pip \ + && python3 -m pip install --upgrade setuptools + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# copy payload code only +COPY main.py ./ +COPY source_kobotoolbox ./source_kobotoolbox + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.2.0 +LABEL io.airbyte.name=tech4dev/source-kobotoolbox diff --git a/airbyte-integrations/connectors/source-kobotoolbox/README.md b/airbyte-integrations/connectors/source-kobotoolbox/README.md new file mode 100644 index 000000000000..60d9a6a32da3 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/README.md @@ -0,0 +1,129 @@ +# Kobotoolbox Source + +This is the repository for the Kobotoolbox source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/kobotoolbox). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +From the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-kobotoolbox:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/kobotoolbox) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_kobotoolbox/spec.yaml` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source kobotoolbox test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-kobotoolbox:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-kobotoolbox:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-kobotoolbox:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-kobotoolbox:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-kobotoolbox:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-kobotoolbox:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-kobotoolbox:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-kobotoolbox:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-config.yml b/airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-config.yml new file mode 100644 index 000000000000..c583f1f8522c --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-config.yml @@ -0,0 +1,37 @@ +# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-kobotoolbox:dev +acceptance_tests: + spec: + tests: + - spec_path: "source_kobotoolbox/spec.yaml" + connection: + tests: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + tests: + - config_path: "secrets/config.json" + basic_read: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + # TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file + # expect_records: + # path: "integration_tests/expected_records.jsonl" + # extra_fields: no + # exact_order: no + # extra_records: yes + incremental: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state: + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-docker.sh new file mode 100755 index 000000000000..a8d6ac4bb608 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-) + +# Pull latest acctest image +docker pull airbyte/connector-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/connector-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-kobotoolbox/build.gradle b/airbyte-integrations/connectors/source-kobotoolbox/build.gradle new file mode 100644 index 000000000000..6145c717e597 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-connector-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_kobotoolbox_singer' +} diff --git a/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/__init__.py b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..7d8191917b77 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/abnormal_state.json @@ -0,0 +1,5 @@ +{ + "Daily Issue Form": { + "endtime": "2024-01-01T00:00:00" + } +} diff --git a/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/acceptance.py new file mode 100644 index 000000000000..43ce950d77ca --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("connector_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..03016b67466f --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/configured_catalog.json @@ -0,0 +1,20 @@ +{ + "streams": [ + { + "stream": { + "name": "Employability Skill Assessment Registration", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {} + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": ["_submission_time"] + }, + "sync_mode": "incremental", + "cursor_field": ["_submission_time"], + "destination_sync_mode": "append_dedup" + } + ] +} diff --git a/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/invalid_config.json new file mode 100644 index 000000000000..ce8bb8d0c83b --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/integration_tests/invalid_config.json @@ -0,0 +1,4 @@ +{ + "username": "thisisarandomusername", + "password": "thisisarandompassword" +} diff --git a/airbyte-integrations/connectors/source-kobotoolbox/main.py b/airbyte-integrations/connectors/source-kobotoolbox/main.py new file mode 100644 index 000000000000..a8d216dc7d6d --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_kobotoolbox import SourceKobotoolbox + +if __name__ == "__main__": + source = SourceKobotoolbox() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-kobotoolbox/requirements.txt b/airbyte-integrations/connectors/source-kobotoolbox/requirements.txt new file mode 100644 index 000000000000..9ce85523c234 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/requirements.txt @@ -0,0 +1,3 @@ +# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. +-e ../../bases/connector-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-kobotoolbox/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-kobotoolbox/sample_files/configured_catalog.json new file mode 100644 index 000000000000..881eba93c7bb --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/sample_files/configured_catalog.json @@ -0,0 +1,16 @@ +{ + "streams": [ + { + "stream": { + "name": "kobo_tool_stream", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["_submission_time"] + }, + "sync_mode": "incremental", + "cursor_field": ["_submission_time"], + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-kobotoolbox/setup.py b/airbyte-integrations/connectors/source-kobotoolbox/setup.py new file mode 100644 index 000000000000..7a577f54d4df --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/setup.py @@ -0,0 +1,23 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2"] + +TEST_REQUIREMENTS = ["pytest~=6.2", "connector-acceptance-test", "requests_mock"] + +setup( + name="source_kobotoolbox", + description="Source implementation for Kobotoolbox.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/__init__.py b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/__init__.py new file mode 100644 index 000000000000..e689e3311aa1 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceKobotoolbox + +__all__ = ["SourceKobotoolbox"] diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py new file mode 100644 index 000000000000..5d377a5342f0 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/source.py @@ -0,0 +1,307 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import json +import re +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from abc import ABC +from urllib.parse import parse_qsl, urlparse +from datetime import datetime, timedelta, timezone + +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import IncrementalMixin, Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.models import SyncMode + +stream_json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": True, + "properties": { + "_id": { + "type": [ + "number", + "null", + ] + }, + "data": { + "type": "object", + }, + "endtime": {"type": ["string", "null"]}, + "end": {"type": ["string", "null"]}, + "_submission_time": {"type": ["string", "null"]}, + }, +} + + +# pylint:disable=too-many-instance-attributes +class KoboToolStream(HttpStream, IncrementalMixin, ABC): + """Each Kobo form is a stream""" + + primary_key = "_id" + + def __init__( + self, + config: Mapping[str, Any], + form_id: str, + schema: dict, + name: str, + pagination_limit: int, + auth_token: str, + **kwargs, + ): + """constructor""" + super().__init__() + self.form_id = form_id + self.auth_token = auth_token + self.schema = schema + self.stream_name = name + self.base_url = config["base_url"] + # pylint:disable=invalid-name + self.PAGINATION_LIMIT = pagination_limit + self._cursor_value = None + self.start_time = config["start_time"] + self.max_days_to_close = config.get("max_days_to_close", 30) + self.exclude_fields = config["exclude_fields"] if "exclude_fields" in config else [] + + @property + def url_base(self) -> str: + """base url for all http requests for kobo forms""" + return f"{self.base_url}/api/v2/assets/{self.form_id}/" + + @property + def name(self) -> str: + """Return the english substring as stream name. If not found return form uid""" + regex = re.compile("[^a-zA-Z ]") + s = regex.sub("", self.stream_name) + s = s.strip() + return s if len(s) > 0 else self.form_id + + def get_json_schema(self): + """airbyte needs this function""" + return self.schema + + @property + def state(self) -> Mapping[str, Any]: + """State will be a dict : {cursor_field: '2023-03-15T00:00:00.000+05:30'}""" + retval = {} + + if self._cursor_value: + retval[self.cursor_field] = self._cursor_value + else: + retval[self.cursor_field] = self.start_time + + return retval + + @state.setter + def state(self, value: Mapping[str, Any]): + """setter for state""" + if self.cursor_field in value: + self._cursor_value = value[self.cursor_field] + + def mk_tzaware_utc(self, dt: datetime): + """ + add a utc-tzinfo object to the dt if it doesn't have tzinfo + if it has a tzinfo, convert to utc + """ + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + def mk_query(self): + """query using endtime""" + retval = {} + if self.cursor_field == "_submission_time": + retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]} + + else: + start_sub_time = datetime.fromisoformat(self.state[self.cursor_field]) + start_sub_time -= timedelta(days=self.max_days_to_close) + start_sub_time = self.mk_tzaware_utc(start_sub_time) + tzaware_start_time = self.mk_tzaware_utc(datetime.fromisoformat(self.start_time)) + start_sub_time = max(start_sub_time, tzaware_start_time) + retval[self.cursor_field] = {"$gte": self.state[self.cursor_field]} + retval["_submission_time"] = {"$gte": start_sub_time.isoformat()} + return retval + + def request_params( + self, + stream_state: Mapping[str, Any], # pylint:disable=unused-argument + stream_slice: Mapping[str, any] = None, # pylint:disable=unused-argument + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + """build the query request params""" + sort_params = {} + sort_params[self.cursor_field] = 1 + params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps(sort_params)} + + query = self.mk_query() + + params["query"] = json.dumps(query) + + if next_page_token: + params.update(next_page_token) + + return params + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """pagination""" + json_response: Mapping[str, str] = response.json() + next_url = json_response.get("next") + params = None + if next_url is not None: + parsed_url = urlparse(next_url) + params = dict(parse_qsl(parsed_url.query)) + return params + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: # pylint:disable=unused-argument + """airbyte needs this function""" + return "data.json" + + def request_headers( + self, + stream_state: Mapping[str, Any], # pylint:disable=unused-argument + stream_slice: Mapping[str, Any] = None, # pylint:disable=unused-argument + next_page_token: Mapping[str, Any] = None, # pylint:disable=unused-argument + ) -> Mapping[str, Any]: + """build the request headers""" + return {"Authorization": "Token " + self.auth_token} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """parse the response and yield the records""" + json_response = response.json() + result = json_response.get("results") + + for record in result: + for to_remove_field in self.exclude_fields: + if to_remove_field in record: + record.pop(to_remove_field) + retval = {"_id": record["_id"], "data": record} + retval["_submission_time"] = record["_submission_time"] + retval["endtime"] = record.get("endtime") + retval["end"] = record.get("end") + if retval["endtime"]: + # endtime is in utc + endtime = self.mk_tzaware_utc(datetime.fromisoformat(retval["endtime"])) + retval["endtime"] = endtime.isoformat() + yield retval + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] | None = None, + stream_slice: Mapping[str, Any] | None = None, + stream_state: Mapping[str, Any] | None = None, + **kwargs, + ) -> Iterable[Mapping[str, Any]]: + """read the records from the stream""" + for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state, **kwargs): + yield record + if sync_mode == SyncMode.incremental: + self._cursor_value = max(record[self.cursor_field], self._cursor_value) if self._cursor_value else record[self.cursor_field] + + +class KoboStreamSubmissionTime(KoboToolStream): + """KoboStreamSubmissionTime""" + + cursor_field = "_submission_time" + + +class KoboStreamEndTime(KoboToolStream): + """KoboStreamEndTime""" + + cursor_field = "endtime" + + +class KoboStreamEnd(KoboToolStream): + """KoboStreamEnd""" + + cursor_field = "end" + + +class SourceKobotoolbox(AbstractSource): + """One instance per sync""" + + # API_URL = "https://kf.kobotoolbox.org/api/v2" + # TOKEN_URL = "https://kf.kobotoolbox.org/token/?format=json" + PAGINATION_LIMIT = 30000 + + def get_access_token(self, config) -> Tuple[str, any]: + """get the access token for the given credentials""" + token_url = f"{config['base_url']}/token/?format=json" + auth = (config["username"], config["password"]) + try: + response = requests.post(token_url, auth=auth, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException: + return None, "error" + + json_response = response.json() + if json_response is not None: + return json_response.get("token"), None + + return None, "error" + + def check_connection(self, logger, config) -> Tuple[bool, any]: # pylint:disable=unused-argument + """check the connection with the credentials provided""" + url = f"{config['base_url']}/api/v2/assets.json" + auth = (config["username"], config["password"]) + response = requests.get(url, auth=auth, timeout=30) + + try: + response.raise_for_status() + except requests.exceptions.HTTPError: + return False, "Something went wrong. Please check your credentials" + + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + """Fetch all assets(forms)""" + url = f"{config['base_url']}/api/v2/assets.json" + auth = (config["username"], config["password"]) + response = requests.get(url, auth=auth, timeout=30) + json_response = response.json() + key_list = json_response.get("results") + + # Generate a auth token for all streams + auth_token, msg = self.get_access_token(config) # pylint:disable=unused-variable + if auth_token is None: + return [] + + # Generate array of stream objects + streams = [] + for form_dict in key_list: + if form_dict["has_deployment"]: + if "forms_using_endtime" in config and form_dict["name"] in config["forms_using_endtime"]: + stream = KoboStreamEndTime( + config=config, + form_id=form_dict["uid"], + schema=stream_json_schema, + name=form_dict["name"], + pagination_limit=self.PAGINATION_LIMIT, + auth_token=auth_token, + ) + elif "forms_using_end" in config and form_dict["name"] in config["forms_using_end"]: + stream = KoboStreamEnd( + config=config, + form_id=form_dict["uid"], + schema=stream_json_schema, + name=form_dict["name"], + pagination_limit=self.PAGINATION_LIMIT, + auth_token=auth_token, + ) + else: + stream = KoboStreamSubmissionTime( + config=config, + form_id=form_dict["uid"], + schema=stream_json_schema, + name=form_dict["name"], + pagination_limit=self.PAGINATION_LIMIT, + auth_token=auth_token, + ) + streams.append(stream) + + return streams diff --git a/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml new file mode 100644 index 000000000000..a0404d2e3cf8 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/source_kobotoolbox/spec.yaml @@ -0,0 +1,59 @@ +documentationUrl: https://docs.airbyte.com/integrations/sources/kobotoolbox +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Kobotoolbox Spec + type: object + additionalProperties: true + required: + - username + - password + - base_url + properties: + username: + type: string + title: Username + description: Username to authenticate into the KoboToolBox server + order: 1 + password: + type: string + title: Password + description: Password to authenticate into the KoboToolBox server + airbyte_secret: true + order: 2 + base_url: + type: string + title: Base Url + description: Base url for the kobo server + enum: + - https://kf.kobotoolbox.org + - https://kobo.humanitarianresponse.info + - https://eu.kobotoolbox.org + order: 3 + start_time: + type: string + title: Start Time + description: Any data before this date will not be replicated. + default: "2023-03-15T00:00:00" + order: 4 + pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}$ + exclude_fields: + type: array + title: Exclude Fields + description: Column names not to sync + order: 5 + forms_using_endtime: + type: array + title: Forms Using Endtime + description: List of forms that use endtime instead of submission time + order: 6 + forms_using_end: + type: array + title: Forms Using End + description: List of forms that use end instead of submission time + order: 6 + max_days_to_close: + type: integer + title: Max Days To Close + description: The maximum number of days between a form's submission date and end date, for those forms listed above + default: 30 + order: 7 \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_source.py b/airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_source.py new file mode 100644 index 000000000000..6227aa07be94 --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_source.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import pytest +from source_kobotoolbox.source import SourceKobotoolbox + + +@pytest.mark.parametrize('config, err_msg', [ + ( + {"password": "some_password"}, + "username in credentials is not provided" + ), + ( + {"username": "username"}, + "password in credentials is not provided" + ), + ( + {"username": "username", "password": "some_password"}, + 'Something went wrong. Please check your credentials' + ) +]) +def test_check_connection(config, err_msg): + response = SourceKobotoolbox().check_connection(logger=None, config=config) + assert response == (False, err_msg) + + +@pytest.mark.parametrize('config', [ + ( + {"username": "username", "password": "some_password"} + ) +]) +def test_streams(config): + response = SourceKobotoolbox().streams(config) + assert response == [] diff --git a/airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_stream.py new file mode 100644 index 000000000000..ce55843404cf --- /dev/null +++ b/airbyte-integrations/connectors/source-kobotoolbox/unit_tests/test_stream.py @@ -0,0 +1,103 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import Mock + +import json +import requests +import pytest +from source_kobotoolbox.source import KoboToolStream + +API_URL = "https://kf.kobotoolbox.org/api/v2" +PAGINATION_LIMIT = 30000 + +stream_config = { + "config": {"username": "username", "password": "my_password", "start_time": "2023-03-15T00:00:00.000+05:30"}, + "form_id": "my_form_id", + "schema": {}, + "name": "my_form", + "api_url": API_URL, + "pagination_limit": PAGINATION_LIMIT, + "auth_token": "my_token_123" +} + +CURSOR = 'endtime' + + +@pytest.mark.parametrize('config', [(stream_config)]) +def test_stream_base_url(config): + stream = KoboToolStream(**config) + assert stream.url_base == f"{config['api_url']}/assets/{config['form_id']}/" + + +@pytest.mark.parametrize('config', [(stream_config)]) +def test_json_schema(config): + stream = KoboToolStream(**config) + assert stream.get_json_schema() == {} + + +@pytest.mark.parametrize('config, next_page_token', [(stream_config, None)]) +def test_request_params(config, next_page_token): + stream = KoboToolStream(**config) + assert stream.request_params({}, None, next_page_token) == { + 'start': 0, + 'limit': config['pagination_limit'], + "sort": json.dumps({CURSOR: 1}), + "query": json.dumps({CURSOR: {"$gte": config['config']['start_time']}}) + } + + +@pytest.mark.parametrize('config, total_records, params, next_page_token', [ + ( + stream_config, + 50000, + {'start': 100, 'limit': 100}, + {'start': '200', 'limit': '100'} + ), + ( + stream_config, + 1729, + {'start': 1700, 'limit': 100}, + None + ) +]) +def test_next_page_token(config, params, next_page_token, total_records): + stream = KoboToolStream(**config) + response = Mock(spec=requests.Response) + + def fetch_next_page(params, total_records=total_records): + prev = None + next1 = None + if params['limit'] + params['start'] < total_records: + next1 = {'limit': params['limit'], 'start': params['limit'] + params['start']} + + if params['start'] > 0: + prev = params + + return (prev, next1) + + def fetch_request(response, params, url, total_records=total_records): + + (prev, next1) = fetch_next_page(params) + + if prev is not None: + prev = f"{url}?limit={prev['limit']}&start={prev['start']}" + + if next1 is not None: + next1 = f"{url}?limit={next1['limit']}&start={next1['start']}" + + response.json.return_value = { + "count": total_records, + "next": next1, + "previous": prev, + "results": [] + } + + return response + + url = stream.url_base + stream.path() + + response = fetch_request(response, params, url) + + assert next_page_token == stream.next_page_token(response)