-
Notifications
You must be signed in to change notification settings - Fork 0
/
email_notifier.py
60 lines (51 loc) · 2.31 KB
/
email_notifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import aiosmtplib
import httpx
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from validators import GoogleDriveData
from config import Settings
settings = Settings()
class EmailNotifier:
"""Handles API request to receive user data from Sheety API/Google Docs and
sends requested data to users via email SMTP"""
def __init__(self):
self._smtp_username = settings.smtp_username
self._smtp_password = settings.smtp_password
self._sheety_endpoint = settings.sheety_users_endpoint
self._sheety_header = {"Authorization": settings.sheety_bearer}
self.users_data: GoogleDriveData
async def get_emails_data(self) -> None:
"""API request to get all stored user data"""
try:
print("Getting user data from Sheety API...")
async with httpx.AsyncClient() as client:
users_response = await client.get(
url=self._sheety_endpoint,
headers=self._sheety_header
)
users_response.raise_for_status()
data = users_response.json()["users"]
except Exception as e:
print(f"Failed to retrieve user preferences from Sheety API", e)
else:
print("Storing user preferences")
self.users_data = GoogleDriveData(data=data)
async def send_emails(self, user_email: str, subject: str, html_text: str) -> None:
"""Emails requested data to users via MIME multipart and SMTP and checks for successful delivery"""
print(f"Emailing {user_email}...")
message = MIMEMultipart()
message["From"] = self._smtp_username
message["To"] = user_email
message["Subject"] = subject
message.attach(MIMEText(html_text, "html"))
try:
async with aiosmtplib.SMTP(
hostname=settings.smtp_hostname,
port=settings.smtp_port,
start_tls=True
) as connection:
await connection.login(self._smtp_username, self._smtp_password)
result = await connection.send_message(message)
print(f"Emailed {user_email} successfully!", result)
except aiosmtplib.SMTPException as e:
print(f"Failed to send email to {user_email}:", e)