Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove flattening logic surveycto #78

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,25 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

from .helpers import Helpers

stream_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {
"KEY": {
"type": [
"string",
"null",
]
},
"endtime": {"type": ["string", "null"]},
"data": {
"type": "object",
},
"SubmissionDate": {"type": ["string", "null"]},
},
}

class SurveyStream(HttpStream, ABC):
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
Expand Down Expand Up @@ -94,13 +111,17 @@ def parse_response(
) -> Iterable[Mapping]:
self.response_json = response.json()

for data in self.response_json:
try:
yield data
except Exception as e:
msg = "Encountered an exception parsing schema"
self.logger.exception(msg)
raise e
for record in self.response_json:
# send data, key, submission date and endtime
record_id = record.get("KEY")
submission_date = record.get("SubmissionDate")
endtime = record.get("endtime")

retval = {"KEY": record_id, "data": record}
retval["SubmissionDate"] = submission_date
retval["endtime"] = endtime

yield retval

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
Expand All @@ -116,10 +137,7 @@ def check_connection(self, logger, config) -> Tuple[bool, Any]:

try:
for form_id in form_ids:
schema = Helpers.call_survey_cto(config, form_id)
filter_data = Helpers.get_filter_data(schema)
schema_res = Helpers.get_json_schema(filter_data)
stream = SurveyctoStream(config=config, form_id=form_id, schema=schema_res)
stream = SurveyctoStream(config=config, form_id=form_id, schema=stream_json_schema)
next(stream.read_records(sync_mode=SyncMode.full_refresh))

return True, None
Expand All @@ -132,10 +150,7 @@ def generate_streams(self, config: str) -> List[Stream]:
streams = []

for form_id in forms:
schema = Helpers.call_survey_cto(config, form_id)
filter_data = Helpers.get_filter_data(schema)
schema_res = Helpers.get_json_schema(filter_data)
stream = SurveyctoStream(config=config, form_id=form_id, schema=schema_res)
stream = SurveyctoStream(config=config, form_id=form_id, schema=stream_json_schema)
streams.append(stream)
return streams

Expand Down
Loading