Skip to content

Commit

Permalink
chore: move billing celery job to use offline cluster and increase ti…
Browse files Browse the repository at this point in the history
…meout from 2 minutes to 5 min (#17656)

* chore: move billing celery job to use offline cluster and increase timeout from 2 minutes to 5 min

* add retry logic

* add types for retry
  • Loading branch information
fuziontech authored Sep 27, 2023
1 parent abcfdab commit 3e13484
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 9 deletions.
8 changes: 1 addition & 7 deletions posthog/clickhouse/client/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@

@lru_cache(maxsize=1)
def default_settings() -> Dict:
# On CH 22.3 we need to disable optimize_move_to_prewhere due to a bug. This is verified fixed on 22.8 (LTS),
# so we only disable on versions below that.
# This is calculated once per deploy
if clickhouse_at_least_228():
return {"join_algorithm": "direct,parallel_hash", "distributed_replica_max_ignored_errors": 1000}
else:
return {"optimize_move_to_prewhere": 0}
return {"join_algorithm": "direct,parallel_hash", "distributed_replica_max_ignored_errors": 1000}


@lru_cache(maxsize=1)
Expand Down
42 changes: 40 additions & 2 deletions posthog/tasks/usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

import requests
from retry import retry
import structlog
from dateutil import parser
from django.conf import settings
Expand All @@ -27,6 +28,7 @@

from posthog import version_requirement
from posthog.celery import app
from posthog.clickhouse.client.connection import Workload
from posthog.client import sync_execute
from posthog.cloud_utils import get_cached_instance_license, is_cloud
from posthog.constants import FlagRequestType
Expand All @@ -47,6 +49,15 @@
TableSizes = TypedDict("TableSizes", {"posthog_event": int, "posthog_sessionrecordingevent": int})


CH_BILLING_SETTINGS = {
"max_execution_time": 5 * 60, # 5 minutes
}

QUERY_RETRIES = 3
QUERY_RETRY_DELAY = 1
QUERY_RETRY_BACKOFF = 2


@dataclasses.dataclass
class UsageReportCounters:
event_count_lifetime: int
Expand Down Expand Up @@ -314,18 +325,22 @@ def capture_event(


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_event_count_lifetime() -> List[Tuple[int, int]]:
result = sync_execute(
"""
SELECT team_id, count(1) as count
FROM events
GROUP BY team_id
"""
""",
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return result


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_billable_event_count_in_period(
begin: datetime, end: datetime, count_distinct: bool = False
) -> List[Tuple[int, int]]:
Expand All @@ -348,11 +363,14 @@ def get_teams_with_billable_event_count_in_period(
GROUP BY team_id
""",
{"begin": begin, "end": end},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return result


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_event_count_with_groups_in_period(begin: datetime, end: datetime) -> List[Tuple[int, int]]:
result = sync_execute(
"""
Expand All @@ -363,11 +381,14 @@ def get_teams_with_event_count_with_groups_in_period(begin: datetime, end: datet
GROUP BY team_id
""",
{"begin": begin, "end": end},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return result


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_event_count_by_lib(begin: datetime, end: datetime) -> List[Tuple[int, str, int]]:
results = sync_execute(
"""
Expand All @@ -377,11 +398,14 @@ def get_teams_with_event_count_by_lib(begin: datetime, end: datetime) -> List[Tu
GROUP BY lib, team_id
""",
{"begin": begin, "end": end},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return results


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_event_count_by_name(begin: datetime, end: datetime) -> List[Tuple[int, str, int]]:
results = sync_execute(
"""
Expand All @@ -391,11 +415,14 @@ def get_teams_with_event_count_by_name(begin: datetime, end: datetime) -> List[T
GROUP BY event, team_id
""",
{"begin": begin, "end": end},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return results


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_recording_count_in_period(begin: datetime, end: datetime) -> List[Tuple[int, int]]:
previous_begin = begin - (end - begin)

Expand All @@ -418,24 +445,30 @@ def get_teams_with_recording_count_in_period(begin: datetime, end: datetime) ->
GROUP BY team_id
""",
{"previous_begin": previous_begin, "begin": begin, "end": end},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)

return result


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_recording_count_total() -> List[Tuple[int, int]]:
result = sync_execute(
"""
SELECT team_id, count(distinct session_id) as count
FROM session_replay_events
GROUP BY team_id
"""
""",
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return result


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_hogql_metric(
begin: datetime,
end: datetime,
Expand All @@ -461,11 +494,14 @@ def get_teams_with_hogql_metric(
GROUP BY team_id
""",
{"begin": begin, "end": end, "query_types": query_types, "access_method": access_method},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)
return result


@timed_log()
@retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF)
def get_teams_with_feature_flag_requests_count_in_period(
begin: datetime, end: datetime, request_type: FlagRequestType
) -> List[Tuple[int, int]]:
Expand All @@ -490,6 +526,8 @@ def get_teams_with_feature_flag_requests_count_in_period(
"validity_token": validity_token,
"target_event": target_event,
},
workload=Workload.OFFLINE,
settings=CH_BILLING_SETTINGS,
)

return result
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ types-freezegun==1.1.10
types-python-dateutil>=2.8.3
types-pytz==2021.3.2
types-redis==4.3.20
types-retry==0.9.9.4
types-requests==2.26.1
parameterized==0.9.0
pytest==7.4.0
Expand Down
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ types-redis==4.3.20
# via -r requirements-dev.in
types-requests==2.26.1
# via -r requirements-dev.in
types-retry==0.9.9.4
# via -r requirements-dev.in
typing-extensions==4.7.1
# via
# -c requirements.txt
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ python-dateutil>=2.8.2
python3-saml==1.12.0
pytz==2021.1
redis==4.5.4
retry==0.9.2
requests==2.28.1
requests-oauthlib==1.3.0
selenium==4.1.5
Expand Down
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ cssselect==1.1.0
# via toronado
cssutils==1.0.2
# via toronado
decorator==5.1.1
# via retry
defusedxml==0.6.0
# via
# -r requirements.in
Expand Down Expand Up @@ -327,6 +329,8 @@ psycopg2-binary==2.9.7
# via -r requirements.in
ptyprocess==0.6.0
# via pexpect
py==1.11.0
# via retry
pyarrow==12.0.1
# via -r requirements.in
pyasn1==0.5.0
Expand Down Expand Up @@ -411,6 +415,8 @@ requests-oauthlib==1.3.0
# via
# -r requirements.in
# social-auth-core
retry==0.9.2
# via -r requirements.in
rsa==4.9
# via google-auth
ruamel-yaml==0.17.21
Expand Down

0 comments on commit 3e13484

Please sign in to comment.