From 737dc248cb9d6dad18cdafe345bb9365d1ca8513 Mon Sep 17 00:00:00 2001 From: Raquel Smith Date: Tue, 30 Jan 2024 12:15:50 -0800 Subject: [PATCH] add metrics for this queue depth --- posthog/settings/sentry.py | 12 +++++++----- posthog/tasks/scheduled.py | 2 ++ posthog/tasks/tasks.py | 17 +++++++++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/posthog/settings/sentry.py b/posthog/settings/sentry.py index 208c862c7fa7e..63a291e92a0d0 100644 --- a/posthog/settings/sentry.py +++ b/posthog/settings/sentry.py @@ -1,7 +1,10 @@ import logging import os +from datetime import timedelta +from random import random import sentry_sdk +from dateutil import parser from sentry_sdk.integrations.celery import CeleryIntegration from sentry_sdk.integrations.django import DjangoIntegration from sentry_sdk.integrations.logging import LoggingIntegration @@ -10,10 +13,6 @@ from posthog.settings import get_from_env from posthog.settings.base_variables import TEST -from dateutil import parser -from random import random -from datetime import timedelta - def before_send(event, hint): for exception in event.get("exception", {}).get("values", []): @@ -109,7 +108,10 @@ def traces_sampler(sampling_context: dict) -> float: task = sampling_context.get("celery_job", {}).get("task") if task == "posthog.celery.redis_heartbeat": return 0.0001 # 0.01% - if task == "posthog.celery.redis_celery_queue_depth": + if ( + task == "posthog.celery.redis_celery_queue_depth" + or task == "posthog.celery.redis_celery_queue_depth_usage_reports" + ): return 0.0001 # 0.01% else: # Default sample rate for Celery tasks diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index 85d39d4bcb904..dc8f3e24a8c84 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -36,6 +36,7 @@ pg_table_cache_hit_rate, process_scheduled_changes, redis_celery_queue_depth, + redis_celery_queue_depth_usage_reports, redis_heartbeat, schedule_all_subscriptions, schedule_cache_updates_task, @@ -82,6 +83,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: if not settings.DEBUG: add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth.s(), "10 sec queue probe") + add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth_usage_reports.s(), "10 sec queue probe") # Heartbeat every 10sec to make sure the worker is alive add_periodic_task_with_expiry(sender, 10, redis_heartbeat.s(), "10 sec heartbeat") diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index e1add9d682674..b59b49b959658 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -467,6 +467,23 @@ def redis_celery_queue_depth() -> None: return +@shared_task(ignore_result=True, queue=CeleryQueue.STATS) +def redis_celery_queue_depth_usage_reports() -> None: + try: + with pushed_metrics_registry("redis_celery_queue_depth_usage_reports_registry") as registry: + celery_task_queue_depth_gauge = Gauge( + "posthog_celery_queue_depth_usage_reports", + "We use this to monitor the depth of the usage_reports celery queue.", + registry=registry, + ) + + llen = get_client().llen("usage_reports") + celery_task_queue_depth_gauge.set(llen) + except: + # if we can't generate the metric don't complain about it. + return + + @shared_task(ignore_result=True) def update_event_partitions() -> None: with connection.cursor() as cursor: