diff --git a/airbyte-integrations/connectors/source-surveycto/source_surveycto/helpers.py b/airbyte-integrations/connectors/source-surveycto/source_surveycto/helpers.py deleted file mode 100644 index c027f5f0ae5e..000000000000 --- a/airbyte-integrations/connectors/source-surveycto/source_surveycto/helpers.py +++ /dev/null @@ -1,66 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import base64 - -import requests -from bigquery_schema_generator.generate_schema import SchemaGenerator -from gbqschema_converter.gbqschema_to_jsonschema import json_representation as converter -from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry - - -class Helpers(object): - @staticmethod - def _base64_encode(string: str) -> str: - return base64.b64encode(string.encode("ascii")).decode("ascii") - - @staticmethod - def call_survey_cto(config, form_id): - server_name = config["server_name"] - start_date = config["start_date"] - user_name_password = f"{config['username']}:{config['password']}" - auth_token = Helpers._base64_encode(user_name_password) - - url = f"https://{server_name}.surveycto.com/" + f"api/v2/forms/data/wide/json/{form_id}?date={start_date}" - - retry_strategy = Retry(total=3, status_forcelist=[429, 409], method_whitelist=["HEAD", "GET", "OPTIONS"]) - adapter = HTTPAdapter(max_retries=retry_strategy) - http = requests.Session() - http.mount("https://", adapter) - http.mount("http://", adapter) - - response = http.get(url, headers={"Authorization": "Basic " + auth_token}) - response_json = response.json() - - if response.status_code != 200 and response_json["error"]: - message = response_json["error"]["message"] - raise Exception(message) - - for data in response_json: - try: - yield data - except Exception as e: - raise e - - return data - - @staticmethod - def get_filter_data(data): - generator = SchemaGenerator(input_format="dict", infer_mode="NULLABLE", preserve_input_sort_order="true") - - schema_map, error_logs = generator.deduce_schema(input_data=data) - schema = generator.flatten_schema(schema_map) - schema_json = converter(schema) - schema = schema_json["definitions"]["element"]["properties"] - return schema - - @staticmethod - def get_json_schema(schema): - json_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "object", - "properties": schema, - } - return json_schema diff --git a/airbyte-integrations/connectors/source-surveycto/source_surveycto/source.py b/airbyte-integrations/connectors/source-surveycto/source_surveycto/source.py index 816b0dad755e..67d27eec9900 100644 --- a/airbyte-integrations/connectors/source-surveycto/source_surveycto/source.py +++ b/airbyte-integrations/connectors/source-surveycto/source_surveycto/source.py @@ -14,8 +14,25 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -from .helpers import Helpers +stream_json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": True, + "properties": { + "KEY": { + "type": [ + "string", + "null", + ] + }, + "endtime": {"type": ["string", "null"]}, + "data": { + "type": "object", + }, + "SubmissionDate": {"type": ["string", "null"]}, + }, +} class SurveyStream(HttpStream, ABC): transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) @@ -94,13 +111,17 @@ def parse_response( ) -> Iterable[Mapping]: self.response_json = response.json() - for data in self.response_json: - try: - yield data - except Exception as e: - msg = "Encountered an exception parsing schema" - self.logger.exception(msg) - raise e + for record in self.response_json: + # send data, key, submission date and endtime + record_id = record.get("KEY") + submission_date = record.get("SubmissionDate") + endtime = record.get("endtime") + + retval = {"KEY": record_id, "data": record} + retval["SubmissionDate"] = submission_date + retval["endtime"] = endtime + + yield retval def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: for record in super().read_records(*args, **kwargs): @@ -116,10 +137,7 @@ def check_connection(self, logger, config) -> Tuple[bool, Any]: try: for form_id in form_ids: - schema = Helpers.call_survey_cto(config, form_id) - filter_data = Helpers.get_filter_data(schema) - schema_res = Helpers.get_json_schema(filter_data) - stream = SurveyctoStream(config=config, form_id=form_id, schema=schema_res) + stream = SurveyctoStream(config=config, form_id=form_id, schema=stream_json_schema) next(stream.read_records(sync_mode=SyncMode.full_refresh)) return True, None @@ -132,10 +150,7 @@ def generate_streams(self, config: str) -> List[Stream]: streams = [] for form_id in forms: - schema = Helpers.call_survey_cto(config, form_id) - filter_data = Helpers.get_filter_data(schema) - schema_res = Helpers.get_json_schema(filter_data) - stream = SurveyctoStream(config=config, form_id=form_id, schema=schema_res) + stream = SurveyctoStream(config=config, form_id=form_id, schema=stream_json_schema) streams.append(stream) return streams