Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process Repeaters, Part 1 #35033

Open
wants to merge 62 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
50c848a
Add `PROCESS_REPEATERS` toggle
kaapstorm Aug 23, 2024
0db6a6c
`process_repeaters()` task
kaapstorm Aug 23, 2024
e36296c
`get_repeater_lock()`
kaapstorm Aug 23, 2024
aeb10ba
`iter_ready_repeater_ids_once()`
kaapstorm Aug 23, 2024
01e4bc7
Skip rate-limited repeaters
kaapstorm Aug 23, 2024
db2fec2
`process_repeater()` task
kaapstorm Aug 23, 2024
85b952e
Add tests
kaapstorm Aug 4, 2024
c28c11b
`Repeater.max_workers` field
kaapstorm Aug 24, 2024
d8d9642
Index fields used by `RepeaterManager.all_ready()`
kaapstorm Aug 28, 2024
48c3d7c
Use quickcache. Prefilter enabled domains.
kaapstorm Aug 28, 2024
418ed3a
Check randomly-enabled domains
kaapstorm Aug 29, 2024
85bbfa3
Forward new records for synchronous case repeaters
kaapstorm Aug 29, 2024
d1119bb
Add explanatory docstrings and comments
kaapstorm Sep 9, 2024
03b26cf
get_redis_lock() ... acquire(): No TypeError ?!
kaapstorm Sep 9, 2024
de27ba0
Drop unnecessary `iter_domain_repeaters()`
kaapstorm Sep 10, 2024
4955ef4
Don't quickcache `domain_can_forward_now()`
kaapstorm Sep 24, 2024
59aae71
Migration to create indexes concurrently
kaapstorm Sep 24, 2024
f40e6f4
Merge branch 'master' into nh/iter_repeaters_1
orangejenny Oct 4, 2024
b70fc52
Add comment
kaapstorm Oct 19, 2024
7e65b3b
Don't squash BaseExceptions
kaapstorm Oct 19, 2024
4c41896
Drop timeout for `process_repeater_lock`.
kaapstorm Oct 19, 2024
30d4a6f
Add metric for monitoring health
kaapstorm Oct 19, 2024
fc0f174
Merge branch 'master' into nh/iter_repeaters_1
kaapstorm Oct 19, 2024
e32b465
Resolve migration conflict, fix index
kaapstorm Oct 19, 2024
e3bcd74
Fix metric
kaapstorm Oct 19, 2024
efc4dde
Change indexes
kaapstorm Oct 22, 2024
bd37a00
Add one more index. Use UNION ALL queries.
kaapstorm Oct 23, 2024
a448b9e
Don't report attempt too soon
kaapstorm Oct 26, 2024
4321fb7
Add metrics
kaapstorm Oct 26, 2024
74137f9
Improve backoff logic
kaapstorm Oct 26, 2024
968a922
Update comments
kaapstorm Oct 28, 2024
4fd14a0
Show "Next attempt at" in Forwarders page
kaapstorm Oct 28, 2024
07320b9
Merge branch 'master' into nh/iter_repeaters_1
kaapstorm Oct 28, 2024
b1eb171
Fixes migration
kaapstorm Oct 28, 2024
2463348
Merge remote-tracking branch 'origin/master' into nh/iter_repeaters_1
kaapstorm Oct 29, 2024
8a7f343
Add comment on other `True` return value
kaapstorm Nov 19, 2024
0f72ba9
Count repeater backoffs
kaapstorm Dec 2, 2024
7f18e52
Add documentation
kaapstorm Dec 2, 2024
8e21cf2
Merge branch 'master' into nh/iter_repeaters_1
kaapstorm Dec 22, 2024
7cbfadb
Use less aggressive backoff for repeaters
kaapstorm Dec 22, 2024
7373ba6
Add TODOs
kaapstorm Dec 22, 2024
f312b5a
Add tests
kaapstorm Dec 22, 2024
9c8d9fb
Return one row per repeater
kaapstorm Jan 6, 2025
2a8c981
Drop RepeaterManager.all_ready()
kaapstorm Jan 8, 2025
c869ec6
Refactor queries to count RepeatRecords
kaapstorm Jan 8, 2025
11c2b76
Drop Datadog tag
kaapstorm Jan 8, 2025
6e84a23
Tweak metrics
kaapstorm Jan 8, 2025
3ee78d8
Merge branch 'master' into nh/iter_repeaters_1
kaapstorm Jan 8, 2025
96dd85b
Set much shorter timeout on repeater lock
kaapstorm Jan 9, 2025
55ad75f
Fix comments
kaapstorm Jan 9, 2025
55846d3
Nit
kaapstorm Jan 9, 2025
0e5c5ec
Fix test
kaapstorm Jan 9, 2025
79377f8
Don't spawn a subtask for `process_repeater()`
kaapstorm Jan 10, 2025
9604d8a
Management command to expire lock
kaapstorm Jan 10, 2025
964ae32
An alternate approach to iterating until complete
kaapstorm Jan 10, 2025
b801159
Clean up tests
kaapstorm Jan 10, 2025
90bf281
Update docstrings
kaapstorm Jan 11, 2025
d0f13a4
Improve wording
kaapstorm Jan 14, 2025
03197dc
Wait for random task instead of all tasks
kaapstorm Jan 15, 2025
e95e2cc
Lint
kaapstorm Jan 17, 2025
983ebf6
`MeteredLock` supports `reacquire()`
kaapstorm Jan 21, 2025
24a7acf
`update_repeater()` calls `process_repeater()`
kaapstorm Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
30 changes: 30 additions & 0 deletions corehq/ex-submodules/dimagi/utils/couch/tests/test_redis_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import uuid

from redis.lock import Lock as RedisLock

from dimagi.utils.couch import get_redis_lock

from corehq.tests.pytest_plugins.redislocks import TestLock
from corehq.util.metrics.lockmeter import MeteredLock


def test_get_redis_lock_with_token():
lock_name = 'test-1'
metered_lock = get_redis_lock(key=lock_name, name=lock_name, timeout=1)
assert isinstance(metered_lock, MeteredLock)
# metered_lock.lock is a TestLock instance
test_lock = metered_lock.lock
assert isinstance(test_lock, TestLock)
redis_lock = test_lock.lock
assert isinstance(redis_lock, RedisLock)

token = uuid.uuid1().hex
acquired = redis_lock.acquire(blocking=False, token=token)
assert acquired

# What we want to be able to do in a separate process:
metered_lock_2 = get_redis_lock(key=lock_name, name=lock_name, timeout=1)
redis_lock_2 = metered_lock_2.lock.lock
redis_lock_2.local.token = token
# Does not raise LockNotOwnedError:
redis_lock_2.release()
5 changes: 5 additions & 0 deletions corehq/motech/repeaters/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@

MAX_RETRY_WAIT = timedelta(days=7)
MIN_RETRY_WAIT = timedelta(minutes=60)
MIN_REPEATER_RETRY_WAIT = timedelta(minutes=5) # Repeaters back off slower
RATE_LIMITER_DELAY_RANGE = (
timedelta(minutes=getattr(settings, 'MIN_REPEATER_RATE_LIMIT_DELAY', 0)),
timedelta(minutes=getattr(settings, 'MAX_REPEATER_RATE_LIMIT_DELAY', 15)),
)
CHECK_REPEATERS_INTERVAL = timedelta(minutes=5)
CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT
CHECK_REPEATERS_KEY = 'check-repeaters-key'
PROCESS_REPEATERS_INTERVAL = timedelta(minutes=5)
ENDPOINT_TIMER = 'endpoint_timer'
# Number of attempts to an online endpoint before cancelling payload
MAX_ATTEMPTS = 3
# Number of exponential backoff attempts to an offline endpoint
# TODO: Drop MAX_BACKOFF_ATTEMPTS. We don't need MAX_BACKOFF_ATTEMPTS
# because we are using MAX_RETRY_WAIT, and MAX_BACKOFF_ATTEMPTS is
# being conflated with MAX_ATTEMPTS.
MAX_BACKOFF_ATTEMPTS = 6


Expand Down
194 changes: 160 additions & 34 deletions corehq/motech/repeaters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from http import HTTPStatus
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

from django.conf import settings
from django.db import models, router
from django.db.models.base import Deferred
from django.dispatch import receiver
Expand Down Expand Up @@ -102,8 +103,8 @@
XFormInstance,
)
from corehq.motech.const import (
MAX_REQUEST_LOG_LENGTH,
ALL_REQUEST_METHODS,
MAX_REQUEST_LOG_LENGTH,
REQUEST_POST,
)
from corehq.motech.models import ConnectionSettings
Expand All @@ -125,6 +126,7 @@
MAX_ATTEMPTS,
MAX_BACKOFF_ATTEMPTS,
MAX_RETRY_WAIT,
MIN_REPEATER_RETRY_WAIT,
MIN_RETRY_WAIT,
State,
)
Expand Down Expand Up @@ -233,22 +235,44 @@ def __new__(cls, *args, **kwargs):

class RepeaterManager(models.Manager):

def all_ready(self):
"""
Return all Repeaters ready to be forwarded.
"""
not_paused = models.Q(is_paused=False)
next_attempt_not_in_the_future = (
models.Q(next_attempt_at__isnull=True)
| models.Q(next_attempt_at__lte=timezone.now())
def get_all_ready_ids_by_domain(self):
next_attempt_null = (
self.get_all_ready_next_attempt_null()
.distinct()
.values_list('domain', 'id')
)
next_attempt_now = (
self.get_all_ready_next_attempt_now()
.distinct()
.values_list('domain', 'id')
)
query = next_attempt_null.union(next_attempt_now, all=True)
results = defaultdict(list)
for (domain, id_uuid) in query.all():
results[domain].append(id_uuid.hex)
return results

kaapstorm marked this conversation as resolved.
Show resolved Hide resolved
def get_all_ready_next_attempt_null(self):
# See `get_all_ready_next_attempt_now()`. Splitting the queries
# speeds up total query time by using different indexes.
# NOTE: Returns one row per RepeatRecord, not per Repeater.
return (
self.get_queryset()
.filter(is_paused=False)
.filter(next_attempt_at__isnull=True)
.filter(repeat_records__state__in=(State.Pending, State.Fail))
)
repeat_records_ready_to_send = models.Q(
repeat_records__state__in=(State.Pending, State.Fail)

def get_all_ready_next_attempt_now(self):
# See `get_all_ready_next_attempt_null()`. Splitting the queries
# speeds up total query time by using different indexes.
# NOTE: Returns one row per RepeatRecord, not per Repeater.
return (
self.get_queryset()
.filter(is_paused=False)
.filter(next_attempt_at__lte=timezone.now())
.filter(repeat_records__state__in=(State.Pending, State.Fail))
)
return (self.get_queryset()
.filter(not_paused)
.filter(next_attempt_not_in_the_future)
.filter(repeat_records_ready_to_send))

def get_queryset(self):
repeater_obj = self.model()
Expand Down Expand Up @@ -378,31 +402,70 @@ def _repeater_type(cls):

@property
def repeat_records_ready(self):
return self.repeat_records.filter(state__in=(State.Pending, State.Fail))
"""
A QuerySet of repeat records in the Pending or Fail state in the
order in which they were registered
"""
return (
self.repeat_records
.filter(state__in=(State.Pending, State.Fail))
.order_by('registered_at')
)

def set_next_attempt(self):
now = datetime.utcnow()
interval = _get_retry_interval(self.last_attempt_at, now)
self.last_attempt_at = now
self.next_attempt_at = now + interval
@property
def num_workers(self):
# If num_workers is 1, repeat records are sent in the order in
# which they were registered.
num_workers = self.max_workers or settings.DEFAULT_REPEATER_WORKERS
return min(num_workers, settings.MAX_REPEATER_WORKERS)

def set_backoff(self):
self.next_attempt_at = self._get_next_attempt_at(self.last_attempt_at)
self.last_attempt_at = datetime.utcnow()
# Save using QuerySet.update() to avoid a possible race condition
# with self.pause(), etc. and to skip the unnecessary functionality
# in RepeaterSuperProxy.save().
Repeater.objects.filter(id=self.repeater_id).update(
last_attempt_at=now,
next_attempt_at=now + interval,
last_attempt_at=self.last_attempt_at,
next_attempt_at=self.next_attempt_at,
)

def reset_next_attempt(self):
def reset_backoff(self):
if self.last_attempt_at or self.next_attempt_at:
# `self._get_next_attempt_at()` uses `last_attempt_at` to
# determine the previous interval. Set it to `None` so that
# the next time we need to back off, we know it is the first
# interval.
self.last_attempt_at = None
kaapstorm marked this conversation as resolved.
Show resolved Hide resolved
self.next_attempt_at = None
# Avoid a possible race condition with self.pause(), etc.
Repeater.objects.filter(id=self.repeater_id).update(
last_attempt_at=None,
next_attempt_at=None,
last_attempt_at=self.last_attempt_at,
next_attempt_at=self.next_attempt_at,
)

@staticmethod
def _get_next_attempt_at(last_attempt_at):
"""
Calculates exponential backoff.
"""
# Repeat records back off aggressively: The previous interval is
# multiplied by 3, and the minimum wait time is an hour.
# Repeaters are more cautious because backing off will hold up
# all their repeat records. If a remote endpoint is just
# rate-limiting, backing off for an hour is too much.
# MIN_REPEATER_RETRY_WAIT is only 5 minutes.

# TODO: With a less aggressive backoff, should MAX_ATTEMPTS be
# increased?
if last_attempt_at:
interval = 2 * (datetime.utcnow() - last_attempt_at)
else:
interval = timedelta(0)
interval = max(MIN_REPEATER_RETRY_WAIT, interval)
interval = min(MAX_RETRY_WAIT, interval)
return datetime.utcnow() + interval

@property
def verify(self):
return not self.connection_settings.skip_cert_verify
Expand Down Expand Up @@ -945,6 +1008,15 @@ def count_overdue(self, threshold=timedelta(minutes=10)):
next_check__lt=datetime.utcnow() - threshold
).count()

@staticmethod
def count_all_ready():
# Uses *Repeater* queries that return one row per RepeatRecord.
# See `RepeaterManager.get_all_ready_ids_by_domain()`.
return (
Repeater.objects.get_all_ready_next_attempt_null().count()
+ Repeater.objects.get_all_ready_next_attempt_now().count()
)

def iterate(self, domain, repeater_id=None, state=None, chunk_size=1000):
db = router.db_for_read(self.model)
where = models.Q(domain=domain)
Expand Down Expand Up @@ -1004,11 +1076,16 @@ def get_repeat_record_ids(self, domain, repeater_id=None, state=None, payload_id
class RepeatRecord(models.Model):
domain = models.CharField(max_length=126)
payload_id = models.CharField(max_length=255)
repeater = models.ForeignKey(Repeater,
on_delete=DB_CASCADE,
db_column="repeater_id_",
related_name='repeat_records')
state = models.PositiveSmallIntegerField(choices=State.choices, default=State.Pending)
repeater = models.ForeignKey(
Repeater,
on_delete=DB_CASCADE,
db_column="repeater_id_",
related_name='repeat_records',
)
state = models.PositiveSmallIntegerField(
choices=State.choices,
default=State.Pending,
)
registered_at = models.DateTimeField()
next_check = models.DateTimeField(null=True, default=None)
max_possible_tries = models.IntegerField(default=MAX_BACKOFF_ATTEMPTS)
Expand Down Expand Up @@ -1199,16 +1276,46 @@ def fire(self, force_send=False, timing_context=None):
raise
except Exception as e:
self.handle_payload_error(str(e), traceback_str=traceback.format_exc())
raise
# Repeat records with State.Fail are retried, and repeat
# records with State.InvalidPayload are not.
#
# But a repeat record can have State.InvalidPayload
# because it was sent and rejected, so we know that the
# remote endpoint is healthy and responding, or because
# this exception occurred and it was not sent, so we
# don't know anything about the remote endpoint.
#
# Return None so that `tasks.update_repeater()` treats
# the repeat record as unsent, and does not apply or
# reset a backoff.
return None
return self.state
return None

def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False):
from corehq.motech.repeaters.tasks import (
process_repeat_record,
process_datasource_repeat_record,
retry_process_repeat_record,
process_repeat_record,
retry_process_datasource_repeat_record,
retry_process_repeat_record,
)

def is_new_synchronous_case_repeater_record():
"""
Repeat record is a new record for a synchronous case repeater
See corehq.motech.repeaters.signals.fire_synchronous_case_repeaters
"""
# This will also return True if a user clicked "Resend" on a
# Pending repeat record in the Repeat Records Report. This
# is not intended, but it's also not harmful.
return fire_synchronously and self.state == State.Pending

if (
toggles.PROCESS_REPEATERS.enabled(self.domain, toggles.NAMESPACE_DOMAIN)
and not is_new_synchronous_case_repeater_record()
):
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the scenario where this toggle is enabled and then later disabled. With next_check discrepancies between RepeatRecord and Repeater be handled gracefully?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If by "gracefully" you mean "will all repeat records be processed when the toggle is disabled?", then yes.

But if by "gracefully" you mean "will Datadog show the repeat records as not overdue?", then no. Datadog will show the repeat records as overdue.

Alternatively, when we apply a back-off to a repeater, we could also update all its pending and failed repeat records too. I considered this, but it felt like a lot of churn on the repeat records table. I thought a better approach would be to use a new metric for Datadog to gauge when the repeat records queue is getting backed up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if self.next_check is None or self.next_check > datetime.utcnow():
return

Expand Down Expand Up @@ -1364,7 +1471,26 @@ def is_response(duck):


def domain_can_forward(domain):
"""
Returns whether ``domain`` has data forwarding or Zapier integration
privileges.

Used for determining whether to register a repeat record.
"""
return domain and (
domain_has_privilege(domain, ZAPIER_INTEGRATION)
or domain_has_privilege(domain, DATA_FORWARDING)
)


def domain_can_forward_now(domain):
millerdev marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns ``True`` if ``domain`` has the requisite privileges and data
forwarding is not paused.

Used for determining whether to send a repeat record now.
"""
return (
domain_can_forward(domain)
and not toggles.PAUSE_DATA_FORWARDING.enabled(domain)
)
Loading
Loading