forked from punk-security/dnsReaper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
domain.py
143 lines (126 loc) · 4.47 KB
/
domain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import ipaddress
import socket
from collections import namedtuple
import dns.resolver
from functools import lru_cache
import requests
import logging
import urllib3
import collections
collections.Iterable = collections.abc.Iterable
collections.Mapping = collections.abc.Mapping
import whois
class Domain:
@property
@lru_cache
def SOA(self):
return self.query("SOA")
@property
@lru_cache
def NX_DOMAIN(self):
record_types = ["A", "AAAA", "CNAME", "TXT", "MX", "NS"]
for record_type in record_types:
if self.query(record_type):
return False
return True
def query(self, type):
try:
resp = self.resolver.resolve(self.domain, type)
return [record.to_text().rstrip(".") for record in resp]
except:
return []
def fetch_std_records(self):
# TODO: is this recursive?
self.CNAME = self.query("CNAME")
self.A = self.query("A")
self.AAAA = self.query("AAAA")
if self.CNAME:
# return early if we get a CNAME otherwise we get records for the cname aswell
# this is actually desirable for A/AAAA but not NS as the next zone
# will be queried based on the CNAME value, not the original domain
return
self.NS = self.query("NS")
def fetch_external_records(self):
for cname in self.CNAME:
split_cname = cname.split(".", 1)
if len(split_cname) == 1:
continue # This cname has no zone to assess
if self.base_domain == split_cname[1]:
continue # Same zone, dont fetch
d = Domain(cname)
d.fetch_std_records()
self.A += d.A
self.AAAA += d.AAAA
self.CNAME += d.CNAME
for ns in self.NS:
try:
d = Domain(self.domain)
d.set_custom_NS(ns=ns)
self.A += d.A
self.AAAA += d.AAAA
except:
logging.debug(
f"We could not resolve the provided NS record '{ns}' to an ip"
)
def set_custom_NS(self, ns: str):
if type(ns) != str:
logging.error(f"Cannot set custom NS as {ns} not a string")
self.resolver = dns.resolver.Resolver()
try:
ipaddress.ip_address(ns)
self.resolver.nameservers = [ns]
except ValueError:
try:
self.resolver.nameservers = [socket.gethostbyname(ns.rstrip("."))]
except:
self.resolver.nameservers = []
def set_base_domain(self):
split_domain = self.domain.split(".", 1)
if len(split_domain) > 1:
self.base_domain = split_domain[1]
else:
self.base_domain = "."
def __init__(self, domain, fetch_standard_records=True, ns=None):
self.domain = domain.rstrip(".")
self.NS = []
self.A = []
self.AAAA = []
self.CNAME = []
self.set_base_domain()
self.requests = requests
if ns == None:
self.resolver = dns.resolver
else:
self.set_custom_NS(ns)
self.resolver.timeout = 1
self.should_fetch_std_records = fetch_standard_records
@lru_cache
def fetch_web(self, uri="", https=True, params={}):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
protocol = "https" if https else "http"
url = f"{protocol}://{self.domain}/{uri}"
try:
resp = self.requests.get(url, timeout=5, verify=False, params=params)
web_status = resp.status_code
web_body = resp.content.decode()
except:
web_status = 0
web_body = ""
return namedtuple("web_response", ["status_code", "body"])(web_status, web_body)
@property
@lru_cache
def is_registered(self):
try:
whois.whois(self.domain)
return True
except whois.parser.PywhoisError as e:
if e.args[0] == "No whois server is known for this kind of object.":
# This is the only case of a potentially registered domain
# triggering a PywhoisError
# https://github.com/richardpenman/whois/blob/56dc7e41d134e6d4343ad80a48533681bd887ff2/whois/parser.py#L201
return True
return False
except Exception:
return True
def __repr__(self):
return self.domain