Skip to content

Commit

Permalink
Make process_repeaters() more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
kaapstorm committed Jan 11, 2025
1 parent 988ed56 commit e16f4ab
Showing 1 changed file with 25 additions and 36 deletions.
61 changes: 25 additions & 36 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@
"""
import random
import uuid
from datetime import datetime, timedelta
from inspect import cleandoc

from django.conf import settings

from celery import chord
from celery import group
from celery.schedules import crontab
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -319,42 +318,33 @@ def process_repeaters():
timeout=None, # Iterating repeaters forever is fine
name=PROCESS_REPEATERS_KEY,
)
lock_token = uuid.uuid1().hex
if not process_repeaters_lock.acquire(blocking=False, token=lock_token):
if not process_repeaters_lock.acquire(blocking=False):
return

metrics_counter('commcare.repeaters.process_repeaters.start')
process_repeaters_forever.delay(lock_token)


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def process_repeaters_forever(lock_token):
"""
Calls `process_repeater()` for each repeater with repeat records
ready to be sent, and then calls itself as a callback, to loop
through the repeaters again.
"""
tasks = []
for domain, repeater_id in iter_ready_repeater_ids():
if not domain_can_forward_now(domain):
continue
if rate_limit_repeater(domain, repeater_id):
continue
tasks.append(process_repeater.s(domain, repeater_id))

if tasks:
# Call back `process_repeaters_forever()` after all
# `process_repeater()` tasks are complete:
chord(tasks, process_repeaters_forever.si(lock_token))()
try:
metrics_counter('commcare.repeaters.process_repeaters.start')
while True:
metrics_counter('commcare.repeaters.process_repeaters.iter_once')
tasks = []
for domain, repeater_id in iter_ready_repeater_ids():
if not domain_can_forward_now(domain):
continue
if rate_limit_repeater(domain, repeater_id):
continue
tasks.append(process_repeater.s(domain, repeater_id))

if tasks:
result = group(*tasks).apply_async()
# Waiting for the result of a subtask is discouraged
# https://docs.celeryq.dev/en/stable/userguide/tasks.html#avoid-launching-synchronous-subtasks
# but in this situation it is safe because the
# `process_repeater()` tasks use a different queue.
result.get(disable_sync_subtasks=False)

else:
break

else:
# There are no more repeaters to be processed.
process_repeaters_lock = get_redis_lock(
PROCESS_REPEATERS_KEY,
timeout=None,
name=PROCESS_REPEATERS_KEY,
)
process_repeaters_lock.local.token = lock_token
finally:
process_repeaters_lock.release()
metrics_counter('commcare.repeaters.process_repeaters.complete')

Expand All @@ -372,7 +362,6 @@ def iter_ready_repeater_ids():
...
"""
metrics_counter('commcare.repeaters.process_repeaters.iter_once')
repeater_ids_by_domain = get_repeater_ids_by_domain()
while True:
if not repeater_ids_by_domain:
Expand Down

0 comments on commit e16f4ab

Please sign in to comment.