forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
urlscan.py
262 lines (219 loc) · 9.84 KB
/
urlscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import json
import requests
import logging
import sys
import time
log = logging.getLogger('urlscan')
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
log.addHandler(ch)
moduleinfo = {
'version': '0.1',
'author': 'Dave Johnson',
'description': 'Module to query urlscan.io',
'module-type': ['expansion']
}
moduleconfig = ['apikey']
misperrors = {'error': 'Error'}
mispattributes = {
'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url'],
'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url', 'text', 'link', 'hash']
}
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('config') or not request['config'].get('apikey'):
misperrors['error'] = 'Urlscan apikey is missing'
return misperrors
client = urlscanAPI(request['config']['apikey'])
r = {'results': []}
if 'ip-src' in request:
r['results'] += lookup_indicator(client, request['ip-src'])
if 'ip-dst' in request:
r['results'] += lookup_indicator(client, request['ip-dst'])
if 'domain' in request:
r['results'] += lookup_indicator(client, request['domain'])
if 'hostname' in request:
r['results'] += lookup_indicator(client, request['hostname'])
if 'url' in request:
r['results'] += lookup_indicator(client, request['url'])
# Return any errors generated from lookup to the UI and remove duplicates
uniq = []
log.debug(r['results'])
for item in r['results']:
log.debug(item)
if 'error' in item:
misperrors['error'] = item['error']
return misperrors
if item not in uniq:
uniq.append(item)
r['results'] = uniq
return r
def lookup_indicator(client, query):
result = client.search_url(query)
log.debug('RESULTS: ' + json.dumps(result))
r = []
misp_comment = "{}: Enriched via the urlscan module".format(query)
# Determine if the page is reachable
for request in result['data']['requests']:
if request['response'].get('failed'):
if request['response']['failed']['errorText']:
log.debug('The page could not load')
r.append(
{'error': 'Domain could not be resolved: {}'.format(request['response']['failed']['errorText'])})
if result.get('page'):
if result['page'].get('domain'):
misp_val = result['page']['domain']
r.append({'types': 'domain',
'categories': ['Network activity'],
'values': misp_val,
'comment': misp_comment})
if result['page'].get('ip'):
misp_val = result['page']['ip']
r.append({'types': 'ip-dst',
'categories': ['Network activity'],
'values': misp_val,
'comment': misp_comment})
if result['page'].get('country'):
misp_val = 'country: ' + result['page']['country']
if result['page'].get('city'):
misp_val += ', city: ' + result['page']['city']
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result['page'].get('asn'):
misp_val = result['page']['asn']
r.append({'types': 'AS', 'categories': ['External analysis'], 'values': misp_val, 'comment': misp_comment})
if result['page'].get('asnname'):
misp_val = result['page']['asnname']
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result.get('stats'):
if result['stats'].get('malicious'):
log.debug('There is something in results > stats > malicious')
threat_list = set()
if 'matches' in result['meta']['processors']['gsb']['data']:
for item in result['meta']['processors']['gsb']['data']['matches']:
if item['threatType']:
threat_list.add(item['threatType'])
threat_list = ', '.join(threat_list)
log.debug('threat_list values are: \'' + threat_list + '\'')
if threat_list:
misp_val = '{} threat(s) detected'.format(threat_list)
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result.get('lists'):
if result['lists'].get('urls'):
for url in result['lists']['urls']:
url = url.lower()
if 'office' in url:
misp_val = "Possible Office-themed phishing"
elif 'o365' in url or '0365' in url:
misp_val = "Possible O365-themed phishing"
elif 'microsoft' in url:
misp_val = "Possible Microsoft-themed phishing"
elif 'paypal' in url:
misp_val = "Possible PayPal-themed phishing"
elif 'onedrive' in url:
misp_val = "Possible OneDrive-themed phishing"
elif 'docusign' in url:
misp_val = "Possible DocuSign-themed phishing"
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result.get('task'):
if result['task'].get('reportURL'):
misp_val = result['task']['reportURL']
r.append({'types': 'link',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result['task'].get('screenshotURL'):
image_url = result['task']['screenshotURL']
r.append({'types': 'link',
'categories': ['External analysis'],
'values': image_url,
'comment': misp_comment})
# ## TO DO ###
# ## Add ability to add an in-line screenshot of the target website into an attribute
# screenshot = requests.get(image_url).content
# r.append({'types': ['attachment'],
# 'categories': ['External analysis'],
# 'values': image_url,
# 'image': str(base64.b64encode(screenshot), 'utf-8'),
# 'comment': 'Screenshot of website'})
return r
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
class urlscanAPI():
def __init__(self, apikey=None, uuid=None):
self.key = apikey
self.uuid = uuid
def request(self, query):
log.debug('From request function with the parameter: ' + query)
payload = {'url': query}
headers = {'API-Key': self.key,
'Content-Type': "application/json",
'Cache-Control': "no-cache"}
# Troubleshooting problems with initial search request
log.debug('PAYLOAD: ' + json.dumps(payload))
log.debug('HEADERS: ' + json.dumps(headers))
search_url_string = "https://urlscan.io/api/v1/scan/"
response = requests.request("POST",
search_url_string,
data=json.dumps(payload),
headers=headers)
# HTTP 400 - Bad Request
if response.status_code == 400:
raise Exception('HTTP Error 400 - Bad Request')
# HTTP 404 - Not found
if response.status_code == 404:
raise Exception('HTTP Error 404 - These are not the droids you\'re looking for')
# Any other status code
if response.status_code != 200:
raise Exception('HTTP Error ' + str(response.status_code))
if response.text:
response = json.loads(response.content.decode("utf-8"))
time.sleep(3)
self.uuid = response['uuid']
# Strings for to check for errors on the results page
# Null response string for any unavailable resources
null_response_string = '"status": 404'
# Redirect string accounting for 301/302/303/307/308 status codes
redirect_string = '"status": 30'
# Normal response string with 200 status code
normal_response_string = '"status": 200'
results_url_string = "https://urlscan.io/api/v1/result/" + self.uuid
log.debug('Results URL: ' + results_url_string)
# Need to wait for results to process and check if they are valid
tries = 10
while tries >= 0:
results = requests.request("GET", results_url_string)
log.debug('Made a GET request')
results = results.content.decode("utf-8")
# checking if there is a 404 status code and no available resources
if null_response_string in results and \
redirect_string not in results and \
normal_response_string not in results:
log.debug('Results not processed. Please check again later.')
time.sleep(3)
tries -= 1
else:
return json.loads(results)
raise Exception('Results contained a 404 status error and could not be processed.')
def search_url(self, query):
log.debug('From search_url with parameter: ' + query)
return self.request(query)