Skip to content

Commit

Permalink
Sensible log behavior when redis is unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Aug 28, 2024
1 parent 79c1921 commit 6d3b3d9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
9 changes: 7 additions & 2 deletions awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prometheus_client.registry import CollectorRegistry
from django.conf import settings
from django.http import HttpRequest
import redis.exceptions
from rest_framework.request import Request

from awx.main.consumers import emit_channel_notification
Expand Down Expand Up @@ -290,8 +291,12 @@ def pipe_execute(self):
def send_metrics(self):
# more than one thread could be calling this at the same time, so should
# acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '-' + self._namespace + '_lock')
if not lock.acquire(blocking=False):
try:
lock = self.conn.lock(root_key + '-' + self._namespace + '_lock')
if not lock.acquire(blocking=False):
return
except redis.exceptions.ConnectionError as exc:
logger.warning(f'Connection error in send_metrics: {exc}')
return
try:
current_time = time.time()
Expand Down
13 changes: 10 additions & 3 deletions awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from django import db
from django.conf import settings
import redis.exceptions

from awx.main.dispatch.pool import WorkerPool
from awx.main.dispatch.periodic import Scheduler
Expand Down Expand Up @@ -129,10 +130,13 @@ def process_task(self, body):
@log_excess_runtime(logger)
def record_statistics(self):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
save_data = self.pool.debug()
try:
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
self.redis.set(f'awx_{self.name}_statistics', save_data)
except redis.exceptions.ConnectionError as exc:
logger.warning(f'Redis connection error saving {self.name} status data:\n{exc}\nmissed data:\n{save_data}')
except Exception:
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
logger.exception(f"Unknown redis error saving {self.name} status data:\nmissed data:\n{save_data}")
self.last_stats = time.time()

def run(self, *args, **kwargs):
Expand Down Expand Up @@ -187,7 +191,10 @@ def record_metrics(self):
current_time = time.time()
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
self.subsystem_metrics.pipe_execute()
try:
self.subsystem_metrics.pipe_execute()
except redis.exceptions.ConnectionError as exc:
logger.warning(f'Redis connection error saving dispatcher metrics, error:\n{exc}')
self.listen_cumulative_time = 0.0
self.last_metrics_gather = current_time

Expand Down
12 changes: 10 additions & 2 deletions awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def pid(self):
return os.getpid()

def read(self, queue):
has_redis_error = False
try:
res = self.redis.blpop(self.queue_name, timeout=1)
if res is None:
Expand All @@ -94,14 +95,21 @@ def read(self, queue):
self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1)
return json.loads(res[1])
except redis.exceptions.ConnectionError as exc:
# Low noise log, because very common and many workers will write this
logger.error(f"redis connection error: {exc}")
has_redis_error = True
time.sleep(5)
except redis.exceptions.RedisError:
logger.exception("encountered an error communicating with redis")
has_redis_error = True
time.sleep(1)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
finally:
self.record_statistics()
self.record_read_metrics()
if not has_redis_error:
self.record_statistics()
self.record_read_metrics()

return {'event': 'FLUSH'}

Expand Down
11 changes: 8 additions & 3 deletions awx/main/management/commands/run_callback_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# All Rights Reserved.

from django.conf import settings
from django.core.management.base import BaseCommand
from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
from django.core.management.base import BaseCommand, CommandError

from redis.exceptions import ConnectionError

from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
from awx.main.dispatch.control import Control
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker

Expand All @@ -27,7 +29,10 @@ def handle(self, *arg, **options):
return
consumer = None

CallbackReceiverMetricsServer().start()
try:
CallbackReceiverMetricsServer().start()
except ConnectionError as exc:
raise CommandError(f'Could not connect to redis, error:\n{exc}\n')

try:
consumer = AWXConsumerRedis(
Expand Down
4 changes: 4 additions & 0 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import sys
import signal

import redis

# Django
from django.db import transaction
from django.utils.translation import gettext_lazy as _, gettext_noop
Expand Down Expand Up @@ -118,6 +120,8 @@ def record_aggregate_metrics(self, *args):
self.subsystem_metrics.pipe_execute()
else:
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
except redis.exceptions.ConnectionError as exc:
logger.warning(f"Redis connection error saving metrics for {self.prefix}, error: {exc}")
except Exception:
logger.exception(f"Error saving metrics for {self.prefix}")

Expand Down

0 comments on commit 6d3b3d9

Please sign in to comment.