Skip to content

Commit

Permalink
add metrics for this queue depth
Browse files Browse the repository at this point in the history
  • Loading branch information
raquelmsmith committed Jan 30, 2024
1 parent b6f4772 commit 737dc24
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
12 changes: 7 additions & 5 deletions posthog/settings/sentry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import os
from datetime import timedelta
from random import random

import sentry_sdk
from dateutil import parser
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
Expand All @@ -10,10 +13,6 @@
from posthog.settings import get_from_env
from posthog.settings.base_variables import TEST

from dateutil import parser
from random import random
from datetime import timedelta


def before_send(event, hint):
for exception in event.get("exception", {}).get("values", []):
Expand Down Expand Up @@ -109,7 +108,10 @@ def traces_sampler(sampling_context: dict) -> float:
task = sampling_context.get("celery_job", {}).get("task")
if task == "posthog.celery.redis_heartbeat":
return 0.0001 # 0.01%
if task == "posthog.celery.redis_celery_queue_depth":
if (
task == "posthog.celery.redis_celery_queue_depth"
or task == "posthog.celery.redis_celery_queue_depth_usage_reports"
):
return 0.0001 # 0.01%
else:
# Default sample rate for Celery tasks
Expand Down
2 changes: 2 additions & 0 deletions posthog/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
pg_table_cache_hit_rate,
process_scheduled_changes,
redis_celery_queue_depth,
redis_celery_queue_depth_usage_reports,
redis_heartbeat,
schedule_all_subscriptions,
schedule_cache_updates_task,
Expand Down Expand Up @@ -82,6 +83,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:

if not settings.DEBUG:
add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth.s(), "10 sec queue probe")
add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth_usage_reports.s(), "10 sec queue probe")

# Heartbeat every 10sec to make sure the worker is alive
add_periodic_task_with_expiry(sender, 10, redis_heartbeat.s(), "10 sec heartbeat")
Expand Down
17 changes: 17 additions & 0 deletions posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,23 @@ def redis_celery_queue_depth() -> None:
return


@shared_task(ignore_result=True, queue=CeleryQueue.STATS)
def redis_celery_queue_depth_usage_reports() -> None:
try:
with pushed_metrics_registry("redis_celery_queue_depth_usage_reports_registry") as registry:
celery_task_queue_depth_gauge = Gauge(
"posthog_celery_queue_depth_usage_reports",
"We use this to monitor the depth of the usage_reports celery queue.",
registry=registry,
)

llen = get_client().llen("usage_reports")
celery_task_queue_depth_gauge.set(llen)
except:
# if we can't generate the metric don't complain about it.
return


@shared_task(ignore_result=True)
def update_event_partitions() -> None:
with connection.cursor() as cursor:
Expand Down

0 comments on commit 737dc24

Please sign in to comment.