-
-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Process Repeaters, Part 1 #35033
base: master
Are you sure you want to change the base?
Process Repeaters, Part 1 #35033
Changes from 25 commits
50c848a
0db6a6c
e36296c
aeb10ba
01e4bc7
db2fec2
85b952e
c28c11b
d8d9642
48c3d7c
418ed3a
85bbfa3
d1119bb
03b26cf
de27ba0
4955ef4
59aae71
f40e6f4
b70fc52
7e65b3b
4c41896
30d4a6f
fc0f174
e32b465
e3bcd74
efc4dde
bd37a00
a448b9e
4321fb7
74137f9
968a922
4fd14a0
07320b9
b1eb171
2463348
8a7f343
0f72ba9
7f18e52
8e21cf2
7cbfadb
7373ba6
f312b5a
9c8d9fb
2a8c981
c869ec6
11c2b76
6e84a23
3ee78d8
96dd85b
55ad75f
55846d3
0e5c5ec
79377f8
9604d8a
964ae32
b801159
90bf281
d0f13a4
03197dc
e95e2cc
983ebf6
24a7acf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import uuid | ||
|
||
from redis.lock import Lock as RedisLock | ||
|
||
from dimagi.utils.couch import get_redis_lock | ||
|
||
from corehq.tests.noseplugins.redislocks import TestLock | ||
from corehq.util.metrics.lockmeter import MeteredLock | ||
|
||
|
||
def test_get_redis_lock_with_token(): | ||
lock_name = 'test-1' | ||
metered_lock = get_redis_lock(key=lock_name, name=lock_name, timeout=1) | ||
assert isinstance(metered_lock, MeteredLock) | ||
# metered_lock.lock is a TestLock instance because of | ||
# corehq.tests.noseplugins.redislocks.RedisLockTimeoutPlugin | ||
test_lock = metered_lock.lock | ||
assert isinstance(test_lock, TestLock) | ||
redis_lock = test_lock.lock | ||
assert isinstance(redis_lock, RedisLock) | ||
|
||
token = uuid.uuid1().hex | ||
acquired = redis_lock.acquire(blocking=False, token=token) | ||
assert acquired | ||
|
||
# What we want to be able to do in a separate process: | ||
metered_lock_2 = get_redis_lock(key=lock_name, name=lock_name, timeout=1) | ||
redis_lock_2 = metered_lock_2.lock.lock | ||
redis_lock_2.local.token = token | ||
# Does not raise LockNotOwnedError: | ||
redis_lock_2.release() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
("repeaters", "0014_alter_repeater_request_method"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="repeater", | ||
name="max_workers", | ||
field=models.IntegerField(default=0), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
atomic = False | ||
|
||
dependencies = [ | ||
("repeaters", "0015_repeater_max_workers"), | ||
] | ||
|
||
operations = [ | ||
migrations.SeparateDatabaseAndState( | ||
state_operations=[ | ||
migrations.AlterField( | ||
model_name="repeatrecord", | ||
name="state", | ||
field=models.PositiveSmallIntegerField( | ||
choices=[ | ||
(1, "Pending"), | ||
(2, "Failed"), | ||
(4, "Succeeded"), | ||
(8, "Cancelled"), | ||
(16, "Empty"), | ||
(32, "Invalid Payload"), | ||
], | ||
db_index=True, | ||
default=1, | ||
), | ||
), | ||
migrations.AddIndex( | ||
model_name="repeater", | ||
index=models.Index( | ||
condition=models.Q(("is_deleted", False), ("is_paused", False)), | ||
fields=["next_attempt_at"], | ||
name="next_attempt_at_partial_idx", | ||
), | ||
), | ||
], | ||
|
||
database_operations=[ | ||
migrations.RunSQL( | ||
sql=""" | ||
CREATE INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b" | ||
ON "repeaters_repeatrecord" ("state"); | ||
""", | ||
reverse_sql=""" | ||
DROP INDEX CONCURRENTLY "repeaters_repeatrecord_state_8055083b"; | ||
""" | ||
), | ||
migrations.RunSQL( | ||
sql=""" | ||
CREATE INDEX CONCURRENTLY "next_attempt_at_partial_idx" | ||
ON "repeaters_repeater" ("next_attempt_at") | ||
WHERE (NOT "is_deleted" AND NOT "is_paused"); | ||
""", | ||
reverse_sql=""" | ||
DROP INDEX CONCURRENTLY "next_attempt_at_partial_idx"; | ||
""" | ||
), | ||
] | ||
) | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,7 @@ | |
from http import HTTPStatus | ||
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse | ||
|
||
from django.conf import settings | ||
from django.db import models, router | ||
from django.db.models.base import Deferred | ||
from django.dispatch import receiver | ||
|
@@ -245,10 +246,19 @@ def all_ready(self): | |
repeat_records_ready_to_send = models.Q( | ||
repeat_records__state__in=(State.Pending, State.Fail) | ||
) | ||
return (self.get_queryset() | ||
.filter(not_paused) | ||
.filter(next_attempt_not_in_the_future) | ||
.filter(repeat_records_ready_to_send)) | ||
return ( | ||
self.get_queryset() | ||
.filter(not_paused) | ||
.filter(next_attempt_not_in_the_future) | ||
.filter(repeat_records_ready_to_send) | ||
) | ||
|
||
def get_all_ready_ids_by_domain(self): | ||
results = defaultdict(list) | ||
query = self.all_ready().values_list('domain', 'id') | ||
for (domain, id_uuid) in query.all(): | ||
results[domain].append(id_uuid.hex) | ||
return results | ||
|
||
def get_queryset(self): | ||
repeater_obj = self.model() | ||
|
@@ -275,6 +285,7 @@ class Repeater(RepeaterSuperProxy): | |
is_paused = models.BooleanField(default=False) | ||
next_attempt_at = models.DateTimeField(null=True, blank=True) | ||
last_attempt_at = models.DateTimeField(null=True, blank=True) | ||
max_workers = models.IntegerField(default=0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of spinning this feature off in a separate PR (if we decided it's needed)? We have a lot of knobs to turn already, and I wonder if we'll need this one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We needed this one years ago. OpenMRS integrations involve Events and Observations, and it is really useful if we can send repeat records in the order in which their forms were submitted. More generally, setting And setting it to [all of the workers] is an insurance policy for this PR to ensure that we can still handle the highest volume for repeaters that need it. I'd sleep easier if this is included. |
||
options = JSONField(default=dict) | ||
connection_settings_id = models.IntegerField(db_index=True) | ||
is_deleted = models.BooleanField(default=False, db_index=True) | ||
|
@@ -286,6 +297,13 @@ class Repeater(RepeaterSuperProxy): | |
|
||
class Meta: | ||
db_table = 'repeaters_repeater' | ||
indexes = [ | ||
models.Index( | ||
fields=['next_attempt_at'], | ||
condition=models.Q(("is_deleted", False), ("is_paused", False)), | ||
name='next_attempt_at_partial_idx', | ||
), | ||
] | ||
|
||
payload_generator_classes = () | ||
|
||
|
@@ -365,9 +383,24 @@ def _repeater_type(cls): | |
|
||
@property | ||
def repeat_records_ready(self): | ||
return self.repeat_records.filter(state__in=(State.Pending, State.Fail)) | ||
""" | ||
A QuerySet of repeat records in the Pending or Fail state in the | ||
order in which they were registered | ||
""" | ||
return ( | ||
self.repeat_records | ||
.filter(state__in=(State.Pending, State.Fail)) | ||
.order_by('registered_at') | ||
) | ||
|
||
def set_next_attempt(self): | ||
@property | ||
def num_workers(self): | ||
# If num_workers is 1, repeat records are sent in the order in | ||
# which they were registered. | ||
num_workers = self.max_workers or settings.DEFAULT_REPEATER_WORKERS | ||
return min(num_workers, settings.MAX_REPEATER_WORKERS) | ||
|
||
def set_backoff(self): | ||
now = datetime.utcnow() | ||
interval = _get_retry_interval(self.last_attempt_at, now) | ||
self.last_attempt_at = now | ||
|
@@ -380,8 +413,12 @@ def set_next_attempt(self): | |
next_attempt_at=now + interval, | ||
) | ||
|
||
def reset_next_attempt(self): | ||
def reset_backoff(self): | ||
if self.last_attempt_at or self.next_attempt_at: | ||
# `_get_retry_interval()` implements exponential backoff by | ||
# multiplying the previous interval by 3. Set last_attempt_at | ||
# to None so that the next time we need to back off, we | ||
# know it is the first interval. | ||
self.last_attempt_at = None | ||
kaapstorm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.next_attempt_at = None | ||
# Avoid a possible race condition with self.pause(), etc. | ||
|
@@ -991,11 +1028,17 @@ def get_repeat_record_ids(self, domain, repeater_id=None, state=None, payload_id | |
class RepeatRecord(models.Model): | ||
domain = models.CharField(max_length=126) | ||
payload_id = models.CharField(max_length=255) | ||
repeater = models.ForeignKey(Repeater, | ||
on_delete=DB_CASCADE, | ||
db_column="repeater_id_", | ||
related_name='repeat_records') | ||
state = models.PositiveSmallIntegerField(choices=State.choices, default=State.Pending) | ||
repeater = models.ForeignKey( | ||
Repeater, | ||
on_delete=DB_CASCADE, | ||
db_column="repeater_id_", | ||
related_name='repeat_records', | ||
) | ||
state = models.PositiveSmallIntegerField( | ||
choices=State.choices, | ||
default=State.Pending, | ||
db_index=True, | ||
) | ||
registered_at = models.DateTimeField() | ||
next_check = models.DateTimeField(null=True, default=None) | ||
max_possible_tries = models.IntegerField(default=MAX_BACKOFF_ATTEMPTS) | ||
|
@@ -1175,7 +1218,8 @@ def fire(self, force_send=False, timing_context=None): | |
self.repeater.fire_for_record(self, timing_context=timing_context) | ||
except Exception as e: | ||
self.handle_payload_error(str(e), traceback_str=traceback.format_exc()) | ||
raise | ||
return self.state | ||
return None | ||
|
||
def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False): | ||
from corehq.motech.repeaters.tasks import ( | ||
|
@@ -1185,6 +1229,19 @@ def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False): | |
retry_process_datasource_repeat_record, | ||
) | ||
|
||
def is_new_synchronous_case_repeater_record(): | ||
""" | ||
Repeat record is a new record for a synchronous case repeater | ||
See corehq.motech.repeaters.signals.fire_synchronous_case_repeaters | ||
""" | ||
return fire_synchronously and self.state == State.Pending | ||
|
||
if ( | ||
toggles.PROCESS_REPEATERS.enabled(self.domain, toggles.NAMESPACE_DOMAIN) | ||
and not is_new_synchronous_case_repeater_record() | ||
): | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider the scenario where this toggle is enabled and then later disabled. With There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If by "gracefully" you mean "will all repeat records be processed when the toggle is disabled?", then yes. But if by "gracefully" you mean "will Datadog show the repeat records as not overdue?", then no. Datadog will show the repeat records as overdue. Alternatively, when we apply a back-off to a repeater, we could also update all its pending and failed repeat records too. I considered this, but it felt like a lot of churn on the repeat records table. I thought a better approach would be to use a new metric for Datadog to gauge when the repeat records queue is getting backed up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
if self.next_check is None or self.next_check > datetime.utcnow(): | ||
return | ||
|
||
|
@@ -1337,7 +1394,26 @@ def is_response(duck): | |
|
||
|
||
def domain_can_forward(domain): | ||
""" | ||
Returns whether ``domain`` has data forwarding or Zapier integration | ||
privileges. | ||
|
||
Used for determining whether to register a repeat record. | ||
""" | ||
return domain and ( | ||
domain_has_privilege(domain, ZAPIER_INTEGRATION) | ||
or domain_has_privilege(domain, DATA_FORWARDING) | ||
) | ||
|
||
|
||
def domain_can_forward_now(domain): | ||
millerdev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Returns ``True`` if ``domain`` has the requisite privileges and data | ||
forwarding is not paused. | ||
|
||
Used for determining whether to send a repeat record now. | ||
""" | ||
return ( | ||
domain_can_forward(domain) | ||
and not toggles.PAUSE_DATA_FORWARDING.enabled(domain) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I analyzed the query used by
Repeater.objects.get_all_ready_ids_by_domain()
, and this is the result on Staging:The "next_attempt_at_partial_idx" was chosen to match the filter in that query plan, but Postgres is not using the index.
Should we drop it, change it, or leave it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much data is present on staging? If not much then those two indexes may have had very similar outcomes and the choice of which to use may not have been very meaningful.
I'd check the same or a similar query on prod to see if you get the same query plan.
Do we need both indexes?Would it break other queries if we added the partial conditionNOT "is_deleted" AND NOT "is_paused"
onrepeaters_repeatrecord_state_8055083b
and eliminated the?next_attempt_at_partial_idx
indexEdit: realized that those indexes are on separate tables, so the question above did not make sense. Crossed out some bits, but I think the question of whether to add a condition to the repeatrecord index still makes sense.
Edit 2: I guess maybe that does not make sense since
is_deleted
andis_paused
are not columns on therepeatrecord
table. I think the gist of my question is whether there is a way to only index rows on that table that are active and eliminate the many millions of rows that will never be touched again?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"repeaters_repeater"."next_attempt_at" IS NULL OR "repeaters_repeater"."next_attempt_at" <= '2024-10-19 22:11:36.310082'
This condition could be an issue. The NULL and the range check can't be accomplished with a single index operation which may be why it's not using the index.
You could try split the query into two queries and doing a union of the results. It seems counter intuitive but if the indexes get used the performance will be better:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. It uses the index.
Out of curiosity, I'll try a composite index and see what difference it makes. Either way, this feels like a big step in the right direction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a composite index, planning time is 2.5 ms faster, but execution time is 363 ms slower. The partial index wins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found some changes that made a significant difference. efc4dde
The details are in the commit message:
I've dropped the
next_attempt_at_partial_idx
because of that, and changed the indexes forRepeater.state
andRepeater.is_deleted
to be partial.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
Parallel Seq Scan on repeaters_repeatrecord
is going to be a problem if that query plan is chosen on prod. In comparison,Index Scan on repeaters_repeater_...
is inconsequential because there are only ~2000 (tiny number) repeaters whereas there are millions of repeat records.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to close the loop here: I tested the RepeatRecord.state index on Prod, and Postgres uses it for both subqueries. I tested on Friday afternoon, and the query took around 27 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the benefits of this PR are big enough for us to iterate on step-wise improvements after it is merged. One improvement I think we should consider is replacing the
repeaters_repeatrecord
table with nested partitioning / subpartitioning. Something like:This will improve queries on repeat records ready to be sent, and also allow us to drop completed repeat records after some expiry (TBD).