From 4dcc1374add527deccb90816a9be081f7ba154bd Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Mon, 27 May 2024 16:48:03 +0530 Subject: [PATCH 1/3] state getter and setter for incremental-stream --- .../source-commcare/source_commcare/source.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index 5ade905c2618..f82445973c37 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -98,12 +98,16 @@ class IncrementalStream(CommcareStream, IncrementalMixin): @property def state(self) -> Mapping[str, Any]: - if self._cursor_value: - return {self.cursor_field: self._cursor_value} + return {self.cursor_field: self._cursor_value} @state.setter def state(self, value: Mapping[str, Any]): - self._cursor_value = datetime.strptime(value[self.cursor_field], self.dateformat) + if self.cursor_field in value: + if "Z" in value[self.cursor_field]: + date_format = "%Y-%m-%dT%H:%M:%S.%fZ" + else: + date_format = "%Y-%m-%dT%H:%M:%S.%f" + self._cursor_value = datetime.strptime(value[self.cursor_field], date_format) @property def sync_mode(self): @@ -140,7 +144,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class Case(IncrementalStream): - """ docs: https://www.commcarehq.org/a/[domain]/api/[version]/case/ """ From 0fd94315c449c26cd2ba11e8ba7f60518e7c06db Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Mon, 27 May 2024 23:09:37 +0530 Subject: [PATCH 2/3] datetime format handling --- .../source-commcare/source_commcare/source.py | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index f82445973c37..e45693ff6e73 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -15,6 +15,23 @@ from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +def ensure_single_trailing_Z(dtstr: str): + """return the dtstr with a trailing Z, appending one if it's missing""" + if dtstr.endswith("Z"): + return dtstr + return dtstr + "Z" + + +def parse_datetime_with_microseconds(dtstr: str): + """parse a datetime string with or without microseconds""" + for date_format in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%f"]: + try: + return datetime.strptime(dtstr, date_format) + except ValueError: + pass + raise ValueError(f"Could not parse datetime string {dtstr}") + + # Basic full refresh stream class CommcareStream(HttpStream, ABC): def __init__(self, project_space, form_fields_to_exclude, **kwargs): @@ -34,10 +51,10 @@ def url_base(self) -> str: schemas = {} @property - def dateformat(self): + def dateformat_for_query(self) -> str: return "%Y-%m-%dT%H:%M:%S.%f" - def scrubUnwantedFields(self, form): + def scrubUnwantedFields(self, form: dict[str, str]) -> dict: new_dict = {} for key, value in form.items(): if key in self.form_fields_to_exclude: @@ -103,11 +120,7 @@ def state(self) -> Mapping[str, Any]: @state.setter def state(self, value: Mapping[str, Any]): if self.cursor_field in value: - if "Z" in value[self.cursor_field]: - date_format = "%Y-%m-%dT%H:%M:%S.%fZ" - else: - date_format = "%Y-%m-%dT%H:%M:%S.%f" - self._cursor_value = datetime.strptime(value[self.cursor_field], date_format) + self._cursor_value = parse_datetime_with_microseconds(value[self.cursor_field]) @property def sync_mode(self): @@ -153,7 +166,7 @@ class Case(IncrementalStream): def __init__(self, start_date, schema, app_id, **kwargs): super().__init__(**kwargs) - self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self._cursor_value = parse_datetime_with_microseconds(start_date) self.schema = schema def get_json_schema(self): @@ -176,35 +189,25 @@ def request_params( ) -> MutableMapping[str, Any]: # start date is what we saved for forms # if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date) - ix = self.state[self.cursor_field] - params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat), "order_by": "indexed_on", "limit": "5000"} + ix: datetime = self.state[self.cursor_field] + params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat_for_query), "order_by": "indexed_on", "limit": "5000"} if next_page_token: params.update(next_page_token) return params def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: for record in super().read_records(*args, **kwargs): - date_string = record[self.cursor_field] - if "Z" in date_string: - date_format = "%Y-%m-%dT%H:%M:%S.%fZ" - else: - date_format = "%Y-%m-%dT%H:%M:%S.%f" - found = False - for f in record["xform_ids"]: - if f in CommcareStream.forms: - found = True - break - if found: - self._cursor_value = datetime.strptime(date_string, date_format) + if any(f in CommcareStream.forms for f in record["xform_ids"]): + self._cursor_value = parse_datetime_with_microseconds(record[self.cursor_field]) # Make indexed_on tz aware - record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"}) + record.update({"streamname": "case", "indexed_on": ensure_single_trailing_Z(record["indexed_on"])}) # convert xform_ids field from array to comma separated list so flattening won't create # one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra # fields in the schema record["xform_ids"] = ",".join(record["xform_ids"]) retval = {} retval["id"] = record["id"] - retval["indexed_on"] = record["indexed_on"] + retval["indexed_on"] = ensure_single_trailing_Z(record["indexed_on"]) retval["data"] = record yield retval if self._cursor_value.microsecond == 0: @@ -228,7 +231,7 @@ class Form(IncrementalStream): def __init__(self, start_date, app_id, name, xmlns, schema, **kwargs): super().__init__(**kwargs) self.app_id = app_id - self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ") + self._cursor_value = parse_datetime_with_microseconds(start_date) self.streamname = name self.xmlns = xmlns self.schema = schema @@ -249,11 +252,11 @@ def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: # if self.cursor_field in self.state else self.initial_date - ix = self.state[self.cursor_field] + ix: datetime = self.state[self.cursor_field] params = { "format": "json", "app_id": self.app_id, - "indexed_on_start": ix.strftime(self.dateformat), + "indexed_on_start": ix.strftime(self.dateformat_for_query), "order_by": "indexed_on", "limit": "1000", "xmlns": self.xmlns, @@ -264,16 +267,12 @@ def request_params( def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: for record in super().read_records(*args, **kwargs): - date_string = record[self.cursor_field] - if "Z" in date_string: - date_format = "%Y-%m-%dT%H:%M:%S.%fZ" - else: - date_format = "%Y-%m-%dT%H:%M:%S.%f" - self._cursor_value = datetime.strptime(date_string, date_format) + self._cursor_value = parse_datetime_with_microseconds(record[self.cursor_field]) CommcareStream.forms.add(record["id"]) newform = self.scrubUnwantedFields(record) retval = {} retval["id"] = newform["id"] + newform[self.cursor_field] = ensure_single_trailing_Z(newform[self.cursor_field]) retval[self.cursor_field] = newform[self.cursor_field] retval["data"] = newform yield retval @@ -293,7 +292,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: args = { "authenticator": auth, } - stream = Application( + Application( **{ **args, "app_id": config["app_id"], From 3b01d0695ab5d619a549f1dcb21ee77e2328f06c Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Tue, 28 May 2024 15:39:25 +0530 Subject: [PATCH 3/3] indexed_on is a `timestamp_with_timezone`, this is the default we are only making it explicit --- .../connectors/source-commcare/source_commcare/source.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py index e45693ff6e73..f06d7422772b 100644 --- a/airbyte-integrations/connectors/source-commcare/source_commcare/source.py +++ b/airbyte-integrations/connectors/source-commcare/source_commcare/source.py @@ -308,7 +308,11 @@ def base_schema(self): return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": {"id": {"type": "string"}, "indexed_on": {"type": "string", "format": "date-time"}, "data": {"type": "object"}}, + "properties": { + "id": {"type": "string"}, + "indexed_on": {"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"}, + "data": {"type": "object"}, + }, } def streams(self, config: Mapping[str, Any]) -> List[Stream]: