Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tasks): Add autoretry for too many simultaneous queries error in some tasks #21895

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ee/clickhouse/test/test_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
"Query exceeds memory limits. Try reducing its scope by changing the time range.",
241,
),
(
ServerException("Too many simultaneous queries. Maximum: 100.", code=202),
"CHQueryErrorTooManySimultaneousQueries",
"Code: 202.\nToo many simultaneous queries. Try again later.",
202,
),
],
)
def test_wrap_query_error(error, expected_type, expected_message, expected_code):
Expand Down
17 changes: 17 additions & 0 deletions posthog/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def wrap_query_error(err: Exception) -> Exception:
# :TRICKY: Return a custom class for every code by looking up the short name and creating a class dynamically.
if hasattr(err, "code"):
meta = look_up_error_code_meta(err)

if meta.name in CLICKHOUSE_SPECIFIC_ERROR_LOOKUP:
return CLICKHOUSE_SPECIFIC_ERROR_LOOKUP[meta.name]

name = f"CHQueryError{meta.name.replace('_', ' ').title().replace(' ', '')}"
processed_error_class = ExposedCHQueryError if meta.user_safe else InternalCHQueryError
message = meta.user_safe if isinstance(meta.user_safe, str) else err.message
Expand All @@ -71,6 +75,19 @@ def look_up_error_code_meta(error: ServerException) -> ErrorCodeMeta:
return CLICKHOUSE_ERROR_CODE_LOOKUP[code]


# Specific error classes we need
# These exist here and are not dynamically created because they are used in the codebase.
class CHQueryErrorTooManySimultaneousQueries(InternalCHQueryError):
pass


CLICKHOUSE_SPECIFIC_ERROR_LOOKUP = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be good to move EstimatedQueryExecutionTimeTooLong and into this QuerySizeExceeded, in a separate PR

"TOO_MANY_SIMULTANEOUS_QUERIES": CHQueryErrorTooManySimultaneousQueries(
"Too many simultaneous queries. Try again later.", code=202
),
}


#
# From https://github.com/ClickHouse/ClickHouse/blob/23.12/src/Common/ErrorCodes.cpp#L16-L622
#
Expand Down
5 changes: 5 additions & 0 deletions posthog/tasks/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.db import transaction

from posthog import settings
from posthog.errors import CHQueryErrorTooManySimultaneousQueries
from posthog.models import ExportedAsset
from posthog.tasks.utils import CeleryQueue

Expand Down Expand Up @@ -43,6 +44,10 @@
ignore_result=False,
time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS,
queue=CeleryQueue.EXPORTS.value,
autoretry_for=(CHQueryErrorTooManySimultaneousQueries,),
retry_backoff=1,
retry_backoff_max=3,
max_retries=3,
)
@transaction.atomic
def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None:
Expand Down
23 changes: 21 additions & 2 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from prometheus_client import Gauge

from posthog.cloud_utils import is_cloud
from posthog.errors import CHQueryErrorTooManySimultaneousQueries
from posthog.hogql.constants import LimitContext
from posthog.metrics import pushed_metrics_registry
from posthog.ph_client import get_ph_client
Expand All @@ -32,7 +33,18 @@ def redis_heartbeat() -> None:
get_client().set("POSTHOG_HEARTBEAT", int(time.time()))


@shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES.value, acks_late=True)
@shared_task(
ignore_result=True,
queue=CeleryQueue.ANALYTICS_QUERIES.value,
acks_late=True,
autoretry_for=(
# Important: Only retry for things that might be okay on the next try
CHQueryErrorTooManySimultaneousQueries,
),
retry_backoff=1,
retry_backoff_max=2,
max_retries=3,
)
def process_query_task(
team_id: int,
user_id: int,
Expand Down Expand Up @@ -550,7 +562,14 @@ def schedule_cache_updates_task() -> None:
schedule_cache_updates()


@shared_task(ignore_result=True)
@shared_task(
ignore_result=True,
autoretry_for=(CHQueryErrorTooManySimultaneousQueries,),
retry_backoff=10,
retry_backoff_max=30,
max_retries=3,
retry_jitter=True,
)
def update_cache_task(caching_state_id: UUID) -> None:
from posthog.caching.insight_cache import update_cache

Expand Down
Loading