From c61626a6dec4d09cf17f06938c8d97488838886d Mon Sep 17 00:00:00 2001 From: Julian Bez Date: Fri, 26 Apr 2024 14:48:09 +0200 Subject: [PATCH 1/3] Add autoretry for too many simultaneous async queries Fixes POSTHOG-DNR --- ee/clickhouse/test/test_error.py | 6 ++++++ posthog/errors.py | 17 +++++++++++++++++ posthog/tasks/tasks.py | 14 +++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/ee/clickhouse/test/test_error.py b/ee/clickhouse/test/test_error.py index 3e1f76a2a17bb..983e37b14540d 100644 --- a/ee/clickhouse/test/test_error.py +++ b/ee/clickhouse/test/test_error.py @@ -35,6 +35,12 @@ "Query exceeds memory limits. Try reducing its scope by changing the time range.", 241, ), + ( + ServerException("Too many simultaneous queries. Maximum: 100.", code=202), + "CHQueryErrorTooManySimultaneousQueries", + "Code: 202.\nToo many simultaneous queries. Try again later.", + 202, + ), ], ) def test_wrap_query_error(error, expected_type, expected_message, expected_code): diff --git a/posthog/errors.py b/posthog/errors.py index 70b3d46dd3c31..cdade09b0f127 100644 --- a/posthog/errors.py +++ b/posthog/errors.py @@ -57,6 +57,10 @@ def wrap_query_error(err: Exception) -> Exception: # :TRICKY: Return a custom class for every code by looking up the short name and creating a class dynamically. if hasattr(err, "code"): meta = look_up_error_code_meta(err) + + if meta.name in CLICKHOUSE_SPECIFIC_ERROR_LOOKUP: + return CLICKHOUSE_SPECIFIC_ERROR_LOOKUP[meta.name] + name = f"CHQueryError{meta.name.replace('_', ' ').title().replace(' ', '')}" processed_error_class = ExposedCHQueryError if meta.user_safe else InternalCHQueryError message = meta.user_safe if isinstance(meta.user_safe, str) else err.message @@ -71,6 +75,19 @@ def look_up_error_code_meta(error: ServerException) -> ErrorCodeMeta: return CLICKHOUSE_ERROR_CODE_LOOKUP[code] +# Specific error classes we need +# These exist here and are not dynamically created because they are used in the codebase. +class CHQueryErrorTooManySimultaneousQueries(InternalCHQueryError): + pass + + +CLICKHOUSE_SPECIFIC_ERROR_LOOKUP = { + "TOO_MANY_SIMULTANEOUS_QUERIES": CHQueryErrorTooManySimultaneousQueries( + "Too many simultaneous queries. Try again later.", code=202 + ), +} + + # # From https://github.com/ClickHouse/ClickHouse/blob/23.12/src/Common/ErrorCodes.cpp#L16-L622 # diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 0a52b02d35590..c5e3ac62da0fb 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -9,6 +9,7 @@ from prometheus_client import Gauge from posthog.cloud_utils import is_cloud +from posthog.errors import CHQueryErrorTooManySimultaneousQueries from posthog.hogql.constants import LimitContext from posthog.metrics import pushed_metrics_registry from posthog.ph_client import get_ph_client @@ -32,7 +33,18 @@ def redis_heartbeat() -> None: get_client().set("POSTHOG_HEARTBEAT", int(time.time())) -@shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES.value, acks_late=True) +@shared_task( + ignore_result=True, + queue=CeleryQueue.ANALYTICS_QUERIES.value, + acks_late=True, + autoretry_for=( + # Important: Only retry for things that might be okay on the next try + CHQueryErrorTooManySimultaneousQueries, + ), + retry_backoff=1, + retry_backoff_max=2, + max_retries=3, +) def process_query_task( team_id: int, user_id: int, From 4ab4a526c2ea466e6b439c3cebf2b697401c1c1c Mon Sep 17 00:00:00 2001 From: Julian Bez Date: Fri, 26 Apr 2024 14:50:50 +0200 Subject: [PATCH 2/3] Use for update_cache Fixes POSTHOG-DNR --- posthog/tasks/tasks.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index c5e3ac62da0fb..5445b4aaded1e 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -562,7 +562,14 @@ def schedule_cache_updates_task() -> None: schedule_cache_updates() -@shared_task(ignore_result=True) +@shared_task( + ignore_result=True, + autoretry_for=(CHQueryErrorTooManySimultaneousQueries,), + retry_backoff=10, + retry_backoff_max=30, + max_retries=3, + retry_jitter=True, +) def update_cache_task(caching_state_id: UUID) -> None: from posthog.caching.insight_cache import update_cache From 99d0352e4c37b7fe07f604306c7ee898593e1d8e Mon Sep 17 00:00:00 2001 From: Julian Bez Date: Fri, 26 Apr 2024 14:51:10 +0200 Subject: [PATCH 3/3] Use for export_asset POSTHOG-TMV --- posthog/tasks/exporter.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index 8ab5b392cf0d6..6d7ee8ab2cd98 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -6,6 +6,7 @@ from django.db import transaction from posthog import settings +from posthog.errors import CHQueryErrorTooManySimultaneousQueries from posthog.models import ExportedAsset from posthog.tasks.utils import CeleryQueue @@ -43,6 +44,10 @@ ignore_result=False, time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS, queue=CeleryQueue.EXPORTS.value, + autoretry_for=(CHQueryErrorTooManySimultaneousQueries,), + retry_backoff=1, + retry_backoff_max=3, + max_retries=3, ) @transaction.atomic def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None: