forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lastline_query.py
139 lines (114 loc) · 3.94 KB
/
lastline_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
"""
Module (type "expansion") to query a Lastline report from an analysis link.
"""
import json
import lastline_api
from . import check_input_attribute, checking_error, standard_error_message
misperrors = {
"error": "Error",
}
mispattributes = {
"input": [
"link",
],
"output": ["text"],
"format": "misp_standard",
}
moduleinfo = {
"version": "0.1",
"author": "Stefano Ortolani",
"description": "Get a Lastline report from an analysis link.",
"module-type": ["expansion"],
}
moduleconfig = [
"username",
"password",
"verify_ssl",
]
def introspection():
return mispattributes
def version():
moduleinfo["config"] = moduleconfig
return moduleinfo
def handler(q=False):
if q is False:
return False
request = json.loads(q)
# Parse the init parameters
try:
config = request["config"]
auth_data = lastline_api.LastlineAbstractClient.get_login_params_from_dict(config)
if not request.get('attribute') or not check_input_attribute(request['attribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error} that is the link to a Lastline analysis.'}
analysis_link = request['attribute']['value']
# The API url changes based on the analysis link host name
api_url = lastline_api.get_portal_url_from_task_link(analysis_link)
except Exception as e:
misperrors["error"] = "Error parsing configuration: {}".format(e)
return misperrors
# Parse the call parameters
try:
task_uuid = lastline_api.get_uuid_from_task_link(analysis_link)
except (KeyError, ValueError) as e:
misperrors["error"] = "Error processing input parameters: {}".format(e)
return misperrors
# Make the API calls
try:
api_client = lastline_api.PortalClient(api_url, auth_data, verify_ssl=config.get('verify_ssl', True).lower() in ("true"))
response = api_client.get_progress(task_uuid)
if response.get("completed") != 1:
raise ValueError("Analysis is not finished yet.")
response = api_client.get_result(task_uuid)
if not response:
raise ValueError("Analysis report is empty.")
except Exception as e:
misperrors["error"] = "Error issuing the API call: {}".format(e)
return misperrors
# Parse and return
result_parser = lastline_api.LastlineResultBaseParser()
result_parser.parse(analysis_link, response)
event = result_parser.misp_event
event_dictionary = json.loads(event.to_json())
return {
"results": {
key: event_dictionary[key]
for key in ('Attribute', 'Object', 'Tag')
if (key in event and event[key])
}
}
if __name__ == "__main__":
"""Test querying information from a Lastline analysis link."""
import argparse
import configparser
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config-file", dest="config_file")
parser.add_argument("-s", "--section-name", dest="section_name")
args = parser.parse_args()
c = configparser.ConfigParser()
c.read(args.config_file)
a = lastline_api.LastlineAbstractClient.get_login_params_from_conf(c, args.section_name)
j = json.dumps(
{
"config": a,
"attribute": {
"value": (
"https://user.lastline.com/portal#/analyst/task/"
"1fcbcb8f7fb400100772d6a7b62f501b/overview"
)
}
}
)
print(json.dumps(handler(j), indent=4, sort_keys=True))
j = json.dumps(
{
"config": a,
"attribute": {
"value": (
"https://user.lastline.com/portal#/analyst/task/"
"f3c0ae115d51001017ff8da768fa6049/overview"
)
}
}
)
print(json.dumps(handler(j), indent=4, sort_keys=True))