Skip to content

Commit

Permalink
Merge pull request #103 from DalgoT4D/mgramseva-monthwise-demands
Browse files Browse the repository at this point in the history
step through demands month-wise
  • Loading branch information
fatchat authored Sep 4, 2024
2 parents 0e663f5 + 7e3d049 commit 27884c8
Showing 1 changed file with 122 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import base64
import hashlib
from datetime import datetime
from datetime import datetime, timedelta
from logging import Logger
from dateutil.relativedelta import relativedelta
import requests
Expand All @@ -19,6 +19,13 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.core import StreamData

pytz.IST = pytz.timezone("Asia/Kolkata")


def convert_to_date(x: int) -> datetime:
"""convert a timestamp to a date"""
return datetime.fromtimestamp(x / 1000, pytz.UTC).astimezone(pytz.IST)


# Basic full refresh stream
class MgramsevaStream(HttpStream, ABC):
Expand Down Expand Up @@ -99,38 +106,62 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
"""
:return an iterable containing each record in the response
"""
# self.logger.info(response.json())
return map(lambda x: {"data": x, "id": x["id"]}, response.json()[self.response_key])


class MgramsevaDemands(MgramsevaStream):
"""object for consumer demands"""

def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid: str, start_date: datetime, end_date: datetime, **kwargs
):
"""specify endpoint for demands and call super"""
params = {
"tenantId": tenantid,
"businessService": "WS",
"periodFrom": int(1000 * start_date.timestamp()),
"periodTo": int(1000 * end_date.timestamp()),
}
super().__init__("billing-service/demand/_search", headers, request_info, user_request, params, "Demands", **kwargs)
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs
): # pylint: disable=super-init-not-called
"""ctor"""
self.tenantid_list = tenantid_list
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.response_key = "Demands"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
"""override"""
for tenantid in self.tenantid_list:
params = {
"tenantId": tenantid,
"businessService": "WS",
}
demandstream = MgramsevaStream(
"billing-service/demand/_search", self.headers, self.request_info, self.user_request, params, self.response_key
)
yield from demandstream.read_records(sync_mode, cursor_field, stream_slice, stream_state)

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""include the bill date"""
demands = response.json()[self.response_key]
for demand in demands:
demand["demandFromDate"] = convert_to_date(demand["taxPeriodFrom"]).strftime("%Y-%m-%d")
demand["demandToDate"] = convert_to_date(demand["taxPeriodTo"]).strftime("%Y-%m-%d")
return map(lambda x: {"data": x, "id": x["id"]}, demands)


class MgramsevaBills(MgramsevaStream):
"""object for consumer bills"""

def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, consumer_codes: list, **kwargs):
def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, consumer_codes: dict, **kwargs
): # pylint: disable=super-init-not-called
"""specify endpoint for bills and call super"""
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.consumer_codes = consumer_codes
self.params = {
"tenantId": tenantid,
"businessService": "WS",
}
self.tenantid_list = tenantid_list

def read_records(
self,
Expand All @@ -140,13 +171,13 @@ def read_records(
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
"""override"""
for consumer_code in self.consumer_codes:
params = self.params.copy()
params["consumerCode"] = consumer_code
consumer_code_stream = MgramsevaStream(
"billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill"
)
yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)
for tenantid in self.tenantid_list:
for consumer_code in self.consumer_codes[tenantid]:
params = {"tenantId": tenantid, "businessService": "WS", "consumerCode": consumer_code}
consumer_code_stream = MgramsevaStream(
"billing-service/bill/v2/_fetchbill", self.headers, self.request_info, self.user_request, params, "Bill"
)
yield from consumer_code_stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)


class MgramsevaTenantExpense(MgramsevaStream):
Expand All @@ -160,18 +191,18 @@ def __init__(
user_request: dict,
tenantid: str,
month_start: datetime,
next_month_start: datetime,
month_end: datetime,
response_key: str,
**kwargs,
):
"""call super"""
self.tenantid = tenantid
self.month_start = month_start
self.next_month_start = next_month_start
self.month_end = month_end
params = {
"tenantId": self.tenantid,
"fromDate": int(month_start.timestamp() * 1000),
"toDate": int(next_month_start.timestamp() * 1000),
"toDate": int(month_end.timestamp() * 1000),
}
super().__init__(endpoint, headers, request_info, user_request, params, response_key, **kwargs)

Expand All @@ -182,7 +213,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
expenses = response.json()[self.response_key]
expenses["tenantId"] = self.tenantid
expenses["fromDate"] = self.month_start.strftime("%Y-%m-%d")
expenses["toDate"] = self.next_month_start.strftime("%Y-%m-%d")
expenses["toDate"] = self.month_end.strftime("%Y-%m-%d")
combined_string = f"{self.tenantid}{expenses['fromDate']}{expenses['toDate']}"
id_hash = hashlib.sha256(combined_string.encode())
return [{"data": expenses, "id": id_hash.hexdigest()}]
Expand All @@ -192,8 +223,8 @@ class MgramsevaTenantExpenses(MgramsevaStream):
"""object for tenant payments"""

def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid: str, fromdate: datetime, todate: datetime, **kwargs
):
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, fromdate: datetime, todate: datetime, **kwargs
): # pylint: disable=super-init-not-called
"""
specify endpoint for demands and call super
1672531200000 = 2023-01-01 00:00
Expand All @@ -202,7 +233,7 @@ def __init__(
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.tenantid = tenantid
self.tenantid_list = tenantid_list
self.fromdate = fromdate
self.todate = todate

Expand All @@ -215,34 +246,56 @@ def read_records(
) -> Iterable[StreamData]:
"""override"""

month_start = self.fromdate.replace(day=1)
for tenantid in self.tenantid_list:

while month_start < self.todate:
month_start = self.fromdate.replace(day=1)

next_month_start = month_start + relativedelta(months=1)
while month_start < self.todate:

stream = MgramsevaTenantExpense(
"echallan-services/eChallan/v1/_expenseDashboard",
self.headers,
self.request_info,
self.user_request,
self.tenantid,
month_start,
next_month_start,
"ExpenseDashboard",
)
yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)
next_month_start = month_start + relativedelta(months=1) - timedelta(milliseconds=1)

month_start = next_month_start
stream = MgramsevaTenantExpense(
"echallan-services/eChallan/v1/_expenseDashboard",
self.headers,
self.request_info,
self.user_request,
tenantid,
month_start,
next_month_start,
"ExpenseDashboard",
)
yield from stream.read_records(sync_mode, cursor_field, stream_slice, stream_state)

month_start = next_month_start


class MgramsevaPayments(MgramsevaStream):
"""object for consumer payments"""

def __init__(self, headers: dict, request_info: dict, user_request: dict, tenantid: str, **kwargs):
def __init__(
self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs
): # pylint: disable=super-init-not-called
"""specify endpoint for payments and call super"""
params = {"tenantId": tenantid, "businessService": "WS"}
super().__init__("collection-services/payments/WS/_search", headers, request_info, user_request, params, "Payments", **kwargs)
self.headers = headers
self.request_info = request_info
self.user_request = user_request
self.tenantid_list = tenantid_list

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
"""override"""

for tenantid in self.tenantid_list:
params = {"tenantId": tenantid, "businessService": "WS"}
paymentstream = MgramsevaStream(
"collection-services/payments/WS/_search", self.headers, self.request_info, self.user_request, params, "Payments"
)
yield from paymentstream.read_records(sync_mode, cursor_field, stream_slice, stream_state)


# Source
Expand Down Expand Up @@ -332,24 +385,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# tenant_expenses_from = datetime.strptime(config.get("tenant_expenses_from", "2022-01-01"), "%Y-%m-%d")
# tenant_expenses_to = datetime.strptime(config.get("tenant_expenses_to", "2022-01-01"), "%Y-%m-%d")

start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d").replace(tzinfo=pytz.UTC)
end_date = datetime.today().replace(tzinfo=pytz.UTC)

for tenantid in self.config["tenantids"]:
# Generate streams for each object type
streams = [
MgramsevaPayments(self.headers, self.request_info, self.user_request, tenantid),
MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date),
]
start_date = datetime.strptime(config.get("start_date", "2022-01-01"), "%Y-%m-%d")
start_date = pytz.IST.localize(start_date).astimezone(pytz.utc)
end_date = datetime.today()
end_date = pytz.IST.localize(end_date).astimezone(pytz.utc)

demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, tenantid, start_date, end_date)
streams.append(demand_stream)
# Generate streams for each object type
streams = [
MgramsevaPayments(self.headers, self.request_info, self.user_request, self.config["tenantids"]),
MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, self.config["tenantids"], start_date, end_date),
MgramsevaDemands(self.headers, self.request_info, self.user_request, self.config["tenantids"]),
]

# and now we need bills for each consumer
consumer_codes = set()
for demand in demand_stream.read_records(SyncMode.full_refresh):
consumer_codes.add(demand["data"]["consumerCode"])
# and now we need bills for each consumer
tenantid_to_consumer_codes = {}
for tenantid in self.config["tenantids"]:
tenantid_to_consumer_codes[tenantid] = set()
tmp_demand_stream = MgramsevaDemands(self.headers, self.request_info, self.user_request, [tenantid])
for demand in tmp_demand_stream.read_records(SyncMode.full_refresh):
tenantid_to_consumer_codes[tenantid].add(demand["data"]["consumerCode"])

streams.append(MgramsevaBills(self.headers, self.request_info, self.user_request, tenantid, list(consumer_codes)))
streams.append(
MgramsevaBills(self.headers, self.request_info, self.user_request, self.config["tenantids"], tenantid_to_consumer_codes)
)

return streams
return streams

0 comments on commit 27884c8

Please sign in to comment.