Skip to content

Commit

Permalink
datetime format handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit Chatterjee committed May 27, 2024
1 parent 4dcc137 commit 0fd9431
Showing 1 changed file with 33 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator


def ensure_single_trailing_Z(dtstr: str):
"""return the dtstr with a trailing Z, appending one if it's missing"""
if dtstr.endswith("Z"):
return dtstr
return dtstr + "Z"


def parse_datetime_with_microseconds(dtstr: str):
"""parse a datetime string with or without microseconds"""
for date_format in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%f"]:
try:
return datetime.strptime(dtstr, date_format)
except ValueError:
pass
raise ValueError(f"Could not parse datetime string {dtstr}")


# Basic full refresh stream
class CommcareStream(HttpStream, ABC):
def __init__(self, project_space, form_fields_to_exclude, **kwargs):
Expand All @@ -34,10 +51,10 @@ def url_base(self) -> str:
schemas = {}

@property
def dateformat(self):
def dateformat_for_query(self) -> str:
return "%Y-%m-%dT%H:%M:%S.%f"

def scrubUnwantedFields(self, form):
def scrubUnwantedFields(self, form: dict[str, str]) -> dict:
new_dict = {}
for key, value in form.items():
if key in self.form_fields_to_exclude:
Expand Down Expand Up @@ -103,11 +120,7 @@ def state(self) -> Mapping[str, Any]:
@state.setter
def state(self, value: Mapping[str, Any]):
if self.cursor_field in value:
if "Z" in value[self.cursor_field]:
date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
else:
date_format = "%Y-%m-%dT%H:%M:%S.%f"
self._cursor_value = datetime.strptime(value[self.cursor_field], date_format)
self._cursor_value = parse_datetime_with_microseconds(value[self.cursor_field])

@property
def sync_mode(self):
Expand Down Expand Up @@ -153,7 +166,7 @@ class Case(IncrementalStream):

def __init__(self, start_date, schema, app_id, **kwargs):
super().__init__(**kwargs)
self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ")
self._cursor_value = parse_datetime_with_microseconds(start_date)
self.schema = schema

def get_json_schema(self):
Expand All @@ -176,35 +189,25 @@ def request_params(
) -> MutableMapping[str, Any]:
# start date is what we saved for forms
# if self.cursor_field in self.state else (CommcareStream.last_form_date or self.initial_date)
ix = self.state[self.cursor_field]
params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat), "order_by": "indexed_on", "limit": "5000"}
ix: datetime = self.state[self.cursor_field]
params = {"format": "json", "indexed_on_start": ix.strftime(self.dateformat_for_query), "order_by": "indexed_on", "limit": "5000"}
if next_page_token:
params.update(next_page_token)
return params

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
date_string = record[self.cursor_field]
if "Z" in date_string:
date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
else:
date_format = "%Y-%m-%dT%H:%M:%S.%f"
found = False
for f in record["xform_ids"]:
if f in CommcareStream.forms:
found = True
break
if found:
self._cursor_value = datetime.strptime(date_string, date_format)
if any(f in CommcareStream.forms for f in record["xform_ids"]):
self._cursor_value = parse_datetime_with_microseconds(record[self.cursor_field])
# Make indexed_on tz aware
record.update({"streamname": "case", "indexed_on": record["indexed_on"] + "Z"})
record.update({"streamname": "case", "indexed_on": ensure_single_trailing_Z(record["indexed_on"])})
# convert xform_ids field from array to comma separated list so flattening won't create
# one field per item. This is because some cases have up to 2000 xform_ids and we don't want 2000 extra
# fields in the schema
record["xform_ids"] = ",".join(record["xform_ids"])
retval = {}
retval["id"] = record["id"]
retval["indexed_on"] = record["indexed_on"]
retval["indexed_on"] = ensure_single_trailing_Z(record["indexed_on"])
retval["data"] = record
yield retval
if self._cursor_value.microsecond == 0:
Expand All @@ -228,7 +231,7 @@ class Form(IncrementalStream):
def __init__(self, start_date, app_id, name, xmlns, schema, **kwargs):
super().__init__(**kwargs)
self.app_id = app_id
self._cursor_value = datetime.strptime(start_date, "%Y-%m-%dT%H:%M:%SZ")
self._cursor_value = parse_datetime_with_microseconds(start_date)
self.streamname = name
self.xmlns = xmlns
self.schema = schema
Expand All @@ -249,11 +252,11 @@ def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
# if self.cursor_field in self.state else self.initial_date
ix = self.state[self.cursor_field]
ix: datetime = self.state[self.cursor_field]
params = {
"format": "json",
"app_id": self.app_id,
"indexed_on_start": ix.strftime(self.dateformat),
"indexed_on_start": ix.strftime(self.dateformat_for_query),
"order_by": "indexed_on",
"limit": "1000",
"xmlns": self.xmlns,
Expand All @@ -264,16 +267,12 @@ def request_params(

def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
date_string = record[self.cursor_field]
if "Z" in date_string:
date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
else:
date_format = "%Y-%m-%dT%H:%M:%S.%f"
self._cursor_value = datetime.strptime(date_string, date_format)
self._cursor_value = parse_datetime_with_microseconds(record[self.cursor_field])
CommcareStream.forms.add(record["id"])
newform = self.scrubUnwantedFields(record)
retval = {}
retval["id"] = newform["id"]
newform[self.cursor_field] = ensure_single_trailing_Z(newform[self.cursor_field])
retval[self.cursor_field] = newform[self.cursor_field]
retval["data"] = newform
yield retval
Expand All @@ -293,7 +292,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
args = {
"authenticator": auth,
}
stream = Application(
Application(
**{
**args,
"app_id": config["app_id"],
Expand Down

0 comments on commit 0fd9431

Please sign in to comment.