forked from f5devcentral/asmevents
-
Notifications
You must be signed in to change notification settings - Fork 0
/
asmevents.py
129 lines (115 loc) · 5.83 KB
/
asmevents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/python
import math
import argparse
import sys
import getpass
import requests
import json
import socket
from tabulate import tabulate
parser = argparse.ArgumentParser(
prog='asmevents',
usage='\n\n \
Few examples\n \
Display All events : # asmevents --host x.x.x.x -u <username> \n \
Display only blocked events : # asmevents --host x.x.x.x -s blocked -u <username> \n \
Display only legal events : # asmevents --host x.x.x.x -s legal -u <username>\n',
description='Display ASM events',
epilog=' ',
add_help=True,
)
parser.add_argument('--host', nargs='?', const=-1, help='set host with value')
parser.add_argument('-u', '--user', nargs='?', const=-1, help='set user with value', default='None')
parser.add_argument('-s', '--status', nargs='?', const=-1, choices=['legal', 'illegal', 'blocked', 'unblocked', 'challenged', 'all'], help='set the status of the operation', default='None')
parser.add_argument('-rq', action='store_true', help='when set, print raw request data', default=False)
args = parser.parse_args()
def is_valid_hostname_or_ip(host):
try:
socket.getaddrinfo(host, None)
return True
except socket.gaierror:
return False
if args.status == "None":
print("The status is set to all if -s isn't used; to pull specific status records, use the flag -s or --status")
args.status = "all"
else:
pass
if args.host:
if is_valid_hostname_or_ip(args.host):
pass
else:
print("The host is set incorrectlly, please provide correct host using --host flag")
sys.exit()
else:
print("The host is not set, please provide the host using --host flag")
sys.exit()
if args.user == "None":
args.user = "admin"
print("The user is set to default GUI admin, to use different username use the flag -u or --user")
else:
pass
# Define ASM REST API endpoint and credentials
asm_url = args.host
username = args.user
# prompt the user to enter the password and hide the value
if args.host and args.user:
password = getpass.getpass(prompt='Enter password for the user ' + args.user + ': ')
else:
sys.exit()
if args.status == "all":
#Send GET request to ASM REST API to total number of event log
requests.packages.urllib3.disable_warnings()
stage = requests.get(f"https://{asm_url}/mgmt/tm/asm/events/requests", auth=(username, password), verify=False)
# Parse response JSON
p_stage = json.loads(json.dumps(stage.json()))
# Pull total number of events
real_total_events = p_stage["totalItems"]
total_events = math.ceil(p_stage["totalItems"]/ 500) * 500
# Create an empty list to store the events
event_list = []
# Iterate through events to find the requested event action
i = 0
while i <= total_events:
requests.packages.urllib3.disable_warnings()
response = requests.get(f"https://{asm_url}/mgmt/tm/asm/events/requests?$skip={i}&top=500", auth=(username, password), verify=False)
# Parse response JSON
events = json.loads(json.dumps(response.json()))
for x in range(len(events["items"])):
try:
if args.rq is True:
event_list.append([events["items"][x]["id"],events["items"][x]["clientIp"],events["items"][x]["serverIp"],events["items"][x]["protocolInfo"],events["items"][x]["clientPort"],events["items"][x]["serverPort"],events["items"][x]["requestStatus"],events["items"][x]["rawRequest"]["httpRequestUnescaped"]])
else:
event_list.append([events["items"][x]["id"],events["items"][x]["clientIp"],events["items"][x]["serverIp"],events["items"][x]["protocolInfo"],events["items"][x]["clientPort"],events["items"][x]["serverPort"],events["items"][x]["requestStatus"]])
except Exception as e:
pass
i +=500
else:
#Send GET request to ASM REST API to total number of event log specified in args.status
requests.packages.urllib3.disable_warnings()
stage = requests.get(f"https://{asm_url}/mgmt/tm/asm/events/requests?$filter=requestStatus+eq+{args.status}", auth=(username, password), verify=False)
# Parse response JSON
p_stage = json.loads(json.dumps(stage.json()))
# Pull total number of events
real_total_events = p_stage["totalItems"]
total_events = math.ceil(p_stage["totalItems"]/ 500) * 500
# Create an empty list to store the events
event_list = []
# Iterate through events to find the requested event action
i = 0
while i <= total_events:
requests.packages.urllib3.disable_warnings()
response = requests.get(f"https://{asm_url}/mgmt/tm/asm/events/requests?$skip={i}&top=500&$filter=requestStatus+eq+{args.status}", auth=(username, password), verify=False)
# Parse response JSON
events = json.loads(json.dumps(response.json()))
for x in range(len(events["items"])):
try:
if args.rq is True:
event_list.append([events["items"][x]["id"],events["items"][x]["clientIp"],events["items"][x]["serverIp"],events["items"][x]["protocolInfo"],events["items"][x]["clientPort"],events["items"][x]["serverPort"],events["items"][x]["requestStatus"],events["items"][x]["rawRequest"]["httpRequestUnescaped"]])
else:
event_list.append([events["items"][x]["id"],events["items"][x]["clientIp"],events["items"][x]["serverIp"],events["items"][x]["protocolInfo"],events["items"][x]["clientPort"],events["items"][x]["serverPort"],events["items"][x]["requestStatus"]])
except Exception as e:
pass
i +=500
# Print the event list in tabular format
headers = ["Event ID", "Source IP", "Destination IP", "Protocol", "Source Port", "Destination Port", "Request Status", "Raw Request"]
print(tabulate(event_list, headers, tablefmt="fancy_grid"))