Skip to content

Commit

Permalink
fix: hit celery with a hammer (#23340)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Jun 28, 2024
1 parent e201875 commit f079205
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings
CELERY_WORKER_QUEUES=celery,stats,email,analytics_queries,long_running,exports,subscription_delivery,usage_reports,session_replay_embeddings,session_replay_general,session_replay_persistence
4 changes: 2 additions & 2 deletions posthog/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
update_quota_limiting,
verify_persons_data_in_sync,
update_survey_iteration,
invalid_web_replays,
replay_count_metrics,
calculate_external_data_rows_synced,
)
from posthog.utils import get_crontab
Expand Down Expand Up @@ -223,7 +223,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:
name="process scheduled changes",
)

add_periodic_task_with_expiry(sender, 3600, invalid_web_replays.s())
add_periodic_task_with_expiry(sender, 3600, replay_count_metrics.s(), name="replay_count_metrics")

if clear_clickhouse_crontab := get_crontab(settings.CLEAR_CLICKHOUSE_REMOVED_DATA_SCHEDULE_CRON):
sender.add_periodic_task(
Expand Down
42 changes: 21 additions & 21 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,30 +229,30 @@ def ingestion_lag() -> None:


@shared_task(ignore_result=True, queue=CeleryQueue.SESSION_REPLAY_GENERAL.value)
def invalid_web_replays() -> None:
logger.info("[invalid web replays] running task")
def replay_count_metrics() -> None:
try:
logger.info("[replay_count_metrics] running task")

from posthog.client import sync_execute
from posthog.client import sync_execute

# ultimately I want to observe values by team id, but at the moment that would be lots of series, let's reduce the value first
query = """
select
--team_id,
count() as all_recordings,
countIf(snapshot_source == 'mobile') as mobile_recordings,
countIf(snapshot_source == 'web') as web_recordings,
countIf(snapshot_source =='web' and first_url is null) as invalid_web_recordings
from (
select any(team_id) as team_id, argMinMerge(first_url) as first_url, argMinMerge(snapshot_source) as snapshot_source
from session_replay_events
where min_first_timestamp >= now() - interval 65 minute
and min_first_timestamp <= now() - interval 5 minute
group by session_id
)
--group by team_id
"""
# ultimately I want to observe values by team id, but at the moment that would be lots of series, let's reduce the value first
query = """
select
--team_id,
count() as all_recordings,
countIf(snapshot_source == 'mobile') as mobile_recordings,
countIf(snapshot_source == 'web') as web_recordings,
countIf(snapshot_source =='web' and first_url is null) as invalid_web_recordings
from (
select any(team_id) as team_id, argMinMerge(first_url) as first_url, argMinMerge(snapshot_source) as snapshot_source
from session_replay_events
where min_first_timestamp >= now() - interval 65 minute
and min_first_timestamp <= now() - interval 5 minute
group by session_id
)
--group by team_id
"""

try:
results = sync_execute(
query,
)
Expand Down

0 comments on commit f079205

Please sign in to comment.