Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into pyproject
Browse files Browse the repository at this point in the history
  • Loading branch information
awdeorio committed Feb 2, 2024
2 parents 6b5d7d3 + 17f7a40 commit 6204e9c
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ This example will send progress reports to students. The template uses more of
TO: {{email}}
SUBJECT: EECS 280 Mid-semester Progress Report
FROM: My Self <[email protected]>
REPLY-TO: My Reply Self <[email protected]>
Dear {{name}},
Expand Down
10 changes: 10 additions & 0 deletions mailmerge/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ def create_sample_input_files(template_path, database_path, config_path):
# username = YOUR_USERNAME_HERE
# ratelimit = 0
# Example: XOAUTH
# Enter your token at the password prompt. For Microsoft OAuth
# authentication, a token can be obtained with the oauth2ms tool
# https://github.com/harishkrupo/oauth2ms
# [smtp_server]
# host = smtp.office365.com
# port = 587
# security = XOAUTH
# username = [email protected]
# Example: No security
# [smtp_server]
# host = newman.eecs.umich.edu
Expand Down
85 changes: 67 additions & 18 deletions mailmerge/sendmail_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import configparser
import getpass
import datetime
import base64
import ssl
from . import exceptions


# Type to store info read from config file
MailmergeConfig = collections.namedtuple(
"MailmergeConfig",
Expand Down Expand Up @@ -49,7 +50,7 @@ def read_config(self):
security = None

# Verify security type
if security not in [None, "SSL/TLS", "STARTTLS", "PLAIN"]:
if security not in [None, "SSL/TLS", "STARTTLS", "PLAIN", "XOAUTH"]:
raise exceptions.MailmergeError(
f"{self.config_path}: unrecognized security type: '{security}'"
)
Expand Down Expand Up @@ -86,27 +87,18 @@ def sendmail(self, sender, recipients, message):
)

# Send
host, port = self.config.host, self.config.port
try:
message_flattened = str(message)
host, port = self.config.host, self.config.port
if self.config.security == "SSL/TLS":
with smtplib.SMTP_SSL(host, port) as smtp:
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)
self.sendmail_ssltls(sender, recipients, message)
elif self.config.security == "STARTTLS":
with smtplib.SMTP(host, port) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)
self.sendmail_starttls(sender, recipients, message)
elif self.config.security == "PLAIN":
with smtplib.SMTP(host, port) as smtp:
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)
self.sendmail_plain(sender, recipients, message)
elif self.config.security == "XOAUTH":
self.sendmail_xoauth(sender, recipients, message)
elif self.config.security is None:
with smtplib.SMTP(host, port) as smtp:
smtp.sendmail(sender, recipients, message_flattened)
self.sendmail_clear(sender, recipients, message)
except smtplib.SMTPAuthenticationError as err:
raise exceptions.MailmergeError(
f"{host}:{port} failed to authenticate "
Expand All @@ -123,3 +115,60 @@ def sendmail(self, sender, recipients, message):

# Update timestamp of last sent message
self.lastsent = now

def sendmail_ssltls(self, sender, recipients, message):
"""Send email message with SSL/TLS security."""
message_flattened = str(message)
try:
ctx = ssl.create_default_context()
except ssl.SSLError as err:
raise exceptions.MailmergeError(f"SSL Error: {err}")
host, port = (self.config.host, self.config.port)
with smtplib.SMTP_SSL(host, port, context=ctx) as smtp:
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)

def sendmail_starttls(self, sender, recipients, message):
"""Send email message with STARTTLS security."""
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)

def sendmail_plain(self, sender, recipients, message):
"""Send email message with plain security."""
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.login(self.config.username, self.password)
smtp.sendmail(sender, recipients, message_flattened)

def sendmail_clear(self, sender, recipients, message):
"""Send email message with no security."""
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.sendmail(sender, recipients, message_flattened)

def sendmail_xoauth(self, sender, recipients, message):
"""Send email message with XOAUTH security."""
xoauth2 = (
f"user={self.config.username}\x01"
f"auth=Bearer {self.password}\x01\x01"
)
try:
xoauth2 = xoauth2.encode("ascii")
except UnicodeEncodeError as err:
raise exceptions.MailmergeError(
f"Username and XOAUTH access token must be ASCII '{xoauth2}'. "
f"{err}, "
)
message_flattened = str(message)
with smtplib.SMTP(self.config.host, self.config.port) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
smtp.docmd('AUTH XOAUTH2')
smtp.docmd(str(base64.b64encode(xoauth2).decode("utf-8")))
smtp.sendmail(sender, recipients, message_flattened)
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mailmerge"
version = "2.2.1"
version = "2.2.3"
description = "A simple, command line mail merge tool"
keywords = ["mail merge", "mailmerge", "email"]
authors = [{ email = "[email protected]" }, { name = "Andrew DeOrio" }]
Expand All @@ -20,7 +20,6 @@ dependencies = [

[project.optional-dependencies]
dev = [
"pdbpp",
"twine",
"tox",
]
Expand Down
132 changes: 131 additions & 1 deletion tests/test_sendmail_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import smtplib
import email
import email.parser
import base64
import ssl
import pytest
from mailmerge import SendmailClient, MailmergeError

Expand Down Expand Up @@ -250,6 +252,90 @@ def test_security_starttls(mocker, tmp_path):
assert smtp.sendmail.call_count == 1


def test_security_xoauth(mocker, tmp_path):
"""Verify XOAUTH security configuration."""
# Config for XOAUTH SMTP server
config_path = tmp_path/"server.conf"
config_path.write_text(textwrap.dedent("""\
[smtp_server]
host = smtp.office365.com
port = 587
security = XOAUTH
username = [email protected]
"""))

# Simple template
sendmail_client = SendmailClient(config_path, dry_run=False)
message = email.message_from_string("Hello world")

# Mock SMTP
mock_smtp = mocker.patch('smtplib.SMTP')
mock_smtp_ssl = mocker.patch('smtplib.SMTP_SSL')

# Mock the password entry
mock_getpass = mocker.patch('getpass.getpass')
mock_getpass.return_value = "password"

# Send a message
sendmail_client.sendmail(
sender="[email protected]",
recipients=["[email protected]"],
message=message,
)

# Verify SMTP library calls
assert mock_getpass.call_count == 1
assert mock_smtp.call_count == 1
assert mock_smtp_ssl.call_count == 0
smtp = mock_smtp.return_value.__enter__.return_value
assert smtp.ehlo.call_count == 2
assert smtp.starttls.call_count == 1
assert smtp.login.call_count == 0
assert smtp.docmd.call_count == 2
assert smtp.sendmail.call_count == 1

# Verify authentication token format. The first call to docmd() is always
# the same. Second call to docmd() contains a base64 encoded username and
# password.
assert smtp.docmd.call_args_list[0].args[0] == "AUTH XOAUTH2"
user_pass = smtp.docmd.call_args_list[1].args[0]
user_pass = base64.b64decode(user_pass)
assert user_pass == \
b'[email protected]\x01auth=Bearer password\x01\x01'


def test_security_xoauth_bad_username(mocker, tmp_path):
"""Verify exception is thrown for UTF-8 username."""
# Config for XOAUTH SMTP server
config_path = tmp_path/"server.conf"
config_path.write_text(textwrap.dedent("""\
[smtp_server]
host = smtp.office365.com
port = 587
security = XOAUTH
username = Laȝ[email protected]
"""))

# Simple template
sendmail_client = SendmailClient(config_path, dry_run=False)
message = email.message_from_string("Hello world")

# Mock the password entry
mock_getpass = mocker.patch('getpass.getpass')
mock_getpass.return_value = "password"

# Send a message
with pytest.raises(MailmergeError) as err:
sendmail_client.sendmail(
sender="[email protected]",
recipients=["[email protected]"],
message=message,
)

# Verify exception string
assert "Username and XOAUTH access token must be ASCII" in str(err.value)


def test_security_plain(mocker, tmp_path):
"""Verify plain security configuration."""
# Config for Plain SMTP server
Expand Down Expand Up @@ -293,7 +379,7 @@ def test_security_plain(mocker, tmp_path):


def test_security_ssl(mocker, tmp_path):
"""Verify open (Never) security configuration."""
"""Verify SSL/TLS security configuration."""
# Config for SSL SMTP server
config_path = tmp_path/"server.conf"
config_path.write_text(textwrap.dedent("""\
Expand All @@ -312,6 +398,10 @@ def test_security_ssl(mocker, tmp_path):
mock_smtp = mocker.patch('smtplib.SMTP')
mock_smtp_ssl = mocker.patch('smtplib.SMTP_SSL')

# Mock SSL
mock_ssl_create_default_context = \
mocker.patch('ssl.create_default_context')

# Mock the password entry
mock_getpass = mocker.patch('getpass.getpass')
mock_getpass.return_value = "password"
Expand All @@ -327,13 +417,53 @@ def test_security_ssl(mocker, tmp_path):
assert mock_getpass.call_count == 1
assert mock_smtp.call_count == 0
assert mock_smtp_ssl.call_count == 1
assert mock_ssl_create_default_context.called
assert "context" in mock_smtp_ssl.call_args[1] # SSL cert chain
smtp = mock_smtp_ssl.return_value.__enter__.return_value
assert smtp.ehlo.call_count == 0
assert smtp.starttls.call_count == 0
assert smtp.login.call_count == 1
assert smtp.sendmail.call_count == 1


def test_ssl_error(mocker, tmp_path):
"""Verify SSL/TLS with an SSL error."""
# Config for SSL SMTP server
config_path = tmp_path/"server.conf"
config_path.write_text(textwrap.dedent("""\
[smtp_server]
host = smtp.mail.umich.edu
port = 465
security = SSL/TLS
username = YOUR_USERNAME_HERE
"""))

# Simple template
sendmail_client = SendmailClient(config_path, dry_run=False)
message = email.message_from_string("Hello world")

# Mock ssl.create_default_context() to raise an exception
mocker.patch(
'ssl.create_default_context',
side_effect=ssl.SSLError(1, "CERTIFICATE_VERIFY_FAILED")
)

# Mock the password entry
mock_getpass = mocker.patch('getpass.getpass')
mock_getpass.return_value = "password"

# Send a message
with pytest.raises(MailmergeError) as err:
sendmail_client.sendmail(
sender="[email protected]",
recipients=["[email protected]"],
message=message,
)

# Verify exception string
assert "CERTIFICATE_VERIFY_FAILED" in str(err.value)


def test_missing_username(tmp_path):
"""Verify exception on missing username."""
config_path = tmp_path/"server.conf"
Expand Down

0 comments on commit 6204e9c

Please sign in to comment.