Skip to content

Commit

Permalink
Merge pull request #35012 from dimagi/gh/repeaters/rate-limit-attempt…
Browse files Browse the repository at this point in the history
…s-off-of-wait-time

Rework repeater rate limiting by attempts logic
  • Loading branch information
gherceg authored Aug 21, 2024
2 parents d04ca4c + a703e28 commit 07042a4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
15 changes: 8 additions & 7 deletions corehq/motech/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ def _get_per_user_repeater_wait_milliseconds_rate_definition(domain):
@run_only_when(SHOULD_RATE_LIMIT_REPEATERS)
@silence_and_report_error("Exception raised in the repeater rate limiter",
'commcare.repeaters.rate_limiter_errors')
def rate_limit_repeater(domain):
is_domain_allowed_usage = repeater_rate_limiter.allow_usage(domain)
if RATE_LIMIT_REPEATER_ATTEMPTS.enabled(domain, namespace=NAMESPACE_DOMAIN):
is_domain_allowed_usage = is_domain_allowed_usage and repeater_attempts_rate_limiter.allow_usage(domain)
def rate_limit_repeater(domain, repeater_id):
limit_attempts = RATE_LIMIT_REPEATER_ATTEMPTS.enabled(domain, namespace=NAMESPACE_DOMAIN)
is_under_attempt_limit = repeater_attempts_rate_limiter.allow_usage(repeater_id) if limit_attempts else True

if global_repeater_rate_limiter.allow_usage() or is_domain_allowed_usage:
if global_repeater_rate_limiter.allow_usage() and is_under_attempt_limit:
allow_usage = True
elif repeater_rate_limiter.allow_usage(domain):
allow_usage = True
elif not RATE_LIMIT_REPEATERS.enabled(domain, namespace=NAMESPACE_DOMAIN):
allow_usage = True
Expand Down Expand Up @@ -109,8 +110,8 @@ def report_repeater_usage(domain, milliseconds):
@run_only_when(SHOULD_RATE_LIMIT_REPEATERS)
@silence_and_report_error("Exception raised reporting usage to the repeater attempt rate limiter",
'commcare.repeaters.report_usage_errors')
def report_repeater_attempt(domain):
repeater_attempts_rate_limiter.report_usage(domain)
def report_repeater_attempt(repeater_id):
repeater_attempts_rate_limiter.report_usage(repeater_id)


@quickcache([], timeout=60) # Only report up to once a minute
Expand Down
31 changes: 16 additions & 15 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def retry_process_repeat_record(repeat_record_id, domain):


def _process_repeat_record(repeat_record):
request_duration = None
request_duration = action = None
with TimingContext('process_repeat_record') as timer:
if repeat_record.state == State.Cancelled:
return
Expand All @@ -163,13 +163,13 @@ def _process_repeat_record(repeat_record):
# clogging the queue
repeat_record.postpone_by(MAX_RETRY_WAIT)
action = 'paused'
elif rate_limit_repeater(repeat_record.domain):
elif rate_limit_repeater(repeat_record.domain, repeat_record.repeater.repeater_id):
# Spread retries evenly over the range defined by RATE_LIMITER_DELAY_RANGE
# with the intent of avoiding clumping and spreading load
repeat_record.postpone_by(random.uniform(*RATE_LIMITER_DELAY_RANGE))
action = 'rate_limited'
elif repeat_record.is_queued():
report_repeater_attempt(repeat_record.domain)
report_repeater_attempt(repeat_record.repeater.repeater_id)
with timer('fire_timing') as fire_timer:
repeat_record.fire(timing_context=fire_timer)
# round up to the nearest millisecond, meaning always at least 1ms
Expand All @@ -182,18 +182,19 @@ def _process_repeat_record(repeat_record):
logging.exception('Failed to process repeat record: {}'.format(repeat_record.id))
return

processing_time = timer.duration - request_duration if request_duration else timer.duration
metrics_histogram(
'commcare.repeaters.repeat_record_processing.timing',
processing_time * 1000,
buckets=(100, 500, 1000, 5000),
bucket_tag='duration',
bucket_unit='ms',
tags={
'domain': repeat_record.domain,
'action': action,
},
)
if action:
processing_time = timer.duration - request_duration if request_duration else timer.duration
metrics_histogram(
'commcare.repeaters.repeat_record_processing.timing',
processing_time * 1000,
buckets=(100, 500, 1000, 5000),
bucket_tag='duration',
bucket_unit='ms',
tags={
'domain': repeat_record.domain,
'action': action,
},
)


metrics_gauge_task(
Expand Down

0 comments on commit 07042a4

Please sign in to comment.