Skip to content

Commit

Permalink
fix(tasks): Add autoretry for too many simultaneous queries error in …
Browse files Browse the repository at this point in the history
…some tasks (#21895)

* Add autoretry for too many simultaneous async queries

Fixes POSTHOG-DNR

* Use for update_cache

Fixes POSTHOG-DNR

* Use for export_asset

Fixes POSTHOG-TMV
  • Loading branch information
webjunkie authored Apr 29, 2024
1 parent ebd336f commit 304540f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
6 changes: 6 additions & 0 deletions ee/clickhouse/test/test_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
"Query exceeds memory limits. Try reducing its scope by changing the time range.",
241,
),
(
ServerException("Too many simultaneous queries. Maximum: 100.", code=202),
"CHQueryErrorTooManySimultaneousQueries",
"Code: 202.\nToo many simultaneous queries. Try again later.",
202,
),
],
)
def test_wrap_query_error(error, expected_type, expected_message, expected_code):
Expand Down
17 changes: 17 additions & 0 deletions posthog/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def wrap_query_error(err: Exception) -> Exception:
# :TRICKY: Return a custom class for every code by looking up the short name and creating a class dynamically.
if hasattr(err, "code"):
meta = look_up_error_code_meta(err)

if meta.name in CLICKHOUSE_SPECIFIC_ERROR_LOOKUP:
return CLICKHOUSE_SPECIFIC_ERROR_LOOKUP[meta.name]

name = f"CHQueryError{meta.name.replace('_', ' ').title().replace(' ', '')}"
processed_error_class = ExposedCHQueryError if meta.user_safe else InternalCHQueryError
message = meta.user_safe if isinstance(meta.user_safe, str) else err.message
Expand All @@ -71,6 +75,19 @@ def look_up_error_code_meta(error: ServerException) -> ErrorCodeMeta:
return CLICKHOUSE_ERROR_CODE_LOOKUP[code]


# Specific error classes we need
# These exist here and are not dynamically created because they are used in the codebase.
class CHQueryErrorTooManySimultaneousQueries(InternalCHQueryError):
pass


CLICKHOUSE_SPECIFIC_ERROR_LOOKUP = {
"TOO_MANY_SIMULTANEOUS_QUERIES": CHQueryErrorTooManySimultaneousQueries(
"Too many simultaneous queries. Try again later.", code=202
),
}


#
# From https://github.com/ClickHouse/ClickHouse/blob/23.12/src/Common/ErrorCodes.cpp#L16-L622
#
Expand Down
5 changes: 5 additions & 0 deletions posthog/tasks/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.db import transaction

from posthog import settings
from posthog.errors import CHQueryErrorTooManySimultaneousQueries
from posthog.models import ExportedAsset
from posthog.tasks.utils import CeleryQueue

Expand Down Expand Up @@ -43,6 +44,10 @@
ignore_result=False,
time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS,
queue=CeleryQueue.EXPORTS.value,
autoretry_for=(CHQueryErrorTooManySimultaneousQueries,),
retry_backoff=1,
retry_backoff_max=3,
max_retries=3,
)
@transaction.atomic
def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None:
Expand Down
23 changes: 21 additions & 2 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prometheus_client import Gauge

from posthog.cloud_utils import is_cloud
from posthog.errors import CHQueryErrorTooManySimultaneousQueries
from posthog.hogql.constants import LimitContext
from posthog.metrics import pushed_metrics_registry
from posthog.ph_client import get_ph_client
Expand All @@ -32,7 +33,18 @@ def redis_heartbeat() -> None:
get_client().set("POSTHOG_HEARTBEAT", int(time.time()))


@shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES.value, acks_late=True)
@shared_task(
ignore_result=True,
queue=CeleryQueue.ANALYTICS_QUERIES.value,
acks_late=True,
autoretry_for=(
# Important: Only retry for things that might be okay on the next try
CHQueryErrorTooManySimultaneousQueries,
),
retry_backoff=1,
retry_backoff_max=2,
max_retries=3,
)
def process_query_task(
team_id: int,
user_id: int,
Expand Down Expand Up @@ -550,7 +562,14 @@ def schedule_cache_updates_task() -> None:
schedule_cache_updates()


@shared_task(ignore_result=True)
@shared_task(
ignore_result=True,
autoretry_for=(CHQueryErrorTooManySimultaneousQueries,),
retry_backoff=10,
retry_backoff_max=30,
max_retries=3,
retry_jitter=True,
)
def update_cache_task(caching_state_id: UUID) -> None:
from posthog.caching.insight_cache import update_cache

Expand Down

0 comments on commit 304540f

Please sign in to comment.