From 6d3b3d96e8d030c0b52be19b847e8b6ac655940d Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Sun, 25 Aug 2024 22:07:50 -0400 Subject: [PATCH] Sensible log behavior when redis is unavailable --- awx/main/analytics/subsystem_metrics.py | 9 +++++++-- awx/main/dispatch/worker/base.py | 13 ++++++++++--- awx/main/dispatch/worker/callback.py | 12 ++++++++++-- .../management/commands/run_callback_receiver.py | 11 ++++++++--- awx/main/scheduler/task_manager.py | 4 ++++ 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 2662a257fc7e..bf6dc1ec36e2 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -9,6 +9,7 @@ from prometheus_client.registry import CollectorRegistry from django.conf import settings from django.http import HttpRequest +import redis.exceptions from rest_framework.request import Request from awx.main.consumers import emit_channel_notification @@ -290,8 +291,12 @@ def pipe_execute(self): def send_metrics(self): # more than one thread could be calling this at the same time, so should # acquire redis lock before sending metrics - lock = self.conn.lock(root_key + '-' + self._namespace + '_lock') - if not lock.acquire(blocking=False): + try: + lock = self.conn.lock(root_key + '-' + self._namespace + '_lock') + if not lock.acquire(blocking=False): + return + except redis.exceptions.ConnectionError as exc: + logger.warning(f'Connection error in send_metrics: {exc}') return try: current_time = time.time() diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 264205a8ed6d..8aaeb13b48e6 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -15,6 +15,7 @@ from django import db from django.conf import settings +import redis.exceptions from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch.periodic import Scheduler @@ -129,10 +130,13 @@ def process_task(self, body): @log_excess_runtime(logger) def record_statistics(self): if time.time() - self.last_stats > 1: # buffer stat recording to once per second + save_data = self.pool.debug() try: - self.redis.set(f'awx_{self.name}_statistics', self.pool.debug()) + self.redis.set(f'awx_{self.name}_statistics', save_data) + except redis.exceptions.ConnectionError as exc: + logger.warning(f'Redis connection error saving {self.name} status data:\n{exc}\nmissed data:\n{save_data}') except Exception: - logger.exception(f"encountered an error communicating with redis to store {self.name} statistics") + logger.exception(f"Unknown redis error saving {self.name} status data:\nmissed data:\n{save_data}") self.last_stats = time.time() def run(self, *args, **kwargs): @@ -187,7 +191,10 @@ def record_metrics(self): current_time = time.time() self.pool.produce_subsystem_metrics(self.subsystem_metrics) self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather)) - self.subsystem_metrics.pipe_execute() + try: + self.subsystem_metrics.pipe_execute() + except redis.exceptions.ConnectionError as exc: + logger.warning(f'Redis connection error saving dispatcher metrics, error:\n{exc}') self.listen_cumulative_time = 0.0 self.last_metrics_gather = current_time diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 199302c76c5a..7bc2eca3f52f 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -85,6 +85,7 @@ def pid(self): return os.getpid() def read(self, queue): + has_redis_error = False try: res = self.redis.blpop(self.queue_name, timeout=1) if res is None: @@ -94,14 +95,21 @@ def read(self, queue): self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1) self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1) return json.loads(res[1]) + except redis.exceptions.ConnectionError as exc: + # Low noise log, because very common and many workers will write this + logger.error(f"redis connection error: {exc}") + has_redis_error = True + time.sleep(5) except redis.exceptions.RedisError: logger.exception("encountered an error communicating with redis") + has_redis_error = True time.sleep(1) except (json.JSONDecodeError, KeyError): logger.exception("failed to decode JSON message from redis") finally: - self.record_statistics() - self.record_read_metrics() + if not has_redis_error: + self.record_statistics() + self.record_read_metrics() return {'event': 'FLUSH'} diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 6ab2158caefd..624a34cd2888 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -2,9 +2,11 @@ # All Rights Reserved. from django.conf import settings -from django.core.management.base import BaseCommand -from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer +from django.core.management.base import BaseCommand, CommandError + +from redis.exceptions import ConnectionError +from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer from awx.main.dispatch.control import Control from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker @@ -27,7 +29,10 @@ def handle(self, *arg, **options): return consumer = None - CallbackReceiverMetricsServer().start() + try: + CallbackReceiverMetricsServer().start() + except ConnectionError as exc: + raise CommandError(f'Could not connect to redis, error:\n{exc}\n') try: consumer = AWXConsumerRedis( diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 272779b3e589..2ce42553adbb 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -10,6 +10,8 @@ import sys import signal +import redis + # Django from django.db import transaction from django.utils.translation import gettext_lazy as _, gettext_noop @@ -118,6 +120,8 @@ def record_aggregate_metrics(self, *args): self.subsystem_metrics.pipe_execute() else: logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago") + except redis.exceptions.ConnectionError as exc: + logger.warning(f"Redis connection error saving metrics for {self.prefix}, error: {exc}") except Exception: logger.exception(f"Error saving metrics for {self.prefix}")