From b58557121b55e5a7792afddc0285d8e99de88b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20MOHIER?= Date: Fri, 13 Jan 2017 21:33:41 +0100 Subject: [PATCH] Improve Alignak Stats class (timer, counter, gauge) Improve tests according to new features Clean Alignak sent stats: use appropriate function according to metric type (timer, gauge, ...) Document all the found metrics in Stats class Improve scheduler statistics Improve receiver statistics --- alignak/daemon.py | 2 +- alignak/daemons/arbiterdaemon.py | 12 +- alignak/daemons/brokerdaemon.py | 19 +- alignak/daemons/receiverdaemon.py | 9 +- alignak/satellite.py | 12 +- alignak/scheduler.py | 38 +++- alignak/stats.py | 272 ++++++++++++++++++++++++++-- test/test_statsd.py | 290 ++++++++++++++++++++++++++++-- 8 files changed, 598 insertions(+), 56 deletions(-) diff --git a/alignak/daemon.py b/alignak/daemon.py index 330ea86db..d68a82bfe 100644 --- a/alignak/daemon.py +++ b/alignak/daemon.py @@ -1169,7 +1169,7 @@ def hook_point(self, hook_name): 'and set it to restart later', inst.get_name(), str(exp)) logger.exception('Exception %s', exp) self.modules_manager.set_to_restart(inst) - statsmgr.incr('core.hook.%s' % hook_name, time.time() - _t0) + statsmgr.timer('core.hook.%s' % hook_name, time.time() - _t0) def get_retention_data(self): # pylint: disable=R0201 """Basic function to get retention data, diff --git a/alignak/daemons/arbiterdaemon.py b/alignak/daemons/arbiterdaemon.py index d4a4b63e8..00eeca408 100644 --- a/alignak/daemons/arbiterdaemon.py +++ b/alignak/daemons/arbiterdaemon.py @@ -482,7 +482,7 @@ def load_modules_configuration_objects(self, raw_objects): logger.error("Back trace of this remove: %s", output.getvalue()) output.close() continue - statsmgr.incr('hook.get-objects', time.time() - _t0) + statsmgr.timer('core.hook.get_objects', time.time() - _t0) types_creations = self.conf.types_creations for type_c in types_creations: (_, _, prop, dummy) = types_creations[type_c] @@ -763,21 +763,21 @@ def run(self): # Now the dispatcher job _t0 = time.time() self.dispatcher.check_alive() - statsmgr.incr('core.check-alive', time.time() - _t0) + statsmgr.timer('core.check-alive', time.time() - _t0) _t0 = time.time() self.dispatcher.check_dispatch() - statsmgr.incr('core.check-dispatch', time.time() - _t0) + statsmgr.timer('core.check-dispatch', time.time() - _t0) # REF: doc/alignak-conf-dispatching.png (3) _t0 = time.time() self.dispatcher.prepare_dispatch() self.dispatcher.dispatch() - statsmgr.incr('core.dispatch', time.time() - _t0) + statsmgr.timer('core.dispatch', time.time() - _t0) _t0 = time.time() self.dispatcher.check_bad_dispatch() - statsmgr.incr('core.check-bad-dispatch', time.time() - _t0) + statsmgr.timer('core.check-bad-dispatch', time.time() - _t0) # Now get things from our module instances self.get_objects_from_from_queues() @@ -798,7 +798,7 @@ def run(self): _t0 = time.time() self.push_external_commands_to_schedulers() - statsmgr.incr('core.push-external-commands', time.time() - _t0) + statsmgr.timer('core.push-external-commands', time.time() - _t0) # It's sent, do not keep them # TODO: check if really sent. Queue by scheduler? diff --git a/alignak/daemons/brokerdaemon.py b/alignak/daemons/brokerdaemon.py index 80282e8c1..f98a6a675 100644 --- a/alignak/daemons/brokerdaemon.py +++ b/alignak/daemons/brokerdaemon.py @@ -229,7 +229,7 @@ def pynag_con_init(self, _id, i_type='scheduler'): """ _t0 = time.time() res = self.do_pynag_con_init(_id, i_type) - statsmgr.incr('con-init.%s' % i_type, time.time() - _t0) + statsmgr.timer('con-init.%s' % i_type, time.time() - _t0) return res def do_pynag_con_init(self, s_id, i_type='scheduler'): @@ -325,7 +325,9 @@ def manage_brok(self, brok): # Call all modules if they catch the call for mod in self.modules_manager.get_internal_instances(): try: + _t0 = time.time() mod.manage_brok(brok) + statsmgr.timer('core.manage-broks.%s' % mod.get_name(), time.time() - _t0) except Exception as exp: # pylint: disable=broad-except logger.warning("The mod %s raise an exception: %s, I'm tagging it to restart later", mod.get_name(), str(exp)) @@ -779,11 +781,16 @@ def do_loop_turn(self): if self.new_conf: self.setup_new_conf() - # Maybe the last loop we raised some broks internally + # Maybe the last loop we dir raised some broks internally + _t0 = time.time() # we should integrate them in broks self.interger_internal_broks() + statsmgr.timer('get-new-broks.broker', time.time() - _t0) + + _t0 = time.time() # Also reap broks sent from the arbiters self.interger_arbiter_broks() + statsmgr.timer('get-new-broks.arbiter', time.time() - _t0) # Main job, go get broks in our distant daemons types = ['scheduler', 'poller', 'reactionner', 'receiver'] @@ -791,7 +798,7 @@ def do_loop_turn(self): _t0 = time.time() # And from schedulers self.get_new_broks(i_type=_type) - statsmgr.incr('get-new-broks.%s' % _type, time.time() - _t0) + statsmgr.timer('get-new-broks.%s' % _type, time.time() - _t0) # Sort the brok list by id self.broks.sort(sort_by_ids) @@ -809,7 +816,9 @@ def do_loop_turn(self): # instead of killing ourselves :) for mod in ext_modules: try: + t000 = time.time() mod.to_q.put(to_send) + statsmgr.timer('core.put-to-external-queue.%s' % mod.get_name(), time.time() - t000) except Exception as exp: # pylint: disable=broad-except # first we must find the modules logger.warning("The mod %s queue raise an exception: %s, " @@ -821,7 +830,7 @@ def do_loop_turn(self): # No more need to send them for brok in to_send: brok.need_send_to_ext = False - statsmgr.incr('core.put-to-external-queue', time.time() - t00) + statsmgr.timer('core.put-to-external-queue', time.time() - t00) logger.debug("Time to send %s broks (%d secs)", len(to_send), time.time() - t00) # We must had new broks at the end of the list, so we reverse the list @@ -842,7 +851,7 @@ def do_loop_turn(self): brok.prepare() _t0 = time.time() self.manage_brok(brok) - statsmgr.incr('core.manage-brok', time.time() - _t0) + statsmgr.timer('core.manage-broks', time.time() - _t0) nb_broks = len(self.broks) diff --git a/alignak/daemons/receiverdaemon.py b/alignak/daemons/receiverdaemon.py index f4444577e..465126861 100644 --- a/alignak/daemons/receiverdaemon.py +++ b/alignak/daemons/receiverdaemon.py @@ -324,6 +324,7 @@ def push_external_commands_to_schedulers(self): commands_to_process = self.unprocessed_external_commands self.unprocessed_external_commands = [] logger.debug("Commands: %s", commands_to_process) + statsmgr.gauge('external-commands.pushed', len(self.unprocessed_external_commands)) # Now get all external commands and put them into the # good schedulers @@ -363,10 +364,10 @@ def push_external_commands_to_schedulers(self): logger.error("A satellite raised an unknown exception: %s (%s)", exp, type(exp)) raise - # Wether we sent the commands or not, clean the scheduler list + # Whether we sent the commands or not, clean the scheduler list self.schedulers[sched_id]['external_commands'] = [] - # If we didn't send them, add the commands to the arbiter list + # If we didn't sent them, add the commands to the arbiter list if not sent: for extcmd in extcmds: self.external_commands.append(extcmd) @@ -389,9 +390,13 @@ def do_loop_turn(self): # Maybe external modules raised 'objects' # we should get them + _t0 = time.time() self.get_objects_from_from_queues() + statsmgr.timer('core.get-objects-from-queues', time.time() - _t0) + _t0 = time.time() self.push_external_commands_to_schedulers() + statsmgr.timer('core.push-external-commands', time.time() - _t0) # Maybe we do not have something to do, so we wait a little if len(self.broks) == 0: diff --git a/alignak/satellite.py b/alignak/satellite.py index 798432109..b0b64352f 100644 --- a/alignak/satellite.py +++ b/alignak/satellite.py @@ -227,7 +227,7 @@ def pynag_con_init(self, _id): """ _t0 = time.time() res = self.do_pynag_con_init(_id) - statsmgr.incr('con-init.scheduler', time.time() - _t0) + statsmgr.timer('con-init.scheduler', time.time() - _t0) return res def do_pynag_con_init(self, s_id): @@ -336,7 +336,7 @@ def manage_returns(self): """ _t0 = time.time() self.do_manage_returns() - statsmgr.incr('core.manage-returns', time.time() - _t0) + statsmgr.timer('core.manage-returns', time.time() - _t0) def do_manage_returns(self): """Manage the checks and then @@ -653,7 +653,7 @@ def get_new_actions(self): """ _t0 = time.time() self.do_get_new_actions() - statsmgr.incr('core.get-new-actions', time.time() - _t0) + statsmgr.timer('core.get-new-actions', time.time() - _t0) def do_get_new_actions(self): """Get new actions from schedulers @@ -806,7 +806,7 @@ def do_loop_turn(self): sched_id, sched['name'], mod, index, queue.qsize(), self.get_returns_queue_len()) # also update the stats module - statsmgr.incr('core.worker-%s.queue-size' % mod, queue.qsize()) + statsmgr.gauge('core.worker-%s.queue-size' % mod, queue.qsize()) # Before return or get new actions, see how we manage # old ones: are they still in queue (s)? If True, we @@ -827,14 +827,14 @@ def do_loop_turn(self): self.wait_ratio.update_load(self.polling_interval) wait_ratio = self.wait_ratio.get_load() logger.debug("Wait ratio: %f", wait_ratio) - statsmgr.incr('core.wait-ratio', wait_ratio) + statsmgr.timer('core.wait-ratio', wait_ratio) # We can wait more than 1s if needed, # no more than 5s, but no less than 1 timeout = self.timeout * wait_ratio timeout = max(self.polling_interval, timeout) self.timeout = min(5 * self.polling_interval, timeout) - statsmgr.incr('core.timeout', wait_ratio) + statsmgr.timer('core.wait-arbiter', self.timeout) # Maybe we do not have enough workers, we check for it # and launch the new ones if needed diff --git a/alignak/scheduler.py b/alignak/scheduler.py index 30e7d3287..6cc0687cd 100644 --- a/alignak/scheduler.py +++ b/alignak/scheduler.py @@ -253,6 +253,18 @@ def load_conf(self, conf): self.triggers.load_objects(self) self.escalations = conf.escalations + # Internal statistics + statsmgr.gauge('configuration.hosts', len(self.hosts)) + statsmgr.gauge('configuration.services', len(self.services)) + statsmgr.gauge('configuration.hostgroups', len(self.hostgroups)) + statsmgr.gauge('configuration.servicegroups', len(self.servicegroups)) + statsmgr.gauge('configuration.contacts', len(self.contacts)) + statsmgr.gauge('configuration.contactgroups', len(self.contactgroups)) + statsmgr.gauge('configuration.timeperiods', len(self.timeperiods)) + statsmgr.gauge('configuration.commands', len(self.commands)) + statsmgr.gauge('configuration.notificationways', len(self.notificationways)) + statsmgr.gauge('configuration.escalations', len(self.escalations)) + # self.status_file = StatusFile(self) # External status file # From Arbiter. Use for Broker to differentiate schedulers @@ -386,9 +398,11 @@ def run_external_commands(self, cmds): :type cmds: list :return: None """ + _t0 = time.time() logger.debug("Scheduler '%s' got %d commands", self.instance_name, len(cmds)) for command in cmds: self.run_external_command(command) + statsmgr.timer('core.run_external_commands', time.time() - _t0) def run_external_command(self, command): """Run a single external command @@ -541,6 +555,7 @@ def hook_point(self, hook_name): :return:None TODO: find a way to merge this and the version in daemon.py """ + _t0 = time.time() for inst in self.sched_daemon.modules_manager.instances: full_hook_name = 'hook_' + hook_name logger.debug("hook_point: %s: %s %s", @@ -559,6 +574,7 @@ def hook_point(self, hook_name): logger.error("Exception trace follows: %s", output.getvalue()) output.close() self.sched_daemon.modules_manager.set_to_restart(inst) + statsmgr.timer('core.hook.%s' % hook_name, time.time() - _t0) def clean_queues(self): """Reduces internal list size to max allowed @@ -1435,6 +1451,7 @@ def restore_retention_data(self, data): host = self.hosts.find_by_name(ret_h_name) if host is not None: self.restore_retention_data_item(h_dict, host) + statsmgr.gauge('retention.hosts', len(ret_hosts)) # Same for services ret_services = data['services'] @@ -1445,6 +1462,7 @@ def restore_retention_data(self, data): if serv is not None: self.restore_retention_data_item(s_dict, serv) + statsmgr.gauge('retention.services', len(ret_services)) def restore_retention_data_item(self, data, item): """ @@ -2144,7 +2162,9 @@ def run(self): # Ok, now all is initialized, we can make the initial broks logger.info("[%s] First scheduling launched", self.instance_name) + _t1 = time.time() self.schedule() + statsmgr.timer('first_scheduling', time.time() - _t1) logger.info("[%s] First scheduling done", self.instance_name) # Now connect to the passive satellites if needed @@ -2183,6 +2203,9 @@ def run(self): load = min(100, 100.0 - self.load_one_min.get_load() * 100) logger.debug("Load: (sleep) %.2f (average: %.2f) -> %d%%", self.sched_daemon.sleep_time, self.load_one_min.get_load(), load) + statsmgr.gauge('load.sleep', self.sched_daemon.sleep_time) + statsmgr.gauge('load.average', self.load_one_min.get_load()) + statsmgr.gauge('load.load', load) self.sched_daemon.sleep_time = 0.0 @@ -2200,12 +2223,16 @@ def run(self): # Call it and save the time spend in it _t0 = time.time() fun() - statsmgr.incr('loop.%s' % name, time.time() - _t0) - statsmgr.incr('complete_loop', time.time() - _t1) + statsmgr.timer('loop.%s' % name, time.time() - _t0) + statsmgr.timer('loop.whole', time.time() - _t1) # DBG: push actions to passives? + _t1 = time.time() self.push_actions_to_passives_satellites() + statsmgr.timer('push_actions_to_passives_satellites', time.time() - _t1) + _t1 = time.time() self.get_actions_from_passives_satellites() + statsmgr.timer('get_actions_from_passives_satellites', time.time() - _t1) # stats nb_scheduled = nb_inpoller = nb_zombies = 0 @@ -2221,6 +2248,11 @@ def run(self): logger.debug("Checks: total %s, scheduled %s," "inpoller %s, zombies %s, notifications %s", len(self.checks), nb_scheduled, nb_inpoller, nb_zombies, nb_notifications) + statsmgr.gauge('checks.total', len(self.checks)) + statsmgr.gauge('checks.scheduled', nb_scheduled) + statsmgr.gauge('checks.inpoller', nb_inpoller) + statsmgr.gauge('checks.zombie', nb_zombies) + statsmgr.gauge('actions.notifications', nb_notifications) now = time.time() @@ -2246,6 +2278,6 @@ def run(self): self.hook_point('scheduler_tick') - # WE must save the retention at the quit BY OURSELVES + # We must save the retention at the quit BY OURSELVES # because our daemon will not be able to do it for us self.update_retention_file(True) diff --git a/alignak/stats.py b/alignak/stats.py index 3c3c9e6b1..27a7ce7c2 100644 --- a/alignak/stats.py +++ b/alignak/stats.py @@ -41,12 +41,144 @@ # # You should have received a copy of the GNU Affero General Public License # along with Shinken. If not, see . -"""This module provide export of Alignak metrics in a statsd format + +"""This module allows to export Alignak internal metrics to a statsd server. + +The register function allows an Alignak daemon to register some metrics and the +expected behavior (sends to StatsD server and/or build an internal brok). + +As such it: + +- registers the StatsD connexion parameters +- tries to establish a connection if the StatsD sending is enabled +- creates an inner dictionary for the registered metrics + +Every time a metric is updated thanks to the provided functions, the inner dictionary +is updated according to keep the last value, the minimum/maximum values, to update an +internal count of each update and to sum the collected values. +**Todo**: Interest of this feature is to be proven ;) + +The `timer` function sends a timer value to the StatsD registered server and +creates an internal brok. + +..note: the `incr` function simply calls the `timer` function and is kept for compatibility. + +The `counter` function sends a counter value to the StatsD registered server and +creates an internal brok. + +The `gauge` function sends a gauge value to the StatsD registered server and +creates an internal brok. + +Alignak daemons statistics dictionary: + +* scheduler: + - configuration objects count (gauge) + - configuration.hosts + - configuration.services + - configuration.hostgroups + - configuration.servicegroups + - configuration.contacts + - configuration.contactgroups + - configuration.timeperiods + - configuration.commands + - configuration.notificationways + - configuration.escalations + + - retention objects count (gauge) + - retention.hosts + - retention.services + + - scheduler load (gauge): + - load.sleep + - load.average + - load.load + + - scheduler checks (gauge) + - checks.total + - checks.scheduled + - checks.inpoller + - checks.zombie + - actions.notifications + + - first_scheduling (timer) - for the first scheduling on start + - push_actions_to_passives_satellites (timer) - duration to push actions to + passive satellites + - get_actions_from_passives_satellites (timer) - duration to get results from + passive satellites + - loop.whole (timer) - for the scheduler complete loop + - loop.%s (timer) - for each scheduler recurrent work in the loop, where %s can be: + update_downtimes_and_comments + schedule + check_freshness + consume_results + get_new_actions + scatter_master_notifications + get_new_broks + delete_zombie_checks + delete_zombie_actions + clean_caches + update_retention_file + check_orphaned + get_and_register_update_program_status_brok + check_for_system_time_change + manage_internal_checks + clean_queues + update_business_values + reset_topology_change_flags + check_for_expire_acknowledge + send_broks_to_modules + get_objects_from_from_queues + get_latency_average_percentile + +* satellite (poller, reactionner): + - con-init.scheduler (timer) - for the scheduler connection duration + - core.get-new-actions (timer) - duration to get the new actions to execute from the scheduler + - core.manage-returns (timer) - duration to send back to the scheduler the results of + executed actions + - core.worker-%s.queue-size (gauge) - size of the actions queue for each satellite worker + - core.wait-ratio (timer) - time waiting for lanched actions to finish + - core.wait-arbiter (timer) - time waiting for arbiter configuration + +* all daemons: + - core.hook.%s (timer) - duration spent in each hook function provided by a module + +* arbiter: + - core.hook.get_objects (timer) - duration spent in the get_objects hook function provided + by a module + - core.check-alive (timer) - duration to check that alignak daemons are alive + - core.check-dispatch (timer) - duration to check that the configuration is correctly + dispatched + - core.dispatch (timer) - duration to dispatch the configuration to the daemons + - core.check-bad-dispatch (timer) - duration to confirm that the configuration is + correctly dispatched + - core.push-external-commands (timer) - duration to push the external commands to the + schedulers + +* receiver: + - external-commands.pushed (gauge) - number of external commands pushed to schedulers + - core.get-objects-from-queues (timer) - duration to get the objects from modules queues + - core.push-external-commands (timer) - duration to push the external commands to the + schedulers + +* broker: + - con-init.%s (timer) - for the %s daemon connection duration + - get-new-broks.%s (timer) - duration to get new broks from other daemons, where %s can + be: arbiter, scheduler, poller, reactionner, receiver or broker + broker is used for self generated broks + - core.put-to-external-queue (timer) - duration to send broks to external modules + - core.put-to-external-queue.%s (timer) - duration to send broks to each external module, + where %s is the external module alias + - core.manage-broks (timer) - duration to manage broks with internal modules + - core.manage-broks.%s (timer) - duration to manage broks with each internal module, + where %s is the internal module alias """ + import socket import logging +from alignak.brok import Brok + logger = logging.getLogger(__name__) # pylint: disable=C0103 @@ -73,13 +205,16 @@ def __init__(self): self.statsd_prefix = None self.statsd_enabled = None + # local broks part + self.broks_enabled = None + # Statsd daemon parameters self.statsd_sock = None self.statsd_addr = None def register(self, name, _type, statsd_host='localhost', statsd_port=8125, statsd_prefix='alignak', - statsd_enabled=False): + statsd_enabled=False, broks_enabled=False): """Init statsd instance with real values :param name: daemon name @@ -94,6 +229,8 @@ def register(self, name, _type, :type statsd_prefix: str :param statsd_enabled: bool to enable statsd :type statsd_enabled: bool + :param broks_enabled: bool to enable broks sending + :type broks_enabled: bool :return: None """ self.name = name @@ -105,6 +242,9 @@ def register(self, name, _type, self.statsd_prefix = statsd_prefix self.statsd_enabled = statsd_enabled + # local broks part + self.broks_enabled = broks_enabled + if self.statsd_enabled: logger.info('Sending %s/%s daemon statistics to: %s:%s, prefix: %s', self.type, self.name, @@ -141,32 +281,122 @@ def load_statsd(self): logger.info('StatsD server contacted') return True - def incr(self, key, value): - """Increments a key with value + def timer(self, key, value): + """Set a timer value - If the key does not exist is is created + If the inner key does not exist is is created - :param key: key to edit + :param key: timer to update :type key: str - :param value: value to add + :param value: time value (in seconds) :type value: int - :return: True if the metric got sent, else False if not sent + :return: An alignak_stat brok if broks are enabled else None """ - _min, _max, number, _sum = self.stats.get(key, (None, None, 0, 0)) - number += 1 + _min, _max, count, _sum = self.stats.get(key, (None, None, 0, 0)) + count += 1 _sum += value if _min is None or value < _min: _min = value if _max is None or value > _max: _max = value - self.stats[key] = (_min, _max, number, _sum) + self.stats[key] = (_min, _max, count, _sum) # Manage local statsd part if self.statsd_enabled and self.statsd_sock: - # beware, we are sending ms here, value is in s + # beware, we are sending ms here, timer is in seconds packet = '%s.%s.%s:%d|ms' % (self.statsd_prefix, self.name, key, value * 1000) # Do not log because it is spamming the log file, but leave this code in place - # for it may be restored easily for if more tests are necessary... ;) + # for it may be restored easily if more tests are necessary... ;) + # logger.info("Sending data: %s", packet) + try: + self.statsd_sock.sendto(packet, self.statsd_addr) + except (socket.error, socket.gaierror): + pass + # cannot send? ok not a huge problem here and we cannot + # log because it will be far too verbose :p + + if self.broks_enabled: + logger.debug("alignak stat brok: %s = %s", key, value) + return Brok({'type': 'alignak_stat', + 'data': {'type': 'timer', + 'metric': '%s.%s.%s' % (self.statsd_prefix, self.name, key), + 'value': value * 1000, + 'uom': 'ms' + }}) + + return None + + def counter(self, key, value): + """Set a counter value + + If the inner key does not exist is is created + + :param key: counter to update + :type key: str + :param value: counter value + :type value: int + :return: An alignak_stat brok if broks are enabled else None + """ + _min, _max, count, _sum = self.stats.get(key, (None, None, 0, 0)) + count += 1 + _sum += value + if _min is None or value < _min: + _min = value + if _max is None or value > _max: + _max = value + self.stats[key] = (_min, _max, count, _sum) + + # Manage local statsd part + if self.statsd_enabled and self.statsd_sock: + # beware, we are sending ms here, timer is in seconds + packet = '%s.%s.%s:%d|c' % (self.statsd_prefix, self.name, key, value) + # Do not log because it is spamming the log file, but leave this code in place + # for it may be restored easily if more tests are necessary... ;) + # logger.info("Sending data: %s", packet) + try: + self.statsd_sock.sendto(packet, self.statsd_addr) + except (socket.error, socket.gaierror): + pass + # cannot send? ok not a huge problem here and we cannot + # log because it will be far too verbose :p + + if self.broks_enabled: + logger.debug("alignak stat brok: %s = %s", key, value) + return Brok({'type': 'alignak_stat', + 'data': {'type': 'counter', + 'metric': '%s.%s.%s' % (self.statsd_prefix, self.name, key), + 'value': value, + 'uom': 'c' + }}) + + return None + + def gauge(self, key, value): + """Set a gauge value + + If the inner key does not exist is is created + + :param key: gauge to update + :type key: str + :param value: counter value + :type value: int + :return: An alignak_stat brok if broks are enabled else None + """ + _min, _max, count, _sum = self.stats.get(key, (None, None, 0, 0)) + count += 1 + _sum += value + if _min is None or value < _min: + _min = value + if _max is None or value > _max: + _max = value + self.stats[key] = (_min, _max, count, _sum) + + # Manage local statsd part + if self.statsd_enabled and self.statsd_sock: + # beware, we are sending ms here, timer is in seconds + packet = '%s.%s.%s:%d|g' % (self.statsd_prefix, self.name, key, value) + # Do not log because it is spamming the log file, but leave this code in place + # for it may be restored easily if more tests are necessary... ;) # logger.info("Sending data: %s", packet) try: self.statsd_sock.sendto(packet, self.statsd_addr) @@ -174,9 +404,21 @@ def incr(self, key, value): pass # cannot send? ok not a huge problem here and we cannot # log because it will be far too verbose :p - return True - return False + if self.broks_enabled: + logger.debug("alignak stat brok: %s = %s", key, value) + return Brok({'type': 'alignak_stat', + 'data': {'type': 'gauge', + 'metric': '%s.%s.%s' % (self.statsd_prefix, self.name, key), + 'value': value, + 'uom': 'g' + }}) + + return None + + def incr(self, key, value): + """Calls the timer function""" + return self.timer(key, value) # pylint: disable=C0103 statsmgr = Stats() diff --git a/test/test_statsd.py b/test/test_statsd.py index a4857b178..49320f2fe 100644 --- a/test/test_statsd.py +++ b/test/test_statsd.py @@ -28,7 +28,9 @@ import socket import threading -from alignak.stats import Stats, statsmgr +from alignak.brok import Brok + +from alignak.stats import * from alignak_test import AlignakTest @@ -70,13 +72,6 @@ def run(self): def handle_connection(self, sock): data = sock.recv(4096) print("Received: %s", data) - # a valid nrpe response: - # data = b'\x00'*4 + b'\x00'*4 + b'\x00'*2 + 'OK'.encode() + b'\x00'*1022 - # sock.send(data) - # try: - # sock.shutdown(socket.SHUT_RDWR) - # except Exception: - # pass sock.close() @@ -114,8 +109,33 @@ def test_statsmgr_register_disabled(self): # Register stats manager as disabled assert not self.statsmgr.register('arbiter-master', 'arbiter', - statsd_host='localhost', statsd_port=8125, - statsd_prefix='alignak', statsd_enabled=False) + statsd_host='localhost', statsd_port=8125, + statsd_prefix='alignak', statsd_enabled=False) + assert self.statsmgr.statsd_enabled is False + assert self.statsmgr.broks_enabled is False + assert self.statsmgr.statsd_sock is None + assert self.statsmgr.statsd_addr is None + self.assert_log_match(re.escape( + 'INFO: [alignak.stats] Alignak internal statistics are disabled.' + ), 0) + + def test_statsmgr_register_disabled_broks(self): + """ Stats manager is registered as disabled, but broks are enabled + :return: + """ + self.print_header() + + # Setup a logger... + self.setup_logger() + self.clear_logs() + + # Register stats manager as disabled + assert not self.statsmgr.register('arbiter-master', 'arbiter', + statsd_host='localhost', statsd_port=8125, + statsd_prefix='alignak', statsd_enabled=False, + broks_enabled=True) + assert self.statsmgr.statsd_enabled is False + assert self.statsmgr.broks_enabled is True assert self.statsmgr.statsd_sock is None assert self.statsmgr.statsd_addr is None self.assert_log_match(re.escape( @@ -138,6 +158,40 @@ def test_statsmgr_register_enabled(self): assert self.statsmgr.register('arbiter-master', 'arbiter', statsd_host='localhost', statsd_port=8125, statsd_prefix='alignak', statsd_enabled=True) + assert self.statsmgr.statsd_enabled is True + assert self.statsmgr.broks_enabled is False + assert self.statsmgr.statsd_sock is not None + assert self.statsmgr.statsd_addr is not None + self.assert_log_match(re.escape( + 'INFO: [alignak.stats] Sending arbiter/arbiter-master daemon statistics ' + 'to: localhost:8125, prefix: alignak' + ), 0) + self.assert_log_match(re.escape( + 'INFO: [alignak.stats] Trying to contact StatsD server...' + ), 1) + self.assert_log_match(re.escape( + 'INFO: [alignak.stats] StatsD server contacted' + ), 2) + + def test_statsmgr_register_enabled_broks(self): + """ Stats manager is registered as enabled and broks are enabled + :return: + """ + self.print_header() + + # Setup a logger... + self.setup_logger() + self.clear_logs() + + # Register stats manager as enabled + assert self.statsmgr.statsd_sock is None + assert self.statsmgr.statsd_addr is None + assert self.statsmgr.register('arbiter-master', 'arbiter', + statsd_host='localhost', statsd_port=8125, + statsd_prefix='alignak', statsd_enabled=True, + broks_enabled=True) + assert self.statsmgr.statsd_enabled is True + assert self.statsmgr.broks_enabled is True assert self.statsmgr.statsd_sock is not None assert self.statsmgr.statsd_addr is not None self.assert_log_match(re.escape( @@ -209,8 +263,8 @@ def test_statsmgr_connect_port_error(self): # "Connected" to StatsD server - even with a bad port number! self.assert_no_log_match('Cannot create StatsD socket') - def test_statsmgr_incr(self): - """ Test sending data + def test_statsmgr_timer(self): + """ Test sending data for a timer :return: """ self.print_header() @@ -222,32 +276,232 @@ def test_statsmgr_incr(self): # Register stats manager as enabled self.statsmgr.register('arbiter-master', 'arbiter', statsd_host='localhost', statsd_port=8125, - statsd_prefix='alignak', statsd_enabled=True) + statsd_prefix='alignak', statsd_enabled=True, + broks_enabled=True) + + assert self.statsmgr.stats == {} # Create a metric statistic + brok = self.statsmgr.timer('test', 0) + assert len(self.statsmgr.stats) == 1 + # Get min, max, count and sum + assert self.statsmgr.stats['test'] == (0, 0, 1, 0) + # self.assert_log_match(re.escape( + # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:0|ms' + # ), 3) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'timer', + 'metric': 'alignak.arbiter-master.test', + 'value': 0, 'uom': 'ms' + }} + + # Increment + brok = self.statsmgr.timer('test', 1) + assert len(self.statsmgr.stats) == 1 + # Get min, max, count (incremented) and sum + assert self.statsmgr.stats['test'] == (0, 1, 2, 1) + # self.assert_log_match(re.escape( + # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:1000|ms' + # ), 4) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'timer', + 'metric': 'alignak.arbiter-master.test', + 'value': 1000, 'uom': 'ms' + }} + + # Increment - the function is called 'incr' but it does not increment, it sets the value! + brok = self.statsmgr.timer('test', 12) + assert len(self.statsmgr.stats) == 1 + # Get min, max, count (incremented) and sum (increased) + assert self.statsmgr.stats['test'] == (0, 12, 3, 13) + # self.assert_log_match(re.escape( + # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:1000|ms' + # ), 5) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'timer', + 'metric': 'alignak.arbiter-master.test', + 'value': 12000, 'uom': 'ms' + }} + + def test_statsmgr_counter(self): + """ Test sending data for a counter + :return: + """ + self.print_header() + + # Setup a logger... + self.setup_logger() + self.clear_logs() + + # Register stats manager as enabled + self.statsmgr.register('arbiter-master', 'arbiter', + statsd_host='localhost', statsd_port=8125, + statsd_prefix='alignak', statsd_enabled=True, + broks_enabled=True) + assert self.statsmgr.stats == {} - self.statsmgr.incr('test', 0) + + # Create a metric statistic + brok = self.statsmgr.counter('test', 0) assert len(self.statsmgr.stats) == 1 - # Get min, max, cout and sum + # Get min, max, count and sum assert self.statsmgr.stats['test'] == (0, 0, 1, 0) # self.assert_log_match(re.escape( # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:0|ms' # ), 3) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'counter', + 'metric': 'alignak.arbiter-master.test', + 'value': 0, 'uom': 'c' + }} # Increment - self.statsmgr.incr('test', 1) + brok = self.statsmgr.counter('test', 1) assert len(self.statsmgr.stats) == 1 + # Get min, max, count (incremented) and sum assert self.statsmgr.stats['test'] == (0, 1, 2, 1) # self.assert_log_match(re.escape( # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:1000|ms' # ), 4) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'counter', + 'metric': 'alignak.arbiter-master.test', + 'value': 1, 'uom': 'c' + }} # Increment - the function is called 'incr' but it does not increment, it sets the value! - self.statsmgr.incr('test', 1) + brok = self.statsmgr.counter('test', 12) assert len(self.statsmgr.stats) == 1 - assert self.statsmgr.stats['test'] == (0, 1, 3, 2) + # Get min, max, count (incremented) and sum (increased) + assert self.statsmgr.stats['test'] == (0, 12, 3, 13) # self.assert_log_match(re.escape( # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:1000|ms' # ), 5) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'counter', + 'metric': 'alignak.arbiter-master.test', + 'value': 12, 'uom': 'c' + }} + + def test_statsmgr_gauge(self): + """ Test sending data for a gauge + :return: + """ + self.print_header() + # Setup a logger... + self.setup_logger() + self.clear_logs() + + # Register stats manager as enabled + self.statsmgr.register('arbiter-master', 'arbiter', + statsd_host='localhost', statsd_port=8125, + statsd_prefix='alignak', statsd_enabled=True, + broks_enabled=True) + + assert self.statsmgr.stats == {} + # Create a metric statistic + brok = self.statsmgr.gauge('test', 0) + assert len(self.statsmgr.stats) == 1 + # Get min, max, count and sum + assert self.statsmgr.stats['test'] == (0, 0, 1, 0) + # self.assert_log_match(re.escape( + # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:0|ms' + # ), 3) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'gauge', + 'metric': 'alignak.arbiter-master.test', + 'value': 0, 'uom': 'g' + }} + + # Increment + brok = self.statsmgr.gauge('test', 1) + assert len(self.statsmgr.stats) == 1 + # Get min, max, count (incremented) and sum + assert self.statsmgr.stats['test'] == (0, 1, 2, 1) + # self.assert_log_match(re.escape( + # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:1000|ms' + # ), 4) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'gauge', + 'metric': 'alignak.arbiter-master.test', + 'value': 1, 'uom': 'g' + }} + + # Increment - the function is called 'incr' but it does not increment, it sets the value! + brok = self.statsmgr.gauge('test', 12) + assert len(self.statsmgr.stats) == 1 + # Get min, max, count (incremented) and sum (increased) + assert self.statsmgr.stats['test'] == (0, 12, 3, 13) + # self.assert_log_match(re.escape( + # 'INFO: [alignak.stats] Sending data: alignak.arbiter-master.test:1000|ms' + # ), 5) + # Prepare brok and remove specific brok properties (for test purpose only... + brok.prepare() + brok.__dict__.pop('creation_time') + brok.__dict__.pop('instance_id') + brok.__dict__.pop('prepared') + brok.__dict__.pop('uuid') + assert brok.__dict__ == {'type': 'alignak_stat', + 'data': { + 'type': 'gauge', + 'metric': 'alignak.arbiter-master.test', + 'value': 12, 'uom': 'g' + }}