-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
78 lines (57 loc) · 1.78 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import atexit
import json
import os
import signal
import subprocess
from datetime import timedelta
import dotenv
import rx
import rx.operators as ops
from influxdb_client import InfluxDBClient, Point, WriteOptions
dotenv.load_dotenv()
def do_speedtest():
"""
Runs a speedtest and returns relevant information
"""
print("Running speedtest")
process = subprocess.Popen(
["speedtest", "-f" "json", "--accept-license", "--accept-gdpr"],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
)
last_line = ""
while True:
line = process.stdout.readline().decode()
if line.startswith("{"):
return create_point(line)
if line != last_line:
print(line, end="")
last_line = line
def create_point(line: str) -> Point:
"""Creates a point from our JSON data."""
data = json.loads(line)
point = Point(measurement_name="speedtest")
# Correct timestamp
point.time(data["timestamp"])
# Add tags and fields
point.field("download_bandwidth", data["download"]["bandwidth"])
point.field("upload_bandwidth", data["upload"]["bandwidth"])
print(f"Read: {point}")
return point
def on_exit(**kwargs) -> None:
for obj in kwargs.values():
obj.close()
data = rx.interval(period=timedelta(seconds=60)).pipe(
ops.map(lambda t: do_speedtest()),
ops.distinct_until_changed(),
ops.map(lambda point: point.to_line_protocol()),
)
client = InfluxDBClient(
url=os.environ["INFLUXDB_URL"],
token=os.environ["INFLUXDB_TOKEN"],
org=os.environ["INFLUXDB_ORG"],
)
write_api = client.write_api(write_options=WriteOptions(batch_size=1))
write_api.write(bucket="Hermes", record=data)
atexit.register(on_exit, client=client, write_api=write_api)
signal.pause()