-
Notifications
You must be signed in to change notification settings - Fork 5
/
server.py
87 lines (65 loc) · 2.64 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import email
import logging
from email.message import Message
import dkim
import spf
from aiosmtpd.smtp import Envelope, Session, SMTP
from tools import message_to_display
log = logging.getLogger("smtphandler")
test_results = ['FAIL', 'PASS']
class ExampleHandler:
def __init__(self,
accept_host: str = None,
verify_dkim: bool = False,
verify_spf: bool = False):
self.accept_host = accept_host
self.verify_dkim = verify_dkim
self.verify_spf = verify_spf
# async def handle_EHLO(self,
# server: SMTP,
# session: Session,
# envelope: Envelope,
# hostname: str):
# return '250-AUTH PLAIN'
async def handle_MAIL(self,
server: SMTP,
session: Session,
envelope: Envelope,
address: str,
mail_options: list):
ip = session.peer[0]
result, description = spf.check2(ip, address, session.host_name)
valid_spf = result == 'pass'
envelope.spf = valid_spf
log.info("SPF: %s, %s", result, description)
if self.verify_spf and not valid_spf:
return '550 SPF validation failed'
envelope.mail_from = address
envelope.mail_options.extend(mail_options)
return '250 OK'
async def handle_RCPT(self,
server: SMTP,
session: Session,
envelope: Envelope,
address: str,
rcpt_options: list):
if self.accept_host and not address.endswith('@' + self.accept_host):
return '550 not relaying to that domain'
log.debug("Handle RCPT for %s", address)
envelope.rcpt_tos.append(address)
return '250 OK'
async def handle_DATA(self,
server: SMTP,
session: Session,
envelope: Envelope):
valid_dkim = dkim.verify(envelope.content)
envelope.dkim = valid_dkim
log.info("DKIM: %s", test_results[valid_dkim])
message: Message = email.message_from_bytes(envelope.content)
log.info('From: %s', message['From'])
log.info('To: %s', message['To'])
log.info('Subject: %s', message['Subject'])
log.info('Message data:\n%s', message_to_display(message))
if self.verify_dkim and not valid_dkim:
return '550 DKIM validation failed'
return '250 Message accepted for delivery'