From 3e13484de1b503d1962ad97ff76138b943f2b9f9 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 27 Sep 2023 14:32:52 -0700 Subject: [PATCH] chore: move billing celery job to use offline cluster and increase timeout from 2 minutes to 5 min (#17656) * chore: move billing celery job to use offline cluster and increase timeout from 2 minutes to 5 min * add retry logic * add types for retry --- posthog/clickhouse/client/execute.py | 8 +----- posthog/tasks/usage_report.py | 42 ++++++++++++++++++++++++++-- requirements-dev.in | 1 + requirements-dev.txt | 2 ++ requirements.in | 1 + requirements.txt | 6 ++++ 6 files changed, 51 insertions(+), 9 deletions(-) diff --git a/posthog/clickhouse/client/execute.py b/posthog/clickhouse/client/execute.py index f3796e78cf837..60cad345fcaa7 100644 --- a/posthog/clickhouse/client/execute.py +++ b/posthog/clickhouse/client/execute.py @@ -40,13 +40,7 @@ @lru_cache(maxsize=1) def default_settings() -> Dict: - # On CH 22.3 we need to disable optimize_move_to_prewhere due to a bug. This is verified fixed on 22.8 (LTS), - # so we only disable on versions below that. - # This is calculated once per deploy - if clickhouse_at_least_228(): - return {"join_algorithm": "direct,parallel_hash", "distributed_replica_max_ignored_errors": 1000} - else: - return {"optimize_move_to_prewhere": 0} + return {"join_algorithm": "direct,parallel_hash", "distributed_replica_max_ignored_errors": 1000} @lru_cache(maxsize=1) diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index 612213086629e..4627a95af6ab1 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -16,6 +16,7 @@ ) import requests +from retry import retry import structlog from dateutil import parser from django.conf import settings @@ -27,6 +28,7 @@ from posthog import version_requirement from posthog.celery import app +from posthog.clickhouse.client.connection import Workload from posthog.client import sync_execute from posthog.cloud_utils import get_cached_instance_license, is_cloud from posthog.constants import FlagRequestType @@ -47,6 +49,15 @@ TableSizes = TypedDict("TableSizes", {"posthog_event": int, "posthog_sessionrecordingevent": int}) +CH_BILLING_SETTINGS = { + "max_execution_time": 5 * 60, # 5 minutes +} + +QUERY_RETRIES = 3 +QUERY_RETRY_DELAY = 1 +QUERY_RETRY_BACKOFF = 2 + + @dataclasses.dataclass class UsageReportCounters: event_count_lifetime: int @@ -314,18 +325,22 @@ def capture_event( @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_event_count_lifetime() -> List[Tuple[int, int]]: result = sync_execute( """ SELECT team_id, count(1) as count FROM events GROUP BY team_id - """ + """, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_billable_event_count_in_period( begin: datetime, end: datetime, count_distinct: bool = False ) -> List[Tuple[int, int]]: @@ -348,11 +363,14 @@ def get_teams_with_billable_event_count_in_period( GROUP BY team_id """, {"begin": begin, "end": end}, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_event_count_with_groups_in_period(begin: datetime, end: datetime) -> List[Tuple[int, int]]: result = sync_execute( """ @@ -363,11 +381,14 @@ def get_teams_with_event_count_with_groups_in_period(begin: datetime, end: datet GROUP BY team_id """, {"begin": begin, "end": end}, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_event_count_by_lib(begin: datetime, end: datetime) -> List[Tuple[int, str, int]]: results = sync_execute( """ @@ -377,11 +398,14 @@ def get_teams_with_event_count_by_lib(begin: datetime, end: datetime) -> List[Tu GROUP BY lib, team_id """, {"begin": begin, "end": end}, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return results @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_event_count_by_name(begin: datetime, end: datetime) -> List[Tuple[int, str, int]]: results = sync_execute( """ @@ -391,11 +415,14 @@ def get_teams_with_event_count_by_name(begin: datetime, end: datetime) -> List[T GROUP BY event, team_id """, {"begin": begin, "end": end}, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return results @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_recording_count_in_period(begin: datetime, end: datetime) -> List[Tuple[int, int]]: previous_begin = begin - (end - begin) @@ -418,24 +445,30 @@ def get_teams_with_recording_count_in_period(begin: datetime, end: datetime) -> GROUP BY team_id """, {"previous_begin": previous_begin, "begin": begin, "end": end}, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_recording_count_total() -> List[Tuple[int, int]]: result = sync_execute( """ SELECT team_id, count(distinct session_id) as count FROM session_replay_events GROUP BY team_id - """ + """, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_hogql_metric( begin: datetime, end: datetime, @@ -461,11 +494,14 @@ def get_teams_with_hogql_metric( GROUP BY team_id """, {"begin": begin, "end": end, "query_types": query_types, "access_method": access_method}, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result @timed_log() +@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_teams_with_feature_flag_requests_count_in_period( begin: datetime, end: datetime, request_type: FlagRequestType ) -> List[Tuple[int, int]]: @@ -490,6 +526,8 @@ def get_teams_with_feature_flag_requests_count_in_period( "validity_token": validity_token, "target_event": target_event, }, + workload=Workload.OFFLINE, + settings=CH_BILLING_SETTINGS, ) return result diff --git a/requirements-dev.in b/requirements-dev.in index b21da42a9ee2e..5a761bd0f05ae 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -30,6 +30,7 @@ types-freezegun==1.1.10 types-python-dateutil>=2.8.3 types-pytz==2021.3.2 types-redis==4.3.20 +types-retry==0.9.9.4 types-requests==2.26.1 parameterized==0.9.0 pytest==7.4.0 diff --git a/requirements-dev.txt b/requirements-dev.txt index 2d93601fe05ee..f368953750bb4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -315,6 +315,8 @@ types-redis==4.3.20 # via -r requirements-dev.in types-requests==2.26.1 # via -r requirements-dev.in +types-retry==0.9.9.4 + # via -r requirements-dev.in typing-extensions==4.7.1 # via # -c requirements.txt diff --git a/requirements.in b/requirements.in index 206c4499ff804..5aae7e14356cd 100644 --- a/requirements.in +++ b/requirements.in @@ -63,6 +63,7 @@ python-dateutil>=2.8.2 python3-saml==1.12.0 pytz==2021.1 redis==4.5.4 +retry==0.9.2 requests==2.28.1 requests-oauthlib==1.3.0 selenium==4.1.5 diff --git a/requirements.txt b/requirements.txt index 8eb4c404ae6c1..cdaab88d2b262 100644 --- a/requirements.txt +++ b/requirements.txt @@ -99,6 +99,8 @@ cssselect==1.1.0 # via toronado cssutils==1.0.2 # via toronado +decorator==5.1.1 + # via retry defusedxml==0.6.0 # via # -r requirements.in @@ -327,6 +329,8 @@ psycopg2-binary==2.9.7 # via -r requirements.in ptyprocess==0.6.0 # via pexpect +py==1.11.0 + # via retry pyarrow==12.0.1 # via -r requirements.in pyasn1==0.5.0 @@ -411,6 +415,8 @@ requests-oauthlib==1.3.0 # via # -r requirements.in # social-auth-core +retry==0.9.2 + # via -r requirements.in rsa==4.9 # via google-auth ruamel-yaml==0.17.21