diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 54bed45..345e1cc 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -4,7 +4,7 @@ on: # Publish `master` as Docker `latest` image. branches: - master - - refactor-app + - feature-influx2 # Publish `v1.2.3` tags as releases. tags: diff --git a/CHANGELOG.md b/CHANGELOG.md index 87534be..a0f3b74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog +## Unreleased (2021-06-24) + +#### New Features + +* :zap: Adds global scopes for config, Logging, and influxdb. +#### Fixes + +* :bug: Fixed install of speedtest +* :bug: fixed variable names. Disable pingtest if 0 +#### Refactorings + +* :bug: Fixed variable names, completed refactor +* :art: Refactored to use global scopes. +* :art: Refactored to use global scopes +* :art: Refactored to bring logs into global scope. +* :art: Refactored to match global config/logging +* :art: wrote new config class + +Full set of changes: [`0.4.1...099ae4f`](https://github.com/breadlysm/speedtest-to-influxdb/compare/0.4.1...099ae4f) + ## 0.4.1 (2021-05-27) #### New Features diff --git a/main.py b/main.py index c3a9a17..f28da9a 100755 --- a/main.py +++ b/main.py @@ -1,41 +1,40 @@ import time -from speedflux.config import get_config -from speedflux.influx import Influx +import speedflux from multiprocessing import Process -from speedflux.data import speedtest, pingtest -from speedflux.logs import log +from speedflux import data def main(): - config = get_config() - influx = Influx(config) - config['influx'] = influx - pPing = Process(target=pingtest, args=(config,)) - pSpeed = Process(target=speedtest, args=(config,)) - + speedflux.initialize() + speedflux.LOG.info('Speedtest CLI data logger to InfluxDB started...') + pPing = Process(target=data.pingtest, args=()) + pSpeed = Process(target=data.speedtest, args=()) + speedtest_interval = speedflux.CONFIG.SPEEDTEST_INTERVAL * 60 + ping_interval = speedflux.CONFIG.PING_INTERVAL loopcount = 0 while (1): # Run a Speedtest and send the results to influxDB - if loopcount == 0 or loopcount % config['ping_interval'] == 0: - if pPing.is_alive(): - pPing.terminate() - pPing = Process(target=pingtest, args=(config,)) - pPing.start() - - if loopcount == 0 or loopcount % config['test_interval'] == 0: + if ping_interval != 0: + if loopcount == 0 or loopcount % ping_interval == 0: + if pPing.is_alive(): + pPing.terminate() + pPing = Process(target=data.pingtest, args=()) + pPing.start() + + if loopcount == 0 or loopcount % speedtest_interval == 0: if pSpeed.is_alive(): pSpeed.terminate() - pSpeed = Process(target=speedtest, args=(config,)) + pSpeed = Process(target=data.speedtest, args=()) pSpeed.start() - - if loopcount % ( - config['ping_interval'] * config['test_interval']) == 0: - loopcount = 0 - + if ping_interval != 0: + if loopcount % (ping_interval * speedtest_interval) == 0: + loopcount = 0 + else: + if loopcount == speedtest_interval: + loopcount = 0 time.sleep(1) loopcount += 1 if __name__ == '__main__': - log.info('Speedtest CLI data logger to InfluxDB started...') main() diff --git a/speedflux/__init__.py b/speedflux/__init__.py index e69de29..fc99f56 100644 --- a/speedflux/__init__.py +++ b/speedflux/__init__.py @@ -0,0 +1,26 @@ +from speedflux import config, logs, influx + +# Speedflux +CONFIG = None +DB_TYPE = None +LOG = None +INFLUXDB = None + + +def initialize(): + global CONFIG + global LOG + global INFLUXDB + + try: + CONFIG = config.Config() + except Exception as err: + raise SystemExit("Unable to initialize SpeedFlux", err) + try: + LOG = logs.Log(CONFIG) + except Exception as err: + raise SystemExit("Couldn't initiate logging", err) + try: + INFLUXDB = influx.Influx(CONFIG) + except Exception as err: + raise SystemExit("Couldn't initiate InfluxDB <2", err) diff --git a/speedflux/config.py b/speedflux/config.py index 8dade8f..3ff7516 100644 --- a/speedflux/config.py +++ b/speedflux/config.py @@ -1,35 +1,48 @@ import os +import re +# import speedflux -def get_config(): - NAMESPACE = os.getenv('NAMESPACE', 'None') - DB_ADDRESS = os.getenv('INFLUX_DB_ADDRESS', 'influxdb') - DB_PORT = int(os.getenv('INFLUX_DB_PORT', '8086')) - DB_USER = os.getenv('INFLUX_DB_USER', '') - DB_PASSWORD = os.getenv('INFLUX_DB_PASSWORD', '') - DB_DATABASE = os.getenv('INFLUX_DB_DATABASE', 'speedtests') - DB_TAGS = os.getenv('INFLUX_DB_TAGS', None) - PING_TARGETS = os.getenv('PING_TARGETS', '1.1.1.1, 8.8.8.8') - # Speedtest Settings - # Time between tests (in minutes, converts to seconds). - TEST_INTERVAL = int(os.getenv('SPEEDTEST_INTERVAL', '180')) * 60 - # Specific server ID - SERVER_ID = os.getenv('SPEEDTEST_SERVER_ID', '') - # Time between ping tests (in seconds). - PING_INTERVAL = int(os.getenv('PING_INTERVAL', '120')) - LOG_TYPE = os.getenv('LOG_TYPE', 'info') - config = { - 'namespace': NAMESPACE, - 'db_host': DB_ADDRESS, - 'db_port': DB_PORT, - 'db_user': DB_USER, - 'db_pass': DB_PASSWORD, - 'db_tags': DB_TAGS, - 'db_name': DB_DATABASE, - 'ping_targets': PING_TARGETS, - 'test_interval': TEST_INTERVAL, - 'ping_interval': PING_INTERVAL, - 'server_id': SERVER_ID, - 'log_level': LOG_TYPE - } - return config + +_CONFIG_DEFAULTS = { + 'NAMESPACE': (str, 'Database', None), + 'INFLUX_DB_ADDRESS': (str, 'Database', 'influxdb'), + 'INFLUX_DB_PORT': (int, 'Database', 8086), + 'INFLUX_DB_USER': (str, 'Database', None), + 'INFLUX_DB_PASSWORD': (str, 'Database', None), + 'INFLUX_DB_DATABASE': (str, 'Database', 'speedtests'), + 'INFLUX_DB_TAGS': (str, 'Database', None), + 'SPEEDTEST_INTERVAL': (int, 'SpeedTest', 180), + 'SPEEDTEST_SERVER_ID': (str, 'SpeedTest', None), + 'PING_TARGETS': (str, 'PingTest', '1.1.1.1, 8.8.8.8'), + 'PING_INTERVAL': (int, 'PingTest', 120), + 'LOG_TYPE': (str, 'Logs', 'info'), +} + + +class Config: + + def get_setting(self, key): + """ Cast any value in the config to the right type or use the default + """ + key, definition_type, section, default = self._define(key) + my_val = definition_type(os.getenv(key, default)) + return my_val + + def _define(self, name): + key = name.upper() + definition = _CONFIG_DEFAULTS[key] + if len(definition) == 3: + definition_type, section, default = definition + else: + definition_type, section, _, default = definition + return key, definition_type, section, default + + def __getattr__(self, name): + """ + Retrieves config value for the setting + """ + if not re.match(r'[A-Z_]+$', name): + return super(Config, self).__getattr__(name) + else: + return self.get_setting(name) diff --git a/speedflux/data.py b/speedflux/data.py index 64dfa5e..b673fe7 100644 --- a/speedflux/data.py +++ b/speedflux/data.py @@ -3,26 +3,27 @@ from pythonping import ping import json import datetime -from speedflux.logs import log +import speedflux -def speedtest(config): - if not config['server_id']: +def speedtest(): + if not speedflux.CONFIG.SPEEDTEST_SERVER_ID: speedtest = subprocess.run( ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True) - log.info("Automatic server choice") + speedflux.LOG.info("Automatic server choice") else: speedtest = subprocess.run( ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json", - f"--server-id={config['server_id']}"], + f"--server-id={speedflux.CONFIG.SPEEDTEST_SERVER_ID}"], capture_output=True) - log.info(f"Manual server choice : ID = {config['server_id']}") + speedflux.LOG.info("Manual server choice : " + f"ID = {speedflux.CONFIG.SPEEDTEST_SERVER_ID}") if speedtest.returncode == 0: # Speedtest was successful. - log.info("Speedtest Successful...Writing to Influx") + speedflux.LOG.info("Speedtest Successful...Writing to Influx") data_json = json.loads(speedtest.stdout) - log.info(F"""Speedtest Data: + speedflux.LOG.info(F"""Speedtest Data: time: {data_json['timestamp']} ping: {data_json['ping']['latency']}ms download: {data_json['download']['bandwidth']/125000}Mb/s @@ -33,18 +34,18 @@ def speedtest(config): server location: ({data_json['server']['name']} @ \ {data_json['server']['location']}) """) - config['influx'].process_data(data_json) + speedflux.INFLUXDB.process_data(data_json) else: # Speedtest failed. - log.info("Speedtest Failed :") - log.debug(speedtest.stderr) - log.debug(speedtest.stdout) + speedflux.LOG.info("Speedtest Failed :") + speedflux.LOG.debug(speedtest.stderr) + speedflux.LOG.debug(speedtest.stdout) -def pingtest(config): +def pingtest(): timestamp = datetime.datetime.utcnow() - for target in config['ping_targets'].split(','): + for target in speedflux.CONFIG.PING_TARGETS.split(','): target = target.strip() - log.debug('Running ping test...') + speedflux.LOG.debug('Running ping test...') pingtest = ping(target, verbose=False, timeout=1, count=1, size=128) data = [ { @@ -62,6 +63,6 @@ def pingtest(config): } } ] - if config['namespace']: - data[0]['tags']['namespace'] = config['namespace'] - config['influx'].write(data, data_type='Ping') + if speedflux.CONFIG.NAMESPACE: + data[0]['tags']['namespace'] = speedflux.CONFIG.NAMESPACE + speedflux.INFLUXDB.write(data, data_type='Ping') diff --git a/speedflux/influx.py b/speedflux/influx.py index d5db440..cc189dc 100644 --- a/speedflux/influx.py +++ b/speedflux/influx.py @@ -2,7 +2,7 @@ from urllib3.exceptions import NewConnectionError from influxdb import InfluxDBClient -from speedflux.logs import log +import speedflux from requests.exceptions import ConnectionError @@ -17,39 +17,41 @@ def __init__(self, config): def client(self): if not self._client: self._client = InfluxDBClient( - self.config['db_host'], - self.config['db_port'], - self.config['db_user'], - self.config['db_pass'], + self.config.INFLUX_DB_ADDRESS, + self.config.INFLUX_DB_PORT, + self.config.INFLUX_DB_USER, + self.config.INFLUX_DB_PASSWORD, None) - log.debug("Client extablished") + speedflux.LOG.debug("Client extablished") return self._client def init_db(self): try: - log.debug("Intitializing Influx Database") + speedflux.LOG.debug("Intitializing Influx Database") databases = self.client.get_list_database() if len(list(filter( - lambda x: x['name'] == self.config['db_name'], databases)) - ) == 0: + lambda x: x['name'] == + self.config.INFLUX_DB_DATABASE, databases))) == 0: self.client.create_database( - self.config['db_name']) # Create if does not exist. + self.config.INFLUX_DB_DATABASE) # Create if else: # Switch to if does exist. - self.client.switch_database(self.config['db_name']) + self.client.switch_database(self.config.INFLUX_DB_DATABASE) self.initilized = True except (ConnectionError, NewConnectionError) as bad_host: if self.retries == 3: - log.error("Database Init failed for 3rd time. Exiting") + speedflux.LOG.error( + "Database Init failed for 3rd time. Exiting") sys.exit() self.retries += 1 - log.error("Connection to influx host was refused. This likely " - "means that the DB is down or INFLUX_DB_ADDRESS is " - f"incorrect. It's currently '{self.config['db_host']}'") - log.error("Full Error follows\n") - log.error(bad_host) - log.error(f"Retry {self.retries}: Initiliazing DB.") + speedflux.LOG.error( + "Connection to influx host was refused. This likely " + "means that the DB is down or INFLUX_DB_ADDRESS is " + f"incorrect. It's currently '{self.config.INFLUX_DB_ADDRESS}'") + speedflux.LOG.error("Full Error follows\n") + speedflux.LOG.error(bad_host) + speedflux.LOG.error(f"Retry {self.retries}: Initiliazing DB.") self.init_db() def format_data(self, data): @@ -123,36 +125,37 @@ def format_data(self, data): def write(self, data, data_type='Speedtest'): try: if self.client.write_points(data): - log.info(F"{data_type} data written successfully") - log.debug(F"Wrote `{data}` to Influx") + speedflux.LOG.info(F"{data_type} data written successfully") + speedflux.LOG.debug(F"Wrote `{data}` to Influx") self.retries = 0 else: raise Exception(F"{data_type} write points did not complete") except (ConnectionError, NewConnectionError, Exception) as \ bad_connection: if self.retries == 3: - log.error('Max retries exceeded for write. Check that data ' - 'base is on and can receive data') - log.error('Exiting') + speedflux.LOG.error( + 'Max retries exceeded for write. Check that database' + ' is on and can receive data') + speedflux.LOG.error('Exiting') sys.exit() - log.error("Connection error occurred during write") - log.error(bad_connection) + speedflux.LOG.error("Connection error occurred during write") + speedflux.LOG.error(bad_connection) self.retries += 1 - log.error("Reinitiating database and retrying.") + speedflux.LOG.error("Reinitiating database and retrying.") self.init_db() self.write(data, data_type) except Exception as err: - log.error(F"{err}") + speedflux.LOG.error(F"{err}") def tag_selection(self, data): - tags = self.config['db_tags'] + tags = self.config.INFLUX_DB_TAGS options = {} # tag_switch takes in _data and attaches CLIoutput to more readable ids tag_switch = { - 'namespace': self.config['namespace'], + 'namespace': self.config.NAMESPACE, 'isp': data['isp'], 'interface': data['interface']['name'], 'internal_ip': data['interface']['internalIp'], diff --git a/speedflux/logs.py b/speedflux/logs.py index e80d8fa..3de8b71 100644 --- a/speedflux/logs.py +++ b/speedflux/logs.py @@ -1,8 +1,6 @@ import logging import sys -from speedflux.config import get_config - class Log: def __init__( @@ -10,7 +8,7 @@ def __init__( config, log_format="%(asctime)s [%(levelname)s] %(message)s"): self.log_format = log_format - self.log_type = config['log_level'] + self.log_type = config.LOG_TYPE self._log_level = None self.announce_logging() @@ -70,6 +68,3 @@ def error(self, msg): ) if self.log_level >= 1: logging.error(msg) - - -log = Log(get_config())