From e16f4aba23fbb2413f3becfa95b51f688d814bf7 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Sat, 11 Jan 2025 16:14:17 +0000 Subject: [PATCH] Make `process_repeaters()` more robust --- corehq/motech/repeaters/tasks.py | 61 +++++++++++++------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index 44821323d171..50f777f20243 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -72,13 +72,12 @@ """ import random -import uuid from datetime import datetime, timedelta from inspect import cleandoc from django.conf import settings -from celery import chord +from celery import group from celery.schedules import crontab from celery.utils.log import get_task_logger @@ -319,42 +318,33 @@ def process_repeaters(): timeout=None, # Iterating repeaters forever is fine name=PROCESS_REPEATERS_KEY, ) - lock_token = uuid.uuid1().hex - if not process_repeaters_lock.acquire(blocking=False, token=lock_token): + if not process_repeaters_lock.acquire(blocking=False): return - metrics_counter('commcare.repeaters.process_repeaters.start') - process_repeaters_forever.delay(lock_token) - - -@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) -def process_repeaters_forever(lock_token): - """ - Calls `process_repeater()` for each repeater with repeat records - ready to be sent, and then calls itself as a callback, to loop - through the repeaters again. - """ - tasks = [] - for domain, repeater_id in iter_ready_repeater_ids(): - if not domain_can_forward_now(domain): - continue - if rate_limit_repeater(domain, repeater_id): - continue - tasks.append(process_repeater.s(domain, repeater_id)) - - if tasks: - # Call back `process_repeaters_forever()` after all - # `process_repeater()` tasks are complete: - chord(tasks, process_repeaters_forever.si(lock_token))() + try: + metrics_counter('commcare.repeaters.process_repeaters.start') + while True: + metrics_counter('commcare.repeaters.process_repeaters.iter_once') + tasks = [] + for domain, repeater_id in iter_ready_repeater_ids(): + if not domain_can_forward_now(domain): + continue + if rate_limit_repeater(domain, repeater_id): + continue + tasks.append(process_repeater.s(domain, repeater_id)) + + if tasks: + result = group(*tasks).apply_async() + # Waiting for the result of a subtask is discouraged + # https://docs.celeryq.dev/en/stable/userguide/tasks.html#avoid-launching-synchronous-subtasks + # but in this situation it is safe because the + # `process_repeater()` tasks use a different queue. + result.get(disable_sync_subtasks=False) + + else: + break - else: - # There are no more repeaters to be processed. - process_repeaters_lock = get_redis_lock( - PROCESS_REPEATERS_KEY, - timeout=None, - name=PROCESS_REPEATERS_KEY, - ) - process_repeaters_lock.local.token = lock_token + finally: process_repeaters_lock.release() metrics_counter('commcare.repeaters.process_repeaters.complete') @@ -372,7 +362,6 @@ def iter_ready_repeater_ids(): ... """ - metrics_counter('commcare.repeaters.process_repeaters.iter_once') repeater_ids_by_domain = get_repeater_ids_by_domain() while True: if not repeater_ids_by_domain: