-
Notifications
You must be signed in to change notification settings - Fork 78
/
Copy paths_nexthop_summary.py
323 lines (264 loc) · 13 KB
/
s_nexthop_summary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# $language = "python"
# $interface = "1.0"
import os
import sys
import logging
# Add script directory to the PYTHONPATH so we can import our modules (only if run from SecureCRT)
if 'crt' in globals():
script_dir, script_name = os.path.split(crt.ScriptFullName)
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
else:
script_dir, script_name = os.path.split(os.path.realpath(__file__))
# Now we can import our custom modules
from securecrt_tools import scripts
from securecrt_tools import utilities
from securecrt_tools import ipaddress
# Create global logger so we can write debug messages from any function (if debug mode setting is enabled in settings).
logger = logging.getLogger("securecrt")
logger.debug("Starting execution of {0}".format(script_name))
# ################################################ SCRIPT LOGIC ###################################################
def script_main(session, ask_vrf=True, vrf=None):
"""
| SINGLE device script
| Author: Jamie Caesar
| Email: [email protected]
This script will grab the route table information from a Cisco IOS or NXOS device and export details about each
next-hop address (how many routes and from which protocol) into a CSV file. It will also list all connected
networks and give a detailed breakdown of every route that goes to each next-hop.
:param session: A subclass of the sessions.Session object that represents this particular script session (either
SecureCRTSession or DirectSession)
:type session: sessions.Session
:param ask_vrf: A boolean that specifies if we should prompt for which VRF. The default is true, but when this
module is called from other scripts, we may want avoid prompting and supply the VRF with the "vrf" input.
:type ask_vrf: bool
:param vrf: The VRF that we should get the route table from. This is used only when ask_vrf is False.
:type vrf: str
"""
# Get script object that owns this session, so we can check settings, get textfsm templates, etc
script = session.script
# Start session with device, i.e. modify term parameters for better interaction (assuming already connected)
session.start_cisco_session()
# Validate device is running a supported OS
session.validate_os(["IOS", "NXOS"])
# If we should prompt for a VRF, then do so. Otherwise use the VRF passed into the function (if any)
if ask_vrf:
selected_vrf = script.prompt_window("Enter the VRF name. (Leave blank for default VRF)")
else:
selected_vrf = vrf
# If we have a VRF, modify our commands and hostname to reflect it. If not, pull the default route table.
if selected_vrf:
send_cmd = "show ip route vrf {0}".format(selected_vrf)
session.hostname = session.hostname + "-VRF-{0}".format(selected_vrf)
logger.debug("Received VRF: {0}".format(selected_vrf))
else:
send_cmd = "show ip route"
raw_routes = session.get_command_output(send_cmd)
if session.os == "IOS":
template_file = script.get_template("cisco_ios_show_ip_route.template")
else:
template_file = script.get_template("cisco_nxos_show_ip_route.template")
fsm_results = utilities.textfsm_parse_to_dict(raw_routes, template_file)
route_list = parse_routes(fsm_results)
output_filename = session.create_output_filename("nexthop-summary", ext=".csv")
output = nexthop_summary(route_list)
utilities.list_of_lists_to_csv(output, output_filename)
# Return terminal parameters back to the original state.
session.end_cisco_session()
def update_empty_interfaces(route_table):
"""
Takes the routes table as a list of dictionaries (with dict key names used in parse_routes function) and does
recursive lookups to find the outgoing interface for those entries in the route-table where the outgoing interface
isn't listed.
:param route_table: Route table information as a list of dictionaries (output from TextFSM)
:type route_table: list of dict
:return: The updated route_table object with outbound interfaces filled in.
:rtype: list of dict
"""
def recursive_lookup(nexthop):
"""
Recursively looks up a route to find the actual next-hop on a connected network.
:param nexthop: The next-hop IP that we are looking for
:type nexthop: securecrt_tools.ipaddress
:return: The directly connected next-hop for the input network.
:rtype: securecrt_tools.ipaddress
"""
for network in connected:
if nexthop in network:
return connected[network]
for network in statics:
if nexthop in network:
return recursive_lookup(statics[network])
return None
logger.debug("STARTING update_empty_interfaces")
connected = {}
unknowns = {}
statics = {}
for route in route_table:
if route['protocol'] == 'connected':
connected[route['network']] = route['interface']
if route['protocol'] == 'static':
if route['nexthop']:
statics[route['network']] = route['nexthop']
if route['nexthop'] and not route['interface']:
unknowns[route['nexthop']] = None
for nexthop in unknowns:
unknowns[nexthop] = recursive_lookup(nexthop)
for route in route_table:
if not route['interface']:
if route['nexthop'] in unknowns:
route['interface'] = unknowns[route['nexthop']]
logger.debug("ENDING update_empty_interfaces")
def parse_routes(fsm_routes):
"""
This function will take the TextFSM parsed route-table from the `textfsm_parse_to_dict` function. Each dictionary
in the TextFSM output represents a route entry. Each of these dictionaries will be updated to convert IP addresses
into ip_address or ip_network objects (from the ipaddress.py module). Some key names will also be updated also.
:param fsm_routes: TextFSM output from the `textfsm_parse_to_dict` function.
:type fsm_routes: list of dict
:return: An updated list of dictionaries that replaces IP address strings with objects from the ipaddress.py module
:rtype: list of dict
"""
logger.debug("STARTING parse_routes function.")
complete_table = []
for route in fsm_routes:
new_entry = {}
logger.debug("Processing route entry: {0}".format(str(route)))
new_entry['network'] = ipaddress.ip_network(u"{0}/{1}".format(route['NETWORK'], route['MASK']))
new_entry['protocol'] = utilities.normalize_protocol(route['PROTOCOL'])
if route['NEXTHOP_IP'] == '':
new_entry['nexthop'] = None
else:
new_entry['nexthop'] = ipaddress.ip_address(unicode(route['NEXTHOP_IP']))
if route["NEXTHOP_IF"] == '':
new_entry['interface'] = None
else:
new_entry['interface'] = route['NEXTHOP_IF']
# Nexthop VRF will only occur in NX-OS route tables (%vrf-name after the nexthop)
if 'NEXTHOP_VRF' in route:
if route['NEXTHOP_VRF'] == '':
new_entry['vrf'] = None
else:
new_entry['vrf'] = route['NEXTHOP_VRF']
logger.debug("Adding updated route entry '{0}' based on the information: {1}".format(str(new_entry),
str(route)))
complete_table.append(new_entry)
update_empty_interfaces(complete_table)
logger.debug("ENDING parse_route function")
return complete_table
def nexthop_summary(textfsm_dict):
"""
A function that builds a CSV output (list of lists) that displays the summary information after analyzing the
input route table.
:param textfsm_dict: The route table information in list of dictionaries format.
:type textfsm_dict: list of dict
:return: The nexthop summary information in a format that can be easily written to a CSV file.
:rtype: list of lists
"""
# Identify connected or other local networks -- most found in NXOS to exlude from next-hops. These are excluded
# from the nexthop summary (except connected has its own section in the output).
logger.debug("STARTING nexthop_summary function")
local_protos = ['connected', 'local', 'hsrp', 'vrrp', 'glbp']
# Create a list of all dynamic protocols from the provided route table. Add total and statics to the front.
proto_list = []
for entry in textfsm_dict:
if entry['protocol'] not in proto_list and entry['protocol'] not in local_protos:
logger.debug("Found protocol '{0}' in the table".format(entry['protocol']))
proto_list.append(entry['protocol'])
proto_list.sort(key=utilities.human_sort_key)
proto_list.insert(0, 'total')
proto_list.insert(0, 'interface')
# Create dictionaries to store summary information as we process the route table.
summary_table = {}
connected_table = {}
detailed_table = {}
# Process the route table to populate the above 3 dictionaries.
for entry in textfsm_dict:
logger.debug("Processing route: {0}".format(str(entry)))
# If the route is connected, local or an FHRP entry
if entry['protocol'] in local_protos:
if entry['protocol'] == 'connected':
if entry['interface'] not in connected_table:
connected_table[entry['interface']] = []
connected_table[entry['interface']].append(str(entry['network']))
else:
if entry['nexthop']:
if 'vrf' in entry and entry['vrf']:
nexthop = "{0}%{1}".format(entry['nexthop'], entry['vrf'])
else:
nexthop = str(entry['nexthop'])
elif entry['interface'].lower() == "null0":
nexthop = 'discard'
if nexthop not in summary_table:
# Create an entry for this next-hop, containing zero count for all protocols.
summary_table[nexthop] = {}
summary_table[nexthop].update(zip(proto_list, [0] * len(proto_list)))
summary_table[nexthop]['interface'] = entry['interface']
# Increment total and protocol specific count
summary_table[nexthop][entry['protocol']] += 1
summary_table[nexthop]['total'] += 1
if nexthop not in detailed_table:
detailed_table[nexthop] = []
detailed_table[nexthop].append((str(entry['network']), entry['protocol']))
# Convert summary_table into a format that can be printed to the CSV file.
output = []
header = ["Nexthop", "Interface", "Total"]
header.extend(proto_list[2:])
output.append(header)
summary_keys = sorted(summary_table.keys(), key=utilities.human_sort_key)
for key in summary_keys:
line = [key]
for column in proto_list:
line.append(summary_table[key][column])
output.append(line)
output.append([])
# Convert the connected_table into a format that can be printed to the CSV file (and append to output)
output.append([])
output.append(["Connected:"])
output.append(["Interface", "Network(s)"])
connected_keys = sorted(connected_table.keys(), key=utilities.human_sort_key)
for key in connected_keys:
line = [key]
for network in connected_table[key]:
line.append(network)
output.append(line)
output.append([])
# Convert the detailed_table into a format that can be printed to the CSV file (and append to output)
output.append([])
output.append(["Route Details"])
output.append(["Nexthop", "Network", "Protocol"])
detailed_keys = sorted(detailed_table.keys(), key=utilities.human_sort_key)
for key in detailed_keys:
for network in detailed_table[key]:
line = [key]
line.extend(list(network))
output.append(line)
output.append([])
# Return the output, ready to be sent to directly to a CSV file
logger.debug("ENDING nexthop_summary function")
return output
# ################################################ SCRIPT LAUNCH ###################################################
# If this script is run from SecureCRT directly, use the SecureCRT specific class
if __name__ == "__builtin__":
# Initialize script object
crt_script = scripts.CRTScript(crt)
# Get session object for the SecureCRT tab that the script was launched from.
crt_session = crt_script.get_main_session()
# Run script's main logic against our session
try:
script_main(crt_session)
except Exception:
crt_session.end_cisco_session()
raise
# Shutdown logging after
logging.shutdown()
# If the script is being run directly, use the simulation class
elif __name__ == "__main__":
# Initialize script object
direct_script = scripts.DebugScript(os.path.realpath(__file__))
# Get a simulated session object to pass into the script.
sim_session = direct_script.get_main_session()
# Run script's main logic against our session
script_main(sim_session)
# Shutdown logging after
logging.shutdown()