From f079205aebb8028923cc086a51f7bd935465840a Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Fri, 28 Jun 2024 22:42:43 +0100 Subject: [PATCH] fix: hit celery with a hammer (#23340) --- bin/celery-queues.env | 2 +- posthog/tasks/scheduled.py | 4 ++-- posthog/tasks/tasks.py | 42 +++++++++++++++++++------------------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/bin/celery-queues.env b/bin/celery-queues.env index e7d9a371a55cd..a38e784ab501b 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -2,4 +2,4 @@ # Important: Add new queues to make Celery consume tasks from them. # NOTE: Keep in sync with posthog/tasks/utils.py -CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings +CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings,session_replay_general,session_replay_persistence diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index 29f08c00756a4..cad1d049befee 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -47,7 +47,7 @@ update_quota_limiting, verify_persons_data_in_sync, update_survey_iteration, - invalid_web_replays, + replay_count_metrics, calculate_external_data_rows_synced, ) from posthog.utils import get_crontab @@ -223,7 +223,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: name="process scheduled changes", ) - add_periodic_task_with_expiry(sender, 3600, invalid_web_replays.s()) + add_periodic_task_with_expiry(sender, 3600, replay_count_metrics.s(), name="replay_count_metrics") if clear_clickhouse_crontab := get_crontab(settings.CLEAR_CLICKHOUSE_REMOVED_DATA_SCHEDULE_CRON): sender.add_periodic_task( diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 4959344ba5d82..6a592febe4dea 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -229,30 +229,30 @@ def ingestion_lag() -> None: @shared_task(ignore_result=True, queue=CeleryQueue.SESSION_REPLAY_GENERAL.value) -def invalid_web_replays() -> None: - logger.info("[invalid web replays] running task") +def replay_count_metrics() -> None: + try: + logger.info("[replay_count_metrics] running task") - from posthog.client import sync_execute + from posthog.client import sync_execute - # ultimately I want to observe values by team id, but at the moment that would be lots of series, let's reduce the value first - query = """ - select - --team_id, - count() as all_recordings, - countIf(snapshot_source == 'mobile') as mobile_recordings, - countIf(snapshot_source == 'web') as web_recordings, - countIf(snapshot_source =='web' and first_url is null) as invalid_web_recordings - from ( - select any(team_id) as team_id, argMinMerge(first_url) as first_url, argMinMerge(snapshot_source) as snapshot_source - from session_replay_events - where min_first_timestamp >= now() - interval 65 minute - and min_first_timestamp <= now() - interval 5 minute - group by session_id - ) - --group by team_id - """ + # ultimately I want to observe values by team id, but at the moment that would be lots of series, let's reduce the value first + query = """ + select + --team_id, + count() as all_recordings, + countIf(snapshot_source == 'mobile') as mobile_recordings, + countIf(snapshot_source == 'web') as web_recordings, + countIf(snapshot_source =='web' and first_url is null) as invalid_web_recordings + from ( + select any(team_id) as team_id, argMinMerge(first_url) as first_url, argMinMerge(snapshot_source) as snapshot_source + from session_replay_events + where min_first_timestamp >= now() - interval 65 minute + and min_first_timestamp <= now() - interval 5 minute + group by session_id + ) + --group by team_id + """ - try: results = sync_execute( query, )