From 2a6e6f8acf2f5f6064000ae67ef9d225457a5ddf Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 09:59:49 +0100 Subject: [PATCH 01/17] Fixed gossip locally and added task timings --- bin/start-worker | 4 ++-- posthog/celery.py | 35 ++++++++++++++++++++++++++--------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/bin/start-worker b/bin/start-worker index 7fba86d7e4764..e4d42fe9ee2d6 100755 --- a/bin/start-worker +++ b/bin/start-worker @@ -7,10 +7,10 @@ trap 'kill $(jobs -p)' EXIT source ./bin/celery-queues.env # start celery worker with heartbeat (-B) -SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-gossip --without-mingle -Ofair -n node@%h & +SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-mingle -Ofair -n node@%h & if [[ "$PLUGIN_SERVER_IDLE" != "1" && "$PLUGIN_SERVER_IDLE" != "true" ]]; then ./bin/plugin-server fi -wait \ No newline at end of file +wait diff --git a/posthog/celery.py b/posthog/celery.py index 90b57e69c7d3a..0dd40cf532a43 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -22,7 +22,7 @@ from django.utils import timezone from django_structlog.celery import signals from django_structlog.celery.steps import DjangoStructLogInitStep -from prometheus_client import Counter, Gauge +from prometheus_client import Counter, Gauge, Histogram from posthog.cloud_utils import is_cloud from posthog.metrics import pushed_metrics_registry @@ -59,6 +59,14 @@ labelnames=["task_name"], ) + +CELERTY_TASK_DURATION_HISTOGRAM = Histogram( + "posthog_celery_task_duration_seconds", + "Time spent running a task", + labelnames=["task_name"], + buckets=(1, 5, 10, 30, 60, 120, 600, 1200, float("inf")), +) + # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys @@ -349,9 +357,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs): ) +task_timings = {} + + # Set up clickhouse query instrumentation @task_prerun.connect -def pre_run_signal_handler(task_id, task, **kwargs): +def prerun_signal_handler(task_id, task, **kwargs): from statshog.defaults.django import statsd from posthog.clickhouse.client.connection import ( @@ -364,9 +375,22 @@ def pre_run_signal_handler(task_id, task, **kwargs): tag_queries(kind="celery", id=task.name) set_default_clickhouse_workload_type(Workload.OFFLINE) + task_timings[task_id] = time.time() + CELERY_TASK_PRE_RUN_COUNTER.labels(task_name=task.name).inc() +@task_postrun.connect +def postrun_signal_handler(task_id, task, **kwargs): + from posthog.clickhouse.query_tagging import reset_query_tags + + if task_id in task_timings: + start_time = task_timings.pop(task_id, None) + CELERTY_TASK_DURATION_HISTOGRAM.labels(task_name=task.name).observe(time.time() - start_time) + + reset_query_tags() + + @task_success.connect def success_signal_handler(sender, **kwargs): CELERY_TASK_SUCCESS_COUNTER.labels(task_name=sender.name).inc() @@ -382,13 +406,6 @@ def retry_signal_handler(sender, **kwargs): CELERY_TASK_RETRY_COUNTER.labels(task_name=sender.name).inc() -@task_postrun.connect -def teardown_instrumentation(task_id, task, **kwargs): - from posthog.clickhouse.query_tagging import reset_query_tags - - reset_query_tags() - - @app.task(ignore_result=True) def delete_expired_exported_assets() -> None: from posthog.models import ExportedAsset From 009128835fdcae0c9b821787c6e72a7e534657c4 Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 10:50:42 +0100 Subject: [PATCH 02/17] Refactored tasks --- ee/api/test/test_organization.py | 2 +- ee/models/license.py | 2 +- ee/session_recordings/persistence_tasks.py | 6 +- ee/tasks/subscriptions/__init__.py | 8 +- posthog/api/prompt.py | 4 +- posthog/api/test/test_cohort.py | 12 +- posthog/apps.py | 3 +- posthog/caching/insight_cache.py | 5 +- posthog/celery.py | 977 +----------------- posthog/clickhouse/client/execute_async.py | 4 +- posthog/email.py | 4 +- posthog/models/insight_caching_state.py | 9 +- .../models/session_recording.py | 4 +- posthog/tasks/__init__.py | 2 + posthog/tasks/async_migrations.py | 6 +- posthog/tasks/email.py | 29 +- posthog/tasks/exporter.py | 4 +- posthog/tasks/scheduled.py | 293 ++++++ posthog/tasks/split_person.py | 5 +- posthog/tasks/tasks.py | 721 +++++++++++++ posthog/tasks/usage_report.py | 8 +- posthog/tasks/user_identify.py | 4 +- posthog/tasks/verify_persons_data_in_sync.py | 4 +- posthog/tasks/warehouse.py | 10 +- posthog/test/test_celery.py | 2 +- 25 files changed, 1094 insertions(+), 1034 deletions(-) create mode 100644 posthog/tasks/scheduled.py create mode 100644 posthog/tasks/tasks.py diff --git a/ee/api/test/test_organization.py b/ee/api/test/test_organization.py index 4d7cf41f21fab..296b35d626884 100644 --- a/ee/api/test/test_organization.py +++ b/ee/api/test/test_organization.py @@ -8,9 +8,9 @@ from ee.api.test.base import APILicensedTest from ee.models.license import License -from posthog.celery import sync_all_organization_available_features from posthog.models import Team, User from posthog.models.organization import Organization, OrganizationMembership +from posthog.tasks.tasks import sync_all_organization_available_features class TestOrganizationEnterpriseAPI(APILicensedTest): diff --git a/ee/models/license.py b/ee/models/license.py index 17547a867212a..29179d1d56cca 100644 --- a/ee/models/license.py +++ b/ee/models/license.py @@ -8,9 +8,9 @@ from django.utils import timezone from rest_framework import exceptions, status -from posthog.celery import sync_all_organization_available_features from posthog.constants import AvailableFeature from posthog.models.utils import sane_repr +from posthog.tasks.tasks import sync_all_organization_available_features class LicenseError(exceptions.APIException): diff --git a/ee/session_recordings/persistence_tasks.py b/ee/session_recordings/persistence_tasks.py index e409d9d318df9..376258ea0b38c 100644 --- a/ee/session_recordings/persistence_tasks.py +++ b/ee/session_recordings/persistence_tasks.py @@ -1,21 +1,21 @@ from datetime import timedelta import structlog +from celery import shared_task from django.utils import timezone from ee.session_recordings.session_recording_extensions import persist_recording -from posthog.celery import app from posthog.session_recordings.models.session_recording import SessionRecording logger = structlog.get_logger(__name__) -@app.task() +@shared_task(ignore_result=True) def persist_single_recording(id: str, team_id: int) -> None: persist_recording(id, team_id) -@app.task() +@shared_task(ignore_result=True) def persist_finished_recordings() -> None: one_day_old = timezone.now() - timedelta(hours=24) finished_recordings = SessionRecording.objects.filter(created_at__lte=one_day_old, object_storage_path=None) diff --git a/ee/tasks/subscriptions/__init__.py b/ee/tasks/subscriptions/__init__.py index 443e5eb369995..3dbe6e91b6bfe 100644 --- a/ee/tasks/subscriptions/__init__.py +++ b/ee/tasks/subscriptions/__init__.py @@ -2,6 +2,7 @@ from typing import Optional import structlog +from celery import shared_task from prometheus_client import Counter from sentry_sdk import capture_exception, capture_message @@ -9,7 +10,6 @@ from ee.tasks.subscriptions.slack_subscriptions import send_slack_subscription_report from ee.tasks.subscriptions.subscription_utils import generate_assets from posthog import settings -from posthog.celery import app from posthog.models.subscription import Subscription logger = structlog.get_logger(__name__) @@ -120,7 +120,7 @@ def _deliver_subscription_report( subscription.save() -@app.task() +@shared_task() def schedule_all_subscriptions() -> None: """ Schedule all past notifications (with a buffer) to be delivered @@ -148,12 +148,12 @@ def schedule_all_subscriptions() -> None: report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5 -@app.task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10) +@shared_task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10) def deliver_subscription_report(subscription_id: int) -> None: return _deliver_subscription_report(subscription_id) -@app.task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10) +@shared_task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10) def handle_subscription_value_change( subscription_id: int, previous_value: str, invite_message: Optional[str] = None ) -> None: diff --git a/posthog/api/prompt.py b/posthog/api/prompt.py index b50d56103276a..b4c6b36512ce7 100644 --- a/posthog/api/prompt.py +++ b/posthog/api/prompt.py @@ -1,6 +1,7 @@ import json from typing import Any, Dict, List +from celery import shared_task from dateutil import parser from django.db import IntegrityError from django.http import JsonResponse @@ -11,7 +12,6 @@ from posthog.api.routing import StructuredViewSetMixin from posthog.api.utils import get_token -from posthog.celery import app from posthog.exceptions import generate_exception_response from posthog.models.prompt import Prompt, PromptSequence, UserPromptState from posthog.models.user import User @@ -185,7 +185,7 @@ class Meta: ] -@app.task(ignore_result=True) +@shared_task() def trigger_prompt_for_user(email: str, sequence_id: int): try: sequence = PromptSequence.objects.get(pk=sequence_id) diff --git a/posthog/api/test/test_cohort.py b/posthog/api/test/test_cohort.py index 33128b639d03f..019ee47b9fe74 100644 --- a/posthog/api/test/test_cohort.py +++ b/posthog/api/test/test_cohort.py @@ -6,25 +6,25 @@ from django.core.files.uploadedfile import SimpleUploadedFile from django.test.client import Client from django.utils import timezone -from posthog.celery import clickhouse_clear_removed_data -from posthog.clickhouse.client.execute import sync_execute -from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType -from posthog.schema import PropertyOperator -from posthog.tasks.calculate_cohort import calculate_cohort_from_list from rest_framework import status from rest_framework.test import APIClient from posthog.api.test.test_exports import TestExportMixin +from posthog.clickhouse.client.execute import sync_execute from posthog.models import FeatureFlag, Person +from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType from posthog.models.cohort import Cohort from posthog.models.team.team import Team +from posthog.schema import PropertyOperator +from posthog.tasks.calculate_cohort import calculate_cohort_from_list +from posthog.tasks.tasks import clickhouse_clear_removed_data from posthog.test.base import ( APIBaseTest, ClickhouseTestMixin, + QueryMatchingTest, _create_event, _create_person, flush_persons_and_events, - QueryMatchingTest, snapshot_clickhouse_queries, ) diff --git a/posthog/apps.py b/posthog/apps.py index 3e6b2aaf76fee..25c1a5215839f 100644 --- a/posthog/apps.py +++ b/posthog/apps.py @@ -7,6 +7,7 @@ from posthoganalytics.client import Client from posthog.settings import SELF_CAPTURE, SKIP_ASYNC_MIGRATIONS_SETUP +from posthog.tasks.tasks import sync_all_organization_available_features from posthog.utils import ( get_git_branch, get_git_commit, @@ -35,8 +36,6 @@ def ready(self): # log development server launch to posthog if os.getenv("RUN_MAIN") == "true": # Sync all organization.available_features once on launch, in case plans changed - from posthog.celery import sync_all_organization_available_features - sync_all_organization_available_features() # NOTE: This has to be created as a separate client so that the "capture" call doesn't lock in the properties diff --git a/posthog/caching/insight_cache.py b/posthog/caching/insight_cache.py index d1214c3a67a98..b2f14eab178d4 100644 --- a/posthog/caching/insight_cache.py +++ b/posthog/caching/insight_cache.py @@ -15,6 +15,7 @@ from posthog.caching.calculate_results import calculate_result_by_insight from posthog.models import Dashboard, Insight, InsightCachingState, Team from posthog.models.instance_setting import get_instance_setting +from posthog.tasks.tasks import update_cache_task logger = structlog.get_logger(__name__) @@ -25,8 +26,6 @@ def schedule_cache_updates(): - from posthog.celery import update_cache_task - # :TODO: Separate celery queue for updates rather than limiting via this method PARALLEL_INSIGHT_CACHE = get_instance_setting("PARALLEL_DASHBOARD_ITEM_CACHE") @@ -139,8 +138,6 @@ def update_cache(caching_state_id: UUID): statsd.incr("caching_state_update_errors") if caching_state.refresh_attempt < MAX_ATTEMPTS: - from posthog.celery import update_cache_task - update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds()) InsightCachingState.objects.filter(pk=caching_state.pk).update( diff --git a/posthog/celery.py b/posthog/celery.py index 0dd40cf532a43..d1ead060f617a 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -1,12 +1,7 @@ import os import time -from random import randrange -from typing import Optional -from uuid import UUID from celery import Celery -from celery.canvas import Signature -from celery.schedules import crontab from celery.signals import ( setup_logging, task_failure, @@ -16,19 +11,10 @@ task_success, worker_process_init, ) -from django.conf import settings -from django.db import connection from django.dispatch import receiver -from django.utils import timezone from django_structlog.celery import signals from django_structlog.celery.steps import DjangoStructLogInitStep -from prometheus_client import Counter, Gauge, Histogram - -from posthog.cloud_utils import is_cloud -from posthog.metrics import pushed_metrics_registry -from posthog.ph_client import get_ph_client -from posthog.redis import get_client -from posthog.utils import get_crontab +from prometheus_client import Counter, Histogram # set the default Django settings module for the 'celery' program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "posthog.settings") @@ -67,6 +53,7 @@ buckets=(1, 5, 10, 30, 60, 120, 600, 1200, float("inf")), ) + # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys @@ -82,6 +69,8 @@ app.steps["worker"].add(DjangoStructLogInitStep) +task_timings = {} + @setup_logging.connect def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs) -> None: @@ -112,254 +101,6 @@ def on_worker_start(**kwargs) -> None: start_http_server(int(os.getenv("CELERY_METRICS_PORT", "8001"))) -def add_periodic_task_with_expiry( - sender: Celery, - schedule_seconds: int, - task_signature: Signature, - name: str | None = None, -): - """ - If the workers get delayed in processing tasks, then tasks that fire every X seconds get queued multiple times - And so, are processed multiple times. But they often only need to be processed once. - This schedules them with an expiry so that they aren't processed multiple times. - The expiry is larger than the schedule so that if the worker is only slightly delayed, it still gets processed. - """ - sender.add_periodic_task( - schedule_seconds, - task_signature, - name=name, - # we don't want to run multiple of these if the workers build up a backlog - expires=schedule_seconds * 1.5, - ) - - -@app.on_after_configure.connect -def setup_periodic_tasks(sender: Celery, **kwargs): - # Monitoring tasks - add_periodic_task_with_expiry( - sender, - 60, - monitoring_check_clickhouse_schema_drift.s(), - "check clickhouse schema drift", - ) - - if not settings.DEBUG: - add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth.s(), "10 sec queue probe") - - # Heartbeat every 10sec to make sure the worker is alive - add_periodic_task_with_expiry(sender, 10, redis_heartbeat.s(), "10 sec heartbeat") - - # Update events table partitions twice a week - sender.add_periodic_task( - crontab(day_of_week="mon,fri", hour="0", minute="0"), - update_event_partitions.s(), # check twice a week - ) - - # Send all instance usage to the Billing service - # Sends later on Sunday due to clickhouse things that happen on Sunday at ~00:00 UTC - sender.add_periodic_task( - crontab(hour="2", minute="15", day_of_week="mon"), - send_org_usage_reports.s(), - name="send instance usage report", - ) - sender.add_periodic_task( - crontab(hour="0", minute="15", day_of_week="tue,wed,thu,fri,sat,sun"), - send_org_usage_reports.s(), - name="send instance usage report", - ) - - # Update local usage info for rate limiting purposes - offset by 30 minutes to not clash with the above - sender.add_periodic_task( - crontab(hour="*", minute="30"), - update_quota_limiting.s(), - name="update quota limiting", - ) - - # PostHog Cloud cron jobs - # NOTE: We can't use is_cloud here as some Django elements aren't loaded yet. We check in the task execution instead - # Verify that persons data is in sync every day at 4 AM UTC - sender.add_periodic_task(crontab(hour="4", minute="0"), verify_persons_data_in_sync.s()) - - # Every 30 minutes, send decide request counts to the main posthog instance - sender.add_periodic_task( - crontab(minute="*/30"), - calculate_decide_usage.s(), - name="calculate decide usage", - ) - - # Reset master project data every Monday at Thursday at 5 AM UTC. Mon and Thu because doing this every day - # would be too hard on ClickHouse, and those days ensure most users will have data at most 3 days old. - sender.add_periodic_task(crontab(day_of_week="mon,thu", hour="5", minute="0"), demo_reset_master_team.s()) - - sender.add_periodic_task(crontab(day_of_week="fri", hour="0", minute="0"), clean_stale_partials.s()) - - # Sync all Organization.available_features every hour, only for billing v1 orgs - sender.add_periodic_task(crontab(minute="30", hour="*"), sync_all_organization_available_features.s()) - - sync_insight_cache_states_schedule = get_crontab(settings.SYNC_INSIGHT_CACHE_STATES_SCHEDULE) - if sync_insight_cache_states_schedule: - sender.add_periodic_task( - sync_insight_cache_states_schedule, - sync_insight_cache_states_task.s(), - name="sync insight cache states", - ) - - add_periodic_task_with_expiry( - sender, - settings.UPDATE_CACHED_DASHBOARD_ITEMS_INTERVAL_SECONDS, - schedule_cache_updates_task.s(), - "check dashboard items", - ) - - sender.add_periodic_task(crontab(minute="*/15"), check_async_migration_health.s()) - - if settings.INGESTION_LAG_METRIC_TEAM_IDS: - sender.add_periodic_task(60, ingestion_lag.s(), name="ingestion lag") - - add_periodic_task_with_expiry( - sender, - 120, - clickhouse_lag.s(), - name="clickhouse table lag", - ) - - add_periodic_task_with_expiry( - sender, - 120, - clickhouse_row_count.s(), - name="clickhouse events table row count", - ) - add_periodic_task_with_expiry( - sender, - 120, - clickhouse_part_count.s(), - name="clickhouse table parts count", - ) - add_periodic_task_with_expiry( - sender, - 120, - clickhouse_mutation_count.s(), - name="clickhouse table mutations count", - ) - add_periodic_task_with_expiry( - sender, - 120, - clickhouse_errors_count.s(), - name="clickhouse instance errors count", - ) - - add_periodic_task_with_expiry( - sender, - 120, - pg_row_count.s(), - name="PG tables row counts", - ) - add_periodic_task_with_expiry( - sender, - 120, - pg_table_cache_hit_rate.s(), - name="PG table cache hit rate", - ) - sender.add_periodic_task( - crontab(minute="0", hour="*"), - pg_plugin_server_query_timing.s(), - name="PG plugin server query timing", - ) - add_periodic_task_with_expiry( - sender, - 60, - graphile_worker_queue_size.s(), - name="Graphile Worker queue size", - ) - - add_periodic_task_with_expiry( - sender, - 120, - calculate_cohort.s(), - name="recalculate cohorts", - ) - - add_periodic_task_with_expiry( - sender, - 120, - process_scheduled_changes.s(), - name="process scheduled changes", - ) - - if clear_clickhouse_crontab := get_crontab(settings.CLEAR_CLICKHOUSE_REMOVED_DATA_SCHEDULE_CRON): - sender.add_periodic_task( - clear_clickhouse_crontab, - clickhouse_clear_removed_data.s(), - name="clickhouse clear removed data", - ) - - if clear_clickhouse_deleted_person_crontab := get_crontab(settings.CLEAR_CLICKHOUSE_DELETED_PERSON_SCHEDULE_CRON): - sender.add_periodic_task( - clear_clickhouse_deleted_person_crontab, - clear_clickhouse_deleted_person.s(), - name="clickhouse clear deleted person data", - ) - - if settings.EE_AVAILABLE: - sender.add_periodic_task( - crontab(hour="0", minute=str(randrange(0, 40))), - clickhouse_send_license_usage.s(), - ) # every day at a random minute past midnight. Randomize to avoid overloading license.posthog.com - sender.add_periodic_task( - crontab(hour="4", minute=str(randrange(0, 40))), - clickhouse_send_license_usage.s(), - ) # again a few hours later just to make sure - - materialize_columns_crontab = get_crontab(settings.MATERIALIZE_COLUMNS_SCHEDULE_CRON) - - if materialize_columns_crontab: - sender.add_periodic_task( - materialize_columns_crontab, - clickhouse_materialize_columns.s(), - name="clickhouse materialize columns", - ) - - sender.add_periodic_task( - crontab(hour="*/4", minute="0"), - clickhouse_mark_all_materialized.s(), - name="clickhouse mark all columns as materialized", - ) - - sender.add_periodic_task(crontab(hour="*", minute="55"), schedule_all_subscriptions.s()) - sender.add_periodic_task( - crontab(hour="2", minute=str(randrange(0, 40))), - ee_persist_finished_recordings.s(), - ) - - sender.add_periodic_task( - crontab(minute="0", hour="*"), - check_flags_to_rollback.s(), - name="check feature flags that should be rolled back", - ) - - sender.add_periodic_task( - crontab(minute="10", hour="*/12"), - find_flags_with_enriched_analytics.s(), - name="find feature flags with enriched analytics", - ) - - sender.add_periodic_task( - # once a day a random minute after midnight - crontab(hour="0", minute=str(randrange(0, 40))), - delete_expired_exported_assets.s(), - name="delete expired exported assets", - ) - - sender.add_periodic_task( - crontab(minute="*/20"), - check_data_import_row_limits.s(), - name="check external data rows synced", - ) - - -task_timings = {} - - # Set up clickhouse query instrumentation @task_prerun.connect def prerun_signal_handler(task_id, task, **kwargs): @@ -388,6 +129,8 @@ def postrun_signal_handler(task_id, task, **kwargs): start_time = task_timings.pop(task_id, None) CELERTY_TASK_DURATION_HISTOGRAM.labels(task_name=task.name).observe(time.time() - start_time) + print("Celery task {} took {:.2f} seconds".format(task.name, time.time() - start_time)) + reset_query_tags() @@ -406,708 +149,8 @@ def retry_signal_handler(sender, **kwargs): CELERY_TASK_RETRY_COUNTER.labels(task_name=sender.name).inc() -@app.task(ignore_result=True) -def delete_expired_exported_assets() -> None: - from posthog.models import ExportedAsset - - ExportedAsset.delete_expired_assets() - - -@app.task(ignore_result=True) -def redis_heartbeat(): - get_client().set("POSTHOG_HEARTBEAT", int(time.time())) - - -@app.task(ignore_result=True, queue="analytics_queries") -def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False): - """ - Kick off query - Once complete save results to redis - """ - from posthog.client import execute_process_query - - execute_process_query( - team_id=team_id, - query_id=query_id, - query_json=query_json, - limit_context=limit_context, - refresh_requested=refresh_requested, - ) - - -@app.task(ignore_result=True) -def pg_table_cache_hit_rate(): - from statshog.defaults.django import statsd - - with connection.cursor() as cursor: - try: - cursor.execute( - """ - SELECT - relname as table_name, - sum(heap_blks_hit) / nullif(sum(heap_blks_hit) + sum(heap_blks_read),0) * 100 AS ratio - FROM pg_statio_user_tables - GROUP BY relname - ORDER BY ratio ASC - """ - ) - tables = cursor.fetchall() - with pushed_metrics_registry("celery_pg_table_cache_hit_rate") as registry: - hit_rate_gauge = Gauge( - "posthog_celery_pg_table_cache_hit_rate", - "Postgres query cache hit rate per table.", - labelnames=["table_name"], - registry=registry, - ) - for row in tables: - hit_rate_gauge.labels(table_name=row[0]).set(float(row[1])) - statsd.gauge("pg_table_cache_hit_rate", float(row[1]), tags={"table": row[0]}) - except: - # if this doesn't work keep going - pass - - -@app.task(ignore_result=True) -def pg_plugin_server_query_timing(): - from statshog.defaults.django import statsd - - with connection.cursor() as cursor: - try: - cursor.execute( - """ - SELECT - substring(query from 'plugin-server:(\\w+)') AS query_type, - total_time as total_time, - (total_time / calls) as avg_time, - min_time, - max_time, - stddev_time, - calls, - rows as rows_read_or_affected - FROM pg_stat_statements - WHERE query LIKE '%%plugin-server%%' - ORDER BY total_time DESC - LIMIT 50 - """ - ) - - for row in cursor.fetchall(): - row_dictionary = {column.name: value for column, value in zip(cursor.description, row)} - - for key, value in row_dictionary.items(): - if key == "query_type": - continue - statsd.gauge( - f"pg_plugin_server_query_{key}", - value, - tags={"query_type": row_dictionary["query_type"]}, - ) - except: - # if this doesn't work keep going - pass - - -POSTGRES_TABLES = ["posthog_personoverride", "posthog_personoverridemapping"] - - -@app.task(ignore_result=True) -def pg_row_count(): - with pushed_metrics_registry("celery_pg_row_count") as registry: - row_count_gauge = Gauge( - "posthog_celery_pg_table_row_count", - "Number of rows per Postgres table.", - labelnames=["table_name"], - registry=registry, - ) - with connection.cursor() as cursor: - for table in POSTGRES_TABLES: - QUERY = "SELECT count(*) FROM {table};" - query = QUERY.format(table=table) - - try: - cursor.execute(query) - row = cursor.fetchone() - row_count_gauge.labels(table_name=table).set(row[0]) - except: - pass - - -CLICKHOUSE_TABLES = [ - "sharded_events", - "person", - "person_distinct_id2", - "sharded_session_replay_events", - "log_entries", -] -if not is_cloud(): - CLICKHOUSE_TABLES.append("session_recording_events") - - -@app.task(ignore_result=True) -def clickhouse_lag(): - from statshog.defaults.django import statsd - - from posthog.client import sync_execute - - with pushed_metrics_registry("celery_clickhouse_lag") as registry: - lag_gauge = Gauge( - "posthog_celery_clickhouse_lag_seconds", - "Age of the latest ingested record per ClickHouse table.", - labelnames=["table_name"], - registry=registry, - ) - for table in CLICKHOUSE_TABLES: - try: - QUERY = """SELECT max(_timestamp) observed_ts, now() now_ts, now() - max(_timestamp) as lag - FROM {table}""" - query = QUERY.format(table=table) - lag = sync_execute(query)[0][2] - statsd.gauge( - "posthog_celery_clickhouse__table_lag_seconds", - lag, - tags={"table": table}, - ) - lag_gauge.labels(table_name=table).set(lag) - except: - pass - - -HEARTBEAT_EVENT_TO_INGESTION_LAG_METRIC = { - "heartbeat": "ingestion", - "heartbeat_buffer": "ingestion_buffer", - "heartbeat_api": "ingestion_api", -} - - -@app.task(ignore_result=True) -def ingestion_lag(): - from statshog.defaults.django import statsd - - from posthog.client import sync_execute - - # Requires https://github.com/PostHog/posthog-heartbeat-plugin to be enabled on team 2 - # Note that it runs every minute, and we compare it with now(), so there's up to 60s delay - query = """ - SELECT event, date_diff('second', max(timestamp), now()) - FROM events - WHERE team_id IN %(team_ids)s - AND event IN %(events)s - AND timestamp > yesterday() AND timestamp < now() + toIntervalMinute(3) - GROUP BY event - """ - - try: - results = sync_execute( - query, - { - "team_ids": settings.INGESTION_LAG_METRIC_TEAM_IDS, - "events": list(HEARTBEAT_EVENT_TO_INGESTION_LAG_METRIC.keys()), - }, - ) - with pushed_metrics_registry("celery_ingestion_lag") as registry: - lag_gauge = Gauge( - "posthog_celery_observed_ingestion_lag_seconds", - "End-to-end ingestion lag observed through several scenarios. Can be overestimated by up to 60 seconds.", - labelnames=["scenario"], - registry=registry, - ) - for event, lag in results: - metric = HEARTBEAT_EVENT_TO_INGESTION_LAG_METRIC[event] - statsd.gauge(f"posthog_celery_{metric}_lag_seconds_rough_minute_precision", lag) - lag_gauge.labels(scenario=metric).set(lag) - except: - pass - - -KNOWN_CELERY_TASK_IDENTIFIERS = { - "pluginJob", - "runEveryHour", - "runEveryMinute", - "runEveryDay", -} - - -@app.task(ignore_result=True) -def graphile_worker_queue_size(): - from django.db import connections - from statshog.defaults.django import statsd - - connection = connections["graphile"] if "graphile" in connections else connections["default"] - with connection.cursor() as cursor: - cursor.execute( - """ - SELECT count(*) - FROM graphile_worker.jobs - WHERE (jobs.locked_at is null or jobs.locked_at < (now() - INTERVAL '4 hours')) - AND run_at <= now() - AND attempts < max_attempts - """ - ) - - queue_size = cursor.fetchone()[0] - statsd.gauge("graphile_worker_queue_size", queue_size) - - # Track the number of jobs that will still be run at least once or are currently running based on job type (i.e. task_identifier) - # Completed jobs are deleted and "permanently failed" jobs have attempts == max_attempts - # Jobs not yet eligible for execution are filtered out with run_at <= now() - cursor.execute( - """ - SELECT task_identifier, count(*) as c, EXTRACT(EPOCH FROM MIN(run_at)) as oldest FROM graphile_worker.jobs - WHERE attempts < max_attempts - AND run_at <= now() - GROUP BY task_identifier - """ - ) - - seen_task_identifier = set() - with pushed_metrics_registry("celery_graphile_worker_queue_size") as registry: - processing_lag_gauge = Gauge( - "posthog_celery_graphile_lag_seconds", - "Oldest scheduled run on pending Graphile jobs per task identifier, zero if queue empty.", - labelnames=["task_identifier"], - registry=registry, - ) - waiting_jobs_gauge = Gauge( - "posthog_celery_graphile_waiting_jobs", - "Number of Graphile jobs in the queue, per task identifier.", - labelnames=["task_identifier"], - registry=registry, - ) - for task_identifier, count, oldest in cursor.fetchall(): - seen_task_identifier.add(task_identifier) - waiting_jobs_gauge.labels(task_identifier=task_identifier).set(count) - processing_lag_gauge.labels(task_identifier=task_identifier).set(time.time() - float(oldest)) - statsd.gauge( - "graphile_waiting_jobs", - count, - tags={"task_identifier": task_identifier}, - ) - - # The query will not return rows for empty queues, creating missing points. - # Let's emit updates for known queues even if they are empty. - for task_identifier in KNOWN_CELERY_TASK_IDENTIFIERS - seen_task_identifier: - waiting_jobs_gauge.labels(task_identifier=task_identifier).set(0) - processing_lag_gauge.labels(task_identifier=task_identifier).set(0) - - -@app.task(ignore_result=True) -def clickhouse_row_count(): - from statshog.defaults.django import statsd - - from posthog.client import sync_execute - - with pushed_metrics_registry("celery_clickhouse_row_count") as registry: - row_count_gauge = Gauge( - "posthog_celery_clickhouse_table_row_count", - "Number of rows per ClickHouse table.", - labelnames=["table_name"], - registry=registry, - ) - for table in CLICKHOUSE_TABLES: - try: - QUERY = """SELECT sum(rows) rows from system.parts - WHERE table = '{table}' and active;""" - query = QUERY.format(table=table) - rows = sync_execute(query)[0][0] - row_count_gauge.labels(table_name=table).set(rows) - statsd.gauge( - f"posthog_celery_clickhouse_table_row_count", - rows, - tags={"table": table}, - ) - except: - pass - - -@app.task(ignore_result=True) -def clickhouse_errors_count(): - """ - This task is used to track the recency of errors in ClickHouse. - We can use this to alert on errors that are consistently being generated recently - 999 - KEEPER_EXCEPTION - 225 - NO_ZOOKEEPER - 242 - TABLE_IS_READ_ONLY - """ - from posthog.client import sync_execute - - QUERY = """ - select - getMacro('replica') replica, - getMacro('shard') shard, - name, - value as errors, - dateDiff('minute', last_error_time, now()) minutes_ago - from clusterAllReplicas('posthog', system, errors) - where code in (999, 225, 242) - order by minutes_ago - """ - rows = sync_execute(QUERY) - with pushed_metrics_registry("celery_clickhouse_errors") as registry: - errors_gauge = Gauge( - "posthog_celery_clickhouse_errors", - "Age of the latest error per ClickHouse errors table.", - registry=registry, - labelnames=["replica", "shard", "name"], - ) - if isinstance(rows, list): - for replica, shard, name, _, minutes_ago in rows: - errors_gauge.labels(replica=replica, shard=shard, name=name).set(minutes_ago) - - -@app.task(ignore_result=True) -def clickhouse_part_count(): - from statshog.defaults.django import statsd - - from posthog.client import sync_execute - - QUERY = """ - SELECT table, count(1) freq - FROM system.parts - WHERE active - GROUP BY table - ORDER BY freq DESC; - """ - rows = sync_execute(QUERY) - - with pushed_metrics_registry("celery_clickhouse_part_count") as registry: - parts_count_gauge = Gauge( - "posthog_celery_clickhouse_table_parts_count", - "Number of parts per ClickHouse table.", - labelnames=["table"], - registry=registry, - ) - for table, parts in rows: - parts_count_gauge.labels(table=table).set(parts) - statsd.gauge( - f"posthog_celery_clickhouse_table_parts_count", - parts, - tags={"table": table}, - ) - - -@app.task(ignore_result=True) -def clickhouse_mutation_count(): - from statshog.defaults.django import statsd - - from posthog.client import sync_execute - - QUERY = """ - SELECT - table, - count(1) AS freq - FROM system.mutations - WHERE is_done = 0 - GROUP BY table - ORDER BY freq DESC - """ - rows = sync_execute(QUERY) - - with pushed_metrics_registry("celery_clickhouse_mutation_count") as registry: - mutations_count_gauge = Gauge( - "posthog_celery_clickhouse_table_mutations_count", - "Number of mutations per ClickHouse table.", - labelnames=["table"], - registry=registry, - ) - for table, muts in rows: - mutations_count_gauge.labels(table=table).set(muts) - statsd.gauge( - f"posthog_celery_clickhouse_table_mutations_count", - muts, - tags={"table": table}, - ) - - -@app.task(ignore_result=True) -def clickhouse_clear_removed_data(): - from posthog.models.async_deletion.delete_cohorts import AsyncCohortDeletion - from posthog.models.async_deletion.delete_events import AsyncEventDeletion - - runner = AsyncEventDeletion() - runner.mark_deletions_done() - runner.run() - - cohort_runner = AsyncCohortDeletion() - cohort_runner.mark_deletions_done() - cohort_runner.run() - - -@app.task(ignore_result=True) -def clear_clickhouse_deleted_person(): - from posthog.models.async_deletion.delete_person import remove_deleted_person_data - - remove_deleted_person_data() - - -@app.task(ignore_result=True, queue="email") -def redis_celery_queue_depth(): - try: - with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: - celery_task_queue_depth_gauge = Gauge( - "posthog_celery_queue_depth", - "We use this to monitor the depth of the celery queue.", - registry=registry, - ) - - llen = get_client().llen("celery") - celery_task_queue_depth_gauge.set(llen) - except: - # if we can't generate the metric don't complain about it. - return - - -@app.task(ignore_result=True) -def update_event_partitions(): - with connection.cursor() as cursor: - cursor.execute( - "DO $$ BEGIN IF (SELECT exists(select * from pg_proc where proname = 'update_partitions')) THEN PERFORM update_partitions(); END IF; END $$" - ) - - -@app.task(ignore_result=True) -def clean_stale_partials(): - """Clean stale (meaning older than 7 days) partial social auth sessions.""" - from social_django.models import Partial - - Partial.objects.filter(timestamp__lt=timezone.now() - timezone.timedelta(7)).delete() - - -@app.task(ignore_result=True) -def monitoring_check_clickhouse_schema_drift(): - from posthog.tasks.check_clickhouse_schema_drift import ( - check_clickhouse_schema_drift, - ) - - check_clickhouse_schema_drift() - - -@app.task(ignore_result=True) -def calculate_cohort(): - from posthog.tasks.calculate_cohort import calculate_cohorts - - calculate_cohorts() - - -@app.task(ignore_result=True) -def process_scheduled_changes(): - from posthog.tasks.process_scheduled_changes import process_scheduled_changes - - process_scheduled_changes() - - -@app.task(ignore_result=True) -def sync_insight_cache_states_task(): - from posthog.caching.insight_caching_state import sync_insight_cache_states - - sync_insight_cache_states() - - -@app.task(ignore_result=True) -def schedule_cache_updates_task(): - from posthog.caching.insight_cache import schedule_cache_updates - - schedule_cache_updates() - - -@app.task(ignore_result=True) -def update_cache_task(caching_state_id: UUID): - from posthog.caching.insight_cache import update_cache - - update_cache(caching_state_id) - - -@app.task(ignore_result=True) -def sync_insight_caching_state( - team_id: int, - insight_id: Optional[int] = None, - dashboard_tile_id: Optional[int] = None, -): - from posthog.caching.insight_caching_state import sync_insight_caching_state - - sync_insight_caching_state(team_id, insight_id, dashboard_tile_id) - - -@app.task(ignore_result=True, bind=True) -def debug_task(self): - print(f"Request: {self.request!r}") - - -@app.task(ignore_result=True) -def calculate_decide_usage() -> None: - from django.db.models import Q - - from posthog.models import Team - from posthog.models.feature_flag.flag_analytics import capture_team_decide_usage - - ph_client = get_ph_client() - - for team in Team.objects.select_related("organization").exclude( - Q(organization__for_internal_metrics=True) | Q(is_demo=True) - ): - capture_team_decide_usage(ph_client, team.id, team.uuid) - - ph_client.shutdown() - - -@app.task(ignore_result=True) -def find_flags_with_enriched_analytics(): - from datetime import datetime, timedelta - - from posthog.models.feature_flag.flag_analytics import ( - find_flags_with_enriched_analytics, - ) - - end = datetime.now() - begin = end - timedelta(hours=12) - - find_flags_with_enriched_analytics(begin, end) - - -@app.task(ignore_result=True) -def demo_reset_master_team(): - from posthog.tasks.demo_reset_master_team import demo_reset_master_team - - if is_cloud() or settings.DEMO: - demo_reset_master_team() - - -@app.task(ignore_result=True) -def sync_all_organization_available_features(): - from posthog.tasks.sync_all_organization_available_features import ( - sync_all_organization_available_features, - ) - - sync_all_organization_available_features() - - -@app.task(ignore_result=False, track_started=True, max_retries=0) -def check_async_migration_health(): - from posthog.tasks.async_migrations import check_async_migration_health - - check_async_migration_health() - - -@app.task(ignore_result=True) -def verify_persons_data_in_sync(): - from posthog.tasks.verify_persons_data_in_sync import ( - verify_persons_data_in_sync as verify, - ) - - if not is_cloud(): - return - - verify() - - -def recompute_materialized_columns_enabled() -> bool: - from posthog.models.instance_setting import get_instance_setting - - if get_instance_setting("MATERIALIZED_COLUMNS_ENABLED") and get_instance_setting( - "COMPUTE_MATERIALIZED_COLUMNS_ENABLED" - ): - return True - return False - - -@app.task(ignore_result=True) -def clickhouse_materialize_columns(): - if recompute_materialized_columns_enabled(): - try: - from ee.clickhouse.materialized_columns.analyze import ( - materialize_properties_task, - ) - except ImportError: - pass - else: - materialize_properties_task() - - -@app.task(ignore_result=True) -def clickhouse_mark_all_materialized(): - if recompute_materialized_columns_enabled(): - try: - from ee.tasks.materialized_columns import mark_all_materialized - except ImportError: - pass - else: - mark_all_materialized() - - -@app.task(ignore_result=True) -def send_org_usage_reports(): - from posthog.tasks.usage_report import send_all_org_usage_reports - - send_all_org_usage_reports.delay() - - -@app.task(ignore_result=True) -def update_quota_limiting(): - try: - from ee.billing.quota_limiting import update_all_org_billing_quotas - - update_all_org_billing_quotas() - except ImportError: - pass - - -@app.task(ignore_result=True) -def schedule_all_subscriptions(): - try: - from ee.tasks.subscriptions import ( - schedule_all_subscriptions as _schedule_all_subscriptions, - ) - except ImportError: - pass - else: - _schedule_all_subscriptions() - - -@app.task(ignore_result=True, retries=3) -def clickhouse_send_license_usage(): - try: - if not is_cloud(): - from ee.tasks.send_license_usage import send_license_usage - - send_license_usage() - except ImportError: - pass - - -@app.task(ignore_result=True) -def check_flags_to_rollback(): - try: - from ee.tasks.auto_rollback_feature_flag import check_flags_to_rollback - - check_flags_to_rollback() - except ImportError: - pass - - -@app.task(ignore_result=True) -def ee_persist_single_recording(id: str, team_id: int): - try: - from ee.session_recordings.persistence_tasks import persist_single_recording - - persist_single_recording(id, team_id) - except ImportError: - pass - - -@app.task(ignore_result=True) -def ee_persist_finished_recordings(): - try: - from ee.session_recordings.persistence_tasks import persist_finished_recordings - except ImportError: - pass - else: - persist_finished_recordings() - +@app.on_after_finalize.connect +def setup_periodic_tasks(sender: Celery, **kwargs): + from posthog.tasks.scheduled import setup_periodic_tasks -@app.task(ignore_result=True) -def check_data_import_row_limits(): - try: - from posthog.tasks.warehouse import check_synced_row_limits - except ImportError: - pass - else: - check_synced_row_limits() + setup_periodic_tasks(sender) diff --git a/posthog/clickhouse/client/execute_async.py b/posthog/clickhouse/client/execute_async.py index 065ac9640ecb4..4373ef1748806 100644 --- a/posthog/clickhouse/client/execute_async.py +++ b/posthog/clickhouse/client/execute_async.py @@ -7,10 +7,10 @@ from rest_framework.exceptions import NotFound from posthog import celery, redis -from posthog.celery import process_query_task from posthog.clickhouse.query_tagging import tag_queries from posthog.hogql.constants import LimitContext from posthog.schema import QueryStatus +from posthog.tasks.tasks import process_query_task logger = structlog.get_logger(__name__) @@ -75,8 +75,8 @@ def execute_process_query( ): manager = QueryStatusManager(query_id, team_id) - from posthog.models import Team from posthog.api.services.query import process_query + from posthog.models import Team team = Team.objects.get(pk=team_id) diff --git a/posthog/email.py b/posthog/email.py index 93968b6c07844..2bc9be683b490 100644 --- a/posthog/email.py +++ b/posthog/email.py @@ -3,6 +3,7 @@ import lxml import toronado +from celery import shared_task from django.conf import settings from django.core import exceptions, mail from django.core.mail.backends.smtp import EmailBackend @@ -12,7 +13,6 @@ from django.utils.module_loading import import_string from sentry_sdk import capture_exception -from posthog.celery import app from posthog.models.instance_setting import get_instance_setting from posthog.models.messaging import MessagingRecord @@ -41,7 +41,7 @@ def is_email_available(with_absolute_urls: bool = False) -> bool: ) -@app.task(ignore_result=True, max_retries=3) +@shared_task(max_retries=3) def _send_email( campaign_key: str, to: List[Dict[str, str]], diff --git a/posthog/models/insight_caching_state.py b/posthog/models/insight_caching_state.py index 9e6abc0b7b5a8..6ee767bc520e2 100644 --- a/posthog/models/insight_caching_state.py +++ b/posthog/models/insight_caching_state.py @@ -8,6 +8,7 @@ from posthog.models.signals import mutable_receiver from posthog.models.team import Team from posthog.models.utils import UniqueConstraintByExpression, UUIDModel +from posthog.tasks.tasks import sync_insight_caching_state class InsightCachingState(UUIDModel): @@ -48,8 +49,6 @@ class Meta: @mutable_receiver(post_save, sender=SharingConfiguration) def sync_sharing_configuration(sender, instance: SharingConfiguration, **kwargs): - from posthog.celery import sync_insight_caching_state - if instance.insight is not None and not instance.insight.deleted: sync_insight_caching_state.delay(instance.team_id, insight_id=instance.insight_id) elif instance.dashboard is not None and not instance.dashboard.deleted: @@ -59,22 +58,16 @@ def sync_sharing_configuration(sender, instance: SharingConfiguration, **kwargs) @mutable_receiver(post_save, sender=Insight) def sync_insight(sender, instance: Insight, **kwargs): - from posthog.celery import sync_insight_caching_state - sync_insight_caching_state.delay(instance.team_id, insight_id=instance.pk) @mutable_receiver(post_save, sender=DashboardTile) def sync_dashboard_tile(sender, instance: DashboardTile, **kwargs): - from posthog.celery import sync_insight_caching_state - sync_insight_caching_state.delay(instance.dashboard.team_id, dashboard_tile_id=instance.pk) @mutable_receiver(post_save, sender=Dashboard) def sync_dashboard_updated(sender, instance: Dashboard, **kwargs): - from posthog.celery import sync_insight_caching_state - update_fields = kwargs.get("update_fields") if update_fields in [ frozenset({"filters_hash"}), diff --git a/posthog/session_recordings/models/session_recording.py b/posthog/session_recordings/models/session_recording.py index ca6bbc7b54d6e..5e057b7e72150 100644 --- a/posthog/session_recordings/models/session_recording.py +++ b/posthog/session_recordings/models/session_recording.py @@ -1,9 +1,8 @@ -from typing import Any, List, Optional, Literal +from typing import Any, List, Literal, Optional from django.conf import settings from django.db import models -from posthog.celery import ee_persist_single_recording from posthog.models.person.person import Person from posthog.models.signals import mutable_receiver from posthog.models.team.team import Team @@ -16,6 +15,7 @@ SessionRecordingViewed, ) from posthog.session_recordings.queries.session_replay_events import SessionReplayEvents +from posthog.tasks.tasks import ee_persist_single_recording class SessionRecording(UUIDModel): diff --git a/posthog/tasks/__init__.py b/posthog/tasks/__init__.py index f593c4525e493..d8b1ed9f03181 100644 --- a/posthog/tasks/__init__.py +++ b/posthog/tasks/__init__.py @@ -10,6 +10,7 @@ process_scheduled_changes, split_person, sync_all_organization_available_features, + tasks, usage_report, user_identify, warehouse, @@ -25,6 +26,7 @@ "process_scheduled_changes", "split_person", "sync_all_organization_available_features", + "tasks", "user_identify", "usage_report", "warehouse", diff --git a/posthog/tasks/async_migrations.py b/posthog/tasks/async_migrations.py index ae505b44131e5..f0e3d4aaf5384 100644 --- a/posthog/tasks/async_migrations.py +++ b/posthog/tasks/async_migrations.py @@ -1,4 +1,4 @@ -from celery import states +from celery import shared_task, states from celery.result import AsyncResult from posthog.async_migrations.runner import ( @@ -12,7 +12,6 @@ process_error, trigger_migration, ) -from posthog.celery import app from posthog.models.instance_setting import get_instance_setting @@ -20,7 +19,7 @@ # 1. spawning a thread within the worker # 2. suggesting users scale celery when running async migrations # 3. ... -@app.task(track_started=True, ignore_result=False, max_retries=0) +@shared_task(track_started=True, ignore_result=False, max_retries=0) def run_async_migration(migration_name: str, fresh_start: bool = True) -> None: if fresh_start: start_async_migration(migration_name) @@ -35,6 +34,7 @@ def run_async_migration(migration_name: str, fresh_start: bool = True) -> None: # 2. Does a periodic healthcheck to make sure it's safe to continue running the migration # 3. Updates migration progress def check_async_migration_health() -> None: + from posthog.celery import app from posthog.models.async_migration import AsyncMigration, MigrationStatus try: diff --git a/posthog/tasks/email.py b/posthog/tasks/email.py index 4b07bdc63271d..fba8cbf0a8ea2 100644 --- a/posthog/tasks/email.py +++ b/posthog/tasks/email.py @@ -4,10 +4,10 @@ import posthoganalytics import structlog +from celery import shared_task from django.conf import settings from django.utils import timezone -from posthog.celery import app from posthog.cloud_utils import is_cloud from posthog.email import EmailMessage, is_email_available from posthog.models import ( @@ -31,7 +31,8 @@ def send_message_to_all_staff_users(message: EmailMessage) -> None: message.send() -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -55,7 +56,8 @@ def send_invite(invite_id: str) -> None: message.send() -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -78,7 +80,8 @@ def send_member_join(invitee_uuid: str, organization_id: str) -> None: message.send() -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -101,7 +104,8 @@ def send_password_reset(user_id: int, token: str) -> None: message.send() -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -127,7 +131,8 @@ def send_email_verification(user_id: int, token: str) -> None: ) -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -177,7 +182,8 @@ def send_fatal_plugin_error( message.send(send_async=False) -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -193,7 +199,8 @@ def send_canary_email(user_email: str) -> None: message.send() -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -225,7 +232,8 @@ def send_email_change_emails(now_iso: str, user_name: str, old_address: str, new message_new_address.send(send_async=False) -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, @@ -243,7 +251,8 @@ def send_async_migration_complete_email(migration_key: str, time: str) -> None: send_message_to_all_staff_users(message) -@app.task( +@shared_task( + ignore_result=True, autoretry_for=(Exception,), max_retries=3, retry_backoff=True, diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index ffec5d5b1142f..20124d9607edc 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -1,9 +1,9 @@ from typing import Optional +from celery import shared_task from prometheus_client import Counter, Histogram from posthog import settings -from posthog.celery import app from posthog.models import ExportedAsset EXPORT_QUEUED_COUNTER = Counter( @@ -35,7 +35,7 @@ # export_asset is used in chords/groups and so must not ignore its results -@app.task( +@shared_task( autoretry_for=(Exception,), max_retries=5, retry_backoff=True, diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py new file mode 100644 index 0000000000000..e843074e5246b --- /dev/null +++ b/posthog/tasks/scheduled.py @@ -0,0 +1,293 @@ +from random import randrange + +from celery import Celery +from celery.canvas import Signature +from celery.schedules import crontab +from django.conf import settings + +from posthog.celery import app +from posthog.tasks.tasks import ( + calculate_cohort, + calculate_decide_usage, + check_async_migration_health, + check_data_import_row_limits, + check_flags_to_rollback, + clean_stale_partials, + clear_clickhouse_deleted_person, + clickhouse_clear_removed_data, + clickhouse_errors_count, + clickhouse_lag, + clickhouse_mark_all_materialized, + clickhouse_materialize_columns, + clickhouse_mutation_count, + clickhouse_part_count, + clickhouse_row_count, + clickhouse_send_license_usage, + delete_expired_exported_assets, + demo_reset_master_team, + ee_persist_finished_recordings, + find_flags_with_enriched_analytics, + graphile_worker_queue_size, + ingestion_lag, + monitoring_check_clickhouse_schema_drift, + pg_plugin_server_query_timing, + pg_row_count, + pg_table_cache_hit_rate, + process_scheduled_changes, + redis_celery_queue_depth, + redis_heartbeat, + schedule_all_subscriptions, + schedule_cache_updates_task, + send_org_usage_reports, + sync_all_organization_available_features, + sync_insight_cache_states_task, + update_event_partitions, + update_quota_limiting, + verify_persons_data_in_sync, +) +from posthog.utils import get_crontab + + +def add_periodic_task_with_expiry( + sender: Celery, + schedule_seconds: int, + task_signature: Signature, + name: str | None = None, +): + """ + If the workers get delayed in processing tasks, then tasks that fire every X seconds get queued multiple times + And so, are processed multiple times. But they often only need to be processed once. + This schedules them with an expiry so that they aren't processed multiple times. + The expiry is larger than the schedule so that if the worker is only slightly delayed, it still gets processed. + """ + sender.add_periodic_task( + schedule_seconds, + task_signature, + name=name, + # we don't want to run multiple of these if the workers build up a backlog + expires=schedule_seconds * 1.5, + ) + + +@app.on_after_configure.connect +def setup_periodic_tasks(sender: Celery, **kwargs): + # Monitoring tasks + add_periodic_task_with_expiry( + sender, + 60, + monitoring_check_clickhouse_schema_drift.s(), + "check clickhouse schema drift", + ) + + if not settings.DEBUG: + add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth.s(), "10 sec queue probe") + + # Heartbeat every 10sec to make sure the worker is alive + add_periodic_task_with_expiry(sender, 10, redis_heartbeat.s(), "10 sec heartbeat") + + # Update events table partitions twice a week + sender.add_periodic_task( + crontab(day_of_week="mon,fri", hour="0", minute="0"), + update_event_partitions.s(), # check twice a week + ) + + # Send all instance usage to the Billing service + # Sends later on Sunday due to clickhouse things that happen on Sunday at ~00:00 UTC + sender.add_periodic_task( + crontab(hour="2", minute="15", day_of_week="mon"), + send_org_usage_reports.s(), + name="send instance usage report", + ) + sender.add_periodic_task( + crontab(hour="0", minute="15", day_of_week="tue,wed,thu,fri,sat,sun"), + send_org_usage_reports.s(), + name="send instance usage report", + ) + + # Update local usage info for rate limiting purposes - offset by 30 minutes to not clash with the above + sender.add_periodic_task( + crontab(hour="*", minute="30"), + update_quota_limiting.s(), + name="update quota limiting", + ) + + # PostHog Cloud cron jobs + # NOTE: We can't use is_cloud here as some Django elements aren't loaded yet. We check in the task execution instead + # Verify that persons data is in sync every day at 4 AM UTC + sender.add_periodic_task(crontab(hour="4", minute="0"), verify_persons_data_in_sync.s()) + + # Every 30 minutes, send decide request counts to the main posthog instance + sender.add_periodic_task( + crontab(minute="*/30"), + calculate_decide_usage.s(), + name="calculate decide usage", + ) + + # Reset master project data every Monday at Thursday at 5 AM UTC. Mon and Thu because doing this every day + # would be too hard on ClickHouse, and those days ensure most users will have data at most 3 days old. + sender.add_periodic_task(crontab(day_of_week="mon,thu", hour="5", minute="0"), demo_reset_master_team.s()) + + sender.add_periodic_task(crontab(day_of_week="fri", hour="0", minute="0"), clean_stale_partials.s()) + + # Sync all Organization.available_features every hour, only for billing v1 orgs + sender.add_periodic_task(crontab(minute="30", hour="*"), sync_all_organization_available_features.s()) + + sync_insight_cache_states_schedule = get_crontab(settings.SYNC_INSIGHT_CACHE_STATES_SCHEDULE) + if sync_insight_cache_states_schedule: + sender.add_periodic_task( + sync_insight_cache_states_schedule, + sync_insight_cache_states_task.s(), + name="sync insight cache states", + ) + + add_periodic_task_with_expiry( + sender, + settings.UPDATE_CACHED_DASHBOARD_ITEMS_INTERVAL_SECONDS, + schedule_cache_updates_task.s(), + "check dashboard items", + ) + + sender.add_periodic_task(crontab(minute="*/15"), check_async_migration_health.s()) + + if settings.INGESTION_LAG_METRIC_TEAM_IDS: + sender.add_periodic_task(60, ingestion_lag.s(), name="ingestion lag") + + add_periodic_task_with_expiry( + sender, + 120, + clickhouse_lag.s(), + name="clickhouse table lag", + ) + + add_periodic_task_with_expiry( + sender, + 120, + clickhouse_row_count.s(), + name="clickhouse events table row count", + ) + add_periodic_task_with_expiry( + sender, + 120, + clickhouse_part_count.s(), + name="clickhouse table parts count", + ) + add_periodic_task_with_expiry( + sender, + 120, + clickhouse_mutation_count.s(), + name="clickhouse table mutations count", + ) + add_periodic_task_with_expiry( + sender, + 120, + clickhouse_errors_count.s(), + name="clickhouse instance errors count", + ) + + add_periodic_task_with_expiry( + sender, + 120, + pg_row_count.s(), + name="PG tables row counts", + ) + add_periodic_task_with_expiry( + sender, + 120, + pg_table_cache_hit_rate.s(), + name="PG table cache hit rate", + ) + sender.add_periodic_task( + crontab(minute="0", hour="*"), + pg_plugin_server_query_timing.s(), + name="PG plugin server query timing", + ) + add_periodic_task_with_expiry( + sender, + 60, + graphile_worker_queue_size.s(), + name="Graphile Worker queue size", + ) + + add_periodic_task_with_expiry( + sender, + 120, + calculate_cohort.s(), + name="recalculate cohorts", + ) + + add_periodic_task_with_expiry( + sender, + 120, + process_scheduled_changes.s(), + name="process scheduled changes", + ) + + if clear_clickhouse_crontab := get_crontab(settings.CLEAR_CLICKHOUSE_REMOVED_DATA_SCHEDULE_CRON): + sender.add_periodic_task( + clear_clickhouse_crontab, + clickhouse_clear_removed_data.s(), + name="clickhouse clear removed data", + ) + + if clear_clickhouse_deleted_person_crontab := get_crontab(settings.CLEAR_CLICKHOUSE_DELETED_PERSON_SCHEDULE_CRON): + sender.add_periodic_task( + clear_clickhouse_deleted_person_crontab, + clear_clickhouse_deleted_person.s(), + name="clickhouse clear deleted person data", + ) + + if settings.EE_AVAILABLE: + sender.add_periodic_task( + crontab(hour="0", minute=str(randrange(0, 40))), + clickhouse_send_license_usage.s(), + ) # every day at a random minute past midnight. Randomize to avoid overloading license.posthog.com + sender.add_periodic_task( + crontab(hour="4", minute=str(randrange(0, 40))), + clickhouse_send_license_usage.s(), + ) # again a few hours later just to make sure + + materialize_columns_crontab = get_crontab(settings.MATERIALIZE_COLUMNS_SCHEDULE_CRON) + + if materialize_columns_crontab: + sender.add_periodic_task( + materialize_columns_crontab, + clickhouse_materialize_columns.s(), + name="clickhouse materialize columns", + ) + + sender.add_periodic_task( + crontab(hour="*/4", minute="0"), + clickhouse_mark_all_materialized.s(), + name="clickhouse mark all columns as materialized", + ) + + sender.add_periodic_task(crontab(hour="*", minute="55"), schedule_all_subscriptions.s()) + sender.add_periodic_task( + crontab(hour="2", minute=str(randrange(0, 40))), + ee_persist_finished_recordings.s(), + ) + + sender.add_periodic_task( + crontab(minute="0", hour="*"), + check_flags_to_rollback.s(), + name="check feature flags that should be rolled back", + ) + + sender.add_periodic_task( + crontab(minute="10", hour="*/12"), + find_flags_with_enriched_analytics.s(), + name="find feature flags with enriched analytics", + ) + + sender.add_periodic_task( + # once a day a random minute after midnight + crontab(hour="0", minute=str(randrange(0, 40))), + delete_expired_exported_assets.s(), + name="delete expired exported assets", + ) + + sender.add_periodic_task( + crontab(minute="*/20"), + check_data_import_row_limits.s(), + name="check external data rows synced", + ) diff --git a/posthog/tasks/split_person.py b/posthog/tasks/split_person.py index a0efed4fdf94a..569186c800ee2 100644 --- a/posthog/tasks/split_person.py +++ b/posthog/tasks/split_person.py @@ -1,10 +1,11 @@ from typing import Optional -from posthog.celery import app +from celery import shared_task + from posthog.models import Person -@app.task(ignore_result=True, max_retries=1) +@shared_task(ignore_result=True, max_retries=1) def split_person(person_id: int, main_distinct_id: Optional[str], max_splits: Optional[int]) -> None: """ Split all distinct ids into separate persons diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py new file mode 100644 index 0000000000000..5c4da24cfdb2e --- /dev/null +++ b/posthog/tasks/tasks.py @@ -0,0 +1,721 @@ +import time +from typing import Optional +from uuid import UUID + +from celery import shared_task +from django.conf import settings +from django.db import connection +from django.utils import timezone +from prometheus_client import Gauge + +from posthog.cloud_utils import is_cloud +from posthog.metrics import pushed_metrics_registry +from posthog.ph_client import get_ph_client +from posthog.redis import get_client + + +@shared_task(ignore_result=True) +def delete_expired_exported_assets() -> None: + from posthog.models import ExportedAsset + + ExportedAsset.delete_expired_assets() + + +@shared_task(ignore_result=True) +def redis_heartbeat(): + get_client().set("POSTHOG_HEARTBEAT", int(time.time())) + + +@shared_task(ignore_result=True, queue="analytics_queries") +def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False): + """ + Kick off query + Once complete save results to redis + """ + from posthog.client import execute_process_query + + execute_process_query( + team_id=team_id, + query_id=query_id, + query_json=query_json, + limit_context=limit_context, + refresh_requested=refresh_requested, + ) + + +@shared_task(ignore_result=True) +def pg_table_cache_hit_rate(): + from statshog.defaults.django import statsd + + with connection.cursor() as cursor: + try: + cursor.execute( + """ + SELECT + relname as table_name, + sum(heap_blks_hit) / nullif(sum(heap_blks_hit) + sum(heap_blks_read),0) * 100 AS ratio + FROM pg_statio_user_tables + GROUP BY relname + ORDER BY ratio ASC + """ + ) + tables = cursor.fetchall() + with pushed_metrics_registry("celery_pg_table_cache_hit_rate") as registry: + hit_rate_gauge = Gauge( + "posthog_celery_pg_table_cache_hit_rate", + "Postgres query cache hit rate per table.", + labelnames=["table_name"], + registry=registry, + ) + for row in tables: + hit_rate_gauge.labels(table_name=row[0]).set(float(row[1])) + statsd.gauge("pg_table_cache_hit_rate", float(row[1]), tags={"table": row[0]}) + except: + # if this doesn't work keep going + pass + + +@shared_task(ignore_result=True) +def pg_plugin_server_query_timing(): + from statshog.defaults.django import statsd + + with connection.cursor() as cursor: + try: + cursor.execute( + """ + SELECT + substring(query from 'plugin-server:(\\w+)') AS query_type, + total_time as total_time, + (total_time / calls) as avg_time, + min_time, + max_time, + stddev_time, + calls, + rows as rows_read_or_affected + FROM pg_stat_statements + WHERE query LIKE '%%plugin-server%%' + ORDER BY total_time DESC + LIMIT 50 + """ + ) + + for row in cursor.fetchall(): + row_dictionary = {column.name: value for column, value in zip(cursor.description, row)} + + for key, value in row_dictionary.items(): + if key == "query_type": + continue + statsd.gauge( + f"pg_plugin_server_query_{key}", + value, + tags={"query_type": row_dictionary["query_type"]}, + ) + except: + # if this doesn't work keep going + pass + + +POSTGRES_TABLES = ["posthog_personoverride", "posthog_personoverridemapping"] + + +@shared_task(ignore_result=True) +def pg_row_count(): + with pushed_metrics_registry("celery_pg_row_count") as registry: + row_count_gauge = Gauge( + "posthog_celery_pg_table_row_count", + "Number of rows per Postgres table.", + labelnames=["table_name"], + registry=registry, + ) + with connection.cursor() as cursor: + for table in POSTGRES_TABLES: + QUERY = "SELECT count(*) FROM {table};" + query = QUERY.format(table=table) + + try: + cursor.execute(query) + row = cursor.fetchone() + row_count_gauge.labels(table_name=table).set(row[0]) + except: + pass + + +CLICKHOUSE_TABLES = [ + "sharded_events", + "person", + "person_distinct_id2", + "sharded_session_replay_events", + "log_entries", +] +if not is_cloud(): + CLICKHOUSE_TABLES.append("session_recording_events") + + +@shared_task(ignore_result=True) +def clickhouse_lag(): + from statshog.defaults.django import statsd + + from posthog.client import sync_execute + + with pushed_metrics_registry("celery_clickhouse_lag") as registry: + lag_gauge = Gauge( + "posthog_celery_clickhouse_lag_seconds", + "Age of the latest ingested record per ClickHouse table.", + labelnames=["table_name"], + registry=registry, + ) + for table in CLICKHOUSE_TABLES: + try: + QUERY = """SELECT max(_timestamp) observed_ts, now() now_ts, now() - max(_timestamp) as lag + FROM {table}""" + query = QUERY.format(table=table) + lag = sync_execute(query)[0][2] + statsd.gauge( + "posthog_celery_clickhouse__table_lag_seconds", + lag, + tags={"table": table}, + ) + lag_gauge.labels(table_name=table).set(lag) + except: + pass + + +HEARTBEAT_EVENT_TO_INGESTION_LAG_METRIC = { + "heartbeat": "ingestion", + "heartbeat_buffer": "ingestion_buffer", + "heartbeat_api": "ingestion_api", +} + + +@shared_task(ignore_result=True) +def ingestion_lag(): + from statshog.defaults.django import statsd + + from posthog.client import sync_execute + + # Requires https://github.com/PostHog/posthog-heartbeat-plugin to be enabled on team 2 + # Note that it runs every minute, and we compare it with now(), so there's up to 60s delay + query = """ + SELECT event, date_diff('second', max(timestamp), now()) + FROM events + WHERE team_id IN %(team_ids)s + AND event IN %(events)s + AND timestamp > yesterday() AND timestamp < now() + toIntervalMinute(3) + GROUP BY event + """ + + try: + results = sync_execute( + query, + { + "team_ids": settings.INGESTION_LAG_METRIC_TEAM_IDS, + "events": list(HEARTBEAT_EVENT_TO_INGESTION_LAG_METRIC.keys()), + }, + ) + with pushed_metrics_registry("celery_ingestion_lag") as registry: + lag_gauge = Gauge( + "posthog_celery_observed_ingestion_lag_seconds", + "End-to-end ingestion lag observed through several scenarios. Can be overestimated by up to 60 seconds.", + labelnames=["scenario"], + registry=registry, + ) + for event, lag in results: + metric = HEARTBEAT_EVENT_TO_INGESTION_LAG_METRIC[event] + statsd.gauge(f"posthog_celery_{metric}_lag_seconds_rough_minute_precision", lag) + lag_gauge.labels(scenario=metric).set(lag) + except: + pass + + +KNOWN_CELERY_TASK_IDENTIFIERS = { + "pluginJob", + "runEveryHour", + "runEveryMinute", + "runEveryDay", +} + + +@shared_task(ignore_result=True) +def graphile_worker_queue_size(): + from django.db import connections + from statshog.defaults.django import statsd + + connection = connections["graphile"] if "graphile" in connections else connections["default"] + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT count(*) + FROM graphile_worker.jobs + WHERE (jobs.locked_at is null or jobs.locked_at < (now() - INTERVAL '4 hours')) + AND run_at <= now() + AND attempts < max_attempts + """ + ) + + queue_size = cursor.fetchone()[0] + statsd.gauge("graphile_worker_queue_size", queue_size) + + # Track the number of jobs that will still be run at least once or are currently running based on job type (i.e. task_identifier) + # Completed jobs are deleted and "permanently failed" jobs have attempts == max_attempts + # Jobs not yet eligible for execution are filtered out with run_at <= now() + cursor.execute( + """ + SELECT task_identifier, count(*) as c, EXTRACT(EPOCH FROM MIN(run_at)) as oldest FROM graphile_worker.jobs + WHERE attempts < max_attempts + AND run_at <= now() + GROUP BY task_identifier + """ + ) + + seen_task_identifier = set() + with pushed_metrics_registry("celery_graphile_worker_queue_size") as registry: + processing_lag_gauge = Gauge( + "posthog_celery_graphile_lag_seconds", + "Oldest scheduled run on pending Graphile jobs per task identifier, zero if queue empty.", + labelnames=["task_identifier"], + registry=registry, + ) + waiting_jobs_gauge = Gauge( + "posthog_celery_graphile_waiting_jobs", + "Number of Graphile jobs in the queue, per task identifier.", + labelnames=["task_identifier"], + registry=registry, + ) + for task_identifier, count, oldest in cursor.fetchall(): + seen_task_identifier.add(task_identifier) + waiting_jobs_gauge.labels(task_identifier=task_identifier).set(count) + processing_lag_gauge.labels(task_identifier=task_identifier).set(time.time() - float(oldest)) + statsd.gauge( + "graphile_waiting_jobs", + count, + tags={"task_identifier": task_identifier}, + ) + + # The query will not return rows for empty queues, creating missing points. + # Let's emit updates for known queues even if they are empty. + for task_identifier in KNOWN_CELERY_TASK_IDENTIFIERS - seen_task_identifier: + waiting_jobs_gauge.labels(task_identifier=task_identifier).set(0) + processing_lag_gauge.labels(task_identifier=task_identifier).set(0) + + +@shared_task(ignore_result=True) +def clickhouse_row_count(): + from statshog.defaults.django import statsd + + from posthog.client import sync_execute + + with pushed_metrics_registry("celery_clickhouse_row_count") as registry: + row_count_gauge = Gauge( + "posthog_celery_clickhouse_table_row_count", + "Number of rows per ClickHouse table.", + labelnames=["table_name"], + registry=registry, + ) + for table in CLICKHOUSE_TABLES: + try: + QUERY = """SELECT sum(rows) rows from system.parts + WHERE table = '{table}' and active;""" + query = QUERY.format(table=table) + rows = sync_execute(query)[0][0] + row_count_gauge.labels(table_name=table).set(rows) + statsd.gauge( + f"posthog_celery_clickhouse_table_row_count", + rows, + tags={"table": table}, + ) + except: + pass + + +@shared_task(ignore_result=True) +def clickhouse_errors_count(): + """ + This task is used to track the recency of errors in ClickHouse. + We can use this to alert on errors that are consistently being generated recently + 999 - KEEPER_EXCEPTION + 225 - NO_ZOOKEEPER + 242 - TABLE_IS_READ_ONLY + """ + from posthog.client import sync_execute + + QUERY = """ + select + getMacro('replica') replica, + getMacro('shard') shard, + name, + value as errors, + dateDiff('minute', last_error_time, now()) minutes_ago + from clusterAllReplicas('posthog', system, errors) + where code in (999, 225, 242) + order by minutes_ago + """ + rows = sync_execute(QUERY) + with pushed_metrics_registry("celery_clickhouse_errors") as registry: + errors_gauge = Gauge( + "posthog_celery_clickhouse_errors", + "Age of the latest error per ClickHouse errors table.", + registry=registry, + labelnames=["replica", "shard", "name"], + ) + if isinstance(rows, list): + for replica, shard, name, _, minutes_ago in rows: + errors_gauge.labels(replica=replica, shard=shard, name=name).set(minutes_ago) + + +@shared_task(ignore_result=True) +def clickhouse_part_count(): + from statshog.defaults.django import statsd + + from posthog.client import sync_execute + + QUERY = """ + SELECT table, count(1) freq + FROM system.parts + WHERE active + GROUP BY table + ORDER BY freq DESC; + """ + rows = sync_execute(QUERY) + + with pushed_metrics_registry("celery_clickhouse_part_count") as registry: + parts_count_gauge = Gauge( + "posthog_celery_clickhouse_table_parts_count", + "Number of parts per ClickHouse table.", + labelnames=["table"], + registry=registry, + ) + for table, parts in rows: + parts_count_gauge.labels(table=table).set(parts) + statsd.gauge( + f"posthog_celery_clickhouse_table_parts_count", + parts, + tags={"table": table}, + ) + + +@shared_task(ignore_result=True) +def clickhouse_mutation_count(): + from statshog.defaults.django import statsd + + from posthog.client import sync_execute + + QUERY = """ + SELECT + table, + count(1) AS freq + FROM system.mutations + WHERE is_done = 0 + GROUP BY table + ORDER BY freq DESC + """ + rows = sync_execute(QUERY) + + with pushed_metrics_registry("celery_clickhouse_mutation_count") as registry: + mutations_count_gauge = Gauge( + "posthog_celery_clickhouse_table_mutations_count", + "Number of mutations per ClickHouse table.", + labelnames=["table"], + registry=registry, + ) + for table, muts in rows: + mutations_count_gauge.labels(table=table).set(muts) + statsd.gauge( + f"posthog_celery_clickhouse_table_mutations_count", + muts, + tags={"table": table}, + ) + + +@shared_task(ignore_result=True) +def clickhouse_clear_removed_data(): + from posthog.models.async_deletion.delete_cohorts import AsyncCohortDeletion + from posthog.models.async_deletion.delete_events import AsyncEventDeletion + + runner = AsyncEventDeletion() + runner.mark_deletions_done() + runner.run() + + cohort_runner = AsyncCohortDeletion() + cohort_runner.mark_deletions_done() + cohort_runner.run() + + +@shared_task(ignore_result=True) +def clear_clickhouse_deleted_person(): + from posthog.models.async_deletion.delete_person import remove_deleted_person_data + + remove_deleted_person_data() + + +@shared_task(ignore_result=True, queue="email") +def redis_celery_queue_depth(): + try: + with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: + celery_task_queue_depth_gauge = Gauge( + "posthog_celery_queue_depth", + "We use this to monitor the depth of the celery queue.", + registry=registry, + ) + + llen = get_client().llen("celery") + celery_task_queue_depth_gauge.set(llen) + except: + # if we can't generate the metric don't complain about it. + return + + +@shared_task(ignore_result=True) +def update_event_partitions(): + with connection.cursor() as cursor: + cursor.execute( + "DO $$ BEGIN IF (SELECT exists(select * from pg_proc where proname = 'update_partitions')) THEN PERFORM update_partitions(); END IF; END $$" + ) + + +@shared_task(ignore_result=True) +def clean_stale_partials(): + """Clean stale (meaning older than 7 days) partial social auth sessions.""" + from social_django.models import Partial + + Partial.objects.filter(timestamp__lt=timezone.now() - timezone.timedelta(7)).delete() + + +@shared_task(ignore_result=True) +def monitoring_check_clickhouse_schema_drift(): + from posthog.tasks.check_clickhouse_schema_drift import ( + check_clickhouse_schema_drift, + ) + + check_clickhouse_schema_drift() + + +@shared_task(ignore_result=True) +def calculate_cohort(): + from posthog.tasks.calculate_cohort import calculate_cohorts + + calculate_cohorts() + + +@shared_task(ignore_result=True) +def process_scheduled_changes(): + from posthog.tasks.process_scheduled_changes import process_scheduled_changes + + process_scheduled_changes() + + +@shared_task(ignore_result=True) +def sync_insight_cache_states_task(): + from posthog.caching.insight_caching_state import sync_insight_cache_states + + sync_insight_cache_states() + + +@shared_task(ignore_result=True) +def schedule_cache_updates_task(): + from posthog.caching.insight_cache import schedule_cache_updates + + schedule_cache_updates() + + +@shared_task(ignore_result=True) +def update_cache_task(caching_state_id: UUID): + from posthog.caching.insight_cache import update_cache + + update_cache(caching_state_id) + + +@shared_task(ignore_result=True) +def sync_insight_caching_state( + team_id: int, + insight_id: Optional[int] = None, + dashboard_tile_id: Optional[int] = None, +): + from posthog.caching.insight_caching_state import sync_insight_caching_state + + sync_insight_caching_state(team_id, insight_id, dashboard_tile_id) + + +@shared_task(ignore_result=True, bind=True) +def debug_task(self): + print(f"Request: {self.request!r}") + + +@shared_task(ignore_result=True) +def calculate_decide_usage() -> None: + from django.db.models import Q + + from posthog.models import Team + from posthog.models.feature_flag.flag_analytics import capture_team_decide_usage + + ph_client = get_ph_client() + + for team in Team.objects.select_related("organization").exclude( + Q(organization__for_internal_metrics=True) | Q(is_demo=True) + ): + capture_team_decide_usage(ph_client, team.id, team.uuid) + + ph_client.shutdown() + + +@shared_task(ignore_result=True) +def find_flags_with_enriched_analytics(): + from datetime import datetime, timedelta + + from posthog.models.feature_flag.flag_analytics import ( + find_flags_with_enriched_analytics, + ) + + end = datetime.now() + begin = end - timedelta(hours=12) + + find_flags_with_enriched_analytics(begin, end) + + +@shared_task(ignore_result=True) +def demo_reset_master_team(): + from posthog.tasks.demo_reset_master_team import demo_reset_master_team + + if is_cloud() or settings.DEMO: + demo_reset_master_team() + + +@shared_task(ignore_result=True) +def sync_all_organization_available_features(): + from posthog.tasks.sync_all_organization_available_features import ( + sync_all_organization_available_features, + ) + + sync_all_organization_available_features() + + +@shared_task(ignore_result=False, track_started=True, max_retries=0) +def check_async_migration_health(): + from posthog.tasks.async_migrations import check_async_migration_health + + check_async_migration_health() + + +@shared_task(ignore_result=True) +def verify_persons_data_in_sync(): + from posthog.tasks.verify_persons_data_in_sync import ( + verify_persons_data_in_sync as verify, + ) + + if not is_cloud(): + return + + verify() + + +def recompute_materialized_columns_enabled() -> bool: + from posthog.models.instance_setting import get_instance_setting + + if get_instance_setting("MATERIALIZED_COLUMNS_ENABLED") and get_instance_setting( + "COMPUTE_MATERIALIZED_COLUMNS_ENABLED" + ): + return True + return False + + +@shared_task(ignore_result=True) +def clickhouse_materialize_columns(): + if recompute_materialized_columns_enabled(): + try: + from ee.clickhouse.materialized_columns.analyze import ( + materialize_properties_task, + ) + except ImportError: + pass + else: + materialize_properties_task() + + +@shared_task(ignore_result=True) +def clickhouse_mark_all_materialized(): + if recompute_materialized_columns_enabled(): + try: + from ee.tasks.materialized_columns import mark_all_materialized + except ImportError: + pass + else: + mark_all_materialized() + + +@shared_task(ignore_result=True) +def send_org_usage_reports(): + from posthog.tasks.usage_report import send_all_org_usage_reports + + send_all_org_usage_reports.delay() + + +@shared_task(ignore_result=True) +def update_quota_limiting(): + try: + from ee.billing.quota_limiting import update_all_org_billing_quotas + + update_all_org_billing_quotas() + except ImportError: + pass + + +@shared_task(ignore_result=True) +def schedule_all_subscriptions(): + try: + from ee.tasks.subscriptions import ( + schedule_all_subscriptions as _schedule_all_subscriptions, + ) + except ImportError: + pass + else: + _schedule_all_subscriptions() + + +@shared_task(ignore_result=True, retries=3) +def clickhouse_send_license_usage(): + try: + if not is_cloud(): + from ee.tasks.send_license_usage import send_license_usage + + send_license_usage() + except ImportError: + pass + + +@shared_task(ignore_result=True) +def check_flags_to_rollback(): + try: + from ee.tasks.auto_rollback_feature_flag import check_flags_to_rollback + + check_flags_to_rollback() + except ImportError: + pass + + +@shared_task(ignore_result=True) +def ee_persist_single_recording(id: str, team_id: int): + try: + from ee.session_recordings.persistence_tasks import persist_single_recording + + persist_single_recording(id, team_id) + except ImportError: + pass + + +@shared_task(ignore_result=True) +def ee_persist_finished_recordings(): + try: + from ee.session_recordings.persistence_tasks import persist_finished_recordings + except ImportError: + pass + else: + persist_finished_recordings() + + +@shared_task(ignore_result=True) +def check_data_import_row_limits(): + try: + from posthog.tasks.warehouse import check_synced_row_limits + except ImportError: + pass + else: + check_synced_row_limits() diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index d4b09c46ee9f5..056586243f4b4 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -17,6 +17,7 @@ import requests import structlog +from celery import shared_task from dateutil import parser from django.conf import settings from django.db import connection @@ -27,7 +28,6 @@ from sentry_sdk import capture_exception from posthog import version_requirement -from posthog.celery import app from posthog.clickhouse.client.connection import Workload from posthog.client import sync_execute from posthog.cloud_utils import get_cached_instance_license, is_cloud @@ -274,7 +274,7 @@ def get_org_owner_or_first_user(organization_id: str) -> Optional[User]: return user -@app.task(ignore_result=True, autoretry_for=(Exception,), max_retries=3) +@shared_task(ignore_result=True, autoretry_for=(Exception,), max_retries=3) def send_report_to_billing_service(org_id: str, report: Dict[str, Any]) -> None: if not settings.EE_AVAILABLE: return @@ -621,7 +621,7 @@ def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> List return results -@app.task(ignore_result=True, max_retries=0) +@shared_task(ignore_result=True, max_retries=0) def capture_report( capture_event_name: str, org_id: str, @@ -952,7 +952,7 @@ def _get_full_org_usage_report_as_dict(full_report: FullUsageReport) -> Dict[str return dataclasses.asdict(full_report) -@app.task(ignore_result=True, max_retries=3, autoretry_for=(Exception,)) +@shared_task(ignore_result=True, max_retries=3, autoretry_for=(Exception,)) def send_all_org_usage_reports( dry_run: bool = False, at: Optional[str] = None, diff --git a/posthog/tasks/user_identify.py b/posthog/tasks/user_identify.py index 93dd0c851dbe8..1c5026fc40474 100644 --- a/posthog/tasks/user_identify.py +++ b/posthog/tasks/user_identify.py @@ -1,10 +1,10 @@ import posthoganalytics +from celery import shared_task -from posthog.celery import app from posthog.models import User -@app.task(ignore_result=True) +@shared_task(ignore_result=True) def identify_task(user_id: int) -> None: user = User.objects.get(id=user_id) posthoganalytics.capture( diff --git a/posthog/tasks/verify_persons_data_in_sync.py b/posthog/tasks/verify_persons_data_in_sync.py index 8aea487d96279..406708beb6f5a 100644 --- a/posthog/tasks/verify_persons_data_in_sync.py +++ b/posthog/tasks/verify_persons_data_in_sync.py @@ -4,10 +4,10 @@ from typing import Any, Dict, List import structlog +from celery import shared_task from django.db.models.query import Prefetch from django.utils.timezone import now -from posthog.celery import app from posthog.client import sync_execute from posthog.models.person import Person @@ -41,7 +41,7 @@ """ -@app.task(max_retries=1, ignore_result=True) +@shared_task(max_retries=1, ignore_result=True) def verify_persons_data_in_sync( period_start: timedelta = PERIOD_START, period_end: timedelta = PERIOD_END, diff --git a/posthog/tasks/warehouse.py b/posthog/tasks/warehouse.py index bab541d47bcab..10a56ed424ad4 100644 --- a/posthog/tasks/warehouse.py +++ b/posthog/tasks/warehouse.py @@ -1,12 +1,14 @@ import datetime + +import structlog +from celery import shared_task + from posthog.warehouse.data_load.service import ( cancel_external_data_workflow, pause_external_data_schedule, unpause_external_data_schedule, ) -from posthog.warehouse.models import ExternalDataSource, ExternalDataJob -from posthog.celery import app -import structlog +from posthog.warehouse.models import ExternalDataJob, ExternalDataSource logger = structlog.get_logger(__name__) @@ -19,7 +21,7 @@ def check_synced_row_limits() -> None: check_synced_row_limits_of_team.delay(team_id) -@app.task(ignore_result=True) +@shared_task(ignore_result=True) def check_synced_row_limits_of_team(team_id: int) -> None: logger.info("Checking synced row limits of team", team_id=team_id) diff --git a/posthog/test/test_celery.py b/posthog/test/test_celery.py index 19b0a303fd57b..ee61fded07a67 100644 --- a/posthog/test/test_celery.py +++ b/posthog/test/test_celery.py @@ -1,7 +1,7 @@ from unittest import TestCase from unittest.mock import patch -from posthog.celery import clickhouse_errors_count +from posthog.tasks.tasks import clickhouse_errors_count class TestCeleryMetrics(TestCase): From ec776ebc14ab24c5b3965e28172cb1855384679d Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 10:57:42 +0100 Subject: [PATCH 03/17] Fix up tasks --- posthog/api/prompt.py | 14 +------------- posthog/tasks/__init__.py | 8 +++++++- posthog/tasks/prompts.py | 15 +++++++++++++++ posthog/tasks/utils.py | 9 +++++++++ 4 files changed, 32 insertions(+), 14 deletions(-) create mode 100644 posthog/tasks/prompts.py create mode 100644 posthog/tasks/utils.py diff --git a/posthog/api/prompt.py b/posthog/api/prompt.py index b4c6b36512ce7..8c6f1633016ab 100644 --- a/posthog/api/prompt.py +++ b/posthog/api/prompt.py @@ -1,9 +1,7 @@ import json from typing import Any, Dict, List -from celery import shared_task from dateutil import parser -from django.db import IntegrityError from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from rest_framework import exceptions, request, serializers, status, viewsets @@ -14,7 +12,7 @@ from posthog.api.utils import get_token from posthog.exceptions import generate_exception_response from posthog.models.prompt import Prompt, PromptSequence, UserPromptState -from posthog.models.user import User +from posthog.tasks.prompts import trigger_prompt_for_user from posthog.utils_cors import cors_response @@ -185,16 +183,6 @@ class Meta: ] -@shared_task() -def trigger_prompt_for_user(email: str, sequence_id: int): - try: - sequence = PromptSequence.objects.get(pk=sequence_id) - user = User.objects.get(email=email) - UserPromptState.objects.get_or_create(user=user, sequence=sequence, step=None) - except (User.DoesNotExist, IntegrityError): - pass - - @csrf_exempt def prompt_webhook(request: request.Request): if request.method == "POST": diff --git a/posthog/tasks/__init__.py b/posthog/tasks/__init__.py index d8b1ed9f03181..2e9fb05dbefa6 100644 --- a/posthog/tasks/__init__.py +++ b/posthog/tasks/__init__.py @@ -5,14 +5,17 @@ calculate_cohort, check_clickhouse_schema_drift, demo_create_data, + demo_reset_master_team, email, exporter, process_scheduled_changes, + prompts, split_person, sync_all_organization_available_features, tasks, usage_report, user_identify, + verify_persons_data_in_sync, warehouse, ) @@ -21,13 +24,16 @@ "calculate_cohort", "check_clickhouse_schema_drift", "demo_create_data", + "demo_reset_master_team", "email", "exporter", "process_scheduled_changes", + "prompts", "split_person", "sync_all_organization_available_features", "tasks", - "user_identify", "usage_report", + "user_identify", + "verify_persons_data_in_sync", "warehouse", ] diff --git a/posthog/tasks/prompts.py b/posthog/tasks/prompts.py new file mode 100644 index 0000000000000..4c83536a8e528 --- /dev/null +++ b/posthog/tasks/prompts.py @@ -0,0 +1,15 @@ +from celery import shared_task +from django.db import IntegrityError + +from posthog.models.prompt.prompt import PromptSequence, UserPromptState +from posthog.models.user import User + + +@shared_task() +def trigger_prompt_for_user(email: str, sequence_id: int): + try: + sequence = PromptSequence.objects.get(pk=sequence_id) + user = User.objects.get(email=email) + UserPromptState.objects.get_or_create(user=user, sequence=sequence, step=None) + except (User.DoesNotExist, IntegrityError): + pass diff --git a/posthog/tasks/utils.py b/posthog/tasks/utils.py new file mode 100644 index 0000000000000..dd4d80ffc30ef --- /dev/null +++ b/posthog/tasks/utils.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class CeleryQueue(Enum): + DEFAULT = "celery" + EMAIL = "email" + INSIGHT_EXPORT = "insight_export" + INSIGHT_REFRESH = "insight_refresh" + ANALYTICS_QUERIES = "analytics_queries" From 1d1503661325e6447f4eb47f0859819c10fab1da Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 11:00:58 +0100 Subject: [PATCH 04/17] Added things --- posthog/tasks/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 5c4da24cfdb2e..23f2acbaaba14 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -12,6 +12,7 @@ from posthog.metrics import pushed_metrics_registry from posthog.ph_client import get_ph_client from posthog.redis import get_client +from posthog.tasks.utils import CeleryQueue @shared_task(ignore_result=True) @@ -26,7 +27,7 @@ def redis_heartbeat(): get_client().set("POSTHOG_HEARTBEAT", int(time.time())) -@shared_task(ignore_result=True, queue="analytics_queries") +@shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES) def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False): """ Kick off query @@ -447,7 +448,7 @@ def clear_clickhouse_deleted_person(): remove_deleted_person_data() -@shared_task(ignore_result=True, queue="email") +@shared_task(ignore_result=True, queue=CeleryQueue.EMAIL) def redis_celery_queue_depth(): try: with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: From c99f1e4642256852570d73c17fd442ab0a69fd5f Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 11:05:23 +0100 Subject: [PATCH 05/17] Removed logs --- posthog/celery.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/posthog/celery.py b/posthog/celery.py index d1ead060f617a..8b8780db3930a 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -19,6 +19,7 @@ # set the default Django settings module for the 'celery' program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "posthog.settings") + app = Celery("posthog") CELERY_TASK_PRE_RUN_COUNTER = Counter( @@ -129,8 +130,6 @@ def postrun_signal_handler(task_id, task, **kwargs): start_time = task_timings.pop(task_id, None) CELERTY_TASK_DURATION_HISTOGRAM.labels(task_name=task.name).observe(time.time() - start_time) - print("Celery task {} took {:.2f} seconds".format(task.name, time.time() - start_time)) - reset_query_tags() From 2f92185fd5d6d42544d4cf4b3d0af8c1f8f47708 Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 11:09:12 +0100 Subject: [PATCH 06/17] Fix? --- posthog/tasks/tasks.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 23f2acbaaba14..8637fc8c7294f 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -536,11 +536,6 @@ def sync_insight_caching_state( sync_insight_caching_state(team_id, insight_id, dashboard_tile_id) -@shared_task(ignore_result=True, bind=True) -def debug_task(self): - print(f"Request: {self.request!r}") - - @shared_task(ignore_result=True) def calculate_decide_usage() -> None: from django.db.models import Q From 0c85d593e5f43f02f98c8b0506cf85fcea2cc0af Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Jan 2024 10:12:30 +0000 Subject: [PATCH 07/17] Update query snapshots --- .../test_clickhouse_experiment_secondary_results.ambr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr index 018b6c25fd395..613bdc0de3a2a 100644 --- a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr +++ b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results ''' - /* user_id:132 celery:posthog.celery.sync_insight_caching_state */ + /* user_id:132 celery:posthog.tasks.tasks.sync_insight_caching_state */ SELECT team_id, date_diff('second', max(timestamp), now()) AS age FROM events From 5e2c97a861dcd58660d5e5b24358209cba455338 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Jan 2024 10:13:34 +0000 Subject: [PATCH 08/17] Update query snapshots --- posthog/api/test/__snapshots__/test_cohort.ambr | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index 5dcaa0f2c01ad..3dc2cb5ac1249 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -73,7 +73,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.11 ''' - /* celery:posthog.celery.clickhouse_clear_removed_data */ + /* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */ SELECT DISTINCT team_id, cohort_id FROM cohortpeople @@ -168,7 +168,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.8 ''' - /* celery:posthog.celery.clickhouse_clear_removed_data */ + /* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */ SELECT DISTINCT team_id, cohort_id FROM cohortpeople @@ -179,7 +179,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.9 ''' - /* celery:posthog.celery.clickhouse_clear_removed_data */ + /* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */ DELETE FROM cohortpeople WHERE (team_id = 2 From 4841918e33aec99c2b7a786b26b7f7355c9b335c Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 11:19:36 +0100 Subject: [PATCH 09/17] Fix --- bin/celery-queues.env | 2 ++ posthog/caching/test/test_insight_cache.py | 4 ++-- posthog/tasks/utils.py | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bin/celery-queues.env b/bin/celery-queues.env index c4f63d5aeaab7..04ec0dfde0ddc 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -1,3 +1,5 @@ # Default set of queues to be used by Celery. # Important: Add new queues to make Celery consume tasks from them. + +# NOTE: Keep in sync with posthog/tasks/utils.py CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries \ No newline at end of file diff --git a/posthog/caching/test/test_insight_cache.py b/posthog/caching/test/test_insight_cache.py index 1dbe0b5ce2dc1..80cf01170c98d 100644 --- a/posthog/caching/test/test_insight_cache.py +++ b/posthog/caching/test/test_insight_cache.py @@ -58,7 +58,7 @@ def cache_keys(cache): @pytest.mark.django_db -@patch("posthog.celery.update_cache_task") +@patch("posthog.tasks.tasks.update_cache_task") def test_schedule_cache_updates(update_cache_task, team: Team, user: User): caching_state1 = create_insight_caching_state(team, user, filters=filter_dict, last_refresh=None) create_insight_caching_state(team, user, filters=filter_dict) @@ -164,7 +164,7 @@ def test_update_cache_updates_identical_cache_keys(team: Team, user: User, cache @pytest.mark.django_db @freeze_time("2020-01-04T13:01:01Z") -@patch("posthog.celery.update_cache_task") +@patch("posthog.tasks.tasks.update_cache_task") @patch("posthog.caching.insight_cache.calculate_result_by_insight") def test_update_cache_when_calculation_fails( spy_calculate_result_by_insight, diff --git a/posthog/tasks/utils.py b/posthog/tasks/utils.py index dd4d80ffc30ef..77797f9ef0992 100644 --- a/posthog/tasks/utils.py +++ b/posthog/tasks/utils.py @@ -1,9 +1,11 @@ from enum import Enum +# NOTE: Keep in sync with bin/celery-queues.env class CeleryQueue(Enum): DEFAULT = "celery" EMAIL = "email" INSIGHT_EXPORT = "insight_export" INSIGHT_REFRESH = "insight_refresh" ANALYTICS_QUERIES = "analytics_queries" + # EXPORTS = "exports" From 7afcb9231df352d69af0995437cba1b14c38c6e9 Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 11:36:50 +0100 Subject: [PATCH 10/17] Fix --- posthog/celery.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/posthog/celery.py b/posthog/celery.py index 8b8780db3930a..0cf148b4148ce 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -1,5 +1,6 @@ import os import time +from typing import Dict from celery import Celery from celery.signals import ( @@ -70,7 +71,7 @@ app.steps["worker"].add(DjangoStructLogInitStep) -task_timings = {} +task_timings: Dict[str, float] = {} @setup_logging.connect @@ -128,7 +129,8 @@ def postrun_signal_handler(task_id, task, **kwargs): if task_id in task_timings: start_time = task_timings.pop(task_id, None) - CELERTY_TASK_DURATION_HISTOGRAM.labels(task_name=task.name).observe(time.time() - start_time) + if start_time: + CELERTY_TASK_DURATION_HISTOGRAM.labels(task_name=task.name).observe(time.time() - start_time) reset_query_tags() From 2b1e3a425df00a064bf13d08b1adc0493898d2fe Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 12:28:11 +0100 Subject: [PATCH 11/17] Fixes --- bin/celery-queues.env | 2 +- ee/tasks/subscriptions/__init__.py | 15 ++++++++++++--- posthog/tasks/exporter.py | 2 ++ posthog/tasks/utils.py | 25 ++++++++++++++++++++++++- 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/bin/celery-queues.env b/bin/celery-queues.env index 04ec0dfde0ddc..5c9f8c5fde564 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -2,4 +2,4 @@ # Important: Add new queues to make Celery consume tasks from them. # NOTE: Keep in sync with posthog/tasks/utils.py -CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries \ No newline at end of file +CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery \ No newline at end of file diff --git a/ee/tasks/subscriptions/__init__.py b/ee/tasks/subscriptions/__init__.py index 3dbe6e91b6bfe..f5bd3c043ff59 100644 --- a/ee/tasks/subscriptions/__init__.py +++ b/ee/tasks/subscriptions/__init__.py @@ -11,6 +11,7 @@ from ee.tasks.subscriptions.subscription_utils import generate_assets from posthog import settings from posthog.models.subscription import Subscription +from posthog.tasks.utils import CeleryQueue logger = structlog.get_logger(__name__) @@ -120,7 +121,7 @@ def _deliver_subscription_report( subscription.save() -@shared_task() +@shared_task(queue=CeleryQueue.SUBSCRIPTION_DELIVERY) def schedule_all_subscriptions() -> None: """ Schedule all past notifications (with a buffer) to be delivered @@ -148,12 +149,20 @@ def schedule_all_subscriptions() -> None: report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5 -@shared_task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10) +@shared_task( + soft_time_limit=report_timeout_seconds, + time_limit=report_timeout_seconds + 10, + queue=CeleryQueue.SUBSCRIPTION_DELIVERY, +) def deliver_subscription_report(subscription_id: int) -> None: return _deliver_subscription_report(subscription_id) -@shared_task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10) +@shared_task( + soft_time_limit=report_timeout_seconds, + time_limit=report_timeout_seconds + 10, + queue=CeleryQueue.SUBSCRIPTION_DELIVERY, +) def handle_subscription_value_change( subscription_id: int, previous_value: str, invite_message: Optional[str] = None ) -> None: diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index 20124d9607edc..1d01de9b76566 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -5,6 +5,7 @@ from posthog import settings from posthog.models import ExportedAsset +from posthog.tasks.utils import CeleryQueue EXPORT_QUEUED_COUNTER = Counter( "exporter_task_queued", @@ -42,6 +43,7 @@ acks_late=True, ignore_result=False, time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS, + queue=CeleryQueue.EXPORTS, ) def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None: from posthog.tasks.exports import csv_exporter, image_exporter diff --git a/posthog/tasks/utils.py b/posthog/tasks/utils.py index 77797f9ef0992..734c919990d21 100644 --- a/posthog/tasks/utils.py +++ b/posthog/tasks/utils.py @@ -1,5 +1,27 @@ from enum import Enum +# NOTE: These are the queues used for logically separating workloads. +# Many queues are consumed by one "consumer" - a worker configured to consume from that queue. +# The goal should be to split up queues based on the type of work being done, so that we can scale effectively +# and change the consumer configs without the need for code changes +# +# Worker consumers config here https://github.com/PostHog/posthog-cloud-infra/blob/main/helm/values/prod.yml#L368 +# e.g. +# consumers: +# - name: priority +# queues: +# - email +# - stats +# - name: default +# concurrency: 4 +# queues: +# - celery # default queue for Celery +# - name: async +# concurrency: 4 +# queues: +# - analytics_queries +# - subscriptions + # NOTE: Keep in sync with bin/celery-queues.env class CeleryQueue(Enum): @@ -8,4 +30,5 @@ class CeleryQueue(Enum): INSIGHT_EXPORT = "insight_export" INSIGHT_REFRESH = "insight_refresh" ANALYTICS_QUERIES = "analytics_queries" - # EXPORTS = "exports" + EXPORTS = "exports" + SUBSCRIPTION_DELIVERY = "subscription_delivery" From 958f46eb61f1f55aac919fa2039cd30df4cf3b0f Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 12:32:34 +0100 Subject: [PATCH 12/17] Fix types --- posthog/tasks/prompts.py | 2 +- posthog/tasks/tasks.py | 72 ++++++++++++++++++++-------------------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/posthog/tasks/prompts.py b/posthog/tasks/prompts.py index 4c83536a8e528..13828740f3e95 100644 --- a/posthog/tasks/prompts.py +++ b/posthog/tasks/prompts.py @@ -6,7 +6,7 @@ @shared_task() -def trigger_prompt_for_user(email: str, sequence_id: int): +def trigger_prompt_for_user(email: str, sequence_id: int) -> None: try: sequence = PromptSequence.objects.get(pk=sequence_id) user = User.objects.get(email=email) diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 8637fc8c7294f..b1ed342b332e4 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -23,12 +23,12 @@ def delete_expired_exported_assets() -> None: @shared_task(ignore_result=True) -def redis_heartbeat(): +def redis_heartbeat() -> None: get_client().set("POSTHOG_HEARTBEAT", int(time.time())) @shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES) -def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False): +def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False) -> None: """ Kick off query Once complete save results to redis @@ -45,7 +45,7 @@ def process_query_task(team_id, query_id, query_json, limit_context=None, refres @shared_task(ignore_result=True) -def pg_table_cache_hit_rate(): +def pg_table_cache_hit_rate() -> None: from statshog.defaults.django import statsd with connection.cursor() as cursor: @@ -77,7 +77,7 @@ def pg_table_cache_hit_rate(): @shared_task(ignore_result=True) -def pg_plugin_server_query_timing(): +def pg_plugin_server_query_timing() -> None: from statshog.defaults.django import statsd with connection.cursor() as cursor: @@ -120,7 +120,7 @@ def pg_plugin_server_query_timing(): @shared_task(ignore_result=True) -def pg_row_count(): +def pg_row_count() -> None: with pushed_metrics_registry("celery_pg_row_count") as registry: row_count_gauge = Gauge( "posthog_celery_pg_table_row_count", @@ -153,7 +153,7 @@ def pg_row_count(): @shared_task(ignore_result=True) -def clickhouse_lag(): +def clickhouse_lag() -> None: from statshog.defaults.django import statsd from posthog.client import sync_execute @@ -189,7 +189,7 @@ def clickhouse_lag(): @shared_task(ignore_result=True) -def ingestion_lag(): +def ingestion_lag() -> None: from statshog.defaults.django import statsd from posthog.client import sync_execute @@ -237,7 +237,7 @@ def ingestion_lag(): @shared_task(ignore_result=True) -def graphile_worker_queue_size(): +def graphile_worker_queue_size() -> None: from django.db import connections from statshog.defaults.django import statsd @@ -300,7 +300,7 @@ def graphile_worker_queue_size(): @shared_task(ignore_result=True) -def clickhouse_row_count(): +def clickhouse_row_count() -> None: from statshog.defaults.django import statsd from posthog.client import sync_execute @@ -329,7 +329,7 @@ def clickhouse_row_count(): @shared_task(ignore_result=True) -def clickhouse_errors_count(): +def clickhouse_errors_count() -> None: """ This task is used to track the recency of errors in ClickHouse. We can use this to alert on errors that are consistently being generated recently @@ -364,7 +364,7 @@ def clickhouse_errors_count(): @shared_task(ignore_result=True) -def clickhouse_part_count(): +def clickhouse_part_count() -> None: from statshog.defaults.django import statsd from posthog.client import sync_execute @@ -395,7 +395,7 @@ def clickhouse_part_count(): @shared_task(ignore_result=True) -def clickhouse_mutation_count(): +def clickhouse_mutation_count() -> None: from statshog.defaults.django import statsd from posthog.client import sync_execute @@ -428,7 +428,7 @@ def clickhouse_mutation_count(): @shared_task(ignore_result=True) -def clickhouse_clear_removed_data(): +def clickhouse_clear_removed_data() -> None: from posthog.models.async_deletion.delete_cohorts import AsyncCohortDeletion from posthog.models.async_deletion.delete_events import AsyncEventDeletion @@ -442,7 +442,7 @@ def clickhouse_clear_removed_data(): @shared_task(ignore_result=True) -def clear_clickhouse_deleted_person(): +def clear_clickhouse_deleted_person() -> None: from posthog.models.async_deletion.delete_person import remove_deleted_person_data remove_deleted_person_data() @@ -466,7 +466,7 @@ def redis_celery_queue_depth(): @shared_task(ignore_result=True) -def update_event_partitions(): +def update_event_partitions() -> None: with connection.cursor() as cursor: cursor.execute( "DO $$ BEGIN IF (SELECT exists(select * from pg_proc where proname = 'update_partitions')) THEN PERFORM update_partitions(); END IF; END $$" @@ -474,7 +474,7 @@ def update_event_partitions(): @shared_task(ignore_result=True) -def clean_stale_partials(): +def clean_stale_partials() -> None: """Clean stale (meaning older than 7 days) partial social auth sessions.""" from social_django.models import Partial @@ -482,7 +482,7 @@ def clean_stale_partials(): @shared_task(ignore_result=True) -def monitoring_check_clickhouse_schema_drift(): +def monitoring_check_clickhouse_schema_drift() -> None: from posthog.tasks.check_clickhouse_schema_drift import ( check_clickhouse_schema_drift, ) @@ -491,35 +491,35 @@ def monitoring_check_clickhouse_schema_drift(): @shared_task(ignore_result=True) -def calculate_cohort(): +def calculate_cohort() -> None: from posthog.tasks.calculate_cohort import calculate_cohorts calculate_cohorts() @shared_task(ignore_result=True) -def process_scheduled_changes(): +def process_scheduled_changes() -> None: from posthog.tasks.process_scheduled_changes import process_scheduled_changes process_scheduled_changes() @shared_task(ignore_result=True) -def sync_insight_cache_states_task(): +def sync_insight_cache_states_task() -> None: from posthog.caching.insight_caching_state import sync_insight_cache_states sync_insight_cache_states() @shared_task(ignore_result=True) -def schedule_cache_updates_task(): +def schedule_cache_updates_task() -> None: from posthog.caching.insight_cache import schedule_cache_updates schedule_cache_updates() @shared_task(ignore_result=True) -def update_cache_task(caching_state_id: UUID): +def update_cache_task(caching_state_id: UUID) -> None: from posthog.caching.insight_cache import update_cache update_cache(caching_state_id) @@ -530,7 +530,7 @@ def sync_insight_caching_state( team_id: int, insight_id: Optional[int] = None, dashboard_tile_id: Optional[int] = None, -): +) -> None: from posthog.caching.insight_caching_state import sync_insight_caching_state sync_insight_caching_state(team_id, insight_id, dashboard_tile_id) @@ -554,7 +554,7 @@ def calculate_decide_usage() -> None: @shared_task(ignore_result=True) -def find_flags_with_enriched_analytics(): +def find_flags_with_enriched_analytics() -> None: from datetime import datetime, timedelta from posthog.models.feature_flag.flag_analytics import ( @@ -568,7 +568,7 @@ def find_flags_with_enriched_analytics(): @shared_task(ignore_result=True) -def demo_reset_master_team(): +def demo_reset_master_team() -> None: from posthog.tasks.demo_reset_master_team import demo_reset_master_team if is_cloud() or settings.DEMO: @@ -576,7 +576,7 @@ def demo_reset_master_team(): @shared_task(ignore_result=True) -def sync_all_organization_available_features(): +def sync_all_organization_available_features() -> None: from posthog.tasks.sync_all_organization_available_features import ( sync_all_organization_available_features, ) @@ -592,7 +592,7 @@ def check_async_migration_health(): @shared_task(ignore_result=True) -def verify_persons_data_in_sync(): +def verify_persons_data_in_sync() -> None: from posthog.tasks.verify_persons_data_in_sync import ( verify_persons_data_in_sync as verify, ) @@ -614,7 +614,7 @@ def recompute_materialized_columns_enabled() -> bool: @shared_task(ignore_result=True) -def clickhouse_materialize_columns(): +def clickhouse_materialize_columns() -> None: if recompute_materialized_columns_enabled(): try: from ee.clickhouse.materialized_columns.analyze import ( @@ -627,7 +627,7 @@ def clickhouse_materialize_columns(): @shared_task(ignore_result=True) -def clickhouse_mark_all_materialized(): +def clickhouse_mark_all_materialized() -> None: if recompute_materialized_columns_enabled(): try: from ee.tasks.materialized_columns import mark_all_materialized @@ -638,14 +638,14 @@ def clickhouse_mark_all_materialized(): @shared_task(ignore_result=True) -def send_org_usage_reports(): +def send_org_usage_reports() -> None: from posthog.tasks.usage_report import send_all_org_usage_reports send_all_org_usage_reports.delay() @shared_task(ignore_result=True) -def update_quota_limiting(): +def update_quota_limiting() -> None: try: from ee.billing.quota_limiting import update_all_org_billing_quotas @@ -655,7 +655,7 @@ def update_quota_limiting(): @shared_task(ignore_result=True) -def schedule_all_subscriptions(): +def schedule_all_subscriptions() -> None: try: from ee.tasks.subscriptions import ( schedule_all_subscriptions as _schedule_all_subscriptions, @@ -678,7 +678,7 @@ def clickhouse_send_license_usage(): @shared_task(ignore_result=True) -def check_flags_to_rollback(): +def check_flags_to_rollback() -> None: try: from ee.tasks.auto_rollback_feature_flag import check_flags_to_rollback @@ -688,7 +688,7 @@ def check_flags_to_rollback(): @shared_task(ignore_result=True) -def ee_persist_single_recording(id: str, team_id: int): +def ee_persist_single_recording(id: str, team_id: int) -> None: try: from ee.session_recordings.persistence_tasks import persist_single_recording @@ -698,7 +698,7 @@ def ee_persist_single_recording(id: str, team_id: int): @shared_task(ignore_result=True) -def ee_persist_finished_recordings(): +def ee_persist_finished_recordings() -> None: try: from ee.session_recordings.persistence_tasks import persist_finished_recordings except ImportError: @@ -708,7 +708,7 @@ def ee_persist_finished_recordings(): @shared_task(ignore_result=True) -def check_data_import_row_limits(): +def check_data_import_row_limits() -> None: try: from posthog.tasks.warehouse import check_synced_row_limits except ImportError: From 43ef18ef93073b8804892b866bcbb2edba65b73e Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 12:33:50 +0100 Subject: [PATCH 13/17] Fixes --- bin/celery-queues.env | 2 +- posthog/tasks/tasks.py | 2 +- posthog/tasks/utils.py | 5 ++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bin/celery-queues.env b/bin/celery-queues.env index 5c9f8c5fde564..de76211d9d5ce 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -2,4 +2,4 @@ # Important: Add new queues to make Celery consume tasks from them. # NOTE: Keep in sync with posthog/tasks/utils.py -CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery \ No newline at end of file +CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery \ No newline at end of file diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index b1ed342b332e4..49ee706654c49 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -448,7 +448,7 @@ def clear_clickhouse_deleted_person() -> None: remove_deleted_person_data() -@shared_task(ignore_result=True, queue=CeleryQueue.EMAIL) +@shared_task(ignore_result=True, queue=CeleryQueue.STATS) def redis_celery_queue_depth(): try: with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: diff --git a/posthog/tasks/utils.py b/posthog/tasks/utils.py index 734c919990d21..19705d454635d 100644 --- a/posthog/tasks/utils.py +++ b/posthog/tasks/utils.py @@ -1,5 +1,3 @@ -from enum import Enum - # NOTE: These are the queues used for logically separating workloads. # Many queues are consumed by one "consumer" - a worker configured to consume from that queue. # The goal should be to split up queues based on the type of work being done, so that we can scale effectively @@ -24,8 +22,9 @@ # NOTE: Keep in sync with bin/celery-queues.env -class CeleryQueue(Enum): +class CeleryQueue: DEFAULT = "celery" + STATS = "stats" EMAIL = "email" INSIGHT_EXPORT = "insight_export" INSIGHT_REFRESH = "insight_refresh" From 021357b85e1d98e8357973ed9067c81b63685602 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Jan 2024 11:49:41 +0000 Subject: [PATCH 14/17] Update query snapshots --- .../api/test/__snapshots__/test_insight.ambr | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_insight.ambr b/posthog/api/test/__snapshots__/test_insight.ambr index 0dbd61eb66823..f580e601410f7 100644 --- a/posthog/api/test/__snapshots__/test_insight.ambr +++ b/posthog/api/test/__snapshots__/test_insight.ambr @@ -1113,6 +1113,17 @@ ''' # --- # name: TestInsight.test_listing_insights_does_not_nplus1.24 + ''' + SELECT "posthog_instancesetting"."id", + "posthog_instancesetting"."key", + "posthog_instancesetting"."raw_value" + FROM "posthog_instancesetting" + WHERE "posthog_instancesetting"."key" = 'constance:posthog:RATE_LIMIT_ENABLED' + ORDER BY "posthog_instancesetting"."id" ASC + LIMIT 1 /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ + ''' +# --- +# name: TestInsight.test_listing_insights_does_not_nplus1.25 ''' SELECT "posthog_organization"."id", "posthog_organization"."name", @@ -1136,7 +1147,7 @@ LIMIT 21 /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.25 +# name: TestInsight.test_listing_insights_does_not_nplus1.26 ''' SELECT COUNT(*) AS "__count" FROM "posthog_dashboarditem" @@ -1146,7 +1157,7 @@ AND "posthog_dashboarditem"."query" IS NOT NULL)) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.26 +# name: TestInsight.test_listing_insights_does_not_nplus1.27 ''' SELECT "posthog_dashboarditem"."id", "posthog_dashboarditem"."name", @@ -1289,7 +1300,7 @@ LIMIT 100 /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.27 +# name: TestInsight.test_listing_insights_does_not_nplus1.28 ''' SELECT ("posthog_dashboardtile"."insight_id") AS "_prefetch_related_val_insight_id", "posthog_dashboard"."id", @@ -1388,7 +1399,7 @@ 5 /* ... */)) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.28 +# name: TestInsight.test_listing_insights_does_not_nplus1.29 ''' SELECT "posthog_dashboardtile"."id", "posthog_dashboardtile"."dashboard_id", @@ -1499,24 +1510,6 @@ 5 /* ... */)) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.29 - ''' - SELECT "posthog_taggeditem"."id", - "posthog_taggeditem"."tag_id", - "posthog_taggeditem"."dashboard_id", - "posthog_taggeditem"."insight_id", - "posthog_taggeditem"."event_definition_id", - "posthog_taggeditem"."property_definition_id", - "posthog_taggeditem"."action_id", - "posthog_taggeditem"."feature_flag_id" - FROM "posthog_taggeditem" - WHERE "posthog_taggeditem"."insight_id" IN (1, - 2, - 3, - 4, - 5 /* ... */) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ - ''' -# --- # name: TestInsight.test_listing_insights_does_not_nplus1.3 ''' SELECT "posthog_dashboard"."id", From c8ef0f6f024e921f38af58111fe816e0f3ceff4f Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Jan 2024 12:03:39 +0000 Subject: [PATCH 15/17] Update query snapshots --- .../api/test/__snapshots__/test_insight.ambr | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_insight.ambr b/posthog/api/test/__snapshots__/test_insight.ambr index f580e601410f7..0dbd61eb66823 100644 --- a/posthog/api/test/__snapshots__/test_insight.ambr +++ b/posthog/api/test/__snapshots__/test_insight.ambr @@ -1113,17 +1113,6 @@ ''' # --- # name: TestInsight.test_listing_insights_does_not_nplus1.24 - ''' - SELECT "posthog_instancesetting"."id", - "posthog_instancesetting"."key", - "posthog_instancesetting"."raw_value" - FROM "posthog_instancesetting" - WHERE "posthog_instancesetting"."key" = 'constance:posthog:RATE_LIMIT_ENABLED' - ORDER BY "posthog_instancesetting"."id" ASC - LIMIT 1 /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ - ''' -# --- -# name: TestInsight.test_listing_insights_does_not_nplus1.25 ''' SELECT "posthog_organization"."id", "posthog_organization"."name", @@ -1147,7 +1136,7 @@ LIMIT 21 /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.26 +# name: TestInsight.test_listing_insights_does_not_nplus1.25 ''' SELECT COUNT(*) AS "__count" FROM "posthog_dashboarditem" @@ -1157,7 +1146,7 @@ AND "posthog_dashboarditem"."query" IS NOT NULL)) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.27 +# name: TestInsight.test_listing_insights_does_not_nplus1.26 ''' SELECT "posthog_dashboarditem"."id", "posthog_dashboarditem"."name", @@ -1300,7 +1289,7 @@ LIMIT 100 /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.28 +# name: TestInsight.test_listing_insights_does_not_nplus1.27 ''' SELECT ("posthog_dashboardtile"."insight_id") AS "_prefetch_related_val_insight_id", "posthog_dashboard"."id", @@ -1399,7 +1388,7 @@ 5 /* ... */)) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- -# name: TestInsight.test_listing_insights_does_not_nplus1.29 +# name: TestInsight.test_listing_insights_does_not_nplus1.28 ''' SELECT "posthog_dashboardtile"."id", "posthog_dashboardtile"."dashboard_id", @@ -1510,6 +1499,24 @@ 5 /* ... */)) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ ''' # --- +# name: TestInsight.test_listing_insights_does_not_nplus1.29 + ''' + SELECT "posthog_taggeditem"."id", + "posthog_taggeditem"."tag_id", + "posthog_taggeditem"."dashboard_id", + "posthog_taggeditem"."insight_id", + "posthog_taggeditem"."event_definition_id", + "posthog_taggeditem"."property_definition_id", + "posthog_taggeditem"."action_id", + "posthog_taggeditem"."feature_flag_id" + FROM "posthog_taggeditem" + WHERE "posthog_taggeditem"."insight_id" IN (1, + 2, + 3, + 4, + 5 /* ... */) /*controller='project_insights-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/insights/%3F%24'*/ + ''' +# --- # name: TestInsight.test_listing_insights_does_not_nplus1.3 ''' SELECT "posthog_dashboard"."id", From bb132f64bea2a14efb45c512c3a40a682c4cb0fb Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 13:04:47 +0100 Subject: [PATCH 16/17] Fix types --- posthog/tasks/scheduled.py | 5 +++-- posthog/tasks/tasks.py | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index e843074e5246b..446483ce4575a 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -1,4 +1,5 @@ from random import randrange +from typing import Any from celery import Celery from celery.canvas import Signature @@ -53,7 +54,7 @@ def add_periodic_task_with_expiry( schedule_seconds: int, task_signature: Signature, name: str | None = None, -): +) -> None: """ If the workers get delayed in processing tasks, then tasks that fire every X seconds get queued multiple times And so, are processed multiple times. But they often only need to be processed once. @@ -70,7 +71,7 @@ def add_periodic_task_with_expiry( @app.on_after_configure.connect -def setup_periodic_tasks(sender: Celery, **kwargs): +def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: # Monitoring tasks add_periodic_task_with_expiry( sender, diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 49ee706654c49..7cb20271e1830 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -1,5 +1,5 @@ import time -from typing import Optional +from typing import Any, Optional from uuid import UUID from celery import shared_task @@ -28,7 +28,9 @@ def redis_heartbeat() -> None: @shared_task(ignore_result=True, queue=CeleryQueue.ANALYTICS_QUERIES) -def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False) -> None: +def process_query_task( + team_id: str, query_id: str, query_json: Any, limit_context: Any = None, refresh_requested: bool = False +) -> None: """ Kick off query Once complete save results to redis @@ -449,7 +451,7 @@ def clear_clickhouse_deleted_person() -> None: @shared_task(ignore_result=True, queue=CeleryQueue.STATS) -def redis_celery_queue_depth(): +def redis_celery_queue_depth() -> None: try: with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: celery_task_queue_depth_gauge = Gauge( @@ -585,7 +587,7 @@ def sync_all_organization_available_features() -> None: @shared_task(ignore_result=False, track_started=True, max_retries=0) -def check_async_migration_health(): +def check_async_migration_health() -> None: from posthog.tasks.async_migrations import check_async_migration_health check_async_migration_health() @@ -667,7 +669,7 @@ def schedule_all_subscriptions() -> None: @shared_task(ignore_result=True, retries=3) -def clickhouse_send_license_usage(): +def clickhouse_send_license_usage() -> None: try: if not is_cloud(): from ee.tasks.send_license_usage import send_license_usage From 3c2c2236edf8a7efbbd85435dd4639f8d66506ec Mon Sep 17 00:00:00 2001 From: Ben White Date: Thu, 18 Jan 2024 13:51:43 +0100 Subject: [PATCH 17/17] Fixed tests --- posthog/caching/test/test_insight_cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/caching/test/test_insight_cache.py b/posthog/caching/test/test_insight_cache.py index 80cf01170c98d..88495a795695e 100644 --- a/posthog/caching/test/test_insight_cache.py +++ b/posthog/caching/test/test_insight_cache.py @@ -58,7 +58,7 @@ def cache_keys(cache): @pytest.mark.django_db -@patch("posthog.tasks.tasks.update_cache_task") +@patch("posthog.caching.insight_cache.update_cache_task") def test_schedule_cache_updates(update_cache_task, team: Team, user: User): caching_state1 = create_insight_caching_state(team, user, filters=filter_dict, last_refresh=None) create_insight_caching_state(team, user, filters=filter_dict) @@ -164,7 +164,7 @@ def test_update_cache_updates_identical_cache_keys(team: Team, user: User, cache @pytest.mark.django_db @freeze_time("2020-01-04T13:01:01Z") -@patch("posthog.tasks.tasks.update_cache_task") +@patch("posthog.caching.insight_cache.update_cache_task") @patch("posthog.caching.insight_cache.calculate_result_by_insight") def test_update_cache_when_calculation_fails( spy_calculate_result_by_insight,