From ea06b7e58239f30a902a5f476ddead840d3cb781 Mon Sep 17 00:00:00 2001 From: John Sinteur Date: Sat, 5 Mar 2022 17:05:03 +0100 Subject: [PATCH] first version WIP - to be tested on a large sample set --- checks/categories.py | 63 ++++++++++++++++++++++++++++++++++++++++++++ checks/models.py | 9 +++++-- checks/scoring.py | 4 +++ checks/tasks/tls.py | 59 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 2 deletions(-) diff --git a/checks/categories.py b/checks/categories.py index 8bf99f5c1..4bc3a17ad 100644 --- a/checks/categories.py +++ b/checks/categories.py @@ -149,6 +149,7 @@ def __init__(self, name="web-tls"): WebTlsCertPubkey, WebTlsCertSignature, WebTlsCertHostmatch, + WebCaa, WebTlsDaneExists, WebTlsDaneValid, WebTlsZeroRTT, @@ -221,6 +222,7 @@ def __init__(self, name="mail-tls"): MailTlsCertPubkey, MailTlsCertSignature, MailTlsCertHostmatch, + MailCaa, MailTlsDaneExists, MailTlsDaneValid, MailTlsDaneRollover, @@ -1041,6 +1043,35 @@ def result_bad(self, tech_data): self.tech_data = tech_data +class WebCaa(Subtest): + def __init__(self): + super(WebCaa, self).__init__( + name="cert_caa", + label="detail web cert-caa label", + explanation="detail web cert-caa exp", + tech_string="", + init_tech_type="", + worst_status=scoring.CAA_WORST_STATUS, + full_score=scoring.CAA_GOOD, + model_score_field="cert_caa_score", + ) + + def result_good(self,tech_data): + self._status(STATUS_SUCCESS) + self.verdict = "detail web cert-caa verdict good" + self.tech_data = "" + + def result_info(self, tech_data): + self._status(STATUS_INFO) + self.verdict = "detail web cert-caa verdict warning" + self.tech_data = "" + + def result_bad(self, tech_data): + self._status(STATUS_FAIL) + self.verdict = "detail web cert-caa verdict bad" + self.tech_data = "" + + class WebTlsDaneExists(Subtest): def __init__(self): super(WebTlsDaneExists, self).__init__( @@ -1619,6 +1650,38 @@ def result_has_daneTA(self, tech_data): self.tech_data = tech_data +class MailCaa(Subtest): + def __init__(self): + super(MailCaa, self).__init__( + name="cert_caa", + label="detail mail cert-caa label", + explanation="detail mail cert-caa exp", + tech_string="detail mail cert-caa tech table", + init_tech_type="", + worst_status=scoring.CAA_WORST_STATUS, + full_score=scoring.CAA_GOOD, + model_score_field="cert_caa_score", + ) + + def result_good(self, tech_data): + self._status(STATUS_SUCCESS) + self.verdict = "detail mail cert-caa verdict good" + self.tech_data = "" + + def result_info(self, tech_data): + self._status(STATUS_INFO) + self.verdict = "detail mail cert-caa verdict warning" + self.tech_data = "" + + def result_bad(self, tech_data): + self._status(STATUS_FAIL) + self.verdict = "detail mail cert-caa verdict bad" + self.tech_data = "" + + + + + class MailTlsZeroRTT(Subtest): def __init__(self): super(MailTlsZeroRTT, self).__init__( diff --git a/checks/models.py b/checks/models.py index cfa122646..72997c3d7 100644 --- a/checks/models.py +++ b/checks/models.py @@ -485,6 +485,9 @@ class DomainTestTls(BaseTestModel): cert_hostmatch_bad = ListField(null=True) cert_hostmatch_score = models.IntegerField(null=True) + cert_caa_score = models.IntegerField(null=True) + cert_caa_record = ListField(default=[]) + score = models.IntegerField(null=True) def __dir__(self): @@ -505,7 +508,7 @@ def __dir__(self): 'hsts_score', 'cert_chain', 'cert_trusted', 'cert_trusted_score', 'cert_pubkey_bad', 'cert_pubkey_phase_out', 'cert_pubkey_score', 'cert_signature_bad', 'cert_signature_score', 'cert_hostmatch_bad', - 'cert_hostmatch_score', 'score', 'protocols_good', + 'cert_hostmatch_score', 'cert_caa_score','cert_caa_record','score', 'protocols_good', ] def get_web_api_details(self): @@ -536,6 +539,7 @@ def get_web_api_details(self): 'cert_pubkey_phase_out': self.cert_pubkey_phase_out, 'cert_signature_bad': self.cert_signature_bad, 'cert_hostmatch_bad': self.cert_hostmatch_bad, + 'cert_caa_bad': self.cert_caa_bad, } def get_mail_api_details(self): @@ -562,7 +566,8 @@ def get_mail_api_details(self): 'cert_pubkey_phase_out': self.cert_pubkey_phase_out, 'cert_signature_bad': self.cert_signature_bad, 'cert_hostmatch_bad': self.cert_hostmatch_bad, - } + 'cert_caa_bad': self.cert_caa_bad, + } class WebTestAppsecpriv(DomainServersModel): diff --git a/checks/scoring.py b/checks/scoring.py index a44709747..8e7806395 100644 --- a/checks/scoring.py +++ b/checks/scoring.py @@ -151,6 +151,10 @@ WEB_TLS_HOSTMATCH_BAD = NO_POINTS WEB_TLS_HOSTMATCH_WORST_STATUS = STATUS_FAIL +CAA_GOOD = FULL_WEIGHT_POINTS +CAA_BAD = NO_POINTS +CAA_WORST_STATUS = STATUS_INFO + # DANE_EXISTS has no score. It is combined with # DANE_VALID below. WEB_TLS_DANE_EXISTS_WORST_STATUS = STATUS_INFO diff --git a/checks/tasks/tls.py b/checks/tasks/tls.py index 158d07c98..8c0f3ef15 100644 --- a/checks/tasks/tls.py +++ b/checks/tasks/tls.py @@ -7,6 +7,7 @@ import socket import ssl import time +import unbound from enum import Enum from timeit import default_timer as timer @@ -576,6 +577,8 @@ def save_results(model, results, addr, domain, category): model.cert_signature_score = result.get("sigalg_score") model.cert_hostmatch_score = result.get("hostmatch_score") model.cert_hostmatch_bad = result.get("hostmatch_bad") + model.cert_caa_score = result.get("caa_score") + model.cert_caa_record = result.get("caa_record") model.dane_log = result.get("dane_log") model.dane_score = result.get("dane_score") model.dane_status = result.get("dane_status") @@ -645,6 +648,7 @@ def save_results(model, results, addr, domain, category): model.cert_signature_score = result.get("sigalg_score") model.cert_hostmatch_score = result.get("hostmatch_score") model.cert_hostmatch_bad = result.get("hostmatch_bad") + model.cert_caa_score = result.get("caa_score") model.dane_log = result.get("dane_log") model.dane_score = result.get("dane_score") model.dane_status = result.get("dane_status") @@ -802,6 +806,13 @@ def annotate_and_combine(bad_items, phaseout_items): else: category.subtests['cert_hostmatch'].result_good() + if dttls.cert_caa_score is None: + category.subtests['cert_caa'].result_info(dttls.cert_caa_record) + elif dttls.cert_caa_score is scoring.CAA_WORST_STATUS: + category.subtests['cert_caa'].result_info(dttls.cert_caa_record) + else: + category.subtests['cert_caa'].result_good(dttls.cert_caa_record) + if dttls.dane_status == DaneStatus.none: category.subtests['dane_exists'].result_bad() elif dttls.dane_status == DaneStatus.none_bogus: @@ -1545,6 +1556,51 @@ def do_web_cert(af_ip_pairs, url, task, *args, **kwargs): return ('cert', results) +def as_txt(data): + try: + txt = "".join(unbound.ub_data.dname2str(data)) + except UnicodeError: + txt = "" + return txt + + +def caa_callback(data, status, r): + data['score'] = scoring.CAA_WORST_STATUS + data['available'] = False + data['record'] = [] + if status == 0: + available = False + if r.rcode == unbound.RCODE_NOERROR and r.havedata == 1: + available = True + score = scoring.CAA_GOOD + for d in r.data.data: + txt = as_txt(d) + data['record'].append(txt) + elif r.rcode == unbound.RCODE_NXDOMAIN: + # we know for sure there is no DKIM pubkey + score = scoring.CAA_WORST_STATUS + else: + # resolving problems, servfail probably + score = scoring.CAA_WORST_STATUS + data['score'] = score + data['available'] = available + data['done'] = True + + +def check_caa(task,url): + caa_score = scoring.CAA_GOOD + try: + cb_data = task.async_resolv(url, unbound.RR_TYPE_CAA, caa_callback) + result = dict(available='available' in cb_data and cb_data['available'], score=cb_data['score']) + caa_score = cb_data['score'] + # KeyError is due to score missing, happens in case of timeout on non resolving domain + except (SoftTimeLimitExceeded, KeyError): + result = dict(available=False, score=scoring.scoring.CAA_WORST_STATUS) + caa_score = scoring.CAA_WORST_STATUS + return ( caa_score, cb_data['record']) + + + def cert_checks( url, mode, task, af_ip_pair=None, starttls_details=None, *args, **kwargs): @@ -1610,6 +1666,7 @@ def cert_checks( pubkey_score, pubkey_bad, pubkey_phase_out = debug_chain.check_pubkey() sigalg_score, sigalg_bad = debug_chain.check_sigalg() chain_str = debug_chain.chain_str() + caa_score, caa_record = check_caa(task,url) if starttls_details: dane_results = debug_chain.check_dane( @@ -1631,6 +1688,8 @@ def cert_checks( sigalg_score=sigalg_score, hostmatch_bad=hostmatch_bad, hostmatch_score=hostmatch_score, + caa_score=caa_score, + caa_record=caa_record, ) results.update(dane_results)