-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbmac_client.py
95 lines (82 loc) · 3.97 KB
/
bmac_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from typing import List
import requests
import constants
import env_parser
from datetime import datetime, timedelta
from BmacDonor import BmacDonor, BmacDonors
from BmacException import BmacException
from SubscriptionData import SubscriptionData
from UserData import UserData
donors_endpoint_url = "https://developers.buymeacoffee.com/api/v1/supporters?page=1"
BMAC_API_KEY = env_parser.get_env_var('BUY_ME_A_COFFEE_API_KEY')
headers = {"Authorization": f"Bearer {BMAC_API_KEY}"}
user_data: dict[int, UserData] = {}
valid_subscriptions: set = set()
def get_donors() -> List[BmacDonor]:
donors: List[BmacDonor] = []
next_donors_endpoint_url = donors_endpoint_url
date_now = datetime.now()
while next_donors_endpoint_url is not None:
response = requests.get(next_donors_endpoint_url, headers=headers)
if response.status_code == 200:
supporters = response.json()
if 'error' in supporters and supporters['error'] == 'No supporters':
break
next_donors_endpoint_url = supporters['next_page_url']
bmac_donors = BmacDonors(**supporters)
for donor in bmac_donors.data:
if donor.support_note is None:
continue
user_id = donor.support_note # Can make this structured data.
existing_donor = user_data.get(user_id)
if existing_donor is not None and existing_donor.subscription_data is not None and existing_donor.subscription_data.update_date == donor.support_updated_on:
next_donors_endpoint_url = None
continue
donor_date: datetime = datetime.strptime(donor.support_updated_on, '%Y-%m-%d %H:%M:%S')
donation_expiration_date: datetime = donor_date + timedelta(days=constants.SUBSCRIPTION_DAYS)
donor.expiration_date_internal = donation_expiration_date
if donor.is_subscription_valid(date_now) and not donor.is_refunded:
donor.expiration_date_internal = donation_expiration_date
donor.user_id_internal = user_id
if existing_donor is not None:
print(
f"New customer paid {donor.support_coffee_price}$ with expiration of {donor.expiration_date_internal}!")
donors.append(donor)
else:
next_donors_endpoint_url = None
else:
raise BmacException(f"Failed to get buy me a coffee donors. error {response.status_code}")
return donors
def init_subscriptions():
date_now = datetime.now()
donors: List[BmacDonor] = get_donors()
for donor in donors:
if donor.is_subscription_valid(date_now):
current_user_data = user_data.get(donor.user_id_internal)
subscription_data = SubscriptionData(
donor.expiration_date_internal,
donor.support_coffee_price,
donor.support_created_on,
donor.support_updated_on,
is_valid=True
)
if current_user_data is None:
user_data[donor.user_id_internal] = UserData(
subscription_data=subscription_data)
else:
current_user_data.subscription_data = subscription_data
user_id = donor.support_note
valid_subscriptions.add(user_id)
def clear_expired_subscribers():
date_now = datetime.now()
for subscribed_donor in valid_subscriptions:
current_user_data: UserData = user_data.get(subscribed_donor)
known_expiration_date = current_user_data.subscription_data.expiration_date
if known_expiration_date < date_now:
valid_subscriptions.remove(subscribed_donor)
current_user_data.subscription_data.is_valid = False
def got_donors_printable(donors: List[BmacDonor]) -> str:
printable = ""
for donor in donors:
printable += f"{donor.to_string()} \n"
return printable