Skip to content

Commit

Permalink
don't filter cases for form ids
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit Chatterjee committed Mar 14, 2024
1 parent 24b4208 commit 5d1db55
Showing 1 changed file with 15 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp


class Case(IncrementalStream):

"""
docs: https://www.commcarehq.org/a/[domain]/api/[version]/case/
"""
Expand Down Expand Up @@ -181,37 +180,30 @@ def request_params(

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
date_string = record[self.cursor_field]
if "Z" in date_string:
date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
else:
date_format = "%Y-%m-%dT%H:%M:%S.%f"
found = False
for f in record["xform_ids"]:
if f in CommcareStream.forms:
found = True
break
if found:
# Initialize date_string with a default or ensure it exists in the record
date_string = record.get(self.cursor_field, "")
if date_string: # Proceed only if date_string is not empty
if "Z" in date_string:
date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
else:
date_format = "%Y-%m-%dT%H:%M:%S.%f"
self._cursor_value = datetime.strptime(date_string, date_format)

# Make indexed_on tz aware
record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"})
# convert xform_ids field from array to comma separated list so flattening won't create
# one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra
# fields in the schema
# Convert xform_ids field from array to comma separated list
record["xform_ids"] = ",".join(record["xform_ids"])
retval = {}
retval["id"] = record["id"]
retval["indexed_on"] = record["indexed_on"]
retval["data"] = record
yield retval
if self._cursor_value.microsecond == 0:
# Airbyte converts the cursor_field value (datetime) to string when it saves the state and
# our state setter parses the saved state with a format that contains microseconds
# self._cursor_value must have non-zero microseconds for the formatting and parsing to work correctly.
# This issue would also occur if an incoming record had a timestamp with zero microseconds
self._cursor_value = self._cursor_value.replace(microsecond=10)
# This cycle of pull is complete so clear out the form ids we saved for this cycle
CommcareStream.forms.clear()

if self._cursor_value and self._cursor_value.microsecond == 0:
# Airbyte state handling adjustment
self._cursor_value = self._cursor_value.replace(
microsecond=10
) # This cycle of pull is complete so clear out the form ids we saved for this cycle


class Form(IncrementalStream):
Expand Down

0 comments on commit 5d1db55

Please sign in to comment.