Skip to content

Commit

Permalink
feat: allow other DoH servers to be used for DNS lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
sesh committed Mar 9, 2024
1 parent 3e09e3f commit 0345f61
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 33 deletions.
62 changes: 43 additions & 19 deletions ready/checks/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@

# Check: SPF TXT record should exist
def check_spf_record_should_exist(responses, **kwargs):
records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if not records and "dns_txt_response_fld" in responses:
records = [r["data"] for r in responses["dns_txt_response_fld"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response_fld"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

return result(
len(records) > 0,
Expand All @@ -22,10 +30,18 @@ def check_spf_record_should_exist(responses, **kwargs):

# Check: SPF TXT record should contain "-all"
def check_spf_txt_record_should_disallow_all(responses, **kwargs):
records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if not records and "dns_txt_response_fld" in responses:
records = [r["data"] for r in responses["dns_txt_response_fld"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response_fld"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

return result(
records and all(["-all" in r for r in records]),
Expand All @@ -50,7 +66,7 @@ def check_spf_dns_record_does_not_exist(responses, **kwargs):
)


def _spf_for_domain(domain, depth=0, lookups=[]):
def _spf_for_domain(domain, depth=0, lookups=[], dns_resolver=None):
if domain in lookups:
return []

Expand All @@ -59,7 +75,7 @@ def _spf_for_domain(domain, depth=0, lookups=[]):

j = json.loads(response.content)

spf_records = [(domain, x["data"]) for x in j.get("Answer", []) if x["data"].startswith("v=spf")]
spf_records = [(domain, x["data"]) for x in j.get("Answer", []) if x["data"].strip('"').strip("'").startswith("v=spf")]

results = [x for x in spf_records]

Expand All @@ -68,41 +84,45 @@ def _spf_for_domain(domain, depth=0, lookups=[]):
if depth > 13:
return results

matches = re.findall("include\:([^\s]+)", record)
matches = re.findall(r"include\:([^\s]+)", record)

for d in matches:
results.extend(_spf_for_domain(d, depth, lookups))
results.extend(_spf_for_domain(d, depth, lookups, dns_resolver))

matches = re.findall("redirect\=([^\s]+)", record)
matches = re.findall(r"redirect\=([^\s]+)", record)

for d in matches:
results.extend(_spf_for_domain(d, depth, lookups))
results.extend(_spf_for_domain(d, depth, lookups, dns_resolver))

return results


# Check: SPF includes use less than 10 DNS requests
def check_spf_uses_less_than_10_requests(responses, **kwargs):
records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if not records and "dns_txt_response_fld" in responses:
records = [
r["data"]
for r in responses["dns_txt_response_fld"].json.get("Answer", [])
if "data" in r and r["data"].startswith("v=spf")
if "data" in r and r["data"].strip('"').strip("'").startswith("v=spf")
]

additional_lookups = []
for record in records:
matches = re.findall("include\:([^\s]+)", record)
matches = re.findall(r"include\:([^\s]+)", record)

for d in matches:
additional_lookups.extend(_spf_for_domain(d))
additional_lookups.extend(_spf_for_domain(d, dns_resolver=kwargs["dns_resolver"]))

matches = re.findall("redirect\=([^\s]+)", record)
matches = re.findall(r"redirect\=([^\s]+)", record)

for d in matches:
additional_lookups.extend(_spf_for_domain(d))
additional_lookups.extend(_spf_for_domain(d, dns_resolver=kwargs["dns_resolver"]))

return result(
len(additional_lookups) <= 10,
Expand All @@ -120,7 +140,7 @@ def check_dmarc_record_should_exist(responses, **kwargs):
records = [r["data"] for r in responses["dns_dmarc_response_fld"].json.get("Answer", []) if "data" in r]

return result(
records and all([r.startswith("v=DMARC1") for r in records]),
records and all([r.strip('"').strip("'").startswith("v=DMARC1") for r in records]),
f"DMARC record should exist ({records})",
"email_dmarc_exists",
**kwargs,
Expand Down Expand Up @@ -161,15 +181,19 @@ def check_spf_dash_all(responses, **kwargs):
mx_record_data = [r["data"] for r in mx_records]

if len(mx_record_data) == 0 or all([r == "0 ." for r in mx_record_data]):
spf_records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
spf_records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if spf_records:
spf_record = spf_records[0]
else:
spf_record = ""

return result(
spf_record.lower().strip() == "v=spf1 -all",
spf_record.lower().strip().strip('"').strip("'") == "v=spf1 -all",
f"SPF should be 'v=spf1 -all' if there are no MX records or MX record is '.' ({spf_record})",
"email_spf_disallow_all_with_empty_mx",
**kwargs,
Expand Down
50 changes: 36 additions & 14 deletions ready/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@
"accept-encoding": "gzip",
}

DNS_RESOLVERS = {
"quad9": "https://dns.quad9.net:5053/dns-query",
"google": "https://dns.google/resolve",
"doh.li": "https://doh.li/dns-query",
"dns.sb": "https://doh.dns.sb/dns-query",
}


def response_or_none(url, name="", request_filter="", **kwargs):
if request_filter and request_filter not in name:
Expand All @@ -145,6 +152,7 @@ def ready(
fuzz=False,
check_filter=None,
request_filter=None,
dns_resolver="https://dns.google/resolve",
extra_args={},
):
domain_with_no_path = urllib.parse.urlparse("https://" + domain).hostname
Expand Down Expand Up @@ -196,41 +204,41 @@ def ready(
)

responses["dns_ns_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=NS", "dns_ns_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=NS", "dns_ns_response", request_filter
)
responses["dns_mx_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=MX", "dns_mx_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=MX", "dns_mx_response", request_filter
)
responses["dns_txt_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=TXT", "dns_txt_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=TXT", "dns_txt_response", request_filter
)
responses["dns_spf_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=SPF", "dns_spf_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=SPF", "dns_spf_response", request_filter
)
responses["dns_caa_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=CAA", "dns_caa_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=CAA", "dns_caa_response", request_filter
)
responses["dns_a_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=A", "dns_aaaa_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=A", "dns_aaaa_response", request_filter
)
responses["dns_aaaa_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=AAAA", "dns_aaaa_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=AAAA", "dns_aaaa_response", request_filter
)
responses["dns_dmarc_response"] = response_or_none(
f"https://dns.google/resolve?name=_dmarc.{domain_with_no_path}&type=TXT", "dns_dmarc_response", request_filter
f"{dns_resolver}?name=_dmarc.{domain_with_no_path}&type=TXT", "dns_dmarc_response", request_filter
)

if USE_FLD and domain != fld:
responses["response_fld"] = response_or_none(
f"https://{fld}", "response_fld", request_filter, verify=False, headers=DEFAULT_HEADERS, timeout=3
)

responses["dns_ns_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=NS")
responses["dns_mx_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=MX")
responses["dns_spf_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=SPF")
responses["dns_txt_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=TXT")
responses["dns_dmarc_response_fld"] = response_or_none(f"https://dns.google/resolve?name=_dmarc.{fld}&type=TXT")
responses["dns_caa_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=CAA")
responses["dns_ns_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=NS")
responses["dns_mx_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=MX")
responses["dns_spf_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=SPF")
responses["dns_txt_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=TXT")
responses["dns_dmarc_response_fld"] = response_or_none(f"{dns_resolver}?name=_dmarc.{fld}&type=TXT")
responses["dns_caa_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=CAA")

checks = []
is_html = responses["response"] and "html" in responses["response"].headers.get("content-type", "")
Expand Down Expand Up @@ -344,6 +352,7 @@ def ready(
)

extra_args["print_output"] = not hide_output
extra_args["dns_resolver"] = dns_resolver

results = []
for c in checks:
Expand Down Expand Up @@ -432,6 +441,18 @@ def cli():
usage()
sys.exit()

resolver_name = args.get("--dns-resolver", "google")
if not resolver_name.startswith("http"):
if resolver_name.lower() in DNS_RESOLVERS:
dns_resolver = DNS_RESOLVERS[resolver_name.lower()]
else:
print(
f"{resolver_name} is not a valid DNS resolver name. Provide one of {DNS_RESOLVERS.keys()} or a full URI for the DoH resolver."
)
sys.exit(1)
else:
dns_resolver = resolver_name

results = ready(
args["[]"][0],
print_headers=args.get("--headers", False),
Expand All @@ -441,6 +462,7 @@ def cli():
fuzz=args.get("--fuzz", False),
check_filter=args.get("--check-filter", ""),
request_filter=args.get("--request-filter", ""),
dns_resolver=dns_resolver,
)

if "--score" in args:
Expand Down

0 comments on commit 0345f61

Please sign in to comment.