-
Notifications
You must be signed in to change notification settings - Fork 9
/
CreateFeaturesHandler.py
104 lines (97 loc) · 4.75 KB
/
CreateFeaturesHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from scapy.all import *
from FeaturesCalc import FeaturesCalc
from CSV import CSV
from PacketFilter import PacketFilter
from AttackerCalc import AttackerCalc
import glob
class CreateFeaturesHandler():
def __init__(self, pkts_window_size=10, single_csv=True):
self.pkts_window_size = pkts_window_size
assert self.pkts_window_size >=1, "Valore per la finestra non valido"
self.single_csv = single_csv
assert (self.single_csv is True) or (self.single_csv is False), "Valore non valido per il flag single_csv"
self.featuresCalc = FeaturesCalc(flow_type="malware", min_window_size=pkts_window_size)
ip_to_ignore = ["127.0.0.1"]
self.filter_1 = PacketFilter(ip_whitelist_filter=[], ip_blacklist_filter=ip_to_ignore, TCP=True)
self.filter_2 = PacketFilter(ip_whitelist_filter=[], ip_blacklist_filter=ip_to_ignore, UDP=True)
self.filter_3 = PacketFilter(ip_whitelist_filter=[], ip_blacklist_filter=ip_to_ignore, ICMP=True)
self.filters = [self.filter_1, self.filter_2, self.filter_3]
if(self.single_csv):
self.csv = CSV(file_name="features")
self.csv.create_empty_csv()
self.csv.add_row(self.featuresCalc.get_features_name())
def compute_features(self):
def malware_features():
folder_name = "Pcaps_Malware"
flow_type = "malware"
if (self.featuresCalc.get_flow_type() == flow_type):
pass
else:
self.featuresCalc.set_flow_type(flow_type)
for pcap in glob.glob(folder_name + "/" + "*.pcap"):
if(self.single_csv):
csv = self.csv
else:
pcap_name = pcap.split("/")
pcap_name = pcap_name[len(pcap_name)-1].replace(".pcap", "")
csv = CSV(file_name=pcap_name, folder_name="Malware_Features")
csv.create_empty_csv()
csv.add_row(self.featuresCalc.get_features_name())
array_of_pkts = []
print("\nCalcolo features di " + pcap + "\n")
attacker = AttackerCalc(pcap=pcap)
ip_to_consider = attacker.compute_attacker()
for filter in self.filters:
filter.set_ip_whitelist_filter(ip_to_consider)
pkts = rdpcap(pcap)
filter_res=[]
for pkt in pkts:
for filter in self.filters:
if(filter.check_packet_filter(pkt)):
filter_res.append(True)
else:
filter_res.append(False)
if(True in filter_res):
array_of_pkts.append(pkt)
if (len(array_of_pkts) >= self.featuresCalc.get_min_window_size()):
features = self.featuresCalc.compute_features(array_of_pkts)
csv.add_row(features)
array_of_pkts.clear()
filter_res.clear()
def legitimate_features():
folder_name = "Pcaps_Legitimate"
flow_type = "legitimate"
if (self.featuresCalc.get_flow_type() == flow_type):
pass
else:
self.featuresCalc.set_flow_type(flow_type)
for filter in self.filters:
filter.set_ip_whitelist_filter([])
for pcap in glob.glob(folder_name + "/" + "*.pcap"):
if(self.single_csv):
csv = self.csv
else:
pcap_name = pcap.split("/")
pcap_name = pcap_name[len(pcap_name) - 1].replace(".pcap", "")
csv = CSV(file_name=pcap_name, folder_name="Legitimate_Features")
csv.create_empty_csv()
csv.add_row(self.featuresCalc.get_features_name())
array_of_pkts = []
filter_res = []
print("\nCalcolo features di " + pcap + "\n")
pkts = rdpcap(pcap)
for pkt in pkts:
for filter in self.filters:
if(filter.check_packet_filter(pkt)):
filter_res.append(True)
else:
filter_res.append(False)
if(True in filter_res):
array_of_pkts.append(pkt)
if (len(array_of_pkts) >= self.featuresCalc.get_min_window_size()):
features = self.featuresCalc.compute_features(array_of_pkts)
csv.add_row(features)
array_of_pkts.clear()
filter_res.clear()
malware_features()
legitimate_features()