From a13a42d51c7c8199d94232cb54e6d5d5e756ae51 Mon Sep 17 00:00:00 2001 From: mtakaki Date: Sun, 5 Jan 2020 10:25:06 -0800 Subject: [PATCH] Multithreading #66 (#76) * feat(multihreading): each url has it's own thread * Fixing broken unit tests * Improving readability when there are multiple URLs registerd and creating new action to upload metrics * Improving error message when there's no file found * Bumping the version Co-authored-by: Alex Berenshtein --- README.md | 93 ++++++++++++++++++----------- cachet_url_monitor/configuration.py | 74 ++++++++++++----------- cachet_url_monitor/scheduler.py | 58 +++++++++++++++--- config.yml | 49 +++++++-------- setup.py | 2 +- tests/test_configuration.py | 23 +++---- tests/test_scheduler.py | 5 +- 7 files changed, 188 insertions(+), 116 deletions(-) diff --git a/README.md b/README.md index fe6d5d5..da1a8d9 100644 --- a/README.md +++ b/README.md @@ -16,55 +16,78 @@ This project is available at PyPI: [https://pypi.python.org/pypi/cachet-url-moni ## Configuration ```yaml -endpoint: - url: http://www.google.com - method: GET - header: - SOME-HEADER: SOME-VALUE - timeout: 1 # seconds - expectation: - - type: HTTP_STATUS - status_range: 200-300 - incident: MAJOR - - type: LATENCY - threshold: 1 - - type: REGEX - regex: ".*.*" - allowed_fails: 0 +endpoints: + - name: Google + url: http://www.google.com + method: GET + header: + SOME-HEADER: SOME-VALUE + timeout: 1 # seconds + expectation: + - type: HTTP_STATUS + status_range: 200-205 + - type: LATENCY + threshold: 1 + - type: REGEX + regex: ".*.*" + allowed_fails: 0 + component_id: 1 + metric_id: 1 + action: + - UPDATE_STATUS + public_incidents: true + latency_unit: ms + frequency: 5 + - name: Amazon + url: http://www.amazon.com + method: GET + header: + SOME-HEADER: SOME-VALUE + timeout: 1 # seconds + expectation: + - type: HTTP_STATUS + status_range: 200-205 + incident: MAJOR + - type: LATENCY + threshold: 1 + - type: REGEX + regex: ".*.*" + threshold: 10 + allowed_fails: 0 + component_id: 2 + action: + - CREATE_INCIDENT + public_incidents: true + latency_unit: ms + frequency: 5 cachet: api_url: http://status.cachethq.io/api/v1 - token: my_token - component_id: 1 - metric_id: 1 - action: - - CREATE_INCIDENT - - UPDATE_STATUS - public_incidents: true - latency_unit: ms -frequency: 30 + token: mytoken ``` -- **endpoint**, the configuration about the URL that will be monitored. - - **url**, the URL that is going to be monitored. - - **method**, the HTTP method that will be used by the monitor. +- **endpoints**, the configuration about the URL/Urls that will be monitored. + - **name**, The name of the component. This is now mandatory (since 0.6.0) so we can distinguish the logs for each URL being monitored. + - **url**, the URL that is going to be monitored. *mandatory* + - **method**, the HTTP method that will be used by the monitor. *mandatory* - **header**, client header passed to the request. Remove if you do not want to pass a header. - - **timeout**, how long we'll wait to consider the request failed. The unit of it is seconds. - - **expectation**, the list of expectations set for the URL. + - **timeout**, how long we'll wait to consider the request failed. The unit of it is seconds. *mandatory* + - **expectation**, the list of expectations set for the URL. *mandatory* - **HTTP_STATUS**, we will verify if the response status code falls into the expected range. Please keep in mind the range is inclusive on the first number and exclusive on the second number. If just one value is specified, it will default to only the given value, for example `200` will be converted to `200-201`. - **LATENCY**, we measure how long the request took to get a response and fail if it's above the threshold. The unit is in seconds. - **REGEX**, we verify if the response body matches the given regex. - **allowed_fails**, create incident/update component status only after specified amount of failed connection trials. -- **cachet**, this is the settings for our cachet server. - - **api_url**, the cachet API endpoint. - - **token**, the API token. - - **component_id**, the id of the component we're monitoring. This will be used to update the status of the component. + - **component_id**, the id of the component we're monitoring. This will be used to update the status of the component. *mandatory* - **metric_id**, this will be used to store the latency of the API. If this is not set, it will be ignored. - **action**, the action to be done when one of the expectations fails. This is optional and if left blank, nothing will be done to the component. - **CREATE_INCIDENT**, we will create an incident when the expectation fails. - - **UPDATE_STATUS**, updates the component status + - **UPDATE_STATUS**, updates the component status. + - **PUSH_METRICS**, uploads response latency metrics. - **public_incidents**, boolean to decide if created incidents should be visible to everyone or only to logged in users. Important only if `CREATE_INCIDENT` or `UPDATE_STATUS` are set. - **latency_unit**, the latency unit used when reporting the metrics. It will automatically convert to the specified unit. It's not mandatory and it will default to **seconds**. Available units: `ms`, `s`, `m`, `h`. -- **frequency**, how often we'll send a request to the given URL. The unit is in seconds. + - **frequency**, how often we'll send a request to the given URL. The unit is in seconds. +- **cachet**, this is the settings for our cachet server. + - **api_url**, the cachet API endpoint. *mandatory* + - **token**, the API token. *mandatory* Each `expectation` has their own default incident status. It can be overridden by setting the `incident` property to any of the following values: - `PARTIAL` diff --git a/cachet_url_monitor/configuration.py b/cachet_url_monitor/configuration.py index 51a9238..8a343c0 100644 --- a/cachet_url_monitor/configuration.py +++ b/cachet_url_monitor/configuration.py @@ -8,18 +8,13 @@ import requests from yaml import dump -from yaml import load -from yaml import FullLoader import cachet_url_monitor.latency_unit as latency_unit import cachet_url_monitor.status as st # This is the mandatory fields that must be in the configuration file in this # same exact structure. -configuration_mandatory_fields = { - 'endpoint': ['url', 'method', 'timeout', 'expectation'], - 'cachet': ['api_url', 'token', 'component_id'], - 'frequency': []} +configuration_mandatory_fields = ['url', 'method', 'timeout', 'expectation', 'component_id', 'frequency'] class ConfigurationValidationError(Exception): @@ -78,13 +73,19 @@ class Configuration(object): of assessing the API and pushing the results to cachet. """ - def __init__(self, config_file): - self.logger = logging.getLogger('cachet_url_monitor.configuration.Configuration') - self.config_file = config_file - self.data = load(open(self.config_file, 'r'), Loader=FullLoader) + def __init__(self, config_file, endpoint_index): + self.endpoint_index = endpoint_index + self.data = config_file + self.endpoint = self.data['endpoints'][endpoint_index] self.current_fails = 0 self.trigger_update = True + if 'name' not in self.endpoint: + # We have to make this mandatory, otherwise the logs are confusing when there are multiple URLs. + raise ConfigurationValidationError('name') + + self.logger = logging.getLogger(f'cachet_url_monitor.configuration.Configuration.{self.endpoint["name"]}') + # Exposing the configuration to confirm it's parsed as expected. self.print_out() @@ -94,33 +95,32 @@ def __init__(self, config_file): # We store the main information from the configuration file, so we don't keep reading from the data dictionary. self.headers = {'X-Cachet-Token': os.environ.get('CACHET_TOKEN') or self.data['cachet']['token']} - self.endpoint_method = os.environ.get('ENDPOINT_METHOD') or self.data['endpoint']['method'] - self.endpoint_url = os.environ.get('ENDPOINT_URL') or self.data['endpoint']['url'] + self.endpoint_method = self.endpoint['method'] + self.endpoint_url = self.endpoint['url'] self.endpoint_url = normalize_url(self.endpoint_url) - self.endpoint_timeout = os.environ.get('ENDPOINT_TIMEOUT') or self.data['endpoint'].get('timeout') or 1 - self.endpoint_header = self.data['endpoint'].get('header') or None - self.allowed_fails = os.environ.get('ALLOWED_FAILS') or self.data['endpoint'].get('allowed_fails') or 0 + self.endpoint_timeout = self.endpoint.get('timeout') or 1 + self.endpoint_header = self.endpoint.get('header') or None + self.allowed_fails = self.endpoint.get('allowed_fails') or 0 self.api_url = os.environ.get('CACHET_API_URL') or self.data['cachet']['api_url'] - self.component_id = os.environ.get('CACHET_COMPONENT_ID') or self.data['cachet']['component_id'] - self.metric_id = os.environ.get('CACHET_METRIC_ID') or self.data['cachet'].get('metric_id') + self.component_id = self.endpoint['component_id'] + self.metric_id = self.endpoint.get('metric_id') if self.metric_id is not None: self.default_metric_value = self.get_default_metric_value(self.metric_id) # The latency_unit configuration is not mandatory and we fallback to seconds, by default. - self.latency_unit = os.environ.get('LATENCY_UNIT') or self.data['cachet'].get('latency_unit') or 's' + self.latency_unit = self.data['cachet'].get('latency_unit') or 's' # We need the current status so we monitor the status changes. This is necessary for creating incidents. self.status = get_current_status(self.api_url, self.component_id, self.headers) self.previous_status = self.status # Get remaining settings - self.public_incidents = int( - os.environ.get('CACHET_PUBLIC_INCIDENTS') or self.data['cachet']['public_incidents']) + self.public_incidents = int(self.endpoint['public_incidents']) self.logger.info('Monitoring URL: %s %s' % (self.endpoint_method, self.endpoint_url)) - self.expectations = [Expectation.create(expectation) for expectation in self.data['endpoint']['expectation']] + self.expectations = [Expectation.create(expectation) for expectation in self.endpoint['expectation']] for expectation in self.expectations: self.logger.info('Registered expectation: %s' % (expectation,)) @@ -137,10 +137,10 @@ def get_action(self): """Retrieves the action list from the configuration. If it's empty, returns an empty list. :return: The list of actions, which can be an empty list. """ - if self.data['cachet'].get('action') is None: + if self.endpoint.get('action') is None: return [] else: - return self.data['cachet']['action'] + return self.endpoint['action'] def validate(self): """Validates the configuration by verifying the mandatory fields are @@ -148,24 +148,20 @@ def validate(self): ConfigurationValidationError is raised. Otherwise nothing will happen. """ configuration_errors = [] - for key, sub_entries in configuration_mandatory_fields.items(): - if key not in self.data: + for key in configuration_mandatory_fields: + if key not in self.endpoint: configuration_errors.append(key) - for sub_key in sub_entries: - if sub_key not in self.data[key]: - configuration_errors.append('%s.%s' % (key, sub_key)) - - if ('endpoint' in self.data and 'expectation' in - self.data['endpoint']): - if (not isinstance(self.data['endpoint']['expectation'], list) or - (isinstance(self.data['endpoint']['expectation'], list) and - len(self.data['endpoint']['expectation']) == 0)): + if 'expectation' in self.endpoint: + if (not isinstance(self.endpoint['expectation'], list) or + (isinstance(self.endpoint['expectation'], list) and + len(self.endpoint['expectation']) == 0)): configuration_errors.append('endpoint.expectation') if len(configuration_errors) > 0: raise ConfigurationValidationError( - f"Config file [{self.config_file}] failed validation. Missing keys: {', '.join(configuration_errors)}") + 'Endpoint [%s] failed validation. Missing keys: %s' % (self.endpoint, + ', '.join(configuration_errors))) def evaluate(self): """Sends the request to the URL set in the configuration and executes @@ -214,6 +210,8 @@ def __repr__(self): temporary_data = copy.deepcopy(self.data) # Removing the token so we don't leak it in the logs. del temporary_data['cachet']['token'] + temporary_data['endpoints'] = temporary_data['endpoints'][self.endpoint_index] + return dump(temporary_data, default_flow_style=False) def if_trigger_update(self): @@ -361,6 +359,10 @@ def __init__(self, configuration): @staticmethod def parse_range(range_string): + if isinstance(range_string, int): + # This happens when there's no range and no dash character, it will be parsed as int already. + return range_string, range_string + 1 + statuses = range_string.split("-") if len(statuses) == 1: # When there was no range given, we should treat the first number as a single status check. @@ -382,7 +384,7 @@ def get_message(self, response): return f'Unexpected HTTP status ({response.status_code})' def __str__(self): - return repr(f'HTTP status range: {self.status_range}') + return repr(f'HTTP status range: [{self.status_range[0]}, {self.status_range[1]}[') class Latency(Expectation): diff --git a/cachet_url_monitor/scheduler.py b/cachet_url_monitor/scheduler.py index 8b672e6..67b5afc 100644 --- a/cachet_url_monitor/scheduler.py +++ b/cachet_url_monitor/scheduler.py @@ -1,12 +1,16 @@ #!/usr/bin/env python import logging import sys +import threading import time import schedule +from yaml import load, SafeLoader from cachet_url_monitor.configuration import Configuration +cachet_mandatory_fields = ['api_url', 'token'] + class Agent(object): """Monitor agent that will be constantly verifying if the URL is healthy @@ -32,7 +36,7 @@ def execute(self): def start(self): """Sets up the schedule based on the configuration file.""" - schedule.every(self.configuration.data['frequency']).seconds.do(self.execute) + schedule.every(self.configuration.endpoint['frequency']).seconds.do(self.execute) class Decorator(object): @@ -50,10 +54,15 @@ def execute(self, configuration): configuration.push_incident() +class PushMetricsDecorator(Decorator): + def execute(self, configuration): + configuration.push_metrics() + + class Scheduler(object): - def __init__(self, config_file): + def __init__(self, config_file, endpoint_index): self.logger = logging.getLogger('cachet_url_monitor.scheduler.Scheduler') - self.configuration = Configuration(config_file) + self.configuration = Configuration(config_file, endpoint_index) self.agent = self.get_agent() self.stop = False @@ -62,10 +71,11 @@ def get_agent(self): action_names = { 'CREATE_INCIDENT': CreateIncidentDecorator, 'UPDATE_STATUS': UpdateStatusDecorator, + 'PUSH_METRICS': PushMetricsDecorator, } actions = [] for action in self.configuration.get_action(): - self.logger.info('Registering action %s' % (action)) + self.logger.info(f'Registering action {action}') actions.append(action_names[action]()) return Agent(self.configuration, decorators=actions) @@ -74,7 +84,33 @@ def start(self): self.logger.info('Starting monitor agent...') while not self.stop: schedule.run_pending() - time.sleep(self.configuration.data['frequency']) + time.sleep(self.configuration.endpoint['frequency']) + + +class NewThread(threading.Thread): + def __init__(self, scheduler): + threading.Thread.__init__(self) + self.scheduler = scheduler + + def run(self): + self.scheduler.start() + + +def validate_config(): + if 'endpoints' not in config_file.keys(): + fatal_error('Endpoints is a mandatory field') + + if config_file['endpoints'] is None: + fatal_error('Endpoints array can not be empty') + + for key in cachet_mandatory_fields: + if key not in config_file['cachet']: + fatal_error('Missing cachet mandatory fields') + + +def fatal_error(message): + logging.getLogger('cachet_url_monitor.scheduler').fatal("%s", message) + sys.exit(1) if __name__ == "__main__": @@ -87,5 +123,13 @@ def start(self): logging.getLogger('cachet_url_monitor.scheduler').fatal('Missing configuration file argument') sys.exit(1) - scheduler = Scheduler(sys.argv[1]) - scheduler.start() + try: + config_file = load(open(sys.argv[1], 'r'), SafeLoader) + except FileNotFoundError: + logging.getLogger('cachet_url_monitor.scheduler').fatal(f'File not found: {sys.argv[1]}') + sys.exit(1) + + validate_config() + + for endpoint_index in range(len(config_file['endpoints'])): + NewThread(Scheduler(config_file, endpoint_index)).start() diff --git a/config.yml b/config.yml index 9f02260..1dd22d1 100644 --- a/config.yml +++ b/config.yml @@ -1,26 +1,27 @@ -endpoint: - url: http://localhost:8080/swagger - method: GET - header: - SOME-HEADER: SOME-VALUE - timeout: 0.01 - expectation: - - type: HTTP_STATUS - status_range: 200-300 - incident: MAJOR - - type: LATENCY - threshold: 1 - - type: REGEX - regex: '.*(