Skip to content

Commit

Permalink
Merge pull request #21 from breadlysm/breadlysm/issue12
Browse files Browse the repository at this point in the history
Breadlysm/issue12
  • Loading branch information
breadlysm authored Jun 24, 2021
2 parents 36391ba + 5ddd602 commit 7fcf49f
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 110 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
# Publish `master` as Docker `latest` image.
branches:
- master
- refactor-app
- feature-influx2

# Publish `v1.2.3` tags as releases.
tags:
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# Changelog

## Unreleased (2021-06-24)

#### New Features

* :zap: Adds global scopes for config, Logging, and influxdb.
#### Fixes

* :bug: Fixed install of speedtest
* :bug: fixed variable names. Disable pingtest if 0
#### Refactorings

* :bug: Fixed variable names, completed refactor
* :art: Refactored to use global scopes.
* :art: Refactored to use global scopes
* :art: Refactored to bring logs into global scope.
* :art: Refactored to match global config/logging
* :art: wrote new config class

Full set of changes: [`0.4.1...099ae4f`](https://github.com/breadlysm/speedtest-to-influxdb/compare/0.4.1...099ae4f)

## 0.4.1 (2021-05-27)

#### New Features
Expand Down
47 changes: 23 additions & 24 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
import time

from speedflux.config import get_config
from speedflux.influx import Influx
import speedflux
from multiprocessing import Process
from speedflux.data import speedtest, pingtest
from speedflux.logs import log
from speedflux import data


def main():
config = get_config()
influx = Influx(config)
config['influx'] = influx
pPing = Process(target=pingtest, args=(config,))
pSpeed = Process(target=speedtest, args=(config,))

speedflux.initialize()
speedflux.LOG.info('Speedtest CLI data logger to InfluxDB started...')
pPing = Process(target=data.pingtest, args=())
pSpeed = Process(target=data.speedtest, args=())
speedtest_interval = speedflux.CONFIG.SPEEDTEST_INTERVAL * 60
ping_interval = speedflux.CONFIG.PING_INTERVAL
loopcount = 0
while (1): # Run a Speedtest and send the results to influxDB
if loopcount == 0 or loopcount % config['ping_interval'] == 0:
if pPing.is_alive():
pPing.terminate()
pPing = Process(target=pingtest, args=(config,))
pPing.start()

if loopcount == 0 or loopcount % config['test_interval'] == 0:
if ping_interval != 0:
if loopcount == 0 or loopcount % ping_interval == 0:
if pPing.is_alive():
pPing.terminate()
pPing = Process(target=data.pingtest, args=())
pPing.start()

if loopcount == 0 or loopcount % speedtest_interval == 0:
if pSpeed.is_alive():
pSpeed.terminate()
pSpeed = Process(target=speedtest, args=(config,))
pSpeed = Process(target=data.speedtest, args=())
pSpeed.start()

if loopcount % (
config['ping_interval'] * config['test_interval']) == 0:
loopcount = 0

if ping_interval != 0:
if loopcount % (ping_interval * speedtest_interval) == 0:
loopcount = 0
else:
if loopcount == speedtest_interval:
loopcount = 0
time.sleep(1)
loopcount += 1


if __name__ == '__main__':
log.info('Speedtest CLI data logger to InfluxDB started...')
main()
26 changes: 26 additions & 0 deletions speedflux/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from speedflux import config, logs, influx

# Speedflux
CONFIG = None
DB_TYPE = None
LOG = None
INFLUXDB = None


def initialize():
global CONFIG
global LOG
global INFLUXDB

try:
CONFIG = config.Config()
except Exception as err:
raise SystemExit("Unable to initialize SpeedFlux", err)
try:
LOG = logs.Log(CONFIG)
except Exception as err:
raise SystemExit("Couldn't initiate logging", err)
try:
INFLUXDB = influx.Influx(CONFIG)
except Exception as err:
raise SystemExit("Couldn't initiate InfluxDB <2", err)
77 changes: 45 additions & 32 deletions speedflux/config.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,48 @@
import os
import re

# import speedflux

def get_config():
NAMESPACE = os.getenv('NAMESPACE', 'None')
DB_ADDRESS = os.getenv('INFLUX_DB_ADDRESS', 'influxdb')
DB_PORT = int(os.getenv('INFLUX_DB_PORT', '8086'))
DB_USER = os.getenv('INFLUX_DB_USER', '')
DB_PASSWORD = os.getenv('INFLUX_DB_PASSWORD', '')
DB_DATABASE = os.getenv('INFLUX_DB_DATABASE', 'speedtests')
DB_TAGS = os.getenv('INFLUX_DB_TAGS', None)
PING_TARGETS = os.getenv('PING_TARGETS', '1.1.1.1, 8.8.8.8')
# Speedtest Settings
# Time between tests (in minutes, converts to seconds).
TEST_INTERVAL = int(os.getenv('SPEEDTEST_INTERVAL', '180')) * 60
# Specific server ID
SERVER_ID = os.getenv('SPEEDTEST_SERVER_ID', '')
# Time between ping tests (in seconds).
PING_INTERVAL = int(os.getenv('PING_INTERVAL', '120'))
LOG_TYPE = os.getenv('LOG_TYPE', 'info')
config = {
'namespace': NAMESPACE,
'db_host': DB_ADDRESS,
'db_port': DB_PORT,
'db_user': DB_USER,
'db_pass': DB_PASSWORD,
'db_tags': DB_TAGS,
'db_name': DB_DATABASE,
'ping_targets': PING_TARGETS,
'test_interval': TEST_INTERVAL,
'ping_interval': PING_INTERVAL,
'server_id': SERVER_ID,
'log_level': LOG_TYPE
}
return config

_CONFIG_DEFAULTS = {
'NAMESPACE': (str, 'Database', None),
'INFLUX_DB_ADDRESS': (str, 'Database', 'influxdb'),
'INFLUX_DB_PORT': (int, 'Database', 8086),
'INFLUX_DB_USER': (str, 'Database', None),
'INFLUX_DB_PASSWORD': (str, 'Database', None),
'INFLUX_DB_DATABASE': (str, 'Database', 'speedtests'),
'INFLUX_DB_TAGS': (str, 'Database', None),
'SPEEDTEST_INTERVAL': (int, 'SpeedTest', 180),
'SPEEDTEST_SERVER_ID': (str, 'SpeedTest', None),
'PING_TARGETS': (str, 'PingTest', '1.1.1.1, 8.8.8.8'),
'PING_INTERVAL': (int, 'PingTest', 120),
'LOG_TYPE': (str, 'Logs', 'info'),
}


class Config:

def get_setting(self, key):
""" Cast any value in the config to the right type or use the default
"""
key, definition_type, section, default = self._define(key)
my_val = definition_type(os.getenv(key, default))
return my_val

def _define(self, name):
key = name.upper()
definition = _CONFIG_DEFAULTS[key]
if len(definition) == 3:
definition_type, section, default = definition
else:
definition_type, section, _, default = definition
return key, definition_type, section, default

def __getattr__(self, name):
"""
Retrieves config value for the setting
"""
if not re.match(r'[A-Z_]+$', name):
return super(Config, self).__getattr__(name)
else:
return self.get_setting(name)
37 changes: 19 additions & 18 deletions speedflux/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@
from pythonping import ping
import json
import datetime
from speedflux.logs import log
import speedflux


def speedtest(config):
if not config['server_id']:
def speedtest():
if not speedflux.CONFIG.SPEEDTEST_SERVER_ID:
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"],
capture_output=True)
log.info("Automatic server choice")
speedflux.LOG.info("Automatic server choice")
else:
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json",
f"--server-id={config['server_id']}"],
f"--server-id={speedflux.CONFIG.SPEEDTEST_SERVER_ID}"],
capture_output=True)
log.info(f"Manual server choice : ID = {config['server_id']}")
speedflux.LOG.info("Manual server choice : "
f"ID = {speedflux.CONFIG.SPEEDTEST_SERVER_ID}")

if speedtest.returncode == 0: # Speedtest was successful.
log.info("Speedtest Successful...Writing to Influx")
speedflux.LOG.info("Speedtest Successful...Writing to Influx")
data_json = json.loads(speedtest.stdout)
log.info(F"""Speedtest Data:
speedflux.LOG.info(F"""Speedtest Data:
time: {data_json['timestamp']}
ping: {data_json['ping']['latency']}ms
download: {data_json['download']['bandwidth']/125000}Mb/s
Expand All @@ -33,18 +34,18 @@ def speedtest(config):
server location: ({data_json['server']['name']} @ \
{data_json['server']['location']})
""")
config['influx'].process_data(data_json)
speedflux.INFLUXDB.process_data(data_json)
else: # Speedtest failed.
log.info("Speedtest Failed :")
log.debug(speedtest.stderr)
log.debug(speedtest.stdout)
speedflux.LOG.info("Speedtest Failed :")
speedflux.LOG.debug(speedtest.stderr)
speedflux.LOG.debug(speedtest.stdout)


def pingtest(config):
def pingtest():
timestamp = datetime.datetime.utcnow()
for target in config['ping_targets'].split(','):
for target in speedflux.CONFIG.PING_TARGETS.split(','):
target = target.strip()
log.debug('Running ping test...')
speedflux.LOG.debug('Running ping test...')
pingtest = ping(target, verbose=False, timeout=1, count=1, size=128)
data = [
{
Expand All @@ -62,6 +63,6 @@ def pingtest(config):
}
}
]
if config['namespace']:
data[0]['tags']['namespace'] = config['namespace']
config['influx'].write(data, data_type='Ping')
if speedflux.CONFIG.NAMESPACE:
data[0]['tags']['namespace'] = speedflux.CONFIG.NAMESPACE
speedflux.INFLUXDB.write(data, data_type='Ping')
61 changes: 32 additions & 29 deletions speedflux/influx.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from urllib3.exceptions import NewConnectionError

from influxdb import InfluxDBClient
from speedflux.logs import log
import speedflux
from requests.exceptions import ConnectionError


Expand All @@ -17,39 +17,41 @@ def __init__(self, config):
def client(self):
if not self._client:
self._client = InfluxDBClient(
self.config['db_host'],
self.config['db_port'],
self.config['db_user'],
self.config['db_pass'],
self.config.INFLUX_DB_ADDRESS,
self.config.INFLUX_DB_PORT,
self.config.INFLUX_DB_USER,
self.config.INFLUX_DB_PASSWORD,
None)
log.debug("Client extablished")
speedflux.LOG.debug("Client extablished")
return self._client

def init_db(self):

try:
log.debug("Intitializing Influx Database")
speedflux.LOG.debug("Intitializing Influx Database")
databases = self.client.get_list_database()
if len(list(filter(
lambda x: x['name'] == self.config['db_name'], databases))
) == 0:
lambda x: x['name'] ==
self.config.INFLUX_DB_DATABASE, databases))) == 0:
self.client.create_database(
self.config['db_name']) # Create if does not exist.
self.config.INFLUX_DB_DATABASE) # Create if
else:
# Switch to if does exist.
self.client.switch_database(self.config['db_name'])
self.client.switch_database(self.config.INFLUX_DB_DATABASE)
self.initilized = True
except (ConnectionError, NewConnectionError) as bad_host:
if self.retries == 3:
log.error("Database Init failed for 3rd time. Exiting")
speedflux.LOG.error(
"Database Init failed for 3rd time. Exiting")
sys.exit()
self.retries += 1
log.error("Connection to influx host was refused. This likely "
"means that the DB is down or INFLUX_DB_ADDRESS is "
f"incorrect. It's currently '{self.config['db_host']}'")
log.error("Full Error follows\n")
log.error(bad_host)
log.error(f"Retry {self.retries}: Initiliazing DB.")
speedflux.LOG.error(
"Connection to influx host was refused. This likely "
"means that the DB is down or INFLUX_DB_ADDRESS is "
f"incorrect. It's currently '{self.config.INFLUX_DB_ADDRESS}'")
speedflux.LOG.error("Full Error follows\n")
speedflux.LOG.error(bad_host)
speedflux.LOG.error(f"Retry {self.retries}: Initiliazing DB.")
self.init_db()

def format_data(self, data):
Expand Down Expand Up @@ -123,36 +125,37 @@ def format_data(self, data):
def write(self, data, data_type='Speedtest'):
try:
if self.client.write_points(data):
log.info(F"{data_type} data written successfully")
log.debug(F"Wrote `{data}` to Influx")
speedflux.LOG.info(F"{data_type} data written successfully")
speedflux.LOG.debug(F"Wrote `{data}` to Influx")
self.retries = 0
else:
raise Exception(F"{data_type} write points did not complete")
except (ConnectionError, NewConnectionError, Exception) as \
bad_connection:
if self.retries == 3:
log.error('Max retries exceeded for write. Check that data '
'base is on and can receive data')
log.error('Exiting')
speedflux.LOG.error(
'Max retries exceeded for write. Check that database'
' is on and can receive data')
speedflux.LOG.error('Exiting')
sys.exit()

log.error("Connection error occurred during write")
log.error(bad_connection)
speedflux.LOG.error("Connection error occurred during write")
speedflux.LOG.error(bad_connection)
self.retries += 1
log.error("Reinitiating database and retrying.")
speedflux.LOG.error("Reinitiating database and retrying.")
self.init_db()
self.write(data, data_type)

except Exception as err:
log.error(F"{err}")
speedflux.LOG.error(F"{err}")

def tag_selection(self, data):
tags = self.config['db_tags']
tags = self.config.INFLUX_DB_TAGS
options = {}

# tag_switch takes in _data and attaches CLIoutput to more readable ids
tag_switch = {
'namespace': self.config['namespace'],
'namespace': self.config.NAMESPACE,
'isp': data['isp'],
'interface': data['interface']['name'],
'internal_ip': data['interface']['internalIp'],
Expand Down
Loading

0 comments on commit 7fcf49f

Please sign in to comment.