Skip to content

Commit

Permalink
Merge pull request #93 from DalgoT4D/feature/source-surveycto
Browse files Browse the repository at this point in the history
Feature/source surveycto
  • Loading branch information
fatchat authored Aug 14, 2024
2 parents 0066e5c + f26e029 commit 90ee811
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 82 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,25 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

from .helpers import Helpers

stream_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {
"KEY": {
"type": [
"string",
"null",
]
},
"endtime": {"type": ["string", "null"]},
"data": {
"type": "object",
},
"SubmissionDate": {"type": ["string", "null"]},
},
}

class SurveyStream(HttpStream, ABC):
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
Expand Down Expand Up @@ -94,13 +111,17 @@ def parse_response(
) -> Iterable[Mapping]:
self.response_json = response.json()

for data in self.response_json:
try:
yield data
except Exception as e:
msg = "Encountered an exception parsing schema"
self.logger.exception(msg)
raise e
for record in self.response_json:
# send data, key, submission date and endtime
record_id = record.get("KEY")
submission_date = record.get("SubmissionDate")
endtime = record.get("endtime")

retval = {"KEY": record_id, "data": record}
retval["SubmissionDate"] = submission_date
retval["endtime"] = endtime

yield retval

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
Expand All @@ -116,10 +137,7 @@ def check_connection(self, logger, config) -> Tuple[bool, Any]:

try:
for form_id in form_ids:
schema = Helpers.call_survey_cto(config, form_id)
filter_data = Helpers.get_filter_data(schema)
schema_res = Helpers.get_json_schema(filter_data)
stream = SurveyctoStream(config=config, form_id=form_id, schema=schema_res)
stream = SurveyctoStream(config=config, form_id=form_id, schema=stream_json_schema)
next(stream.read_records(sync_mode=SyncMode.full_refresh))

return True, None
Expand All @@ -132,10 +150,7 @@ def generate_streams(self, config: str) -> List[Stream]:
streams = []

for form_id in forms:
schema = Helpers.call_survey_cto(config, form_id)
filter_data = Helpers.get_filter_data(schema)
schema_res = Helpers.get_json_schema(filter_data)
stream = SurveyctoStream(config=config, form_id=form_id, schema=schema_res)
stream = SurveyctoStream(config=config, form_id=form_id, schema=stream_json_schema)
streams.append(stream)
return streams

Expand Down

0 comments on commit 90ee811

Please sign in to comment.