From 20e6c019a15554824ed8003e25090f17c010fd42 Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Fri, 5 Jan 2024 11:19:20 +1000 Subject: [PATCH 1/5] Handle loss of telegraf socket more gracefully --- src/outputs.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/outputs.py b/src/outputs.py index 05fc001..7e50fce 100644 --- a/src/outputs.py +++ b/src/outputs.py @@ -4,10 +4,12 @@ import argparse +import datetime import socket import sys from io import TextIOWrapper +import time from typing import ClassVar, Dict, List, Tuple @@ -277,7 +279,8 @@ def set_prometheus_metric( class TelegrafOutput(Output): def __init__(self, args: argparse.Namespace) -> None: super().__init__() - self.file = sys.stdout if args.debug else self.get_telegraf_file(args.connect) + self.args = args + self.set_file() @staticmethod def get_telegraf_file(connect: str) -> TextIOWrapper: @@ -288,13 +291,29 @@ def get_telegraf_file(connect: str) -> TextIOWrapper: s.connect((host, port)) return s.makefile(mode="w") + def send(self, name: str, metrics: dict, tries: int = 0) -> None: + telegraf_line = line_protocol.to_line_protocol(metrics, name) + if tries >= 5: + print("Reached maximum retries on telegraf connection", file=sys.stderr) + print(telegraf_line) + sys.exit(3) + try: + print(telegraf_line, file=self.file) + except BrokenPipeError as bpe: + # If we have lost our connection to telegraf, wait a little, then + # reopen the socket and try again. We add a timestamp to metrics + # without it, in case it takes a while to make the connection. + if "datetime" not in metrics: + metrics["datetime"] = datetime.datetime.now(tz=datetime.timezone.utc) + time.sleep(0.1) + self.set_file() + self.send(name, metrics, tries + 1) + def send_info(self, metrics: dict, debug: bool) -> None: - telegraf_line = line_protocol.to_line_protocol(metrics, "ntpmon_info") - print(telegraf_line, file=self.file) + self.send("ntpmon_info", metrics) def send_measurement(self, metrics: dict, debug: bool = False) -> None: - telegraf_line = line_protocol.to_line_protocol(metrics, "ntpmon_peer") - print(telegraf_line, file=self.file) + self.send("ntpmon_peer", metrics) def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: for metric in sorted(self.peertypes.keys()): @@ -302,13 +321,14 @@ def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: "count": metrics[metric], "peertype": metric, } - output = line_protocol.to_line_protocol(telegraf_metrics, "ntpmon_peers") - print(output, file=self.file) + self.send("ntpmon_peers", telegraf_metrics) def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: telegraf_metrics = {k: metrics[k] for k in sorted(self.summarytypes.keys()) if k in metrics} - telegraf_line = line_protocol.to_line_protocol(telegraf_metrics, "ntpmon") - print(telegraf_line, file=self.file) + self.send("ntpmon", telegraf_metrics) + + def set_file(self) -> None: + self.file = sys.stdout if self.args.debug else self.get_telegraf_file(self.args.connect) def get_output(args: argparse.Namespace) -> Output: From 0529b955ec74d850861391efe7d268490fe49254 Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Fri, 5 Jan 2024 11:22:18 +1000 Subject: [PATCH 2/5] Rename send_measurement() to send_peer_measurements() --- src/ntpmon.py | 2 +- src/outputs.py | 40 ++++++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/ntpmon.py b/src/ntpmon.py index 7bed56e..e2e80b8 100755 --- a/src/ntpmon.py +++ b/src/ntpmon.py @@ -164,7 +164,7 @@ async def peer_stats_task(args: argparse.Namespace, output: outputs.Output) -> N if stats is not None: if "peertype" not in stats: stats["peertype"] = find_type(stats["source"], checkobjs["peers"].peers) - output.send_measurement(stats, debug=args.debug) + output.send_peer_measurements(stats, debug=args.debug) async def summary_stats_task(args: argparse.Namespace, output: outputs.Output) -> None: diff --git a/src/outputs.py b/src/outputs.py index 7e50fce..97b7181 100644 --- a/src/outputs.py +++ b/src/outputs.py @@ -75,10 +75,10 @@ class Output: def send_info(self, metrics: dict, debug: bool = False) -> None: pass - def send_measurement(self, metrics: dict, debug: bool = False) -> None: + def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: pass - def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: + def send_peer_measurements(self, metrics: dict, debug: bool = False) -> None: pass def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: @@ -91,12 +91,12 @@ def __init__(self, args: argparse.Namespace) -> None: formatstr: ClassVar[str] = 'PUTVAL "%s/ntpmon-%s" interval=%d N:%.9f' - def send_measurement(self, metrics: dict, debug: bool = False) -> None: - self.send_stats(metrics, self.peerstatstypes, hostname=metrics["source"], debug=debug) - def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: self.send_stats(metrics, self.peertypes, debug=debug) + def send_peer_measurements(self, metrics: dict, debug: bool = False) -> None: + self.send_stats(metrics, self.peerstatstypes, hostname=metrics["source"], debug=debug) + def send_stats(self, metrics: dict, types: dict, debug: bool = False, hostname: str = None) -> None: if hostname is None: hostname = self.args.hostname @@ -193,16 +193,6 @@ def send_info(self, metrics: dict, debug: bool = False) -> None: debug=debug, ) - def send_measurement(self, metrics: dict, debug: bool = False) -> None: - self.send_stats( - "ntpmon_peer", - metrics, - self.peerstatstypes, - [x for x in self.peerstatslabels if x in metrics], - [metrics[x] for x in self.peerstatslabels if x in metrics], - debug=debug, - ) - def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: for metric in sorted(self.peertypes.keys()): if metric in metrics: @@ -216,8 +206,15 @@ def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: debug=debug, ) - def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: - self.send_stats("ntpmon", metrics, self.summarystatstypes, debug=debug) + def send_peer_measurements(self, metrics: dict, debug: bool = False) -> None: + self.send_stats( + "ntpmon_peer", + metrics, + self.peerstatstypes, + [x for x in self.peerstatslabels if x in metrics], + [metrics[x] for x in self.peerstatslabels if x in metrics], + debug=debug, + ) def send_stats( self, @@ -242,6 +239,9 @@ def send_stats( value /= 100 self.set_prometheus_metric(name, description, value, fmt, labelnames, labels, debug=debug) + def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: + self.send_stats("ntpmon", metrics, self.summarystatstypes, debug=debug) + def set_prometheus_metric( self, name: str, @@ -312,9 +312,6 @@ def send(self, name: str, metrics: dict, tries: int = 0) -> None: def send_info(self, metrics: dict, debug: bool) -> None: self.send("ntpmon_info", metrics) - def send_measurement(self, metrics: dict, debug: bool = False) -> None: - self.send("ntpmon_peer", metrics) - def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: for metric in sorted(self.peertypes.keys()): telegraf_metrics = { @@ -323,6 +320,9 @@ def send_peer_counts(self, metrics: dict, debug: bool = False) -> None: } self.send("ntpmon_peers", telegraf_metrics) + def send_peer_measurements(self, metrics: dict, debug: bool = False) -> None: + self.send("ntpmon_peer", metrics) + def send_summary_stats(self, metrics: dict, debug: bool = False) -> None: telegraf_metrics = {k: metrics[k] for k in sorted(self.summarytypes.keys()) if k in metrics} self.send("ntpmon", telegraf_metrics) From 373a0b0819f411677ccb6b77894be008dce0eb71 Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Fri, 5 Jan 2024 11:23:05 +1000 Subject: [PATCH 3/5] Ensure that the failure of an asyncio task is fatal And ensure that systemd restarts ntpmon regardless of the exit code --- src/ntpmon-telegraf.systemd | 2 +- src/ntpmon.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ntpmon-telegraf.systemd b/src/ntpmon-telegraf.systemd index 925883c..7cf4d57 100644 --- a/src/ntpmon-telegraf.systemd +++ b/src/ntpmon-telegraf.systemd @@ -8,7 +8,7 @@ Requires=telegraf.service [Service] ExecStart={{ install_dir }}/ntpmon.py --interval 60 --mode telegraf --implementation {{ implementation }} KillMode=process -Restart=on-failure +Restart=always RestartSec=42s User={{ user }} Group={{ group }} diff --git a/src/ntpmon.py b/src/ntpmon.py index e2e80b8..f4bbd2d 100755 --- a/src/ntpmon.py +++ b/src/ntpmon.py @@ -187,6 +187,7 @@ async def start_tasks(args: argparse.Namespace) -> None: peer_stats = asyncio.create_task(peer_stats_task(args, output), name="peerstats") summary_stats = asyncio.create_task(summary_stats_task(args, output), name="summarystats") await asyncio.wait((peer_stats, summary_stats), return_when=asyncio.FIRST_COMPLETED) + sys.exit(1) if __name__ == "__main__": From 55e7f21353593c5a1909e10108f9b615d773834f Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Fri, 5 Jan 2024 11:24:26 +1000 Subject: [PATCH 4/5] Handle failure of subprocess more gracefully Fixes UnboundLocalError: local variable 'output' referenced before assignment --- src/process.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/process.py b/src/process.py index d2287d1..2fe369b 100644 --- a/src/process.py +++ b/src/process.py @@ -35,6 +35,7 @@ def execute_subprocess(cmd, timeout, debug, errfatal): + output = None try: output = subprocess.check_output( cmd, From 1519b50dd72c4c3ba26db931564453aeb0743faf Mon Sep 17 00:00:00 2001 From: Paul Gear Date: Fri, 5 Jan 2024 11:26:17 +1000 Subject: [PATCH 5/5] Prepare 3.0.7 release --- CHANGELOG.md | 8 ++++++++ debian/changelog | 6 ++++++ src/version_data.py | 2 +- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b161c23..f8bd964 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,14 @@ The current development release is 3.0.6. This is the recommended version for anyone who wants the latest features. It should be suitable for production deployment very soon. +## [3.0.7] - 2024-01-05 + +### Changed + +- Handle loss of telegraf socket more gracefully +- Handle failure of subprocess more gracefully +- Ensure that systemd always restarts ntpmon regardless of exit code + ## [3.0.6] - 2024-01-02 ### Added diff --git a/debian/changelog b/debian/changelog index a4ccebd..a648948 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ntpmon (3.0.7-1) focal; urgency=medium + + * New upstream release. + + -- Paul Gear Fri, 05 Jan 2024 11:25:25 +1000 + ntpmon (3.0.6-1) focal; urgency=medium * New upstream release. diff --git a/src/version_data.py b/src/version_data.py index 961d287..19c152e 100644 --- a/src/version_data.py +++ b/src/version_data.py @@ -2,4 +2,4 @@ # and bash. MAJOR="3" MINOR="0" -PATCH="6" +PATCH="7"