diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 7d83b2d6ca34..5e5637bb6b81 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -110,74 +110,26 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) -class MgramsevaDemand(MgramsevaStream): - """object for a single demand""" +class MgramsevaDemands(MgramsevaStream): + """object for consumer demands""" - def __init__( - self, - endpoint: str, - headers: dict, - request_info: dict, - user_request: dict, - params: dict, - response_key: str, - **kwargs, - ): - """call super""" - super().__init__(endpoint, headers, request_info, user_request, params, response_key, **kwargs) + def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + """ctor""" + params = { + "tenantId": tenantid, + "businessService": "WS", + } + super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands") def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """include the bill date""" demands = response.json()[self.response_key] for demand in demands: - demand["demandDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d") + demand["demandFromDate"] = convert_to_date(demand["taxPeriodFrom"]).strftime("%Y-%m-%d") + demand["demandToDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d") return map(lambda x: {"data": x, "id": x["id"]}, demands) -class MgramsevaDemands(MgramsevaStream): - """object for consumer demands""" - - def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs - ): - """ctor""" - self.headers = headers - self.request_info = request_info - self.user_request = user_request - self.tenantid = tenantid - self.fromdate = fromdate - self.todate = todate - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: Optional[List[str]] = None, - stream_slice: Optional[Mapping[str, Any]] = None, - stream_state: Optional[Mapping[str, Any]] = None, - ) -> Iterable[StreamData]: - """override""" - - month_start = self.fromdate - - while month_start < self.todate: - - next_month_start = month_start + relativedelta(months=1) - - params = { - "tenantId": self.tenantid, - "businessService": "WS", - "periodFrom": int(1000 * month_start.timestamp()), - "periodTo": int(1000 * (next_month_start - timedelta(milliseconds=1)).timestamp()), - } - - stream = MgramsevaDemand( - "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, "Demands" - ) - yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) - - month_start = next_month_start - - class MgramsevaBills(MgramsevaStream): """object for consumer bills""" @@ -393,9 +345,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # tenant_expenses_to = datetime.strptime(config.get("tenant_expenses_to", "2022-01-01"), "%Y-%m-%d") start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d") - start_date_month_start = start_date.replace(day=1) start_date = pytz.IST.localize(start_date).astimezone(pytz.utc) - start_date_month_start = pytz.IST.localize(start_date_month_start).astimezone(pytz.utc) end_date = datetime.today() end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) @@ -404,14 +354,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams = [ MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), - MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date_month_start, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid), ] # and now we need bills for each consumer consumer_codes = set() - tmp_demand_stream = MgramsevaDemands( - self.headers, self.request_info, self.user_request, tenantid, start_date_month_start, end_date - ) + tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid) for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): consumer_codes.add(demand["data"]["consumerCode"])