Skip to content

Commit

Permalink
Common Redis connection retry mechanism. (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
jshimkus-rh authored Oct 15, 2024
1 parent 5a90a1b commit fea5fb1
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 323 deletions.
118 changes: 36 additions & 82 deletions src/aap_eda/core/management/commands/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@
https://github.com/rq/rq-scheduler/blob/master/README.rst
"""
import logging
import re
import typing
from datetime import datetime
from time import sleep

import django_rq
import redis
from ansible_base.lib.redis.client import DABRedisCluster
import rq_scheduler
from django.conf import settings
from django_rq.management.commands import rqscheduler
from rq_scheduler import Scheduler

from aap_eda.core import tasking

logger = logging.getLogger(__name__)

Expand All @@ -89,19 +88,24 @@
RQ_CRON_JOBS = getattr(settings, "RQ_CRON_JOBS", None)


def delete_scheduled_jobs(scheduler: Scheduler):
@tasking.redis_connect_retry()
def delete_scheduled_jobs(scheduler: rq_scheduler.Scheduler) -> None:
"""Cancel any existing jobs in the scheduler when the app starts up."""
for job in scheduler.get_jobs():
logging.info("Deleting scheduled job: %s", job)
job.delete()


def add_startup_jobs(scheduler: Scheduler) -> None:
def add_startup_jobs(scheduler: rq_scheduler.Scheduler) -> None:
if not RQ_STARTUP_JOBS:
logger.info("No scheduled jobs. Skipping.")
return

for entry in RQ_STARTUP_JOBS:
@tasking.redis_connect_retry()
def _add_startup_job(
scheduler: rq_scheduler.Scheduler,
entry: dict[str, typing.Any],
) -> None:
logger.info('Adding startup job "%s"', entry["func"])
scheduled_time = entry.pop("scheduled_time", None)
if scheduled_time is None:
Expand All @@ -111,13 +115,20 @@ def add_startup_jobs(scheduler: Scheduler) -> None:
**entry,
)

for entry in RQ_STARTUP_JOBS:
_add_startup_job(scheduler, entry)


def add_periodic_jobs(scheduler: Scheduler) -> None:
def add_periodic_jobs(scheduler: rq_scheduler.Scheduler) -> None:
if not RQ_PERIODIC_JOBS:
logger.info("No periodic jobs. Skipping.")
return

for entry in RQ_PERIODIC_JOBS:
@tasking.redis_connect_retry()
def _add_periodic_job(
scheduler: rq_scheduler.Scheduler,
entry: dict[str, typing.Any],
) -> None:
logger.info('Adding periodic job "%s"', entry["func"])
scheduled_time = entry.pop("scheduled_time", None)
if scheduled_time is None:
Expand All @@ -127,17 +138,27 @@ def add_periodic_jobs(scheduler: Scheduler) -> None:
**entry,
)

for entry in RQ_PERIODIC_JOBS:
_add_periodic_job(scheduler, entry)

def add_cron_jobs(scheduler: Scheduler) -> None:

def add_cron_jobs(scheduler: rq_scheduler.Scheduler) -> None:
"""Schedule cron jobs."""
if not RQ_CRON_JOBS:
logger.info("No cron jobs. Skipping.")
return

for entry in RQ_CRON_JOBS:
@tasking.redis_connect_retry()
def _add_cron_job(
scheduler: rq_scheduler.Scheduler,
entry: dict[str, typing.Any],
) -> None:
logger.info('Adding cron job "%s"', entry["func"])
scheduler.cron(**entry)

for entry in RQ_CRON_JOBS:
_add_cron_job(scheduler, entry)


class Command(rqscheduler.Command):
help = "Runs RQ scheduler with configured jobs."
Expand All @@ -153,73 +174,6 @@ def handle(self, *args, **options) -> None:
add_startup_jobs(scheduler)
add_periodic_jobs(scheduler)
add_cron_jobs(scheduler)
# We are going to start our own loop here to catch exceptions which
# might be coming from a redis cluster and retrying things.
while True:
try:
super().handle(*args, **options)
except (
redis.exceptions.TimeoutError,
redis.exceptions.ClusterDownError,
redis.exceptions.ConnectionError,
) as e:
# If we got one of these exceptions but are not on a Cluster go
# ahead and raise it normally.
if not isinstance(scheduler.connection, DABRedisCluster):
raise

# There are a lot of different exceptions that inherit from
# ConnectionError. So we need to make sure if we got that its
# an actual ConnectionError. If not, go ahead and raise it.
# Note: ClusterDownError and TimeoutError are not subclasses
# of ConnectionError.
if (
isinstance(e, redis.exceptions.ConnectionError)
and type(e) is not redis.exceptions.ConnectionError
):
raise

downed_node_ip = re.findall(
r"[0-9]+(?:\.[0-9]+){3}:[0-9]+", str(e)
)

# If we got a cluster issue we will loop here until we can ping
# the server again.
max_backoff = 60
current_backoff = 1
while True:
if current_backoff > max_backoff:
# Maybe we just got a network glitch and are waiting
# for a cluster member to fail when its not going to.
# At this point we've waited for 60 secs so lets go
# ahead and let the scheduler try and restart.
logger.error(
"Connection to redis is still down "
"going to attempt to restart scheduler"
)
break

backoff = min(current_backoff, max_backoff)
logger.error(
f"Connection to redis cluster failed. Attempting to "
f"reconnect in {backoff}"
)
sleep(backoff)
current_backoff = 2 * current_backoff
try:
if downed_node_ip:
cluster_nodes = (
scheduler.connection.cluster_nodes()
)
for ip in downed_node_ip:
if "fail" not in cluster_nodes[ip]["flags"]:
raise Exception(
"Failed node is not yet in a failed "
"state"
)
else:
scheduler.connection.ping()
break
# We could tighten this exception up
except Exception:
pass
super().handle(*args, **options)

handle = tasking.redis_connect_retry()(handle)
Loading

0 comments on commit fea5fb1

Please sign in to comment.