-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add checks for RFC8460, SMTP-TLS reporting (TLSRPT) #881 #1300
base: main
Are you sure you want to change the base?
Changes from all commits
a4678d0
ef6031c
524babf
bc46910
e7c43d8
2eab6b4
6cafb7b
0ddc6d5
d8cd152
643144c
49b432b
7a1d8c6
7c3518d
398912c
b1575a0
3e1258c
96f78fe
b175489
1155487
885c4e6
94aa878
35f5fb6
75b123c
3d5a27a
b4005c5
12c4c6c
346fc71
cb79b70
0a6a177
a852507
0fbe116
63b7e5b
b886bcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -222,6 +222,7 @@ def __init__(self, name="mail-auth"): | |
MailAuthDkim, | ||
MailAuthSpf, | ||
MailAuthSpfPolicy, | ||
MailAuthTlsRptExists, | ||
] | ||
super().__init__(name, subtests) | ||
|
||
|
@@ -2182,8 +2183,8 @@ def __init__(self): | |
label="detail mail auth spf label", | ||
explanation="detail mail auth spf exp", | ||
tech_string="detail mail auth spf tech table", | ||
worst_status=scoring.MAIL_AUTH_SPF_WORST_STATUS, | ||
full_score=scoring.MAIL_AUTH_SPF_PASS, | ||
worst_status=scoring.MAIL_AUTH_TLSRPT_WORST_STATUS, | ||
full_score=scoring.MAIL_AUTH_TLSRPT_PASS, | ||
model_score_field="spf_score", | ||
) | ||
# Fix for one line, one value data. | ||
|
@@ -2250,6 +2251,36 @@ def result_bad_redirect(self, tech_data): | |
self.tech_data = tech_data | ||
|
||
|
||
class MailAuthTlsRptExists(Subtest): | ||
def __init__(self): | ||
super().__init__( | ||
name="tlsrpt", | ||
label="detail mail auth tlsrpt label", | ||
explanation="detail mail auth tlsrpt exp", | ||
tech_string="detail mail auth tlsrpt tech table", | ||
worst_status=scoring.MAIL_AUTH_SPF_WORST_STATUS, | ||
full_score=scoring.MAIL_AUTH_SPF_PASS, | ||
model_score_field="tlsrpt_score", | ||
) | ||
# Fix for one line, one value data. | ||
self.tech_data = [[self.tech_data]] | ||
|
||
def result_good(self, tech_data): | ||
self._status(STATUS_SUCCESS) | ||
self.verdict = "detail mail auth tlsrpt verdict good" | ||
self.tech_data = [[tech_data]] | ||
|
||
def result_bad(self, tech_data): | ||
self._status(STATUS_NOTICE) | ||
self.verdict = "detail mail auth tlsrpt verdict bad" | ||
if tech_data: | ||
# More than one spf record. Show the records. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this comment fits the rest of the code? Might be wrong even for SPF. |
||
self.tech_data = [[tech_data]] | ||
else: | ||
self.tech_data = "" | ||
self.tech_type = "" | ||
|
||
|
||
# --- APPSECPRIV | ||
class WebAppsecprivHttpXFrame(Subtest): | ||
def __init__(self): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Created by Uwe Kamper <[email protected]> with Django 3.2.23 on 2024-02-12 13:30 | ||
|
||
import checks.models | ||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("checks", "0015_auto_20240212_1616"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="mailtestauth", | ||
name="tlsrpt_score", | ||
field=models.IntegerField(null=True), | ||
), | ||
migrations.AddField( | ||
model_name="mailtestauth", | ||
name="tlsrpt_available", | ||
field=models.BooleanField(default=False, null=True), | ||
), | ||
migrations.AddField( | ||
model_name="mailtestauth", | ||
name="tlsrpt_record", | ||
field=checks.models.ListField(default=[]), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -235,6 +235,11 @@ | |
MAIL_AUTH_SPF_ERROR = NO_POINTS | ||
MAIL_AUTH_SPF_WORST_STATUS = STATUS_FAIL | ||
|
||
MAIL_AUTH_TLSRPT_PASS = NO_POINTS | ||
MAIL_AUTH_TLSRPT_FAIL = NO_POINTS # TLS-RPT fail does not give a points penalty | ||
MAIL_AUTH_TLSRPT_ERROR = NO_POINTS | ||
MAIL_AUTH_TLSRPT_WORST_STATUS = STATUS_FAIL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per the scoring documentation this should probably also use FULL/NO/NO_POINTS for PASS/FAIL/ERROR, with the WORST_STATUS as STATUS_INFO (or STATUS_NOTICE if we want to give it more weight). That setting for WORST_STATUS should give it no score impact. |
||
|
||
MAIL_AUTH_SPF_POLICY_PASS = FULL_WEIGHT_POINTS | ||
MAIL_AUTH_SPF_POLICY_PARTIAL = LESS_WEIGHT_POINTS | ||
MAIL_AUTH_SPF_POLICY_FAIL = NO_POINTS | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
from checks.tasks.dispatcher import check_registry, post_callback_hook | ||
from checks.tasks.dmarc_parser import parse as dmarc_parse | ||
from checks.tasks.spf_parser import parse as spf_parse | ||
from checks.tasks import tlsrpt_parsing | ||
from interface import batch, batch_shared_task, redis_id | ||
from internetnl import log | ||
|
||
|
@@ -114,6 +115,17 @@ def batch_spf(self, url, *args, **kwargs): | |
return do_spf(self, url, *args, **kwargs) | ||
|
||
|
||
@mail_registered | ||
@shared_task( | ||
bind=True, | ||
soft_time_limit=settings.SHARED_TASK_SOFT_TIME_LIMIT_LOW, | ||
time_limit=settings.SHARED_TASK_TIME_LIMIT_LOW, | ||
base=SetupUnboundContext, | ||
) | ||
def tlsrpt(self, url, *args, **kwargs): | ||
return do_tlsrpt(self, url, *args, **kwargs) | ||
|
||
|
||
def skip_dkim_for_non_sending_domain(mtauth): | ||
""" | ||
If there is no DKIM, check if DMARC and SPF are hinting for a non email | ||
|
@@ -214,10 +226,23 @@ def callback(results, addr, category): | |
subtests["spf_policy"].result_bad_include(spf_records) | ||
elif spf_policy_status == SpfPolicyStatus.invalid_redirect: | ||
subtests["spf_policy"].result_bad_redirect(spf_records) | ||
|
||
else: | ||
subtests["spf"].result_bad(spf_record) | ||
|
||
elif testname == "tlsrpt": | ||
tlsrpt_available = result.get("available") | ||
tlsrpt_record = result.get("record") | ||
tlsrpt_score = result.get("score") | ||
|
||
# Pass results to mtauth and subtests | ||
mtauth.tlsrpt_available = tlsrpt_available | ||
mtauth.tlsrpt_record = tlsrpt_record | ||
mtauth.tlsrpt_score = tlsrpt_score | ||
log.debug(f"subtests: {subtests.keys()}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to keep this log, it should be a bit more clear. |
||
if tlsrpt_available: | ||
subtests["tlsrpt"].result_good(tlsrpt_record) | ||
else: | ||
subtests["tlsrpt"].result_bad(tlsrpt_record) | ||
if skip_dkim_for_non_sending_domain(mtauth): | ||
mtauth.dkim_score = scoring.MAIL_AUTH_DKIM_PASS | ||
subtests["dkim"].result_no_email() | ||
|
@@ -824,3 +849,76 @@ def dmarc_get_public_suffix_list(): | |
public_suffix_list = dmarc_get_public_suffix_list() | ||
|
||
return public_suffix_list | ||
|
||
|
||
def tlsrpt_callback(data, status, r): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add types for these parameters? We try to do it for all new code. I have no idea what "status" means here, or why it should be 0. |
||
data["score"] = scoring.MAIL_AUTH_TLSRPT_FAIL | ||
data["available"] = False | ||
data["record"] = [] | ||
if status == 0: | ||
record = [] | ||
available = False | ||
if r.rcode == unbound.RCODE_NOERROR and r.havedata == 1: | ||
# TXT record(s) for _smtp._tls.{domain} found, start looking for TLSRPT | ||
score = scoring.MAIL_AUTH_TLSRPT_FAIL | ||
for d in r.data.data: | ||
txt = as_txt(d) | ||
log.debug(f"tlsrpt: found record '{txt.lower()}'") | ||
if txt.lower().startswith("v=tlsrptv1"): | ||
record.append(txt) | ||
if tlsrpt_parsing.parse_silent(txt) is None: | ||
# A parsing error has occured | ||
available = False | ||
score = scoring.MAIL_AUTH_TLSRPT_FAIL | ||
break | ||
if available: | ||
# We see more than one TLSRPT record. Fail the test. | ||
available = False | ||
score = scoring.MAIL_AUTH_TLSRPT_FAIL | ||
break | ||
else: | ||
available = True | ||
score = scoring.MAIL_AUTH_TLSRPT_PASS | ||
elif r.rcode == unbound.RCODE_NXDOMAIN or (r.rcode == unbound.RCODE_NOERROR and r.havedata == 0): | ||
# we know for sure there is no TLSRPT record | ||
score = scoring.MAIL_AUTH_TLSRPT_FAIL | ||
else: | ||
# resolving problems, servfail probably | ||
score = scoring.MAIL_AUTH_TLSRPT_ERROR | ||
|
||
data["score"] = score | ||
data["available"] = available | ||
data["record"] = record | ||
data["done"] = True | ||
|
||
|
||
def resolve_tlsrpt_record(url, task): | ||
# Make sure, url does not start with a dot, then add "_smtp._tls." in front | ||
# of the domain name. | ||
tls_rpt_url = f'_smtp._tls.{url.lstrip(".")}' | ||
return task.async_resolv(tls_rpt_url, unbound.RR_TYPE_TXT, callback=tlsrpt_callback) | ||
|
||
|
||
def do_tlsrpt(self, url, *args, **kwargs): | ||
try: | ||
cb_data = resolve_tlsrpt_record(url, self) | ||
available = "available" in cb_data and cb_data["available"] | ||
score = cb_data["score"] | ||
record = cb_data["record"] | ||
|
||
result = dict( | ||
available=available, | ||
score=score, | ||
record=record, | ||
) | ||
|
||
# KeyError is due to score missing, happens in case of timeout on non resolving domain | ||
except (SoftTimeLimitExceeded, KeyError) as specific_exception: | ||
log.debug("Soft time limit exceeded: %s", specific_exception) | ||
result = dict( | ||
available=False, | ||
score=scoring.MAIL_AUTH_TLSRPT_FAIL, | ||
record=[], | ||
) | ||
# return a tuple containing ("testname", result) | ||
return ("tlsrpt", result) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# Copyright: 2022-2024, ECP, NLnet Labs, the Internet.nl contributors and SYS4 AG. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
""" | ||
SMTP TLS Reporting policy parser as defined by: | ||
|
||
RFC 8460, Section "3. Reporting Policy", see: | ||
https://datatracker.ietf.org/doc/html/rfc8460#section-3 | ||
""" | ||
|
||
from pyparsing import ( | ||
Literal, | ||
CaselessLiteral, | ||
Combine, | ||
ParseException, | ||
Regex, | ||
White, | ||
Word, | ||
ZeroOrMore, | ||
alphanums, | ||
pyparsing_common, | ||
delimitedList, | ||
) | ||
|
||
|
||
WSP = White(ws=" ", exact=1).suppress() # Whitespace | ||
|
||
field_delim = ZeroOrMore(WSP) + Literal(";") + ZeroOrMore(WSP) # Fields are semicolon-delimited | ||
ura_delim = ZeroOrMore(WSP) + Literal(",") + ZeroOrMore(WSP) # multiple RUAs are comma-delimited | ||
|
||
tlsrpt_ext_name = Word(alphanums, alphanums + "_-.", max=32) | ||
tlsrpt_ext_value = Word(alphanums, alphanums + "_-.") | ||
tlsrpt_extension = ZeroOrMore(tlsrpt_ext_name + Literal("=") + tlsrpt_ext_value) | ||
|
||
# RegEx for parsing email. | ||
regex_tld = r"(?:[a-zA-Z]{2,63}|xn--[a-zA-Z0-9]+)" | ||
regex_mailaddr = ( | ||
r"(?P<mailaddr>([a-zA-Z0-9]{0,61}@)?([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+" r"" + regex_tld + ")" | ||
) | ||
mail_uri = Combine(CaselessLiteral("mailto:") + Regex(regex_mailaddr)) | ||
tlsrpt_rua = Literal("rua=") + delimitedList(mail_uri | pyparsing_common.url, delim=",").setResultsName("tlsrpt_uri") | ||
|
||
tlsrpt_field = tlsrpt_rua + ZeroOrMore(field_delim + tlsrpt_extension) | ||
|
||
# Literal will match the version string as required by the ABNF in the RFC: | ||
# tlsrpt-version = %s"v=TLSRPTv1" | ||
version = Literal("v=TLSRPTv1").setResultsName("tlsrpt_version") | ||
|
||
record = version + field_delim + tlsrpt_field | ||
|
||
|
||
def parse_silent(tlsrpt_record): | ||
""" | ||
Will return None if there was a parsing error and a ParseResult object otherwise. | ||
""" | ||
try: | ||
parsed = record.parseString(tlsrpt_record) | ||
except ParseException: | ||
parsed = None | ||
except Exception as e: | ||
print(f"{e.__class__.__name__}: {e}") | ||
parsed = None | ||
return parsed |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from checks.tasks import tlsrpt_parsing | ||
|
||
|
||
def test_record_parse_simple_mailto(): | ||
TXT_RECORD = "v=TLSRPTv1; rua=mailto:[email protected]" | ||
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD) | ||
assert parsed.tlsrpt_version == "v=TLSRPTv1" | ||
assert parsed.tlsrpt_uri[0] == "mailto:[email protected]" | ||
|
||
|
||
def test_record_parse_multiple_mailto(): | ||
TXT_RECORD = "v=TLSRPTv1;rua=mailto:[email protected],mailto:[email protected]" | ||
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD) | ||
assert parsed.tlsrpt_version == "v=TLSRPTv1" | ||
assert parsed.tlsrpt_uri[0] == "mailto:[email protected]" | ||
assert parsed.tlsrpt_uri[1] == "mailto:[email protected]" | ||
|
||
|
||
def test_record_parse_simple_https(): | ||
TXT_RECORD = "v=TLSRPTv1; rua=https://reporting.example.com/v1/tlsrpt" | ||
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD) | ||
assert parsed.tlsrpt_version == "v=TLSRPTv1" | ||
assert parsed.tlsrpt_uri[0] == "https://reporting.example.com/v1/tlsrpt" | ||
|
||
|
||
def test_record_parse_with_extension(): | ||
TXT_RECORD = "v=TLSRPTv1; rua=https://reporting.example.com/v1/tlsrpt; ext=extvalue" | ||
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD) | ||
assert parsed.tlsrpt_version == "v=TLSRPTv1" | ||
|
||
|
||
def test_parse_silent(): | ||
""" | ||
Check that parse_silent does not throw a ParseException but instead returns | ||
None if the TLSRPT policy record is malformed. | ||
""" | ||
TXT_RECORD = "v=TLSRPTv1; rua=!!" # broken TLSRPT | ||
parsed = tlsrpt_parsing.parse_silent(TXT_RECORD) | ||
assert parsed is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this should stay at SPF, and TLSRPT should be used in MailAuthTlsRptExists