From 46c1949ca1b9cd1608989d5cf4b3dd073c029e11 Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Tue, 27 Aug 2024 17:21:16 +0530 Subject: [PATCH 1/5] step through tenant expenses month-wise --- .../source_mgramseva/source.py | 88 ++++++++++++++----- 1 file changed, 67 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 6b2f4ebbfdd8..4a3d8d78361a 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -8,7 +8,7 @@ import base64 import hashlib -from datetime import datetime +from datetime import datetime, timedelta from logging import Logger from dateutil.relativedelta import relativedelta import requests @@ -19,6 +19,8 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.core import StreamData +pytz.IST = pytz.timezone("Asia/Kolkata") + # Basic full refresh stream class MgramsevaStream(HttpStream, ABC): @@ -99,6 +101,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """ :return an iterable containing each record in the response """ + self.logger.info(response.json()) return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) @@ -106,16 +109,58 @@ class MgramsevaDemands(MgramsevaStream): """object for consumer demands""" def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, start_date: datetime, end_date: datetime, **kwargs + self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs ): - """specify endpoint for demands and call super""" - params = { - "tenantId": tenantid, - "businessService": "WS", - "periodFrom": int(1000 * start_date.timestamp()), - "periodTo": int(1000 * end_date.timestamp()), - } - super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands", **kwargs) + """ctor""" + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.tenantid = tenantid + self.fromdate = fromdate + self.todate = todate + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + + # ==================================================================================== + # params = { + # "tenantId": self.tenantid, + # "businessService": "WS", + # "periodFrom": int(1000 * self.fromdate.timestamp()), + # "periodTo": int(1000 * self.todate.timestamp()), + # } + # stream = MgramsevaStream("billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, "Demands") + # yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + # ==================================================================================== + + month_start = self.fromdate.replace(day=1) + + while month_start < self.todate: + + next_month_start = month_start + relativedelta(months=1) + if next_month_start > self.todate: + next_month_start = self.todate + + params = { + "tenantId": self.tenantid, + "businessService": "WS", + "periodFrom": int(1000 * month_start.timestamp()), + "periodTo": int(1000 * next_month_start.timestamp()), + } + self.logger.info(params) + + stream = MgramsevaStream( + "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, "Demands" + ) + yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + + month_start = next_month_start class MgramsevaBills(MgramsevaStream): @@ -160,18 +205,18 @@ def __init__( user_request: dict, tenantid: str, month_start: datetime, - next_month_start: datetime, + month_end: datetime, response_key: str, **kwargs, ): """call super""" self.tenantid = tenantid self.month_start = month_start - self.next_month_start = next_month_start + self.month_end = month_end params = { "tenantId": self.tenantid, "fromDate": int(month_start.timestamp() * 1000), - "toDate": int(next_month_start.timestamp() * 1000), + "toDate": int(month_end.timestamp() * 1000), } super().__init__(endpoint, headers, request_info, user_request, params, response_key, **kwargs) @@ -182,7 +227,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp expenses = response.json()[self.response_key] expenses["tenantId"] = self.tenantid expenses["fromDate"] = self.month_start.strftime("%Y-%m-%d") - expenses["toDate"] = self.next_month_start.strftime("%Y-%m-%d") + expenses["toDate"] = self.month_end.strftime("%Y-%m-%d") combined_string = f"{self.tenantid}{expenses['fromDate']}{expenses['toDate']}" id_hash = hashlib.sha256(combined_string.encode()) return [{"data": expenses, "id": id_hash.hexdigest()}] @@ -219,7 +264,7 @@ def read_records( while month_start < self.todate: - next_month_start = month_start + relativedelta(months=1) + next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1) stream = MgramsevaTenantExpense( "echallan-services/eChallan/v1/_expenseDashboard", @@ -332,22 +377,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # tenant_expenses_from = datetime.strptime(config.get("tenant_expenses_from", "2022-01-01"), "%Y-%m-%d") # tenant_expenses_to = datetime.strptime(config.get("tenant_expenses_to", "2022-01-01"), "%Y-%m-%d") - start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d").replace(tzinfo=pytz.UTC) - end_date = datetime.today().replace(tzinfo=pytz.UTC) + start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d") + start_date = pytz.IST.localize(start_date).astimezone(pytz.utc) + end_date = datetime.today() + end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) for tenantid in self.config["tenantids"]: # Generate streams for each object type streams = [ MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), ] - demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date) - streams.append(demand_stream) - # and now we need bills for each consumer consumer_codes = set() - for demand in demand_stream.read_records(SyncMode.full_refresh): + tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date) + for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): consumer_codes.add(demand["data"]["consumerCode"]) streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes))) From 8973b7485ec5bd44e350711f7f6764de79760859 Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Tue, 27 Aug 2024 22:58:47 +0530 Subject: [PATCH 2/5] put the demandDate into the demands --- .../source_mgramseva/source.py | 57 ++++++++++++------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 4a3d8d78361a..7d83b2d6ca34 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -22,6 +22,11 @@ pytz.IST = pytz.timezone("Asia/Kolkata") +def convert_to_date(x: int) -> datetime: + """convert a timestamp to a date""" + return datetime.fromtimestamp(x / 1000, pytz.UTC).astimezone(pytz.IST) + + # Basic full refresh stream class MgramsevaStream(HttpStream, ABC): """Base for all objects""" @@ -105,6 +110,30 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) +class MgramsevaDemand(MgramsevaStream): + """object for a single demand""" + + def __init__( + self, + endpoint: str, + headers: dict, + request_info: dict, + user_request: dict, + params: dict, + response_key: str, + **kwargs, + ): + """call super""" + super().__init__(endpoint, headers, request_info, user_request, params, response_key, **kwargs) + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """include the bill date""" + demands = response.json()[self.response_key] + for demand in demands: + demand["demandDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d") + return map(lambda x: {"data": x, "id": x["id"]}, demands) + + class MgramsevaDemands(MgramsevaStream): """object for consumer demands""" @@ -128,34 +157,20 @@ def read_records( ) -> Iterable[StreamData]: """override""" - # ==================================================================================== - # params = { - # "tenantId": self.tenantid, - # "businessService": "WS", - # "periodFrom": int(1000 * self.fromdate.timestamp()), - # "periodTo": int(1000 * self.todate.timestamp()), - # } - # stream = MgramsevaStream("billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, "Demands") - # yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) - # ==================================================================================== - - month_start = self.fromdate.replace(day=1) + month_start = self.fromdate while month_start < self.todate: next_month_start = month_start + relativedelta(months=1) - if next_month_start > self.todate: - next_month_start = self.todate params = { "tenantId": self.tenantid, "businessService": "WS", "periodFrom": int(1000 * month_start.timestamp()), - "periodTo": int(1000 * next_month_start.timestamp()), + "periodTo": int(1000 * (next_month_start - timedelta(milliseconds=1)).timestamp()), } - self.logger.info(params) - stream = MgramsevaStream( + stream = MgramsevaDemand( "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, "Demands" ) yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) @@ -378,7 +393,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # tenant_expenses_to = datetime.strptime(config.get("tenant_expenses_to", "2022-01-01"), "%Y-%m-%d") start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d") + start_date_month_start = start_date.replace(day=1) start_date = pytz.IST.localize(start_date).astimezone(pytz.utc) + start_date_month_start = pytz.IST.localize(start_date_month_start).astimezone(pytz.utc) end_date = datetime.today() end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) @@ -387,12 +404,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams = [ MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), - MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date_month_start, end_date), ] # and now we need bills for each consumer consumer_codes = set() - tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date) + tmp_demand_stream = MgramsevaDemands( + self.headers, self.request_info, self.user_request, tenantid, start_date_month_start, end_date + ) for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): consumer_codes.add(demand["data"]["consumerCode"]) From f0fb7bda6b74f4341260c831a2b4562707047e85 Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Wed, 28 Aug 2024 20:22:32 +0530 Subject: [PATCH 3/5] rolled back the attempt at fetching demands month-wise --- .../source_mgramseva/source.py | 78 ++++--------------- 1 file changed, 13 insertions(+), 65 deletions(-) diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 7d83b2d6ca34..5e5637bb6b81 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -110,74 +110,26 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) -class MgramsevaDemand(MgramsevaStream): - """object for a single demand""" +class MgramsevaDemands(MgramsevaStream): + """object for consumer demands""" - def __init__( - self, - endpoint: str, - headers: dict, - request_info: dict, - user_request: dict, - params: dict, - response_key: str, - **kwargs, - ): - """call super""" - super().__init__(endpoint, headers, request_info, user_request, params, response_key, **kwargs) + def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + """ctor""" + params = { + "tenantId": tenantid, + "businessService": "WS", + } + super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands") def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """include the bill date""" demands = response.json()[self.response_key] for demand in demands: - demand["demandDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d") + demand["demandFromDate"] = convert_to_date(demand["taxPeriodFrom"]).strftime("%Y-%m-%d") + demand["demandToDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d") return map(lambda x: {"data": x, "id": x["id"]}, demands) -class MgramsevaDemands(MgramsevaStream): - """object for consumer demands""" - - def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs - ): - """ctor""" - self.headers = headers - self.request_info = request_info - self.user_request = user_request - self.tenantid = tenantid - self.fromdate = fromdate - self.todate = todate - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: Optional[List[str]] = None, - stream_slice: Optional[Mapping[str, Any]] = None, - stream_state: Optional[Mapping[str, Any]] = None, - ) -> Iterable[StreamData]: - """override""" - - month_start = self.fromdate - - while month_start < self.todate: - - next_month_start = month_start + relativedelta(months=1) - - params = { - "tenantId": self.tenantid, - "businessService": "WS", - "periodFrom": int(1000 * month_start.timestamp()), - "periodTo": int(1000 * (next_month_start - timedelta(milliseconds=1)).timestamp()), - } - - stream = MgramsevaDemand( - "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, "Demands" - ) - yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) - - month_start = next_month_start - - class MgramsevaBills(MgramsevaStream): """object for consumer bills""" @@ -393,9 +345,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # tenant_expenses_to = datetime.strptime(config.get("tenant_expenses_to", "2022-01-01"), "%Y-%m-%d") start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d") - start_date_month_start = start_date.replace(day=1) start_date = pytz.IST.localize(start_date).astimezone(pytz.utc) - start_date_month_start = pytz.IST.localize(start_date_month_start).astimezone(pytz.utc) end_date = datetime.today() end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) @@ -404,14 +354,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams = [ MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), - MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date_month_start, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid), ] # and now we need bills for each consumer consumer_codes = set() - tmp_demand_stream = MgramsevaDemands( - self.headers, self.request_info, self.user_request, tenantid, start_date_month_start, end_date - ) + tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid) for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): consumer_codes.add(demand["data"]["consumerCode"]) From d85eee877dc4a3f6549e2f80485767ad08a297ed Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Mon, 2 Sep 2024 23:43:37 +0530 Subject: [PATCH 4/5] iterate over all tenants --- .../connectors/source-mgramseva/source_mgramseva/source.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 5e5637bb6b81..6f102cae1af0 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -349,9 +349,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: end_date = datetime.today() end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) + streams = [] + for tenantid in self.config["tenantids"]: # Generate streams for each object type - streams = [ + streams += [ MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid), @@ -365,4 +367,4 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes))) - return streams + return streams From 7e3d049ec2cf4b53d4bb7d18d06de9185bb0ffd2 Mon Sep 17 00:00:00 2001 From: Rohit Chatterjee Date: Tue, 3 Sep 2024 05:56:43 +0530 Subject: [PATCH 5/5] each stream needs to iterate over the list of tenant ids --- .../source_mgramseva/source.py | 148 +++++++++++------- 1 file changed, 95 insertions(+), 53 deletions(-) diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 6f102cae1af0..25c9f736ecd7 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -106,20 +106,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """ :return an iterable containing each record in the response """ - self.logger.info(response.json()) + # self.logger.info(response.json()) return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key]) class MgramsevaDemands(MgramsevaStream): """object for consumer demands""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called """ctor""" - params = { - "tenantId": tenantid, - "businessService": "WS", - } - super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands") + self.tenantid_list = tenantid_list + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.response_key = "Demands" + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + for tenantid in self.tenantid_list: + params = { + "tenantId": tenantid, + "businessService": "WS", + } + demandstream = MgramsevaStream( + "billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, self.response_key + ) + yield from demandstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """include the bill date""" @@ -133,16 +153,15 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class MgramsevaBills(MgramsevaStream): """object for consumer bills""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, consumer_codes: list, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, consumer_codes: dict, **kwargs + ): # pylint: disable=super-init-not-called """specify endpoint for bills and call super""" self.headers = headers self.request_info = request_info self.user_request = user_request self.consumer_codes = consumer_codes - self.params = { - "tenantId": tenantid, - "businessService": "WS", - } + self.tenantid_list = tenantid_list def read_records( self, @@ -152,13 +171,13 @@ def read_records( stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[StreamData]: """override""" - for consumer_code in self.consumer_codes: - params = self.params.copy() - params["consumerCode"] = consumer_code - consumer_code_stream = MgramsevaStream( - "billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill" - ) - yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + for tenantid in self.tenantid_list: + for consumer_code in self.consumer_codes[tenantid]: + params = {"tenantId": tenantid, "businessService": "WS", "consumerCode": consumer_code} + consumer_code_stream = MgramsevaStream( + "billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill" + ) + yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) class MgramsevaTenantExpense(MgramsevaStream): @@ -204,8 +223,8 @@ class MgramsevaTenantExpenses(MgramsevaStream): """object for tenant payments""" def __init__( - self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs - ): + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, fromdate: datetime, todate: datetime, **kwargs + ): # pylint: disable=super-init-not-called """ specify endpoint for demands and call super 1672531200000 = 2023-01-01 00:00 @@ -214,7 +233,7 @@ def __init__( self.headers = headers self.request_info = request_info self.user_request = user_request - self.tenantid = tenantid + self.tenantid_list = tenantid_list self.fromdate = fromdate self.todate = todate @@ -227,34 +246,56 @@ def read_records( ) -> Iterable[StreamData]: """override""" - month_start = self.fromdate.replace(day=1) + for tenantid in self.tenantid_list: - while month_start < self.todate: + month_start = self.fromdate.replace(day=1) - next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1) + while month_start < self.todate: - stream = MgramsevaTenantExpense( - "echallan-services/eChallan/v1/_expenseDashboard", - self.headers, - self.request_info, - self.user_request, - self.tenantid, - month_start, - next_month_start, - "ExpenseDashboard", - ) - yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1) + + stream = MgramsevaTenantExpense( + "echallan-services/eChallan/v1/_expenseDashboard", + self.headers, + self.request_info, + self.user_request, + tenantid, + month_start, + next_month_start, + "ExpenseDashboard", + ) + yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state) - month_start = next_month_start + month_start = next_month_start class MgramsevaPayments(MgramsevaStream): """object for consumer payments""" - def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs): + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called """specify endpoint for payments and call super""" - params = {"tenantId": tenantid, "businessService": "WS"} - super().__init__("collection-services/payments/WS/_search", headers, request_info, user_request, params, "Payments", **kwargs) + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.tenantid_list = tenantid_list + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + + for tenantid in self.tenantid_list: + params = {"tenantId": tenantid, "businessService": "WS"} + paymentstream = MgramsevaStream( + "collection-services/payments/WS/_search", self.headers, self.request_info, self.user_request, params, "Payments" + ) + yield from paymentstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) # Source @@ -349,22 +390,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: end_date = datetime.today() end_date = pytz.IST.localize(end_date).astimezone(pytz.utc) - streams = [] + # Generate streams for each object type + streams = [ + MgramsevaPayments(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, self.config["tenantids"], start_date, end_date), + MgramsevaDemands(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + ] + # and now we need bills for each consumer + tenantid_to_consumer_codes = {} for tenantid in self.config["tenantids"]: - # Generate streams for each object type - streams += [ - MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid), - MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date), - MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid), - ] - - # and now we need bills for each consumer - consumer_codes = set() - tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid) + tenantid_to_consumer_codes[tenantid] = set() + tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, [tenantid]) for demand in tmp_demand_stream.read_records(SyncMode.full_refresh): - consumer_codes.add(demand["data"]["consumerCode"]) + tenantid_to_consumer_codes[tenantid].add(demand["data"]["consumerCode"]) - streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes))) + streams.append( + MgramsevaBills(self.headers, self.request_info, self.user_request, self.config["tenantids"], tenantid_to_consumer_codes) + ) return streams