Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow other DoH servers to be used for DNS lookups #32

Merged
merged 1 commit into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions ready/checks/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@

# Check: SPF TXT record should exist
def check_spf_record_should_exist(responses, **kwargs):
records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if not records and "dns_txt_response_fld" in responses:
records = [r["data"] for r in responses["dns_txt_response_fld"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response_fld"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

return result(
len(records) > 0,
Expand All @@ -22,10 +30,18 @@ def check_spf_record_should_exist(responses, **kwargs):

# Check: SPF TXT record should contain "-all"
def check_spf_txt_record_should_disallow_all(responses, **kwargs):
records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if not records and "dns_txt_response_fld" in responses:
records = [r["data"] for r in responses["dns_txt_response_fld"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response_fld"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

return result(
records and all(["-all" in r for r in records]),
Expand All @@ -50,16 +66,16 @@ def check_spf_dns_record_does_not_exist(responses, **kwargs):
)


def _spf_for_domain(domain, depth=0, lookups=[]):
def _spf_for_domain(domain, depth=0, lookups=[], dns_resolver=None):
if domain in lookups:
return []

response = thttp.request(f"https://dns.google/resolve?name={domain}&type=TXT")
response = thttp.request(f"{dns_resolver}?name={domain}&type=TXT")
lookups.append(domain)

j = json.loads(response.content)

spf_records = [(domain, x["data"]) for x in j.get("Answer", []) if x["data"].startswith("v=spf")]
spf_records = [(domain, x["data"]) for x in j.get("Answer", []) if x["data"].strip('"').strip("'").startswith("v=spf")]

results = [x for x in spf_records]

Expand All @@ -68,41 +84,45 @@ def _spf_for_domain(domain, depth=0, lookups=[]):
if depth > 13:
return results

matches = re.findall("include\:([^\s]+)", record)
matches = re.findall(r"include\:([^\s]+)", record)

for d in matches:
results.extend(_spf_for_domain(d, depth, lookups))
results.extend(_spf_for_domain(d, depth, lookups, dns_resolver))

matches = re.findall("redirect\=([^\s]+)", record)
matches = re.findall(r"redirect\=([^\s]+)", record)

for d in matches:
results.extend(_spf_for_domain(d, depth, lookups))
results.extend(_spf_for_domain(d, depth, lookups, dns_resolver))

return results


# Check: SPF includes use less than 10 DNS requests
def check_spf_uses_less_than_10_requests(responses, **kwargs):
records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if not records and "dns_txt_response_fld" in responses:
records = [
r["data"]
for r in responses["dns_txt_response_fld"].json.get("Answer", [])
if "data" in r and r["data"].startswith("v=spf")
if "data" in r and r["data"].strip('"').strip("'").startswith("v=spf")
]

additional_lookups = []
for record in records:
matches = re.findall("include\:([^\s]+)", record)
matches = re.findall(r"include\:([^\s]+)", record)

for d in matches:
additional_lookups.extend(_spf_for_domain(d))
additional_lookups.extend(_spf_for_domain(d, dns_resolver=kwargs["dns_resolver"]))

matches = re.findall("redirect\=([^\s]+)", record)
matches = re.findall(r"redirect\=([^\s]+)", record)

for d in matches:
additional_lookups.extend(_spf_for_domain(d))
additional_lookups.extend(_spf_for_domain(d, dns_resolver=kwargs["dns_resolver"]))

return result(
len(additional_lookups) <= 10,
Expand All @@ -120,7 +140,7 @@ def check_dmarc_record_should_exist(responses, **kwargs):
records = [r["data"] for r in responses["dns_dmarc_response_fld"].json.get("Answer", []) if "data" in r]

return result(
records and all([r.startswith("v=DMARC1") for r in records]),
records and all([r.strip('"').strip("'").startswith("v=DMARC1") for r in records]),
f"DMARC record should exist ({records})",
"email_dmarc_exists",
**kwargs,
Expand Down Expand Up @@ -161,15 +181,19 @@ def check_spf_dash_all(responses, **kwargs):
mx_record_data = [r["data"] for r in mx_records]

if len(mx_record_data) == 0 or all([r == "0 ." for r in mx_record_data]):
spf_records = [r["data"] for r in responses["dns_txt_response"].json.get("Answer", []) if r["data"].startswith("v=spf")]
spf_records = [
r["data"]
for r in responses["dns_txt_response"].json.get("Answer", [])
if r["data"].strip('"').strip("'").startswith("v=spf")
]

if spf_records:
spf_record = spf_records[0]
else:
spf_record = ""

return result(
spf_record.lower().strip() == "v=spf1 -all",
spf_record.lower().strip().strip('"').strip("'") == "v=spf1 -all",
f"SPF should be 'v=spf1 -all' if there are no MX records or MX record is '.' ({spf_record})",
"email_spf_disallow_all_with_empty_mx",
**kwargs,
Expand Down
50 changes: 36 additions & 14 deletions ready/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@
"accept-encoding": "gzip",
}

DNS_RESOLVERS = {
"quad9": "https://dns.quad9.net:5053/dns-query",
"google": "https://dns.google/resolve",
"doh.li": "https://doh.li/dns-query",
"dns.sb": "https://doh.dns.sb/dns-query",
}


def response_or_none(url, name="", request_filter="", **kwargs):
if request_filter and request_filter not in name:
Expand All @@ -145,6 +152,7 @@ def ready(
fuzz=False,
check_filter=None,
request_filter=None,
dns_resolver="https://dns.google/resolve",
extra_args={},
):
domain_with_no_path = urllib.parse.urlparse("https://" + domain).hostname
Expand Down Expand Up @@ -196,41 +204,41 @@ def ready(
)

responses["dns_ns_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=NS", "dns_ns_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=NS", "dns_ns_response", request_filter
)
responses["dns_mx_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=MX", "dns_mx_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=MX", "dns_mx_response", request_filter
)
responses["dns_txt_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=TXT", "dns_txt_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=TXT", "dns_txt_response", request_filter
)
responses["dns_spf_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=SPF", "dns_spf_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=SPF", "dns_spf_response", request_filter
)
responses["dns_caa_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=CAA", "dns_caa_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=CAA", "dns_caa_response", request_filter
)
responses["dns_a_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=A", "dns_aaaa_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=A", "dns_aaaa_response", request_filter
)
responses["dns_aaaa_response"] = response_or_none(
f"https://dns.google/resolve?name={domain_with_no_path}&type=AAAA", "dns_aaaa_response", request_filter
f"{dns_resolver}?name={domain_with_no_path}&type=AAAA", "dns_aaaa_response", request_filter
)
responses["dns_dmarc_response"] = response_or_none(
f"https://dns.google/resolve?name=_dmarc.{domain_with_no_path}&type=TXT", "dns_dmarc_response", request_filter
f"{dns_resolver}?name=_dmarc.{domain_with_no_path}&type=TXT", "dns_dmarc_response", request_filter
)

if USE_FLD and domain != fld:
responses["response_fld"] = response_or_none(
f"https://{fld}", "response_fld", request_filter, verify=False, headers=DEFAULT_HEADERS, timeout=3
)

responses["dns_ns_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=NS")
responses["dns_mx_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=MX")
responses["dns_spf_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=SPF")
responses["dns_txt_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=TXT")
responses["dns_dmarc_response_fld"] = response_or_none(f"https://dns.google/resolve?name=_dmarc.{fld}&type=TXT")
responses["dns_caa_response_fld"] = response_or_none(f"https://dns.google/resolve?name={fld}&type=CAA")
responses["dns_ns_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=NS")
responses["dns_mx_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=MX")
responses["dns_spf_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=SPF")
responses["dns_txt_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=TXT")
responses["dns_dmarc_response_fld"] = response_or_none(f"{dns_resolver}?name=_dmarc.{fld}&type=TXT")
responses["dns_caa_response_fld"] = response_or_none(f"{dns_resolver}?name={fld}&type=CAA")

checks = []
is_html = responses["response"] and "html" in responses["response"].headers.get("content-type", "")
Expand Down Expand Up @@ -344,6 +352,7 @@ def ready(
)

extra_args["print_output"] = not hide_output
extra_args["dns_resolver"] = dns_resolver

results = []
for c in checks:
Expand Down Expand Up @@ -432,6 +441,18 @@ def cli():
usage()
sys.exit()

resolver_name = args.get("--dns-resolver", "google")
if not resolver_name.startswith("http"):
if resolver_name.lower() in DNS_RESOLVERS:
dns_resolver = DNS_RESOLVERS[resolver_name.lower()]
else:
print(
f"{resolver_name} is not a valid DNS resolver name. Provide one of {DNS_RESOLVERS.keys()} or a full URI for the DoH resolver."
)
sys.exit(1)
else:
dns_resolver = resolver_name

results = ready(
args["[]"][0],
print_headers=args.get("--headers", False),
Expand All @@ -441,6 +462,7 @@ def cli():
fuzz=args.get("--fuzz", False),
check_filter=args.get("--check-filter", ""),
request_filter=args.get("--request-filter", ""),
dns_resolver=dns_resolver,
)

if "--score" in args:
Expand Down
Loading