diff --git a/ee/clickhouse/test/test_error.py b/ee/clickhouse/test/test_error.py index 3e1f76a2a17bb..983e37b14540d 100644 --- a/ee/clickhouse/test/test_error.py +++ b/ee/clickhouse/test/test_error.py @@ -35,6 +35,12 @@ "Query exceeds memory limits. Try reducing its scope by changing the time range.", 241, ), + ( + ServerException("Too many simultaneous queries. Maximum: 100.", code=202), + "CHQueryErrorTooManySimultaneousQueries", + "Code: 202.\nToo many simultaneous queries. Try again later.", + 202, + ), ], ) def test_wrap_query_error(error, expected_type, expected_message, expected_code): diff --git a/posthog/errors.py b/posthog/errors.py index 70b3d46dd3c31..cdade09b0f127 100644 --- a/posthog/errors.py +++ b/posthog/errors.py @@ -57,6 +57,10 @@ def wrap_query_error(err: Exception) -> Exception: # :TRICKY: Return a custom class for every code by looking up the short name and creating a class dynamically. if hasattr(err, "code"): meta = look_up_error_code_meta(err) + + if meta.name in CLICKHOUSE_SPECIFIC_ERROR_LOOKUP: + return CLICKHOUSE_SPECIFIC_ERROR_LOOKUP[meta.name] + name = f"CHQueryError{meta.name.replace('_', ' ').title().replace(' ', '')}" processed_error_class = ExposedCHQueryError if meta.user_safe else InternalCHQueryError message = meta.user_safe if isinstance(meta.user_safe, str) else err.message @@ -71,6 +75,19 @@ def look_up_error_code_meta(error: ServerException) -> ErrorCodeMeta: return CLICKHOUSE_ERROR_CODE_LOOKUP[code] +# Specific error classes we need +# These exist here and are not dynamically created because they are used in the codebase. +class CHQueryErrorTooManySimultaneousQueries(InternalCHQueryError): + pass + + +CLICKHOUSE_SPECIFIC_ERROR_LOOKUP = { + "TOO_MANY_SIMULTANEOUS_QUERIES": CHQueryErrorTooManySimultaneousQueries( + "Too many simultaneous queries. Try again later.", code=202 + ), +} + + # # From https://github.com/ClickHouse/ClickHouse/blob/23.12/src/Common/ErrorCodes.cpp#L16-L622 # diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index 8ab5b392cf0d6..6d7ee8ab2cd98 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -6,6 +6,7 @@ from django.db import transaction from posthog import settings +from posthog.errors import CHQueryErrorTooManySimultaneousQueries from posthog.models import ExportedAsset from posthog.tasks.utils import CeleryQueue @@ -43,6 +44,10 @@ ignore_result=False, time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS, queue=CeleryQueue.EXPORTS.value, + autoretry_for=(CHQueryErrorTooManySimultaneousQueries,), + retry_backoff=1, + retry_backoff_max=3, + max_retries=3, ) @transaction.atomic def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None: diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 56c4909e79db3..9c183837b1289 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -9,6 +9,7 @@ from prometheus_client import Gauge from posthog.cloud_utils import is_cloud +from posthog.errors import CHQueryErrorTooManySimultaneousQueries from posthog.hogql.constants import LimitContext from posthog.metrics import pushed_metrics_registry from posthog.ph_client import get_ph_client @@ -32,7 +33,18 @@ def redis_heartbeat() -> None: get_client().set("POSTHOG_HEARTBEAT", int(time.time())) -@shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES.value, acks_late=True) +@shared_task( + ignore_result=True, + queue=CeleryQueue.ANALYTICS_QUERIES.value, + acks_late=True, + autoretry_for=( + # Important: Only retry for things that might be okay on the next try + CHQueryErrorTooManySimultaneousQueries, + ), + retry_backoff=1, + retry_backoff_max=2, + max_retries=3, +) def process_query_task( team_id: int, user_id: int, @@ -550,7 +562,14 @@ def schedule_cache_updates_task() -> None: schedule_cache_updates() -@shared_task(ignore_result=True) +@shared_task( + ignore_result=True, + autoretry_for=(CHQueryErrorTooManySimultaneousQueries,), + retry_backoff=10, + retry_backoff_max=30, + max_retries=3, + retry_jitter=True, +) def update_cache_task(caching_state_id: UUID) -> None: from posthog.caching.insight_cache import update_cache