-
Notifications
You must be signed in to change notification settings - Fork 9
/
AttackerCalc.py
132 lines (113 loc) · 4.32 KB
/
AttackerCalc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from scapy.all import *
class AttackerCalc():
def __init__(self, pcap=None, list_of_packets=None, policy='first_ip', window_size=15, filter_ip=['0.0.0.0', '127.0.0.1'], number_of_ip=1):
self.pcap = pcap
self.policy = policy
self.window_size = window_size
self.pcap_rd = None
self.filter = filter_ip
self.number_of_ip = number_of_ip
self.list_of_packets = list_of_packets
assert self.get_number_of_ip() == 1, "La classe non e' ancora implementata per riconoscere piu' attacanti"
assert self.get_window_size() > 0, "Dimensioni della finestra non valide"
def compute_attacker(self):
policy = self.get_policy()
if(policy == "first_ip"):
ip = self.first_ip_policy()
elif(policy == "max_in_window"):
ip = self.max_in_window_policy()
else:
print("Selezionare una policy.")
return ip
def first_ip_policy(self):
assert self.list_of_packets is not None or self.pcap is not None, "Assegnare un pcap o una lista di pacchetti"
print("\n" + "Calcolo attaccante" + "\n")
if(self.get_list_of_packets() is not None):
pkts = self.get_list_of_packets()
elif(self.get_pcap() is not None):
pkts = rdpcap(self.get_pcap())
else:
print("Assegnare un pcap o una lista di pacchetti.")
return []
ip_list = []
for i in range(0, len(pkts) - 1):
if (pkts[i].haslayer("IP")):
if (pkts[i]["IP"].src in self.get_filter()):
pass
else:
ip_list.append(pkts[i]["IP"].src)
break
else:
pass
return ip_list
def max_in_window_policy(self):
assert self.list_of_packets is not None or self.pcap is not None, "Assegnare un pcap o una lista di pacchetti"
print("\n" + "Calcolo attaccante" + "\n")
if(self.get_list_of_packets() is not None):
pkts = self.get_list_of_packets()
elif(self.get_pcap() is not None):
pkts = rdpcap(self.get_pcap())
else:
print("Assegnare un pcap o una lista di pacchetti.")
return []
ip_dict = {}
if(len(pkts) >= self.get_window_size()):
for pkt in pkts:
if (pkt.haslayer("IP")):
if(pkt["IP"].src in self.get_filter()):
pass
else:
ip = pkt["IP"].src
older_value = ip_dict.get(ip)
if (older_value != None):
new_value = older_value + 1
ip_dict.update({ip: new_value})
else:
ip_dict.update({ip: 1})
else:
pass
else:
print("Fornita una finestra troppo piccola.")
return []
def find_max_between_IP(ip_dict):
ip = []
i = 0
for key, value in ip_dict.items():
if (value > i):
i = value
if(len(ip) > 0):
ip.pop()
ip.append(key)
else:
pass
return ip
ip = find_max_between_IP(ip_dict)
return ip
def set_number_of_ip(self, number_of_ip):
self.number_of_ip = number_of_ip
def get_number_of_ip(self):
return self.number_of_ip
def set_filter(self, filter):
self.filter = filter
def get_filter(self):
return self.filter
def add_ip_to_filter(self, list_of_ip):
assert isinstance(list_of_ip, list), "Inserire gli ip in una lista."
for ip in list_of_ip:
self.filter.append(ip)
def set_pcap(self, pcap):
self.pcap = pcap
def get_pcap(self):
return self.pcap
def set_policy(self, policy):
self.policy = policy
def get_policy(self):
return self.policy
def set_window_size(self, size):
self.window_size = size
def get_window_size(self):
return self.window_size
def set_list_of_packets(self, list_of_packets):
self.list_of_packets = list_of_packets
def get_list_of_packets(self):
return self.list_of_packets