Skip to content

Commit

Permalink
each stream needs to iterate over the list of tenant ids
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit Chatterjee committed Sep 3, 2024
1 parent d85eee8 commit 7e3d049
Showing 1 changed file with 95 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,40 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
"""
:return an iterable containing each record in the response
"""
self.logger.info(response.json())
# self.logger.info(response.json())
return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key])


class MgramsevaDemands(MgramsevaStream):
"""object for consumer demands"""

def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs):
def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs
): # pylint: disable=super-init-not-called
"""ctor"""
params = {
"tenantId": tenantid,
"businessService": "WS",
}
super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands")
self.tenantid_list = tenantid_list
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.response_key = "Demands"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
"""override"""
for tenantid in self.tenantid_list:
params = {
"tenantId": tenantid,
"businessService": "WS",
}
demandstream = MgramsevaStream(
"billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, self.response_key
)
yield from demandstream.read_records(sync_mode, cursor_field, stream_slice, stream_state)

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""include the bill date"""
Expand All @@ -133,16 +153,15 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
class MgramsevaBills(MgramsevaStream):
"""object for consumer bills"""

def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, consumer_codes: list, **kwargs):
def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, consumer_codes: dict, **kwargs
): # pylint: disable=super-init-not-called
"""specify endpoint for bills and call super"""
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.consumer_codes = consumer_codes
self.params = {
"tenantId": tenantid,
"businessService": "WS",
}
self.tenantid_list = tenantid_list

def read_records(
self,
Expand All @@ -152,13 +171,13 @@ def read_records(
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
"""override"""
for consumer_code in self.consumer_codes:
params = self.params.copy()
params["consumerCode"] = consumer_code
consumer_code_stream = MgramsevaStream(
"billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill"
)
yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)
for tenantid in self.tenantid_list:
for consumer_code in self.consumer_codes[tenantid]:
params = {"tenantId": tenantid, "businessService": "WS", "consumerCode": consumer_code}
consumer_code_stream = MgramsevaStream(
"billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill"
)
yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)


class MgramsevaTenantExpense(MgramsevaStream):
Expand Down Expand Up @@ -204,8 +223,8 @@ class MgramsevaTenantExpenses(MgramsevaStream):
"""object for tenant payments"""

def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs
):
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, fromdate: datetime, todate: datetime, **kwargs
): # pylint: disable=super-init-not-called
"""
specify endpoint for demands and call super
1672531200000 = 2023-01-01 00:00
Expand All @@ -214,7 +233,7 @@ def __init__(
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.tenantid = tenantid
self.tenantid_list = tenantid_list
self.fromdate = fromdate
self.todate = todate

Expand All @@ -227,34 +246,56 @@ def read_records(
) -> Iterable[StreamData]:
"""override"""

month_start = self.fromdate.replace(day=1)
for tenantid in self.tenantid_list:

while month_start < self.todate:
month_start = self.fromdate.replace(day=1)

next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1)
while month_start < self.todate:

stream = MgramsevaTenantExpense(
"echallan-services/eChallan/v1/_expenseDashboard",
self.headers,
self.request_info,
self.user_request,
self.tenantid,
month_start,
next_month_start,
"ExpenseDashboard",
)
yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)
next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1)

stream = MgramsevaTenantExpense(
"echallan-services/eChallan/v1/_expenseDashboard",
self.headers,
self.request_info,
self.user_request,
tenantid,
month_start,
next_month_start,
"ExpenseDashboard",
)
yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)

month_start = next_month_start
month_start = next_month_start


class MgramsevaPayments(MgramsevaStream):
"""object for consumer payments"""

def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs):
def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs
): # pylint: disable=super-init-not-called
"""specify endpoint for payments and call super"""
params = {"tenantId": tenantid, "businessService": "WS"}
super().__init__("collection-services/payments/WS/_search", headers, request_info, user_request, params, "Payments", **kwargs)
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.tenantid_list = tenantid_list

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
"""override"""

for tenantid in self.tenantid_list:
params = {"tenantId": tenantid, "businessService": "WS"}
paymentstream = MgramsevaStream(
"collection-services/payments/WS/_search", self.headers, self.request_info, self.user_request, params, "Payments"
)
yield from paymentstream.read_records(sync_mode, cursor_field, stream_slice, stream_state)


# Source
Expand Down Expand Up @@ -349,22 +390,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
end_date = datetime.today()
end_date = pytz.IST.localize(end_date).astimezone(pytz.utc)

streams = []
# Generate streams for each object type
streams = [
MgramsevaPayments(self.headers, self.request_info, self.user_request, self.config["tenantids"]),
MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, self.config["tenantids"], start_date, end_date),
MgramsevaDemands(self.headers, self.request_info, self.user_request, self.config["tenantids"]),
]

# and now we need bills for each consumer
tenantid_to_consumer_codes = {}
for tenantid in self.config["tenantids"]:
# Generate streams for each object type
streams += [
MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid),
MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date),
MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid),
]

# and now we need bills for each consumer
consumer_codes = set()
tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid)
tenantid_to_consumer_codes[tenantid] = set()
tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, [tenantid])
for demand in tmp_demand_stream.read_records(SyncMode.full_refresh):
consumer_codes.add(demand["data"]["consumerCode"])
tenantid_to_consumer_codes[tenantid].add(demand["data"]["consumerCode"])

streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes)))
streams.append(
MgramsevaBills(self.headers, self.request_info, self.user_request, self.config["tenantids"], tenantid_to_consumer_codes)
)

return streams

0 comments on commit 7e3d049

Please sign in to comment.