-
Notifications
You must be signed in to change notification settings - Fork 9
/
PacketFilter.py
142 lines (115 loc) · 4.01 KB
/
PacketFilter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class PacketFilter():
def __init__(self, ip_whitelist_filter=[], ip_blacklist_filter=[], IPv4=False, TCP=False, UDP=False, ICMP=False, DNS=False):
self.ip_whitelist_filter = ip_whitelist_filter
self.ip_blacklist_filter = ip_blacklist_filter
self.IPv4 = IPv4
self.TCP = TCP
self.UDP = UDP
self.ICMP = ICMP
self.DNS = DNS
filters = [self.IPv4, self.TCP, self.UDP, self.ICMP, self.DNS]
assert sum(filters) <= 1, "You have to set just one protocol filter."
if(len(self.ip_whitelist_filter) > 0 or len(self.ip_blacklist_filter) > 0):
self.set_IPv4_filter(True)
def check_packet_filter(self, pkt):
results = []
def IPv4_filter(pkt):
if(pkt.haslayer("IP")):
return True
else:
return False
def ip_blacklist_filter(pkt, check_list):
if(IPv4_filter(pkt) is True):
if(len(check_list) > 0):
if(pkt["IP"].src not in check_list):
return True
else:
return False
else:
return True
else:
return False
def ip_whitelist_filter(pkt, check_list):
if(IPv4_filter(pkt) is True):
if(len(check_list) > 0):
if(pkt["IP"].src in check_list):
return True
else:
return False
else:
return True
else:
return False
def UDP_filter(pkt):
if(pkt.haslayer("UDP")):
return True
else:
return False
def TCP_filter(pkt):
if(pkt.haslayer("TCP")):
return True
else:
return False
def DNS_filter(pkt):
if(pkt.haslayer("DNS")):
return True
else:
return False
def ICMP_filter(pkt):
if(pkt.haslayer("ICMP")):
return True
else:
return False
if(self.get_IPv4_filter() is True):
res = IPv4_filter(pkt)
results.append(res)
if(len(self.get_ip_blacklist_filter()) > 0):
res = ip_blacklist_filter(pkt, self.get_ip_blacklist_filter())
results.append(res)
if(len(self.get_ip_whitelist_filter()) > 0):
res = ip_whitelist_filter(pkt, self.get_ip_whitelist_filter())
results.append(res)
if(self.get_TCP_filter() is True):
res = TCP_filter(pkt)
results.append(res)
if(self.get_UDP_filter() is True):
res = UDP_filter(pkt)
results.append(res)
if(self.get_ICMP_filter() is True):
res = ICMP_filter(pkt)
results.append(res)
if(self.get_DNS_filter() is True):
res = DNS_filter(pkt)
results.append(res)
if(False in results):
return False
else:
return True
def set_IPv4_filter(self, val):
self.IPv4 = val
def set_ip_whitelist_filter(self, ip_filter):
self.ip_whitelist_filter = ip_filter
def set_ip_blacklist_filter(self, ip_filter):
self.ip_blacklist_filter = ip_filter
def set_TCP_filter(self, val):
self.TCP = val
def set_UDP_filter(self, val):
self.UDP = val
def get_TCP_filter(self):
return self.TCP
def get_UDP_filter(self):
return self.UDP
def get_IPv4_filter(self):
return self.IPv4
def set_ICMP_filter(self, val):
self.ICMP = val
def get_ICMP_filter(self):
return self.ICMP
def set_DNS_filter(self, val):
self.DNS = val
def get_DNS_filter(self):
return self.DNS
def get_ip_whitelist_filter(self):
return self.ip_whitelist_filter
def get_ip_blacklist_filter(self):
return self.ip_blacklist_filter