-
Notifications
You must be signed in to change notification settings - Fork 6
/
unpacme_al.py
127 lines (101 loc) · 4.48 KB
/
unpacme_al.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import json
import time
from assemblyline_v4_service.common.base import ServiceBase
from assemblyline_v4_service.common.result import BODY_FORMAT, Heuristic, Result, ResultSection
from unpacme import unpacme
MAX_TIMEOUT = 300
TIMEOUT_INCREMENT = 15
class UnpacMeAL(ServiceBase):
environments = {}
compiled_rules = None
def __init__(self, config=None):
super(UnpacMeAL, self).__init__(config)
@staticmethod
def filetype_check(request):
valid_filetype = False
if request.file_type == 'executable/windows/pe32':
valid_filetype = True
return valid_filetype
def prechecks(self, request, api_key):
passed_prechecks = True
if not self.filetype_check(request):
self.log.error(f"UNPACME currently only supports 32-bit Portable Executables.")
passed_prechecks = False
# Accessing the service request object results in an exception
# regardless of this not existing.
if not api_key or api_key == '':
self.log.error(f"An API key is required to make API calls to UNPACME.")
passed_prechecks = False
return passed_prechecks
@staticmethod
def check_status(upm, rid):
status = upm.get_analysis_report(rid)
rstatus = None
if status == 'validating' or status == 'unpacking':
rstatus = None
# Results from the UNPACME library will be stored in a dictionary
# once the run is complete.
elif type(status) is dict:
rstatus = status
return rstatus
def wait_for_completion(self, upm, record):
total = 0
status = None
while not status:
status = self.check_status(upm, record['id'])
if status:
break
time.sleep(TIMEOUT_INCREMENT)
total += TIMEOUT_INCREMENT
if total > MAX_TIMEOUT:
self.log.error(f"Maximum timeout reached for processing of sample.")
break
return status
def process_results(self, analysis_results, upm):
results = {
'unpacked': False,
'unpacked_samples': []
}
if len(analysis_results['results']) > 1:
results['unpacked'] = True
for ar in analysis_results['results']:
downloaded = upm.download_sample(ar['hashes']['sha256'], self.working_directory)
dl_path = None
if downloaded:
dl_path = "{}/{}.bin".format(self.working_directory, ar['hashes']['sha256'])
results['unpacked_samples'].append({
'sha256': ar['hashes']['sha256'],
'malware_id': ar['malware_id'],
'data_path': dl_path
})
return results
def generate_results(self, presults, result, analysis_results, request):
if presults['unpacked']:
result.add_section(ResultSection("Successully unpacked binary.", heuristic=Heuristic(1)))
for r in presults['unpacked_samples']:
if len(r['malware_id']) > 0:
for rm in r['malware_id']:
section = ResultSection("{} - {}".format(r['sha256'], rm['name']), heuristic=Heuristic(2))
section.add_line("Details: {}".format(rm['reference']))
result.add_section(section)
request.add_extracted(r['data_path'], r['sha256'], f'Unpacked from {request.sha256}',
safelist_interface=self.api_interface)
result.add_section(ResultSection(f"UNPACME Detailed Results",
body_format=BODY_FORMAT.JSON,
body=json.dumps(analysis_results['results'])))
return result, request
def execute(self, request):
# Result Object
result = Result()
api_key = request.get_param("api_key")
if self.prechecks(request, api_key):
upm = unpacme.UnpacMe(api_key)
record = upm.upload_file(request.file_path)
if record['success']:
analysis_results = self.wait_for_completion(upm, record)
if analysis_results:
presults = self.process_results(analysis_results, upm)
result, request = self.generate_results(presults, result, analysis_results, request)
else:
self.log.error(f"An exception occurred while uploading the sample to UNPACME: %s" % record['msg'])
request.result = result