-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
351 lines (289 loc) · 13 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import asyncio
from datetime import datetime
from pydantic import ValidationError
from fastapi import FastAPI, Request, Depends, HTTPException, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select, or_
from utils import up_down_icon, default_msg, htf_msg, format_coingecko_ids, format_dollars, save_data_to_postgres
from config import Settings, async_session, AsyncSessionDepends
from models import Asset, AssetData, User, DBTestFunctions
from validators import UserData
from crypto_data import CryptoManager
from email_notifier import EmailNotifier
from stock_data import StockManager
MONTHLY = "MONTHLY"
WEEKLY = "WEEKLY"
settings = Settings()
app = FastAPI(
title="Significant Price Action",
description="This API provides daily candle close data for various digital assets",
docs_url="/docs"
)
# uvicorn main:app --reload
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Renders "Homepage" with a signup form for the daily notification service"""
return templates.TemplateResponse(
request=request,
name="index.html"
)
@app.post("/signup")
async def sign_up(session: AsyncSessionDepends, user: UserData = Form(...)):
"""Saves user info to the database for notification purposes"""
user = User(
email=user.email
)
session.add(user)
await session.commit()
return {"message": f"'{user.email}' has been signed up for Daily Close Notifications"}
@app.post("/unsubscribe")
async def unsubscribe_email(session: AsyncSessionDepends, user: UserData = Form(...)):
existing_user = await session.scalar(select(User).filter_by(email=user.email))
if not existing_user:
raise HTTPException(status_code=400, detail=f"'{user.email}' is not registered in our database.")
user.unsubscribe = True
await session.commit()
return {"message": f"'{existing_user.email}' has been unsubscribed"}
@app.get("/data/{ticker}/{date}")
async def get_data_on_date(ticker: str, date: str, session: AsyncSessionDepends):
"""Returns daily candle close data for a specified ticker and date"""
# Date is saved as a Date object in the db so need to convert from str
try:
query_date = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format, please use YYYY-MM-DD.")
# Not doing session.scalar() to prep for addition of diff timeframes (h12, h4, etc)
asset_data_result = await session.execute(
select(AssetData)
.join(AssetData.asset)
.options(joinedload(AssetData.asset))
.filter(Asset.asset_ticker == ticker, AssetData.date == query_date)
)
asset_data = asset_data_result.scalars().first()
if not asset_data:
# If no data found, search for existence of Asset via submitted ticker to see if it exists in the db
asset = await session.scalar(select(Asset).filter_by(asset_ticker=ticker))
if not asset:
raise HTTPException(status_code=404, detail=f"{ticker} not found in the database")
# Asset exists, so there must not exist data on the specified date for the Asset
raise HTTPException(status_code=404, detail=f"No data recorded for {ticker} on {date}")
data_to_return = {
"date": asset_data.date,
"name": asset_data.asset.asset_name,
"ticker": asset_data.asset.asset_ticker,
"price": asset_data.close_price,
"volume": asset_data.volume_USD,
"market_cap": asset_data.market_cap
}
# FastAPI automatically serializes Date object into a string (Pydantic)
return data_to_return
@app.get("/alldata/{date}")
async def get_all_data_on_date(date: str, session: AsyncSessionDepends):
"""Returns daily candle close data for all available assets on the specified date"""
try:
query_date = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format, please use YYYY-MM-DD.")
data_result = await session.execute(
select(AssetData)
.join(AssetData.asset)
.options(joinedload(AssetData.asset))
.filter(AssetData.date == query_date))
data = data_result.scalars().all()
if not data:
raise HTTPException(status_code=404, detail=f"No data found for {date}")
data_to_return = {
"date": query_date
}
for asset_data in data:
asset = asset_data.asset
data_to_return[asset.asset_ticker] = {
"name": asset.asset_name,
"price": asset_data.close_price,
"volume": asset_data.volume_USD,
"market_cap": asset_data.market_cap
}
return data_to_return
@app.get("/compare/{ticker}/{date1}/{date2}")
async def compare_date_data(ticker: str, date1: str, date2: str, session: AsyncSessionDepends, session1: AsyncSessionDepends):
"""Returns price difference and percentage change between two dates for a specified Asset"""
try:
date1_datetime = datetime.strptime(date1, "%Y-%m-%d")
date1_date = date1_datetime.date()
date2_datetime = datetime.strptime(date2, "%Y-%m-%d")
date2_date = date2_datetime.date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format, please use YYYY-MM-DD.")
if date1_datetime.timestamp() > date2_datetime.timestamp():
later_date = date1_date
earlier_date = date2_date
elif date2_datetime.timestamp() > date1_datetime.timestamp():
later_date = date2_date
earlier_date = date1_date
else:
raise HTTPException(status_code=400, detail="Dates must be different")
result = await session.execute(select(AssetData).join(AssetData.asset).options(joinedload(AssetData.asset)).filter(
Asset.asset_ticker == ticker,
or_(
AssetData.date == earlier_date,
AssetData.date == later_date
)
))
both_asset_data = result.unique().scalars().all()
earlier_asset_data = next((data for data in both_asset_data if data.date == earlier_date), None)
later_asset_data = next((data for data in both_asset_data if data.date == later_date), None)
if not earlier_asset_data and not later_asset_data:
asset = await session.scalar(select(Asset).filter_by(asset_ticker=ticker))
if not asset:
raise HTTPException(status_code=404, detail=f"{ticker} not found in database")
raise HTTPException(status_code=404, detail=f"No data recorded for {ticker} on either date")
elif not earlier_asset_data:
raise HTTPException(status_code=404, detail=f"No data recorded for {ticker} on {earlier_date}")
elif not later_asset_data:
raise HTTPException(status_code=404, detail=f"No data recorded for {ticker} on {later_date}")
change_in_price = later_asset_data.close_price - earlier_asset_data.close_price
change_in_percent = (change_in_price / earlier_asset_data.close_price) * 100
data_to_return = {
"name": earlier_asset_data.asset.asset_name,
"ticker": earlier_asset_data.asset.asset_ticker,
"earlier_date": earlier_date,
"later_date": later_date,
"change_in_price": change_in_price,
"percentage_change": change_in_percent
}
return data_to_return
crypto_man, stock_man, email_man = CryptoManager(), StockManager(), EmailNotifier()
crypto_data = crypto_man.crypto_data
stock_data = stock_man.index_data
day_of_month = datetime.now().strftime("%d")
day_of_week = datetime.now().strftime("%w")
# Time is based off UTC on pythonanywhere VM
if day_of_month == "01":
close_significance = MONTHLY
interval = "1M"
# Check if day_of_week is Monday ("1"), as global crypto markets candle close at 00:00 UTC Sunday ("0").
elif day_of_week == "1":
close_significance = WEEKLY
interval = "7D"
else:
close_significance = "Daily" # Default
interval = "1D"
stock_market_open = True
if day_of_week in ["0", "1"]: # Sunday/Monday at 00:00 UTC is Sat/Sun in the U.S.
stock_market_open = False
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(email_man.get_emails_data())
tg.create_task(crypto_man.get_global_crypto_data())
for asset in crypto_man.crypto_list:
tg.create_task(crypto_man.get_crypto_data(asset))
if stock_market_open:
for ind in stock_man.index_list:
tg.create_task(stock_man.get_index_data(ind))
await asyncio.sleep(0.5)
for data in crypto_data.values():
await save_data_to_postgres(
name=data.name,
ticker=data.ticker,
price=data.price,
mcap=data.mcap,
volume=data.volume
)
if stock_market_open:
for data in stock_data.values():
await save_data_to_postgres(
name=data.name,
ticker=data.ticker,
price=data.close,
mcap=data.mcap,
volume=data.volume
)
users_data = email_man.users_data
async_emails = []
for user in users_data.data:
if "unsubscribe?" in user:
continue
user_email = user["emailAddress"]
user_options = user.get("anyExtraDataYou'dLikeInYourReport?", None)
print(f"Compiling daily report for {user_email}...")
subject = f"BTC {close_significance} Close: {crypto_data['bitcoin'].price}"
message_body = "<p>"
message_body += default_msg(
ticker=crypto_data["bitcoin"].ticker,
price=crypto_data["bitcoin"].price,
percent_change=crypto_data["bitcoin"].change_percent_24h
)
if close_significance == MONTHLY or close_significance == WEEKLY:
if close_significance == MONTHLY:
btc_wm_percent = crypto_data['bitcoin'].change_percent_30d
else:
btc_wm_percent = crypto_data['bitcoin'].change_percent_7d
btc_wm_up_down = up_down_icon(btc_wm_percent)
message_body += (f"{close_significance.title()} Change "
f"{btc_wm_up_down} {btc_wm_percent}%")
message_body += "</p>"
if user_options:
user_choices = format_coingecko_ids(user_options)[::2]
message_body += "<p>"
for choice in user_choices:
crypto_dict = crypto_data[choice.lower()]
message_body += default_msg(
ticker=crypto_dict.ticker,
price=crypto_dict.price,
percent_change=crypto_dict.change_percent_24h
)
if close_significance == MONTHLY or close_significance == WEEKLY:
if close_significance == MONTHLY:
crypto_wm_percent = crypto_dict.change_percent_30d
else:
crypto_wm_percent = crypto_dict.change_percent_7d
message_body += htf_msg(
timeframe=interval,
percent_change=crypto_wm_percent
)
message_body += "</p>"
if stock_market_open:
message_body += "<p>"
for stock in stock_data:
stock_dict = stock_data[stock]
message_body += default_msg(
ticker=stock_dict.ticker,
price=stock_dict.close,
percent_change=stock_dict.change_percent_24h
)
if close_significance == MONTHLY or close_significance == WEEKLY:
if close_significance == MONTHLY:
stock_wm_percent = stock_dict.change_percent_monthly
else:
stock_wm_percent = stock_dict.change_percent_weekly
message_body += htf_msg(
timeframe=interval,
percent_change=stock_wm_percent
)
message_body += "</p>"
message_body += (f"<p>BTC Market Cap:<br>"
f"{format_dollars(crypto_data['bitcoin'].mcap)}<br>"
f"Total Cryptocurrency Market Cap:<br>"
f"{format_dollars(crypto_man.crypto_total_mcap)}</p>")
html = f"""
<html>
<body>
{message_body}
<br>
<hr>
<p>Click <a href="https://forms.gle/5UMWTWM8HMuHKexW9">here</a> to update your preferences or unsubscribe.</p>
<p>© 2024 Kevin To</p>
</body>
</html>
"""
async_emails.append(
asyncio.create_task(email_man.send_emails(user_email=user_email, subject=subject, html_text=html))
)
await asyncio.gather(*async_emails)
if __name__ == "__main__":
asyncio.run(main())