This repository has been archived by the owner on Jun 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
sauron.py
165 lines (131 loc) · 5.32 KB
/
sauron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import time
import signal
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor
from threading import Event
from prometheus_api_client import PrometheusConnect
from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn, TaskID
import typer
app = typer.Typer()
prom_url = {"url": "http://localhost:9091"}
BGP_STATES = {
6: "Established",
5: "OpenConfirmed",
4: "OpenSent",
3: "Active",
2: "Connect",
1: "Idle",
}
def sizeof_fmt(num, suffix="bps"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1000.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1000.0
return f"{num:.1f}Yi{suffix}"
@app.command()
def get_bgp_state(
neighbor: str = typer.Option("", help="BGP neighbor"),
device: str = typer.Option("", help="Network device"),
):
# Prepare the labels to help filtering out the latest metrics
labelset = {}
if neighbor:
labelset.update(neighbor_address=neighbor)
if device:
labelset.update(device=device)
# Get a list of all the latest metrics that matches the name and label set
prom = PrometheusConnect(url=prom_url["url"], disable_ssl=True)
results = prom.get_current_metric_value(metric_name="bgp_session_state", label_config=labelset)
if not results:
typer.echo(message="No results returned :(")
raise typer.Exit(1)
# Print out the results
for metric in results:
_device = metric["metric"]["device"]
_neighbor = metric["metric"]["neighbor_address"]
state_code = int(metric["value"][-1])
typer.echo(message=f"BGP State for device: {_device} - neighbor {_neighbor} => {BGP_STATES[state_code]}")
@app.command()
def get_links_high_on_in_bw(
device: Optional[str] = typer.Option("", help="Network device or regular expression for multiple devices"),
device_role: Optional[str] = typer.Option("", help="Role fo the device(s) in the network topology"),
threshold: float = typer.Option(1000.0, help="Bandwidth threshold on bits per second"),
):
typer.echo("Getting links with Bandwidth higher than threshold")
# Prepare the labels to help filtering out the latest metrics
query = ""
if device:
query = f'rate(interface_in_octets{{device=~"{device}"}}[2m])*8 > {threshold}'
elif device_role:
query = f'rate(interface_in_octets{{device_role="{device_role}"}}[2m])*8 > {threshold}'
if not query:
typer.echo(message="No query generated :(")
raise typer.Exit(1)
# Get a list of all the latest metrics that matches the name and label set
prom = PrometheusConnect(url=prom_url["url"], disable_ssl=True)
results = prom.custom_query(query=query)
if not results:
typer.echo(message="No results returned :(")
raise typer.Exit(1)
# Print out the results
for metric in results:
_role = metric["metric"]["device_role"]
_device = metric["metric"]["device"]
_interface = metric["metric"]["interface"]
_traffic = sizeof_fmt(float(metric["value"][-1]))
typer.echo(message=f"Role: {device_role} - device: {_device} - interface: {_interface} => Traffic IN: {_traffic}")
return
@app.command()
def get_links_with_packet_loss():
return
########################################################################################################################
progress = Progress(
TextColumn("[bold blue]{task.fields[interface_id]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"•",
"Traffic: {task.fields[traffic]}",
"•",
TimeRemainingColumn(),
)
done_event = Event()
def handle_sigint(signum, frame):
done_event.set()
signal.signal(signal.SIGINT, handle_sigint)
def get_interface_traffic(task_id: TaskID, interface: str, device: str):
# progress.console.log(f"Requesting {device}: {interface}")
query = f'rate(interface_in_octets{{device="{device}", interface="{interface}"}}[2m])*8'
progress.update(task_id, total=100)
progress.start_task(task_id)
prom = PrometheusConnect(url=prom_url["url"], disable_ssl=True)
for i in range(100):
result = prom.custom_query(query=query)
traffic = float(result[0]["value"][-1])
progress.console.log(f"{device}: {interface} ==> {sizeof_fmt(traffic)}")
progress.update(task_id, traffic=sizeof_fmt(traffic), advance=1)
time.sleep(2)
if done_event.is_set():
return
# progress.console.log("Interface traffic watch completed")
@app.command()
def watch_interface_traffic(
interface: List[str] = typer.Option(..., help="List of interfaces to watch traffic"),
device: str = typer.Option(..., help="Network device"),
):
with progress:
with ThreadPoolExecutor(max_workers=4) as pool:
for intf in interface:
task_id = progress.add_task("interface", interface_id=f"{device}: {intf}", start=False, traffic=sizeof_fmt(0))
pool.submit(get_interface_traffic, task_id, intf, device)
@app.callback()
def main(
prometheus_host: str = typer.Option(
"http://localhost:9091", "--prometheus-host", "-p", help="Prometheus server to connect"
)
):
"""
Manage users in the awesome CLI app.
"""
prom_url["url"] = prometheus_host
if __name__ == "__main__":
app()