-
Notifications
You must be signed in to change notification settings - Fork 2
/
mikrotik_bgpmon.py
120 lines (99 loc) · 4.32 KB
/
mikrotik_bgpmon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Copyright (c) 2023 - Lee Hetherington <[email protected]>
# Script: mikrotik_bgpmon.py
#
# Usage: mikrotik_bgpmon.py router_ip email_address
#
# This script will take a config file containing the Username and Password of the router, then using
# the ROS API, connect to the router and do two things;
# 1. Compare configured vs running BGP sessions and alert if differences
# 2. Alert if there is a running session that is not in Established state
#
# You can then use this output for your own systems/email alerting etc. If you specify an email address
# as a command line switch, it'll send alert emails to that email address
#
import configparser
import routeros_api
import argparse
import smtplib
import requests
from email.mime.text import MIMEText
from discord import SyncWebhook
# Let's take some arguments on the command line
parser = argparse.ArgumentParser()
# add arguments to the parser
parser.add_argument("router_ip", help="IP address or hostname of the router")
parser.add_argument("email", nargs='?', help="Email address to receive alerts")
# parse the arguments
args = parser.parse_args()
# access the values of the arguments
router_ip = args.router_ip
email_address = args.email
# Read from the config file
# which contains the auth information
config = configparser.ConfigParser()
config.read('config.cfg')
username = config.get('API', 'username')
password = config.get('API', 'password')
smtp_server = config.get('ALERTS', 'smtp_server')
smtp_port = config.get('ALERTS', 'smtp_port')
smtp_username = config.get('ALERTS', 'smtp_username')
smtp_password = config.get('ALERTS', 'smtp_password')
discord_enable = config.get('DISCORD', 'enable')
get_webhook = config.get('DISCORD', 'webhook')
webhook = SyncWebhook.from_url(get_webhook)
def send_email(subject, message, recipient):
# Create email message
email_msg = MIMEText(message)
email_msg['Subject'] = subject
email_msg['From'] = f"BGP Alerts <{smtp_username}>"
email_msg['To'] = recipient
# Establish a connection to the email server
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(smtp_username, smtp_password)
# Send the email
server.send_message(email_msg)
server.quit()
def check_bgp_sessions(router_ip, username, password, email_address=None):
api = routeros_api.RouterOsApiPool(router_ip, username=username, password=password, use_ssl=True, ssl_verify=False, plaintext_login=True)
api_conn = api.get_api()
configured_connections = api_conn.get_resource('/routing/bgp/connection').get()
configured_sessions = api_conn.get_resource('/routing/bgp/session').get()
alerts = [] # Store the generated alerts
for connection in configured_connections:
connection_name = connection['name']
connection_asn = connection.get('remote.as', 'Unknown')
connection_disabled = connection.get('disabled')
session_exists = False
if connection_disabled == "true":
print(f"Skipping disabled connection: {connection_name}")
continue
session_found = False
for session in configured_sessions:
session_name = session['name']
session_asn = session.get('remote.as', 'Unknown')
if session_name[:-2] == connection_name:
session_found = True
session_established = session.get('established', '')
if session_established != "true":
alert_msg = f"Alert: BGP session {session_name} with {session_asn} is not established."
alerts.append(alert_msg)
if discord_enable == "true":
webhook.send(alert_msg)
break
if not session_found:
alert_msg = f"Alert: BGP connection {connection_name} with {connection_asn} is configured but not found in running sessions."
alerts.append(alert_msg)
if discord_enable == "true":
webhook.send(alert_msg)
if email_address and alerts:
subject = f"Mikrotik BGP Session Alerts [{router_ip}]"
message = "\n".join(alerts)
send_email(subject, message, email_address)
if alerts:
print("Alerts generated:")
for alert in alerts:
print(alert)
else:
print("No alerts generated.")
check_bgp_sessions(router_ip, username, password, email_address)