From 161924e336654355197246c7d78ebd8124bf9f0a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 17:56:44 +0100 Subject: [PATCH 01/38] rpm build py3 ready --- Makefile | 2 +- argo-nagios-ams-publisher.spec | 10 +++++----- setup.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 18af9cb..c81ce68 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ rpm: dist dist: rm -rf dist - python setup.py sdist + python3 setup.py sdist mv -f dist/${PKGNAME}-${PKGVERSION}.tar.gz . rm -rf dist diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index c33756d..41cbf34 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -43,11 +43,11 @@ Bridge from Nagios to the ARGO Messaging system %setup -q %build -%{__python} setup.py build +%{py3_build} %install rm -rf $RPM_BUILD_ROOT -%{__python} setup.py install --skip-build --root $RPM_BUILD_ROOT --record=INSTALLED_FILES +%{py3_install "--record=INSTALLED_FILES"} install --directory --mode 755 $RPM_BUILD_ROOT/%{_sysconfdir}/%{name}/ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/log/%{name}/ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/spool/%{name}/metrics/ @@ -58,8 +58,8 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/ %defattr(-,root,root,-) %config(noreplace) %{_sysconfdir}/%{name}/ams-publisher.conf %config(noreplace) %{_sysconfdir}/%{name}/metric_data.avsc -%dir %{python_sitelib}/%{underscore %{name}} -%{python_sitelib}/%{underscore %{name}}/*.py[co] +%dir %{python3_sitelib}/%{underscore %{name}} +%{python3_sitelib}/%{underscore %{name}}/*.py[co] %defattr(-,nagios,nagios,-) %dir %{_localstatedir}/log/%{name}/ %attr(0755,nagios,nagios) %dir %{_localstatedir}/run/%{name}/ @@ -108,7 +108,7 @@ fi %changelog * Thu Oct 8 2020 Daniel Vrcic - 0.3.8-1%{?dist} -- remove leftovers from erroneous SIGHUP handling +- remove leftovers from erroneous SIGHUP handling * Wed Jul 8 2020 Daniel Vrcic - 0.3.7-1%{?dist} - ARGO-2378 RPM post install should restart service not stop it - ARGO-844 Complete README for ams-publisher diff --git a/setup.py b/setup.py index 42103d9..bb4fb42 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ def get_ver(): if "Version:" in line: return line.split()[1] except IOError: - print "Make sure that %s is in directory" % (NAME+'.spec') + print(f'Make sure that {NAME}.spec is in directory') raise SystemExit(1) From b29dbfa7ed3872df76fc51e1f6fb20129d874380 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 18:04:09 +0100 Subject: [PATCH 02/38] py3 exec wrapped in parentheses --- pymod/alarmtoqueue.py | 2 +- pymod/metrictoqueue.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pymod/alarmtoqueue.py b/pymod/alarmtoqueue.py index e8ea375..5eadc9b 100644 --- a/pymod/alarmtoqueue.py +++ b/pymod/alarmtoqueue.py @@ -38,7 +38,7 @@ def build_msg(args, *headers): for bs in ['details', 'vo', 'site', 'roc', 'urlhistory', 'urlhelp']: code = "msg.body += '%s: ' + args.%s + '\\n' if args.%s else ''" % (bs, bs, bs) - exec code + exec(code) msg.text = True return msg diff --git a/pymod/metrictoqueue.py b/pymod/metrictoqueue.py index 9e512e1..6b66a85 100644 --- a/pymod/metrictoqueue.py +++ b/pymod/metrictoqueue.py @@ -39,7 +39,7 @@ def build_msg(args, *headers): for bs in ['summary', 'message', 'vofqan', 'voname', 'roc', 'actual_data', 'site']: code = "msg.body += '%s: ' + args.%s + '\\n' if args.%s else ''" % (bs, bs, bs) - exec code + exec(code) msg.text = True return msg From 9bfbefd2b95ac359b602c252445b723853372f6a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 19:32:07 +0100 Subject: [PATCH 03/38] corrected spec dependencies --- argo-nagios-ams-publisher.spec | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index 41cbf34..cb88518 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -10,7 +10,7 @@ %endif Name: argo-nagios-ams-publisher -Version: 0.3.8 +Version: 0.3.9 Release: 1%{mydist} Summary: Bridge from Nagios to the ARGO Messaging system @@ -21,21 +21,13 @@ Source0: %{name}-%{version}.tar.gz BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) BuildArch: noarch -BuildRequires: python-devel -Requires: argo-ams-library -Requires: avro -Requires: python-argparse -Requires: python-daemon -Requires: python-dirq -Requires: python-messaging +BuildRequires: python3-devel +Requires: python3-argo-ams-library +Requires: python3-avro +Requires: python3-dirq +Requires: python3-messaging Requires: pytz -%if 0%{?el7:1} -Requires: python2-psutil >= 4.3 -%else -Requires: python-psutil >= 4.3 -%endif - %description Bridge from Nagios to the ARGO Messaging system @@ -59,7 +51,7 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/ %config(noreplace) %{_sysconfdir}/%{name}/ams-publisher.conf %config(noreplace) %{_sysconfdir}/%{name}/metric_data.avsc %dir %{python3_sitelib}/%{underscore %{name}} -%{python3_sitelib}/%{underscore %{name}}/*.py[co] +%{python3_sitelib}/%{underscore %{name}}/*.py %defattr(-,nagios,nagios,-) %dir %{_localstatedir}/log/%{name}/ %attr(0755,nagios,nagios) %dir %{_localstatedir}/run/%{name}/ From d32e9d9fafc984b15f6156981dcd0d4c09f580e7 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 19:32:28 +0100 Subject: [PATCH 04/38] py3 shebangs --- bin/ams-alarm-to-queue | 2 +- bin/ams-metric-to-queue | 2 +- bin/ams-publisherd | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/ams-alarm-to-queue b/bin/ams-alarm-to-queue index 97972c8..c4e6687 100755 --- a/bin/ams-alarm-to-queue +++ b/bin/ams-alarm-to-queue @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 from argo_nagios_ams_publisher import alarmtoqueue diff --git a/bin/ams-metric-to-queue b/bin/ams-metric-to-queue index d539da6..5a9c4b0 100755 --- a/bin/ams-metric-to-queue +++ b/bin/ams-metric-to-queue @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 from argo_nagios_ams_publisher import metrictoqueue diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 4881d2b..6b67195 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 from argo_nagios_ams_publisher.config import parse_config from argo_nagios_ams_publisher.log import Logger From 0ecc66e401bf9870f3d6af59f49e7206157c29fd Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 19:39:36 +0100 Subject: [PATCH 05/38] fix pytz dependency --- argo-nagios-ams-publisher.spec | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index cb88518..ec170b6 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -26,7 +26,7 @@ Requires: python3-argo-ams-library Requires: python3-avro Requires: python3-dirq Requires: python3-messaging -Requires: pytz +Requires: python36-pytz %description Bridge from Nagios to the ARGO Messaging system diff --git a/setup.py b/setup.py index bb4fb42..3e91dca 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def is_c7(): def get_ver(): try: - with open(NAME+'.spec') as f: + with open(f'{NAME}.spec') as f: for line in f: if "Version:" in line: return line.split()[1] From ea1b5a61a003b2bff945b8953a452d813275de67 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 19:50:22 +0100 Subject: [PATCH 06/38] helpers py3 ready --- helpers/ams-msg-generator.py | 8 ++++---- helpers/ams-queue-consume.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/helpers/ams-msg-generator.py b/helpers/ams-msg-generator.py index ebccccb..78c4a6d 100755 --- a/helpers/ams-msg-generator.py +++ b/helpers/ams-msg-generator.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 import argparse import datetime @@ -77,7 +77,7 @@ def main(): try: tz = timezone(args.timezone) except UnknownTimeZoneError as e: - print "Timezone not correct" + print("Timezone not correct") raise SystemExit(1) mq = DQS(path=args.queue, granularity=args.granularity) @@ -88,13 +88,13 @@ def main(): msg = construct_msg(args.session, args.bodysize, tz) queue_msg(msg, mq) if not args.noout: - print msg + print(msg) else: while True: msg = construct_msg(args.session, args.bodysize, tz) queue_msg(msg, mq) if not args.noout: - print msg + print(msg) if args.sleep: time.sleep(args.sleep) diff --git a/helpers/ams-queue-consume.py b/helpers/ams-queue-consume.py index 91a144a..e93114e 100755 --- a/helpers/ams-queue-consume.py +++ b/helpers/ams-queue-consume.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python3 import argparse import datetime @@ -27,7 +27,7 @@ def seteuser(user): def consume_queue(mq, num=0): global cqcalld, args if not args.noout: - print '---- MSGS ---- RUN {0} ----'.format(cqcalld) + print('---- MSGS ---- RUN {0} ----'.format(cqcalld)) i, msgs = 1, deque() for name in mq: @@ -38,7 +38,7 @@ def consume_queue(mq, num=0): break i += 1 else: - print '{0} empty'.format(mq.path) + print('{0} empty'.format(mq.path)) if msgs and not args.noout: From bfc66740b221c48502afe5afaabbc928c2212385 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 19:50:40 +0100 Subject: [PATCH 07/38] correct configparser py3 import --- pymod/config.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pymod/config.py b/pymod/config.py index de16184..50180b5 100644 --- a/pymod/config.py +++ b/pymod/config.py @@ -1,4 +1,4 @@ -import ConfigParser +import configparser import sys from pytz import timezone, UnknownTimeZoneError @@ -9,7 +9,7 @@ def get_queue_granul(queue): confopts = parse_config() is_queue_found = False - for k, v in confopts['queues'].iteritems(): + for k, v in confopts['queues'].items(): if confopts['queues'][k]['directory'].startswith(queue): is_queue_found = True break @@ -29,7 +29,7 @@ def parse_config(logger=None): confopts = dict() try: - config = ConfigParser.ConfigParser() + config = configparser.ConfigParser() if config.read(conf): pairedsects = ['{0}_'.format(s.lower().split('_', 1)[0]) for s in config.sections() if '_' in s] @@ -95,9 +95,9 @@ def parse_config(logger=None): topts['sleepretry'] = config.getint(section, 'SleepRetry') topics[tname] = topts - for k, v in queues.iteritems(): + for k, v in queues.items(): if k not in topics: - raise ConfigParser.NoSectionError('No topic topic_%s defined' % k) + raise configparser.NoSectionError('No topic topic_%s defined' % k) if topics[k]['bulk'] < queues[k]['rate'] and \ queues[k]['rate'] % topics[k]['bulk']: @@ -144,7 +144,7 @@ def parse_config(logger=None): sys.stderr.write('Missing %s\n' % conf) raise SystemExit(1) - except (ConfigParser.NoOptionError, ConfigParser.NoSectionError) as e: + except (configparser.NoOptionError, configparser.NoSectionError) as e: if logger: logger.error(e) raise SystemExit(1) @@ -152,7 +152,7 @@ def parse_config(logger=None): sys.stderr.write(str(e) + '\n') raise SystemExit(1) - except (ConfigParser.MissingSectionHeaderError, ConfigParser.ParsingError, SystemExit) as e: + except (configparser.MissingSectionHeaderError, configparser.ParsingError, SystemExit) as e: if getattr(e, 'filename', False): if logger: logger.error(e.filename + ' is not a valid configuration file') From c182a0e8a77f350753b19cb3511b0842d966c226 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 21:30:38 +0100 Subject: [PATCH 08/38] remove old daemonizing --- bin/ams-publisherd | 127 --------------------------------------------- 1 file changed, 127 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 6b67195..00f7a3e 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -8,26 +8,14 @@ from argo_nagios_ams_publisher.shared import Shared from datetime import datetime import argparse -import daemon import errno import multiprocessing import os -import psutil import pwd import signal import socket import sys -""" - try first to import el6 modulefile, if that fails try to import el7 - modulefile -""" -try: - import daemon.pidlockfile as pidlockfile -except ImportError: - import daemon.pidfile as pidlockfile - -pidfile = '/var/run/argo-nagios-ams-publisher/pid' logfile = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' shared = None @@ -114,89 +102,6 @@ def get_userids(user): return pwd.getpwnam(user)[2], pwd.getpwnam(user)[3] -def daemon_start(context_daemon, restart=False): - if context_daemon.pidfile.is_locked() and not \ - context_daemon.pidfile.i_am_locking(): - pid = context_daemon.pidfile.read_pid() - try: - psutil.Process(pid=pid) - shared.log.info('Already running (%s)' % pid) - return 0 - except psutil.NoSuchProcess: - context_daemon.pidfile.break_lock() - - def sigtermhandle(signum, frame): - shared.event('term').set() - - def sigusrhandle(signum, frame): - shared.event('usr1').set() - - context_daemon.signal_map = { - signal.SIGTERM: sigtermhandle, - signal.SIGUSR1: sigusrhandle, - } - - uid, gid = get_userids(shared.general['runasuser']) - context_daemon.uid = uid - context_daemon.gid = gid - os.chown(shared.log.fileloghandle.name, uid, gid) - sock = setup_statssocket(shared.general['statsocket'], uid, gid) - context_daemon.files_preserve = [shared.log.fileloghandle, sock.fileno()] - - if not restart: - shared.log.info('Started') - - context_daemon.open() - with context_daemon: - init_dirq_consume(shared.workers, daemonized=True, sockstat=sock) - - -def daemon_stop(context_daemon, restart=False): - def on_terminate(proc): - if not restart: - shared.log.info('Stopping (%s)' % proc.pid) - - if context_daemon.pidfile.is_locked(): - pid = context_daemon.pidfile.read_pid() - - try: - process = psutil.Process(pid=pid) - except psutil.NoSuchProcess: - context_daemon.pidfile.break_lock() - if not restart: - shared.log.info('Not running - cleaning stale pidfile') - else: - process.terminate() - pgone, palive = psutil.wait_procs([process], callback=on_terminate) - - for p in palive: - p.kill() - - elif not restart: - shared.log.info('Not running') - - return 0 - - -def daemon_status(context_daemon): - if context_daemon.pidfile.is_locked() and not \ - context_daemon.pidfile.i_am_locking(): - pid = context_daemon.pidfile.read_pid() - - try: - p = psutil.Process(pid=pid) - p.send_signal(signal.SIGUSR1) - except psutil.NoSuchProcess: - shared.log.info('Not running - stale pidfile') - return 1 - else: - shared.log.info('Running (%s)' % pid) - return 0 - else: - shared.log.info('Not running') - return 3 - - def pidfiledir(pidfile): try: dirp = os.path.dirname(pidfile) @@ -210,33 +115,6 @@ def pidfiledir(pidfile): raise SystemExit(1) -def daemonizer(args): - """ - Create DaemonContext for setting the behaviour and the options for - process once it becomes the daemon. - - """ - pidfiledir(pidfile) - - context_daemon = daemon.DaemonContext() - context_daemon.pidfile = pidlockfile.PIDLockFile(pidfile, threaded=False) - - if args.daemon == 'start': - daemon_start(context_daemon) - - elif args.daemon == 'stop': - ret = daemon_stop(context_daemon) - raise SystemExit(ret) - - elif args.daemon == 'restart': - daemon_stop(context_daemon, restart=True) - daemon_start(context_daemon, restart=True) - - elif args.daemon == 'status': - ret = daemon_status(context_daemon) - raise SystemExit(ret) - - def main(): """ Function fetch arguments from command line, initialize logger, @@ -275,11 +153,6 @@ def main(): except KeyboardInterrupt: raise SystemExit(1) - elif args.daemon: - confopts = parse_config(shared.log) - shared = Shared(confopts=confopts) - daemonizer(args) - elif args.query: confopts = parse_config(shared.log) shared = Shared(confopts=confopts) From 5ed07c94fdd1504201ced77340c44c7bd2c37c55 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 21:43:18 +0100 Subject: [PATCH 09/38] cast division to int --- pymod/consume.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymod/consume.py b/pymod/consume.py index 1eaade6..634a377 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -38,7 +38,7 @@ def cleanup(self): def setup(self): self.dirq = DQS(path=self.shared.queue['directory']) self.pubnumloop = 1 if self.shared.topic['bulk'] > self.shared.queue['rate'] \ - else self.shared.queue['rate'] / self.shared.topic['bulk'] + else int(self.shared.queue['rate'] / self.shared.topic['bulk']) self.shared.runtime.update(inmemq=self.inmemq, pubnumloop=self.pubnumloop, dirq=self.dirq, filepublisher=False) From f0d71d96099814d6fe94c6adfe991ab91ab5f664 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Sat, 9 Jan 2021 21:44:07 +0100 Subject: [PATCH 10/38] explicit byte char and list from map --- pymod/publish.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pymod/publish.py b/pymod/publish.py index 9baac9e..325f88b 100644 --- a/pymod/publish.py +++ b/pymod/publish.py @@ -114,9 +114,9 @@ def _avro_serialize(msg): def _extract_body(self, body, fields, maps=None): msg = dict() - bodylines = body.split('\n') + bodylines = body.split(b'\n') for line in bodylines: - split = line.split(': ', 1) + split = line.split(b': ', 1) if len(split) > 1: key = split[0] value = split[1] @@ -208,7 +208,7 @@ def _write(self, msgs): def write(self): msgs = [self.construct_msg(self.inmemq[e][1]) for e in range(self.shared.topic['bulk'])] - msgs = map(lambda m: AmsMessage(attributes={'partition_date': m[0], + msgs = list(map(lambda m: AmsMessage(attributes={'partition_date': m[0], 'type': self.shared.topic['msgtype']}, - data=m[1]), msgs) + data=m[1]), msgs)) return self._write(msgs) From 24e4d6a023b8d4a1f50bd5c62c7bb387e28c97fb Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 11 Jan 2021 09:21:43 +0100 Subject: [PATCH 11/38] add TERM and USR1 signal handlers for foreground --- bin/ams-publisherd | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 00f7a3e..dc330d2 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -123,6 +123,14 @@ def main(): There is also option for no-daemonizing mainly for debugging purposes. """ + def sigterm_handler(signum, frame): + ev = shared.event('term') + ev.set() + + def sigusr1_handler(signum, frame): + ev = shared.event('usr1') + ev.set() + lobj = Logger(sys.argv[0], logfile) logger = lobj.get() @@ -143,6 +151,9 @@ def main(): nargs='?', type=int, metavar='number of minutes', const=180) args = parser.parse_args() + signal.signal(signal.SIGTERM, sigterm_handler) + signal.signal(signal.SIGUSR1, sigusr1_handler) + if args.nofork: try: confopts = parse_config() From b469be31d21c8ebd0ec298e2d58dbc502d358649 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 11 Jan 2021 11:35:13 +0100 Subject: [PATCH 12/38] explicit cast to int division --- pymod/publish.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymod/publish.py b/pymod/publish.py index 325f88b..9cbef14 100644 --- a/pymod/publish.py +++ b/pymod/publish.py @@ -171,7 +171,7 @@ def _write(self, msgs): raise e else: s = self.shared.topic['sleepretry'] - n = s / self.shared.runtime['evsleep'] + n = int(s / self.shared.runtime['evsleep']) i = 0 while i < n: if self.events['term-' + self.name].is_set(): From c4cf7b0b050432f0541f6f58517c9c157e6b1eff Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 11 Jan 2021 16:22:30 +0100 Subject: [PATCH 13/38] remove daemonize cmd argument --- bin/ams-publisherd | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index dc330d2..383eb87 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -142,19 +142,19 @@ def main(): shared.add_event('usr1', multiprocessing.Event()) parser = argparse.ArgumentParser(prog='ams-publisherd') - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('-n', dest='nofork', action='store_true', - help='do not fork into background') - group.add_argument('-d', dest='daemon', type=str, - help='daemon arguments: start, stop, restart, status', metavar='') - group.add_argument('-q', dest='query', required=False, help='query for statistics for last n minutes', + parser.add_argument('-q', dest='query', required=False, help='query for statistics for last n minutes', nargs='?', type=int, metavar='number of minutes', const=180) args = parser.parse_args() signal.signal(signal.SIGTERM, sigterm_handler) signal.signal(signal.SIGUSR1, sigusr1_handler) - if args.nofork: + if args.query: + confopts = parse_config(shared.log) + shared = Shared(confopts=confopts) + shared.log.info('Asked for statistics for last %s minutes' % int(args.query)) + query_stats(args.query) + else: try: confopts = parse_config() shared = Shared(confopts=confopts) @@ -164,11 +164,6 @@ def main(): except KeyboardInterrupt: raise SystemExit(1) - elif args.query: - confopts = parse_config(shared.log) - shared = Shared(confopts=confopts) - shared.log.info('Asked for statistics for last %s minutes' % int(args.query)) - query_stats(args.query) main() From af42d5b658c3e4c1d72781c6418f159daff7734b Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 11 Jan 2021 16:22:50 +0100 Subject: [PATCH 14/38] refined systemd unit --- init/ams-publisher.service | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/init/ams-publisher.service b/init/ams-publisher.service index a628f1d..6be8182 100644 --- a/init/ams-publisher.service +++ b/init/ams-publisher.service @@ -4,8 +4,9 @@ After=network.target [Service] Type=simple -ExecStart=/usr/bin/ams-publisherd -d start -ExecStop=/usr/bin/ams-publisherd -d stop +ExecStart=/usr/bin/ams-publisherd +KillMode=process +StandardError=syslog [Install] WantedBy=multi-user.target From 756ee5cc274541fd99676ebd7ff785c93fdd3268 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jan 2021 01:37:39 +0100 Subject: [PATCH 15/38] handle bulk=1 specially to dispatch result early --- pymod/consume.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pymod/consume.py b/pymod/consume.py index 634a377..85b14f1 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -37,8 +37,16 @@ def cleanup(self): def setup(self): self.dirq = DQS(path=self.shared.queue['directory']) - self.pubnumloop = 1 if self.shared.topic['bulk'] > self.shared.queue['rate'] \ - else int(self.shared.queue['rate'] / self.shared.topic['bulk']) + + numloop = None + if (self.shared.topic['bulk'] == 1 + or self.shared.topic['bulk'] > self.shared.queue['rate'] + or self.shared.topic['bulk'] == self.shared.queue['rate']): + numloop = 1 + elif self.shared.queue['rate'] > self.shared.topic['bulk']: + numloop = int(self.shared.queue['rate'] / self.shared.topic['bulk']) + self.pubnumloop = numloop + self.shared.runtime.update(inmemq=self.inmemq, pubnumloop=self.pubnumloop, dirq=self.dirq, filepublisher=False) @@ -76,8 +84,9 @@ def run(self): self.publisher.stat_reset() periodev.clear() - if self.consume_dirq_msgs(max(self.shared.topic['bulk'], - self.shared.queue['rate'])): + nmsgs_consume = 1 if self.shared.topic['bulk'] == 1 \ + else max(self.shared.topic['bulk'], self.shared.queue['rate']) + if self.consume_dirq_msgs(nmsgs_consume): ret, published = self.publisher.write() if ret: self.remove_dirq_msgs() From 82f8407d4a17a9524a86d77a67725a366de9af18 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jan 2021 10:47:33 +0100 Subject: [PATCH 16/38] remove spec Centos6 steps --- argo-nagios-ams-publisher.spec | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index ec170b6..328a8c3 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -2,12 +2,7 @@ %define underscore() %(echo %1 | sed 's/-/_/g') %define stripc() %(echo %1 | sed 's/el7.centos/el7/') - -%if 0%{?el7:1} %define mydist %{stripc %{dist}} -%else -%define mydist %{dist} -%endif Name: argo-nagios-ams-publisher Version: 0.3.9 @@ -28,6 +23,12 @@ Requires: python3-dirq Requires: python3-messaging Requires: python36-pytz +Requires(post): systemd +Requires(preun): systemd +Requires(postun): systemd +Requires(post): systemd-sysv + + %description Bridge from Nagios to the ARGO Messaging system @@ -58,29 +59,13 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/ %dir %{_localstatedir}/spool/%{name}/ %post -%if 0%{?el7:1} %systemd_postun_with_restart ams-publisher.service -%else -/sbin/chkconfig --add ams-publisher -if [[ "$1" == 2 ]] -then - /sbin/service ams-publisher condrestart > /dev/null 2>&1 -fi -%endif %clean rm -rf $RPM_BUILD_ROOT %preun -%if 0%{?el7:1} %systemd_preun ams-publisher.service -%else -if [ "$1" = 0 ]; then - /sbin/service ams-publisher stop > /dev/null 2>&1 - /sbin/chkconfig --del ams-publisher -fi -exit 0 -%endif %postun if [ "$1" = 0 ]; then From cf2299ff2e668ccc8034d8d2610b9011dd46f1a7 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jan 2021 17:15:24 +0100 Subject: [PATCH 17/38] fix main loop rate and skip empty body handle --- pymod/consume.py | 5 ++--- pymod/publish.py | 9 +++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pymod/consume.py b/pymod/consume.py index 85b14f1..a880e35 100644 --- a/pymod/consume.py +++ b/pymod/consume.py @@ -40,8 +40,7 @@ def setup(self): numloop = None if (self.shared.topic['bulk'] == 1 - or self.shared.topic['bulk'] > self.shared.queue['rate'] - or self.shared.topic['bulk'] == self.shared.queue['rate']): + or self.shared.topic['bulk'] >= self.shared.queue['rate']): numloop = 1 elif self.shared.queue['rate'] > self.shared.topic['bulk']: numloop = int(self.shared.queue['rate'] / self.shared.topic['bulk']) @@ -106,7 +105,7 @@ def run(self): evgup.set() raise SystemExit(0) - time.sleep(decimal.Decimal(1) / decimal.Decimal(self.shared.queue['rate'])) + time.sleep(1 / self.shared.queue['rate']) except KeyboardInterrupt: self.cleanup() diff --git a/pymod/publish.py b/pymod/publish.py index 9cbef14..1199a03 100644 --- a/pymod/publish.py +++ b/pymod/publish.py @@ -99,8 +99,9 @@ def _avro_serialize(msg): plainmsg = dict() plainmsg.update(msg.header) - plainmsg.update(self.body2dict(msg.body)) - plainmsg.update(tags=self.tag2dict(msg.body)) + if msg.body: + plainmsg.update(self.body2dict(msg.body)) + plainmsg.update(tags=self.tag2dict(msg.body)) timestamp = plainmsg.get('timestamp', None) m = None @@ -114,9 +115,9 @@ def _avro_serialize(msg): def _extract_body(self, body, fields, maps=None): msg = dict() - bodylines = body.split(b'\n') + bodylines = body.split('\n') for line in bodylines: - split = line.split(b': ', 1) + split = line.split(': ', 1) if len(split) > 1: key = split[0] value = split[1] From 92fc5cd3f7e6a6b807dbfdd5ee905476bb3a170c Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jan 2021 17:39:12 +0100 Subject: [PATCH 18/38] correct type of passing stats in inspection socket --- bin/ams-publisherd | 20 ++++++++++---------- pymod/stats.py | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 383eb87..63fe831 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -25,10 +25,10 @@ logger = None def query_stats(last_minutes): def parse_result(query): try: - w, r = query.split('+') + w, r = query.split(b'+') - w = w.split(':')[1] - r = int(r.split(':')[1]) + w = w.split(b':')[1] + r = int(r.split(b':')[1]) except (ValueError, KeyError): return (w, 'error') @@ -50,26 +50,26 @@ def query_stats(last_minutes): sock.settimeout(15) sock.connect(shared.general['statsocket']) - sock.send(query_published, maxcmdlength) + sock.send(query_published.encode(), maxcmdlength) data = sock.recv(maxcmdlength) for answer in data.split(): - if answer.startswith('t:'): + if answer.startswith(b't:'): continue w, r = parse_result(answer) - shared.log.info('worker:{0} published:{1}'.format(w, r)) + shared.log.info('worker:{0} published:{1}'.format(w.decode(), r)) sock.close() sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.setblocking(0) sock.settimeout(15) sock.connect(shared.general['statsocket']) - sock.send(query_consumed, maxcmdlength) + sock.send(query_consumed.encode(), maxcmdlength) data = sock.recv(maxcmdlength) - for answer in data.split(' '): - if answer.startswith('t:'): + for answer in data.split(b' '): + if answer.startswith(b't:'): continue w, r = parse_result(answer) - shared.log.info('worker:{0} consumed:{1}'.format(w, r)) + shared.log.info('worker:{0} consumed:{1}'.format(w.decode(), r)) sock.close() except socket.timeout as e: diff --git a/pymod/stats.py b/pymod/stats.py index 67d4319..aec9227 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -124,7 +124,7 @@ def _cleanup(self): raise SystemExit(0) def parse_cmd(self, cmd): - m = re.findall(r'w:\w+\+g:\w+', cmd) + m = re.findall(r'w:\w+\+g:\w+', cmd.decode()) queries = list() if m: @@ -186,7 +186,7 @@ def run(self): q = self.parse_cmd(data) if q: a = self.answer(q) - conn.send(a, maxcmdlength) + conn.send(a.encode(), maxcmdlength) if self.events['term-stats'].is_set(): self.shared.log.info('Stats received SIGTERM') self.events['term-stats'].clear() From d35ceebdd2ba95c9f390a0c60c9d8dcb0ef515e8 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jan 2021 17:40:39 +0100 Subject: [PATCH 19/38] remove not needed pidfiledir() --- bin/ams-publisherd | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 63fe831..88610fa 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -102,19 +102,6 @@ def get_userids(user): return pwd.getpwnam(user)[2], pwd.getpwnam(user)[3] -def pidfiledir(pidfile): - try: - dirp = os.path.dirname(pidfile) - if not os.path.exists(dirp): - os.makedirs(dirp) - uid, gid = get_userids(shared.general['runasuser']) - os.chown(dirp, uid, gid) - except (OSError, IOError) as e: - if e.args[0] != errno.EEXIST: - shared.log.error('%s %s' % (os.strerror(e.args[0]), e.args[1])) - raise SystemExit(1) - - def main(): """ Function fetch arguments from command line, initialize logger, From b8da35da076ced983734ef02b540af98501f489a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Tue, 12 Jan 2021 17:45:47 +0100 Subject: [PATCH 20/38] place query sock function in appropriate module file --- bin/ams-publisherd | 78 +--------------------------------------------- pymod/stats.py | 78 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 77 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 88610fa..cce754a 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -3,6 +3,7 @@ from argo_nagios_ams_publisher.config import parse_config from argo_nagios_ams_publisher.log import Logger from argo_nagios_ams_publisher.run import init_dirq_consume +from argo_nagios_ams_publisher.stats import query_stats, setup_statssocket from argo_nagios_ams_publisher.shared import Shared from datetime import datetime @@ -13,7 +14,6 @@ import multiprocessing import os import pwd import signal -import socket import sys logfile = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' @@ -22,82 +22,6 @@ shared = None logger = None -def query_stats(last_minutes): - def parse_result(query): - try: - w, r = query.split(b'+') - - w = w.split(b':')[1] - r = int(r.split(b':')[1]) - - except (ValueError, KeyError): - return (w, 'error') - - return (w, r) - - maxcmdlength = 128 - query_consumed, query_published = '', '' - - for w in shared.workers: - query_consumed += 'w:{0}+g:consumed{1} '.format(w, last_minutes) - - for w in shared.workers: - query_published += 'w:{0}+g:published{1} '.format(w, last_minutes) - - try: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.setblocking(0) - sock.settimeout(15) - - sock.connect(shared.general['statsocket']) - sock.send(query_published.encode(), maxcmdlength) - data = sock.recv(maxcmdlength) - for answer in data.split(): - if answer.startswith(b't:'): - continue - w, r = parse_result(answer) - shared.log.info('worker:{0} published:{1}'.format(w.decode(), r)) - sock.close() - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.setblocking(0) - sock.settimeout(15) - sock.connect(shared.general['statsocket']) - sock.send(query_consumed.encode(), maxcmdlength) - data = sock.recv(maxcmdlength) - for answer in data.split(b' '): - if answer.startswith(b't:'): - continue - w, r = parse_result(answer) - shared.log.info('worker:{0} consumed:{1}'.format(w.decode(), r)) - sock.close() - - except socket.timeout as e: - shared.log.error('Socket response timeout after 15s') - - except socket.error as e: - shared.log.error('Socket error: {0}'.format(str(e))) - - finally: - sock.close() - - -def setup_statssocket(path, uid, gid): - global shared - - if os.path.exists(path): - os.unlink(path) - try: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(path) - os.chown(path, uid, gid) - except socket.error as e: - shared.log.error('Error setting up socket: %s - %s' % (path, str(e))) - raise SystemExit(1) - - return sock - - def get_userids(user): return pwd.getpwnam(user)[2], pwd.getpwnam(user)[3] diff --git a/pymod/stats.py b/pymod/stats.py index aec9227..48ede4a 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -14,6 +14,84 @@ maxcmdlength = 128 +def query_stats(last_minutes): + def parse_result(query): + try: + w, r = query.split(b'+') + + w = w.split(b':')[1] + r = int(r.split(b':')[1]) + + except (ValueError, KeyError): + return (w, 'error') + + return (w, r) + + shared = Shared() + + maxcmdlength = 128 + query_consumed, query_published = '', '' + + for w in shared.workers: + query_consumed += 'w:{0}+g:consumed{1} '.format(w, last_minutes) + + for w in shared.workers: + query_published += 'w:{0}+g:published{1} '.format(w, last_minutes) + + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.setblocking(0) + sock.settimeout(15) + + sock.connect(shared.general['statsocket']) + sock.send(query_published.encode(), maxcmdlength) + data = sock.recv(maxcmdlength) + for answer in data.split(): + if answer.startswith(b't:'): + continue + w, r = parse_result(answer) + shared.log.info('worker:{0} published:{1}'.format(w.decode(), r)) + sock.close() + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.setblocking(0) + sock.settimeout(15) + sock.connect(shared.general['statsocket']) + sock.send(query_consumed.encode(), maxcmdlength) + data = sock.recv(maxcmdlength) + for answer in data.split(b' '): + if answer.startswith(b't:'): + continue + w, r = parse_result(answer) + shared.log.info('worker:{0} consumed:{1}'.format(w.decode(), r)) + sock.close() + + except socket.timeout as e: + shared.log.error('Socket response timeout after 15s') + + except socket.error as e: + shared.log.error('Socket error: {0}'.format(str(e))) + + finally: + sock.close() + + +def setup_statssocket(path, uid, gid): + global shared + + if os.path.exists(path): + os.unlink(path) + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(path) + os.chown(path, uid, gid) + except socket.error as e: + shared.log.error('Error setting up socket: %s - %s' % (path, str(e))) + raise SystemExit(1) + + return sock + + class StatSig(object): """ Class is meant to be subclassed by ConsumerQueue and Publish classes for From 9ab09201d54e50c8c2d3a7bad2114c349ea8328b Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 13 Jan 2021 09:53:25 +0100 Subject: [PATCH 21/38] remove helpers and place them into ams-test repo --- helpers/ams-msg-generator.py | 104 ----------------------------------- helpers/ams-queue-consume.py | 78 -------------------------- setup.py | 3 +- 3 files changed, 1 insertion(+), 184 deletions(-) delete mode 100755 helpers/ams-msg-generator.py delete mode 100755 helpers/ams-queue-consume.py diff --git a/helpers/ams-msg-generator.py b/helpers/ams-msg-generator.py deleted file mode 100755 index 78c4a6d..0000000 --- a/helpers/ams-msg-generator.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import datetime -import messaging.generator as generator -import os -import pwd -import random -import sys -import time - -from messaging.message import Message -from messaging.error import MessageError -from messaging.queue.dqs import DQS - -from pytz import timezone, UnknownTimeZoneError - -default_queue = '/var/spool/argo-nagios-ams-publisher/outgoing-messages/' -default_user = 'nagios' - -def construct_msg(session, bodysize, timezone): - statusl = ['OK', 'WARNING', 'MISSING', 'CRITICAL', 'UNKNOWN', 'DOWNTIME'] - - try: - msg = Message() - msg.header = dict() - msg.body = str() - - if session: - msg.header.update({'*** SESSION ***': '*** {0} ***'.format(session)}) - msg.header.update({'service': generator.rndb64(10)}) - msg.header.update({'hostname': generator.rndb64(10)}) - msg.header.update({'metric': generator.rndb64(10)}) - msg.header.update({'monitoring_host': generator.rndb64(10)}) - msg.header.update({'timestamp': str(datetime.datetime.now(timezone).strftime('%Y-%m-%dT%H:%M:%SZ'))}) - msg.header.update({'status': random.choice(statusl)}) - - msg.body += 'summary: %s\n' % generator.rndb64(20) - msg.body += 'message: %s\n' % generator.rndb64(bodysize) - msg.body += 'vofqan: %s\n' % generator.rndb64(10) - msg.body += 'actual_data: %s\n' % generator.rndb64(10) - msg.body += 'voname: %s\n' % generator.rndb64(3) - msg.body += 'roc: %s\n' % generator.rndb64(3) - - except MessageError as e: - sys.stderr.write('Error constructing message - %s\n', repr(e)) - - else: - return msg - -def queue_msg(msg, queue): - try: - queue.add_message(msg) - - except Exception as e: - sys.stderr.write(str(e) + '\n') - raise SystemExit(1) - -def seteuser(user): - os.setegid(user.pw_gid) - os.seteuid(user.pw_uid) - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--session', required=False, default=str(), type=str) - parser.add_argument('--num', required=False, default=0, type=int) - parser.add_argument('--queue', required=False, default=default_queue, type=str) - parser.add_argument('--granularity', required=False, default=60, type=int) - parser.add_argument('--runas', required=False, default=default_user, type=str) - parser.add_argument('--noout', required=False, action='store_true', default=False) - parser.add_argument('--sleep', required=False, default=0, type=float) - parser.add_argument('--bodysize', required=False, default=40, type=int) - parser.add_argument('--timezone', required=False, default='UTC', type=str) - args = parser.parse_args() - - seteuser(pwd.getpwnam(args.runas)) - try: - tz = timezone(args.timezone) - except UnknownTimeZoneError as e: - print("Timezone not correct") - raise SystemExit(1) - - mq = DQS(path=args.queue, granularity=args.granularity) - - try: - if args.num: - for i in range(args.num): - msg = construct_msg(args.session, args.bodysize, tz) - queue_msg(msg, mq) - if not args.noout: - print(msg) - else: - while True: - msg = construct_msg(args.session, args.bodysize, tz) - queue_msg(msg, mq) - if not args.noout: - print(msg) - if args.sleep: - time.sleep(args.sleep) - - except KeyboardInterrupt as e: - raise SystemExit(0) - -main() diff --git a/helpers/ams-queue-consume.py b/helpers/ams-queue-consume.py deleted file mode 100755 index e93114e..0000000 --- a/helpers/ams-queue-consume.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import datetime -import os -import pprint -import pwd -import random -import sys -import time - -from messaging.message import Message -from messaging.error import MessageError -from messaging.queue.dqs import DQS - -from collections import deque - -default_queue = '/var/spool/argo-nagios-ams-publisher/outgoing-messages/' -default_user = 'nagios' -args = None -cqcalld = 1 - -def seteuser(user): - os.setegid(user.pw_gid) - os.seteuid(user.pw_uid) - -def consume_queue(mq, num=0): - global cqcalld, args - if not args.noout: - print('---- MSGS ---- RUN {0} ----'.format(cqcalld)) - - i, msgs = 1, deque() - for name in mq: - if mq.lock(name): - msgs.append(mq.get_message(name)) - mq.remove(name) - if num and i == num: - break - i += 1 - else: - print('{0} empty'.format(mq.path)) - - - if msgs and not args.noout: - pprint.pprint(msgs) - - cqcalld += 1 - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--sleep', required=False, default=0, type=float) - parser.add_argument('--queue', required=False, default=default_queue, type=str) - parser.add_argument('--runas', required=False, default=default_user, type=str) - parser.add_argument('--purge', required=False, action='store_true', default=False) - parser.add_argument('--noout', required=False, action='store_true', default=False) - parser.add_argument('--num', required=False, default=0, type=int) - global args - args = parser.parse_args() - - seteuser(pwd.getpwnam(args.runas)) - - msgs = [] - mq = DQS(path=args.queue) - try: - if args.purge: - mq.purge() - if args.sleep > 0: - while True: - consume_queue(mq, args.num) - time.sleep(args.sleep) - else: - consume_queue(mq, args.num) - - except KeyboardInterrupt as e: - raise SystemExit(0) - -main() - diff --git a/setup.py b/setup.py index 3e91dca..b3988b3 100644 --- a/setup.py +++ b/setup.py @@ -37,5 +37,4 @@ def get_ver(): ('/usr/lib/systemd/system/', ['init/ams-publisher.service']) if is_c7() else \ ('/etc/init.d/', ['init/ams-publisher'])], scripts=['bin/ams-alarm-to-queue', 'bin/ams-metric-to-queue', - 'bin/ams-publisherd', 'helpers/ams-msg-generator.py', - 'helpers/ams-queue-consume.py']) + 'bin/ams-publisherd']) From dffb2bb4f83b727e6e82f927b14230c8ef031277 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 13 Jan 2021 10:56:48 +0100 Subject: [PATCH 22/38] set effective uid and gid from systemd unit --- argo-nagios-ams-publisher.spec | 1 - init/ams-publisher.service | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index 328a8c3..fb213e4 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -26,7 +26,6 @@ Requires: python36-pytz Requires(post): systemd Requires(preun): systemd Requires(postun): systemd -Requires(post): systemd-sysv %description diff --git a/init/ams-publisher.service b/init/ams-publisher.service index 6be8182..c6b98d2 100644 --- a/init/ams-publisher.service +++ b/init/ams-publisher.service @@ -6,6 +6,8 @@ After=network.target Type=simple ExecStart=/usr/bin/ams-publisherd KillMode=process +User=nagios +Group=nagios StandardError=syslog [Install] From b58b1bdbab959e4c06799333afe3605e7c9e69c0 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 13 Jan 2021 11:01:23 +0100 Subject: [PATCH 23/38] syslog prefix set from systemd unit --- init/ams-publisher.service | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/init/ams-publisher.service b/init/ams-publisher.service index c6b98d2..96a3cb5 100644 --- a/init/ams-publisher.service +++ b/init/ams-publisher.service @@ -1,5 +1,5 @@ [Unit] -Description=AMS publisher +Description=Nagios AMS publisher After=network.target [Service] @@ -8,6 +8,7 @@ ExecStart=/usr/bin/ams-publisherd KillMode=process User=nagios Group=nagios +SyslogIdentifier=ams-publisher StandardError=syslog [Install] From 25c6c7d22575d75a3a48319453d96568507aefb0 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Wed, 13 Jan 2021 11:16:22 +0100 Subject: [PATCH 24/38] remove Centos6 building from Jenkinsfile --- Jenkinsfile | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index a8bae7d..1a3c021 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -13,27 +13,6 @@ pipeline { stages { stage ('Build'){ parallel { - stage ('Build Centos 6') { - agent { - docker { - image 'argo.registry:5000/epel-6-ams' - args '-u jenkins:jenkins' - } - } - steps { - echo 'Building Rpm...' - withCredentials(bindings: [sshUserPrivateKey(credentialsId: 'jenkins-rpm-repo', usernameVariable: 'REPOUSER', \ - keyFileVariable: 'REPOKEY')]) { - sh "/home/jenkins/build-rpm.sh -w ${WORKSPACE} -b ${BRANCH_NAME} -d centos6 -p ${PROJECT_DIR} -s ${REPOKEY}" - } - archiveArtifacts artifacts: '**/*.rpm', fingerprint: true - } - post{ - always { - cleanWs() - } - } - } stage ('Build Centos 7') { agent { docker { @@ -54,7 +33,7 @@ pipeline { cleanWs() } } - } + } } } } @@ -74,7 +53,7 @@ pipeline { if ( env.BRANCH_NAME == 'master' || env.BRANCH_NAME == 'devel' ) { slackSend( message: ":rain_cloud: Build Failed for <$BUILD_URL|$PROJECT_DIR>:$BRANCH_NAME Job: $JOB_NAME") } - } + } } } } From 9f42f31155290eb84d6be9b65f534e4571c7b5de Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:02:36 +0100 Subject: [PATCH 25/38] add missing singleton instantiation --- pymod/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymod/stats.py b/pymod/stats.py index 48ede4a..5177568 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -77,7 +77,7 @@ def parse_result(query): def setup_statssocket(path, uid, gid): - global shared + shared = Shared() if os.path.exists(path): os.unlink(path) From e146d6e0f5dc8bef77cb0779593a45a7a170dfc5 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:22:41 +0100 Subject: [PATCH 26/38] do not manually create runtime directory --- argo-nagios-ams-publisher.spec | 8 -------- 1 file changed, 8 deletions(-) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index fb213e4..e24a687 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -44,7 +44,6 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_sysconfdir}/%{name}/ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/log/%{name}/ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/spool/%{name}/metrics/ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/spool/%{name}/alarms/ -install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/ %files -f INSTALLED_FILES %defattr(-,root,root,-) @@ -54,7 +53,6 @@ install --directory --mode 755 $RPM_BUILD_ROOT/%{_localstatedir}/run/%{name}/ %{python3_sitelib}/%{underscore %{name}}/*.py %defattr(-,nagios,nagios,-) %dir %{_localstatedir}/log/%{name}/ -%attr(0755,nagios,nagios) %dir %{_localstatedir}/run/%{name}/ %dir %{_localstatedir}/spool/%{name}/ %post @@ -66,12 +64,6 @@ rm -rf $RPM_BUILD_ROOT %preun %systemd_preun ams-publisher.service -%postun -if [ "$1" = 0 ]; then - test -d %{_localstatedir}/run/%{name}/ && rm -rf %{_localstatedir}/run/%{name}/ -fi -exit 0 - %pre if ! /usr/bin/id nagios &>/dev/null; then /usr/sbin/useradd -r -m -d /var/log/nagios -s /bin/sh -c "nagios" nagios || \ From 4ecf72ce8167a086a57eac7b9f63057d50316b73 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:33:29 +0100 Subject: [PATCH 27/38] fixed socket path, not taken from config --- bin/ams-publisherd | 2 +- config/ams-publisher.conf | 7 +++---- pymod/config.py | 1 - pymod/stats.py | 19 ++++++++++--------- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index cce754a..8ec80c1 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -70,7 +70,7 @@ def main(): confopts = parse_config() shared = Shared(confopts=confopts) uid, gid = get_userids(shared.general['runasuser']) - sock = setup_statssocket(shared.general['statsocket'], uid, gid) + sock = setup_statssocket(uid, gid) init_dirq_consume(shared.workers, daemonized=False, sockstat=sock) except KeyboardInterrupt: raise SystemExit(1) diff --git a/config/ams-publisher.conf b/config/ams-publisher.conf index 5c22f23..777666d 100644 --- a/config/ams-publisher.conf +++ b/config/ams-publisher.conf @@ -1,12 +1,11 @@ [General] -Host = nagioshost -RunAsUser = nagios +Host = nagioshost +RunAsUser = nagios StatsEveryHour = 24 PublishMsgFile = False PublishMsgFileDir = /published PublishArgoMessaging = True TimeZone = UTC -StatSocket = /var/run/argo-nagios-ams-publisher/sock [Queue_Metrics] Directory = /var/spool/argo-nagios-ams-publisher/metrics/ @@ -36,7 +35,7 @@ Rate = 10 Purge = True PurgeEverySec = 300 MaxTemp = 300 -MaxLock = 0 +MaxLock = 0 Granularity = 60 [Topic_Alarms] diff --git a/pymod/config.py b/pymod/config.py index 50180b5..6e957ea 100644 --- a/pymod/config.py +++ b/pymod/config.py @@ -55,7 +55,6 @@ def parse_config(logger=None): confopts['general'].update({'publishmsgfile': eval(config.get(section, 'PublishMsgFile'))}) confopts['general'].update({'publishmsgfiledir': config.get(section, 'PublishMsgFileDir')}) confopts['general'].update({'publishargomessaging': eval(config.get(section, 'PublishArgoMessaging'))}) - confopts['general'].update({'statsocket': config.get(section, 'StatSocket')}) confopts['general'].update({'timezone': config.get(section, 'TimeZone')}) try: tz = timezone(confopts['general']['timezone']) diff --git a/pymod/stats.py b/pymod/stats.py index 5177568..ec38d88 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -11,7 +11,8 @@ from multiprocessing import Process from argo_nagios_ams_publisher.shared import Shared -maxcmdlength = 128 +MAXCMDLENGTH = 128 +STATSOCK = '/run/argo-nagios-ams-publisher/sock' def query_stats(last_minutes): @@ -76,17 +77,17 @@ def parse_result(query): sock.close() -def setup_statssocket(path, uid, gid): +def setup_statssocket(uid, gid): shared = Shared() - if os.path.exists(path): - os.unlink(path) + if os.path.exists(STATSOCK): + os.unlink(STATSOCK) try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(path) - os.chown(path, uid, gid) + sock.bind(STATSOCK) + os.chown(STATSOCK, uid, gid) except socket.error as e: - shared.log.error('Error setting up socket: %s - %s' % (path, str(e))) + shared.log.error('Error setting up socket: %s - %s' % (STATSOCK, str(e))) raise SystemExit(1) return sock @@ -260,11 +261,11 @@ def run(self): event = self.poller.poll(float(self.shared.runtime['evsleep'] * 1000)) if len(event) > 0 and event[0][1] & select.POLLIN: conn, addr = self.sock.accept() - data = conn.recv(maxcmdlength) + data = conn.recv(MAXCMDLENGTH) q = self.parse_cmd(data) if q: a = self.answer(q) - conn.send(a.encode(), maxcmdlength) + conn.send(a.encode(), MAXCMDLENGTH) if self.events['term-stats'].is_set(): self.shared.log.info('Stats received SIGTERM') self.events['term-stats'].clear() From 360ffc6635bd02645a58761acba30cf849eae002 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:33:57 +0100 Subject: [PATCH 28/38] delete no longer needed sysv init --- init/ams-publisher | 83 ---------------------------------------------- 1 file changed, 83 deletions(-) delete mode 100755 init/ams-publisher diff --git a/init/ams-publisher b/init/ams-publisher deleted file mode 100755 index 07e1c2e..0000000 --- a/init/ams-publisher +++ /dev/null @@ -1,83 +0,0 @@ -#!/bin/bash -# ams-publisher daemon -# chkconfig: 345 20 80 -# description: ams-publisher daemon -# processname: ams-publisher - -. /etc/rc.d/init.d/functions -PROG_NAME="ams-publisher" -DAEMON_PATH="/usr/bin/ams-publisherd" -LOCK_FILE=/var/lock/subsys/$PROG_NAME - -function start() -{ - echo $"Starting $PROG_NAME: " - msg=$(daemon $DAEMON_PATH -d start) - retval=$? - [ $retval -eq 0 ] && touch $LOCK_FILE - echo $msg - return $retval -} - -function stop() -{ - echo $"Stopping $PROG_NAME: " - msg=$(daemon $DAEMON_PATH -d stop) - retval=$? - [ $retval -eq 0 ] && rm -f $LOCK_FILE - echo $msg - return $retval -} - -function restart() -{ - rm -f $LOCK_FILE - echo $"Restarting $PROG_NAME: " - msg=$(daemon $DAEMON_PATH -d restart) - retval=$? - [ $retval -eq 0 ] && touch $LOCK_FILE - echo $msg - return $retval -} - -function status() -{ - echo $"Status $PROG_NAME: " - msg=$(daemon $DAEMON_PATH -d status) - retval=$? - echo $msg - return $retval -} - -case "$1" in -start) - start - retval=$? - exit $retval -;; -stop) - stop - retval=$? - exit $retval -;; -restart) - restart - retval=$? - exit $retval -;; -condrestart) - status - retval=$? - if (( $retval == 0 )) - then - restart - fi -;; -status) - status -;; -*) - echo "Usage: $0 {start|stop|restart|status|condrestart}" - exit 1 -esac - From 9e3a3da28dba59e4c399a334a9fd835f66743c05 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:34:23 +0100 Subject: [PATCH 29/38] let systemd create runtime directory --- init/ams-publisher.service | 1 + 1 file changed, 1 insertion(+) diff --git a/init/ams-publisher.service b/init/ams-publisher.service index 96a3cb5..18f7b26 100644 --- a/init/ams-publisher.service +++ b/init/ams-publisher.service @@ -10,6 +10,7 @@ User=nagios Group=nagios SyslogIdentifier=ams-publisher StandardError=syslog +RuntimeDirectory=argo-nagios-ams-publisher [Install] WantedBy=multi-user.target From e2b1d30a64761e139fbd18fe3cbcd796bc7cd558 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:39:16 +0100 Subject: [PATCH 30/38] change reference to socket on filesystem --- pymod/stats.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pymod/stats.py b/pymod/stats.py index ec38d88..633cec8 100644 --- a/pymod/stats.py +++ b/pymod/stats.py @@ -44,7 +44,7 @@ def parse_result(query): sock.setblocking(0) sock.settimeout(15) - sock.connect(shared.general['statsocket']) + sock.connect(STATSOCK) sock.send(query_published.encode(), maxcmdlength) data = sock.recv(maxcmdlength) for answer in data.split(): @@ -57,7 +57,7 @@ def parse_result(query): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.setblocking(0) sock.settimeout(15) - sock.connect(shared.general['statsocket']) + sock.connect(STATSOCK) sock.send(query_consumed.encode(), maxcmdlength) data = sock.recv(maxcmdlength) for answer in data.split(b' '): @@ -194,12 +194,12 @@ def __init__(self, events, sock): try: self.sock.listen(1) except socket.error as m: - self.shared.log.error('Cannot initialize Stats socket %s - %s' % (self.shared.general['statsocket'], repr(m))) + self.shared.log.error('Cannot initialize Stats socket %s - %s' % (STATSOCK, repr(m))) raise SystemExit(1) def _cleanup(self): self.sock.close() - os.unlink(self.shared.general['statsocket']) + os.unlink(STATSOCK) raise SystemExit(0) def parse_cmd(self, cmd): From abc3d2b2cd6d75e55f21580c71f87140a7b2791a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 17:43:40 +0100 Subject: [PATCH 31/38] linting fixes in bootstrap --- bin/ams-publisherd | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 8ec80c1..14897c0 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -1,25 +1,17 @@ #!/usr/bin/python3 - -from argo_nagios_ams_publisher.config import parse_config -from argo_nagios_ams_publisher.log import Logger -from argo_nagios_ams_publisher.run import init_dirq_consume -from argo_nagios_ams_publisher.stats import query_stats, setup_statssocket -from argo_nagios_ams_publisher.shared import Shared - -from datetime import datetime - import argparse -import errno import multiprocessing -import os import pwd import signal import sys -logfile = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' +from argo_nagios_ams_publisher.config import parse_config +from argo_nagios_ams_publisher.log import Logger +from argo_nagios_ams_publisher.run import init_dirq_consume +from argo_nagios_ams_publisher.stats import query_stats, setup_statssocket +from argo_nagios_ams_publisher.shared import Shared -shared = None -logger = None +LOGFILE = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' def get_userids(user): @@ -34,18 +26,17 @@ def main(): There is also option for no-daemonizing mainly for debugging purposes. """ - def sigterm_handler(signum, frame): - ev = shared.event('term') - ev.set() + def sigterm_handler(*args): + event = shared.event('term') + event.set() - def sigusr1_handler(signum, frame): - ev = shared.event('usr1') - ev.set() + def sigusr1_handler(*args): + event = shared.event('usr1') + event.set() - lobj = Logger(sys.argv[0], logfile) + lobj = Logger(sys.argv[0], LOGFILE) logger = lobj.get() - global shared shared = Shared() shared.add_log(logger) @@ -54,7 +45,7 @@ def main(): parser = argparse.ArgumentParser(prog='ams-publisherd') parser.add_argument('-q', dest='query', required=False, help='query for statistics for last n minutes', - nargs='?', type=int, metavar='number of minutes', const=180) + nargs='?', type=int, metavar='number of minutes', const=180) args = parser.parse_args() signal.signal(signal.SIGTERM, sigterm_handler) From c251765b38b020d4e2d1f7671891c88579dd9051 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 18:32:56 +0100 Subject: [PATCH 32/38] remove syslog logger initalization --- bin/ams-publisherd | 1 - init/ams-publisher.service | 1 - pymod/log.py | 28 +++++++++------------------- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 14897c0..92e8995 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -67,5 +67,4 @@ def main(): raise SystemExit(1) - main() diff --git a/init/ams-publisher.service b/init/ams-publisher.service index 18f7b26..8f82844 100644 --- a/init/ams-publisher.service +++ b/init/ams-publisher.service @@ -9,7 +9,6 @@ KillMode=process User=nagios Group=nagios SyslogIdentifier=ams-publisher -StandardError=syslog RuntimeDirectory=argo-nagios-ams-publisher [Install] diff --git a/pymod/log.py b/pymod/log.py index 2430a1e..22cccf1 100644 --- a/pymod/log.py +++ b/pymod/log.py @@ -3,12 +3,13 @@ import sys import os.path -logname = 'ams-publisher' -logfile = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' +LOGNAME = 'ams-publisher' +LOGFILE = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' + class Logger(object): """ - Logger objects with initialized File and Syslog logger. + Logger objects with initialized File logger. """ logger = None @@ -18,35 +19,24 @@ def _init_stdout(self): lv = logging.INFO logging.basicConfig(format=lfs, level=lv, stream=sys.stdout) - self.logger = logging.getLogger(logname) - - def _init_syslog(self): - lfs = '%(name)s[%(process)s]: %(levelname)s - %(message)s' - lf = logging.Formatter(lfs) - lv = logging.INFO - - sh = logging.handlers.SysLogHandler('/dev/log', logging.handlers.SysLogHandler.LOG_USER) - sh.setFormatter(lf) - sh.setLevel(lv) - self.logger.addHandler(sh) + self.logger = logging.getLogger(LOGNAME) - def _init_filelog(self, logfile): + def _init_filelog(self): lfs = '%(asctime)s %(name)s[%(process)s]: %(levelname)s - %(message)s' lf = logging.Formatter(fmt=lfs, datefmt='%Y-%m-%d %H:%M:%S') lv = logging.INFO - sf = logging.FileHandler(logfile) + sf = logging.FileHandler(LOGFILE) self.logger.fileloghandle = sf.stream sf.setFormatter(lf) sf.setLevel(lv) self.logger.addHandler(sf) - def __init__(self, caller, logfile): + def __init__(self, caller): self._caller = os.path.basename(caller) self._init_stdout() try: - self._init_filelog(logfile) - self._init_syslog() + self._init_filelog() except (OSError, IOError) as e: sys.stderr.write('WARNING ' + self._caller + ' Error initializing loggers - ' + str(e) + '\n') From 898e6ed84fc396b4f844a5d0932c6623017448ae Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Thu, 28 Jan 2021 19:15:49 +0100 Subject: [PATCH 33/38] linting fixes in bootstrap --- bin/ams-publisherd | 4 +--- pymod/run.py | 58 +++++++++++++++++++++++----------------------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/bin/ams-publisherd b/bin/ams-publisherd index 92e8995..5dfe322 100755 --- a/bin/ams-publisherd +++ b/bin/ams-publisherd @@ -11,8 +11,6 @@ from argo_nagios_ams_publisher.run import init_dirq_consume from argo_nagios_ams_publisher.stats import query_stats, setup_statssocket from argo_nagios_ams_publisher.shared import Shared -LOGFILE = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' - def get_userids(user): return pwd.getpwnam(user)[2], pwd.getpwnam(user)[3] @@ -34,7 +32,7 @@ def main(): event = shared.event('usr1') event.set() - lobj = Logger(sys.argv[0], LOGFILE) + lobj = Logger(sys.argv[0]) logger = lobj.get() shared = Shared() diff --git a/pymod/run.py b/pymod/run.py index 867c632..5a15921 100644 --- a/pymod/run.py +++ b/pymod/run.py @@ -27,8 +27,8 @@ def init_dirq_consume(workers, daemonized, sockstat): localevents = dict() manager = Manager() - for w in workers: - shared = Shared(worker=w) + for worker in workers: + shared = Shared(worker=worker) # Create dictionaries that hold number of (published, consumed) messages # in seconds from epoch. Second from epoch is a key and number of @@ -39,15 +39,15 @@ def init_dirq_consume(workers, daemonized, sockstat): # Counter is read on queries from socket. # collections.Counter cannot be shared between processes so # manager.dict() is used. - shared.statint[w]['consumed'] = manager.dict() - shared.statint[w]['published'] = manager.dict() + shared.statint[worker]['consumed'] = manager.dict() + shared.statint[worker]['published'] = manager.dict() shared.reload_confopts = manager.dict() # Create integer counters that will be shared across spawned processes # and that will keep track of number of published and consumed messages. # Counter is read on perodic status reports and signal SIGUSR1. - shared.statint[w]['consumed_periodic'] = Value('i', 1) - shared.statint[w]['published_periodic'] = Value('i', 1) + shared.statint[worker]['consumed_periodic'] = Value('i', 1) + shared.statint[worker]['published_periodic'] = Value('i', 1) if not getattr(shared, 'runtime', False): shared.runtime = dict() @@ -68,21 +68,21 @@ def init_dirq_consume(workers, daemonized, sockstat): shared.runtime.update(publisher=MessagingPublisher) - localevents.update({'lck-' + w: Lock()}) - localevents.update({'usr1-' + w: Event()}) - localevents.update({'period-' + w: Event()}) - localevents.update({'term-' + w: Event()}) - localevents.update({'termth-' + w: ThreadEvent()}) - localevents.update({'giveup-' + w: Event()}) + localevents.update({'lck-' + worker: Lock()}) + localevents.update({'usr1-' + worker: Event()}) + localevents.update({'period-' + worker: Event()}) + localevents.update({'term-' + worker: Event()}) + localevents.update({'termth-' + worker: ThreadEvent()}) + localevents.update({'giveup-' + worker: Event()}) shared.runtime.update(evsleep=evsleep) shared.runtime.update(daemonized=daemonized) - consumers.append(ConsumerQueue(events=localevents, worker=w)) + consumers.append(ConsumerQueue(events=localevents, worker=worker)) if not daemonized: consumers[-1].daemon = False consumers[-1].start() - if w: + if worker: localevents.update({'lck-stats': Lock()}) localevents.update({'usr1-stats': Event()}) localevents.update({'term-stats': Event()}) @@ -96,21 +96,21 @@ def init_dirq_consume(workers, daemonized, sockstat): while True: if int(time.time()) - prevstattime >= shared.general['statseveryhour'] * 3600: shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour']) - for c in consumers: - localevents['period-' + c.name].set() + for consumer in consumers: + localevents['period-' + consumer.name].set() prevstattime = int(time.time()) - for c in consumers: - if localevents['giveup-' + c.name].is_set(): - c.terminate() - c.join(1) - localevents['giveup-' + c.name].clear() + for consumer in consumers: + if localevents['giveup-' + consumer.name].is_set(): + consumer.terminate() + consumer.join(1) + localevents['giveup-' + consumer.name].clear() if shared.event('term').is_set(): - for c in consumers: - localevents['term-' + c.name].set() - localevents['termth-' + c.name].set() - c.join(1) + for consumer in consumers: + localevents['term-' + consumer.name].set() + localevents['termth-' + consumer.name].set() + consumer.join(1) localevents['term-stats'].set() localevents['termth-stats'].set() statsp.join(1) @@ -118,15 +118,15 @@ def init_dirq_consume(workers, daemonized, sockstat): if shared.event('usr1').is_set(): shared.log.info('Started %s' % shared.runtime['started']) - for c in consumers: - localevents['usr1-' + c.name].set() + for consumer in consumers: + localevents['usr1-' + consumer.name].set() localevents['usr1-stats'].set() shared.event('usr1').clear() try: time.sleep(evsleep) except KeyboardInterrupt: - for c in consumers: - c.join(1) + for consumer in consumers: + consumer.join(1) statsp.join(1) raise SystemExit(0) From 7fbcbf41cfe0904e21eea12683f1547ed142426d Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 29 Jan 2021 00:13:39 +0100 Subject: [PATCH 34/38] added log msgs on starting and stoping of workers --- pymod/run.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pymod/run.py b/pymod/run.py index 5a15921..0f9be2e 100644 --- a/pymod/run.py +++ b/pymod/run.py @@ -93,6 +93,9 @@ def init_dirq_consume(workers, daemonized, sockstat): statsp.start() prevstattime = int(time.time()) + shared = Shared() + workers_name = ', '.join(consumer.name for consumer in consumers) + shared.log.info('Started {} workers'.format(workers_name)) while True: if int(time.time()) - prevstattime >= shared.general['statseveryhour'] * 3600: shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour']) @@ -111,6 +114,7 @@ def init_dirq_consume(workers, daemonized, sockstat): localevents['term-' + consumer.name].set() localevents['termth-' + consumer.name].set() consumer.join(1) + shared.log.info('Stopped {} workers'.format(workers_name)) localevents['term-stats'].set() localevents['termth-stats'].set() statsp.join(1) From 0323cd95e90f703433cdeb2884be45b1f6ada353 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 29 Jan 2021 07:43:46 +0100 Subject: [PATCH 35/38] msg just right after starting --- pymod/run.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pymod/run.py b/pymod/run.py index 0f9be2e..d1daa88 100644 --- a/pymod/run.py +++ b/pymod/run.py @@ -82,6 +82,10 @@ def init_dirq_consume(workers, daemonized, sockstat): consumers[-1].daemon = False consumers[-1].start() + shared = Shared() + workers_name = ', '.join(consumer.name for consumer in consumers) + shared.log.info('Started {} workers'.format(workers_name)) + if worker: localevents.update({'lck-stats': Lock()}) localevents.update({'usr1-stats': Event()}) @@ -93,9 +97,6 @@ def init_dirq_consume(workers, daemonized, sockstat): statsp.start() prevstattime = int(time.time()) - shared = Shared() - workers_name = ', '.join(consumer.name for consumer in consumers) - shared.log.info('Started {} workers'.format(workers_name)) while True: if int(time.time()) - prevstattime >= shared.general['statseveryhour'] * 3600: shared.log.info('Periodic report (every %sh)' % shared.general['statseveryhour']) From ef2385e021ed7f9369528f455ea002de8a2b1595 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Fri, 29 Jan 2021 07:46:48 +0100 Subject: [PATCH 36/38] fixed logger init for delivery cache tools --- pymod/alarmtoqueue.py | 6 +++--- pymod/metrictoqueue.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pymod/alarmtoqueue.py b/pymod/alarmtoqueue.py index 5eadc9b..27b6b3c 100644 --- a/pymod/alarmtoqueue.py +++ b/pymod/alarmtoqueue.py @@ -14,8 +14,8 @@ import pytz import sys -conf = '/etc/argo-nagios-ams-publisher/ams-publisher.conf' -logfile = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' + +CONF = '/etc/argo-nagios-ams-publisher/ams-publisher.conf' def seteuser(user): @@ -46,7 +46,7 @@ def build_msg(args, *headers): def main(): parser = argparse.ArgumentParser() - lobj = log.Logger(sys.argv[0], logfile) + lobj = log.Logger(sys.argv[0]) logger = lobj.get() confopts = config.parse_config(logger) nagioshost = confopts['general']['host'] diff --git a/pymod/metrictoqueue.py b/pymod/metrictoqueue.py index 6b66a85..950eb2e 100644 --- a/pymod/metrictoqueue.py +++ b/pymod/metrictoqueue.py @@ -14,8 +14,7 @@ import datetime import pytz -conf = '/etc/argo-nagios-ams-publisher/ams-publisher.conf' -logfile = '/var/log/argo-nagios-ams-publisher/ams-publisher.log' +CONF = '/etc/argo-nagios-ams-publisher/ams-publisher.conf' def seteuser(user): @@ -47,7 +46,7 @@ def build_msg(args, *headers): def main(): parser = argparse.ArgumentParser() - lobj = log.Logger(sys.argv[0], logfile) + lobj = log.Logger(sys.argv[0]) logger = lobj.get() confopts = config.parse_config(logger) nagioshost = confopts['general']['host'] From 4d7fcd1da5b2d7b13098348680ed9a1b9f163888 Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 1 Feb 2021 16:05:17 +0100 Subject: [PATCH 37/38] spec updated --- argo-nagios-ams-publisher.spec | 3 +++ 1 file changed, 3 insertions(+) diff --git a/argo-nagios-ams-publisher.spec b/argo-nagios-ams-publisher.spec index e24a687..c31f5de 100644 --- a/argo-nagios-ams-publisher.spec +++ b/argo-nagios-ams-publisher.spec @@ -75,6 +75,9 @@ if ! /usr/bin/getent group nagiocmd &>/dev/null; then fi %changelog +* Mon Feb 1 2021 Daniel Vrcic - 0.3.9-1%{?dist} +- ARGO-2855 ams-publisher py3 switch +- ARGO-2929 Let systemd handle runtime directory * Thu Oct 8 2020 Daniel Vrcic - 0.3.8-1%{?dist} - remove leftovers from erroneous SIGHUP handling * Wed Jul 8 2020 Daniel Vrcic - 0.3.7-1%{?dist} From 62302c03d589b85ff09a902f1e54e5140c77128a Mon Sep 17 00:00:00 2001 From: Daniel Vrcic Date: Mon, 1 Feb 2021 16:07:14 +0100 Subject: [PATCH 38/38] updated CHANGELOG --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 068e698..5d339b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.3.9] - 2021-02-01 + +### Changed + +* ARGO-2929 Let systemd handle runtime directory +* ARGO-2855 ams-publisher py3 switch + ## [0.3.8] - 2020-10-08 ### Fixed