From 7e3d049ec2cf4b53d4bb7d18d06de9185bb0ffd2 Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Tue, 3 Sep 2024 05:56:43 +0530 Subject: [PATCH] each stream needs to iterate over the list of tenant ids --- .../source_mgramseva/source.py | 148 +++++++++++------- 1 file changed, 95 insertions(+), 53 deletions(-) diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 6f102cae1af0..25c9f736ecd7 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -106,20 +106,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """ :return an iterable containing each record in the response """ - self.logger.info(response.json()) + # self.logger.info(response.json()) return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) class MgramsevaDemands(MgramsevaStream): """object for consumer demands""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called """ctor""" - params = { - "tenantId": tenantid, - "businessService": "WS", - } - super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands") + self.tenantid_list = tenantid_list + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.response_key = "Demands" + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + for tenantid in self.tenantid_list: + params = { + "tenantId": tenantid, + "businessService": "WS", + } + demandstream = MgramsevaStream( + "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, self.response_key + ) + yield from demandstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """include the bill date""" @@ -133,16 +153,15 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class MgramsevaBills(MgramsevaStream): """object for consumer bills""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, consumer_codes: list, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, consumer_codes: dict, **kwargs + ): # pylint: disable=super-init-not-called """specify endpoint for bills and call super""" self.headers = headers self.request_info = request_info self.user_request = user_request self.consumer_codes = consumer_codes - self.params = { - "tenantId": tenantid, - "businessService": "WS", - } + self.tenantid_list = tenantid_list def read_records( self, @@ -152,13 +171,13 @@ def read_records( stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[StreamData]: """override""" - for consumer_code in self.consumer_codes: - params = self.params.copy() - params["consumerCode"] = consumer_code - consumer_code_stream = MgramsevaStream( - "billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill" - ) - yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + for tenantid in self.tenantid_list: + for consumer_code in self.consumer_codes[tenantid]: + params = {"tenantId": tenantid, "businessService": "WS", "consumerCode": consumer_code} + consumer_code_stream = MgramsevaStream( + "billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill" + ) + yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) class MgramsevaTenantExpense(MgramsevaStream): @@ -204,8 +223,8 @@ class MgramsevaTenantExpenses(MgramsevaStream): """object for tenant payments""" def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs - ): + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, fromdate: datetime, todate: datetime, **kwargs + ): # pylint: disable=super-init-not-called """ specify endpoint for demands and call super 1672531200000 = 2023-01-01 00:00 @@ -214,7 +233,7 @@ def __init__( self.headers = headers self.request_info = request_info self.user_request = user_request - self.tenantid = tenantid + self.tenantid_list = tenantid_list self.fromdate = fromdate self.todate = todate @@ -227,34 +246,56 @@ def read_records( ) -> Iterable[StreamData]: """override""" - month_start = self.fromdate.replace(day=1) + for tenantid in self.tenantid_list: - while month_start < self.todate: + month_start = self.fromdate.replace(day=1) - next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1) + while month_start < self.todate: - stream = MgramsevaTenantExpense( - "echallan-services/eChallan/v1/_expenseDashboard", - self.headers, - self.request_info, - self.user_request, - self.tenantid, - month_start, - next_month_start, - "ExpenseDashboard", - ) - yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1) + + stream = MgramsevaTenantExpense( + "echallan-services/eChallan/v1/_expenseDashboard", + self.headers, + self.request_info, + self.user_request, + tenantid, + month_start, + next_month_start, + "ExpenseDashboard", + ) + yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) - month_start = next_month_start + month_start = next_month_start class MgramsevaPayments(MgramsevaStream): """object for consumer payments""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called """specify endpoint for payments and call super""" - params = {"tenantId": tenantid, "businessService": "WS"} - super().__init__("collection-services/payments/WS/_search", headers, request_info, user_request, params, "Payments", **kwargs) + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.tenantid_list = tenantid_list + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + + for tenantid in self.tenantid_list: + params = {"tenantId": tenantid, "businessService": "WS"} + paymentstream = MgramsevaStream( + "collection-services/payments/WS/_search", self.headers, self.request_info, self.user_request, params, "Payments" + ) + yield from paymentstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) # Source @@ -349,22 +390,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: end_date = datetime.today() end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) - streams = [] + # Generate streams for each object type + streams = [ + MgramsevaPayments(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, self.config["tenantids"], start_date, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + ] + # and now we need bills for each consumer + tenantid_to_consumer_codes = {} for tenantid in self.config["tenantids"]: - # Generate streams for each object type - streams += [ - MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), - MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), - MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid), - ] - - # and now we need bills for each consumer - consumer_codes = set() - tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid) + tenantid_to_consumer_codes[tenantid] = set() + tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, [tenantid]) for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): - consumer_codes.add(demand["data"]["consumerCode"]) + tenantid_to_consumer_codes[tenantid].add(demand["data"]["consumerCode"]) - streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes))) + streams.append( + MgramsevaBills(self.headers, self.request_info, self.user_request, self.config["tenantids"], tenantid_to_consumer_codes) + ) return streams