Skip to content

Commit

Permalink
change cursor to endtime
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit Chatterjee committed Sep 28, 2023
1 parent 6095e0b commit b1a024e
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,31 @@
"null",
]
},
"endtime": {"type": ["string", "null"]}
"endtime": {"type": ["string", "null"]},
},
}


class KoboToolStream(HttpStream, IncrementalMixin):
"""
submission_date_format = "%Y-%m-%dT%H:%M:%S"
end_time_format = "%Y-%m-%dT%H:%M:%S.%.3f%z"
"""

primary_key = "_id"
cursor_field = "_submission_time"
# submission_date_format = "%Y-%m-%dT%H:%M:%S"
# end_time_format = "%Y-%m-%dT%H:%M:%S.%.3f%z"
cursor_field = "endtime"

def __init__(self, config: Mapping[str, Any], form_id, schema, name, pagination_limit, auth_token, **kwargs):
super().__init__()
self.form_id = form_id
self.auth_token = auth_token
self.schema = schema
self.stream_name = name
self.base_url = config['base_url']
self.base_url = config["base_url"]
self.PAGINATION_LIMIT = pagination_limit
self._cursor_value = None
self.start_time = config["start_time"]
self.exclude_fields = config['exclude_fields'] if 'exclude_fields' in config else []
self.exclude_fields = config["exclude_fields"] if "exclude_fields" in config else []

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -77,7 +81,6 @@ def get_json_schema(self):
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:

params = {"start": 0, "limit": self.PAGINATION_LIMIT, "sort": json.dumps({self.cursor_field: 1})}
params["query"] = json.dumps({self.cursor_field: {"$gte": self.state[self.cursor_field]}})

Expand Down Expand Up @@ -134,7 +137,7 @@ def _check_credentials(cls, config: Mapping[str, Any]) -> Tuple[bool, Any]:
return False, "password in credentials is not provided"

return True, None

def get_access_token(self, config) -> Tuple[str, any]:
token_url = f"{config['base_url']}/token/?format=json"

Expand Down Expand Up @@ -162,7 +165,6 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
return True, None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:

# Fetch all assets(forms)
url = f"{config['base_url']}/api/v2/assets.json"
response = requests.get(url, auth=(config["username"], config["password"]))
Expand All @@ -177,7 +179,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# Generate array of stream objects
streams = []
for form_dict in key_list:
if form_dict['has_deployment']:
if form_dict["has_deployment"]:
stream = KoboToolStream(
config=config,
form_id=form_dict["uid"],
Expand All @@ -187,5 +189,5 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth_token=auth_token,
)
streams.append(stream)

return streams

0 comments on commit b1a024e

Please sign in to comment.