Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add separate celery queue for usage reports #20048

Merged
merged 12 commits into from
Jan 31, 2024
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports
12 changes: 7 additions & 5 deletions posthog/settings/sentry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import os
from datetime import timedelta
from random import random

import sentry_sdk
from dateutil import parser
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
Expand All @@ -10,10 +13,6 @@
from posthog.settings import get_from_env
from posthog.settings.base_variables import TEST

from dateutil import parser
from random import random
from datetime import timedelta


def before_send(event, hint):
for exception in event.get("exception", {}).get("values", []):
Expand Down Expand Up @@ -109,7 +108,10 @@ def traces_sampler(sampling_context: dict) -> float:
task = sampling_context.get("celery_job", {}).get("task")
if task == "posthog.celery.redis_heartbeat":
return 0.0001 # 0.01%
if task == "posthog.celery.redis_celery_queue_depth":
if (
task == "posthog.celery.redis_celery_queue_depth"
or task == "posthog.celery.redis_celery_queue_depth_usage_reports"
):
benjackwhite marked this conversation as resolved.
Show resolved Hide resolved
return 0.0001 # 0.01%
else:
# Default sample rate for Celery tasks
Expand Down
4 changes: 3 additions & 1 deletion posthog/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
pg_table_cache_hit_rate,
process_scheduled_changes,
redis_celery_queue_depth,
redis_celery_queue_depth_usage_reports,
redis_heartbeat,
schedule_all_subscriptions,
schedule_cache_updates_task,
Expand Down Expand Up @@ -82,6 +83,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:

if not settings.DEBUG:
add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth.s(), "10 sec queue probe")
add_periodic_task_with_expiry(sender, 10, redis_celery_queue_depth_usage_reports.s(), "10 sec queue probe")

# Heartbeat every 10sec to make sure the worker is alive
add_periodic_task_with_expiry(sender, 10, redis_heartbeat.s(), "10 sec heartbeat")
Expand All @@ -97,7 +99,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:
sender.add_periodic_task(
crontab(hour="2", minute="15", day_of_week="mon"),
send_org_usage_reports.s(),
name="send instance usage report",
name="send instance usage report, monday",
)
sender.add_periodic_task(
crontab(hour="0", minute="15", day_of_week="tue,wed,thu,fri,sat,sun"),
Expand Down
19 changes: 18 additions & 1 deletion posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,23 @@ def redis_celery_queue_depth() -> None:
return


@shared_task(ignore_result=True, queue=CeleryQueue.STATS)
def redis_celery_queue_depth_usage_reports() -> None:
try:
with pushed_metrics_registry("redis_celery_queue_depth_usage_reports_registry") as registry:
celery_task_queue_depth_gauge = Gauge(
"posthog_celery_queue_depth_usage_reports",
"We use this to monitor the depth of the usage_reports celery queue.",
registry=registry,
)

llen = get_client().llen("usage_reports")
celery_task_queue_depth_gauge.set(llen)
except:
# if we can't generate the metric don't complain about it.
return


@shared_task(ignore_result=True)
def update_event_partitions() -> None:
with connection.cursor() as cursor:
Expand Down Expand Up @@ -639,7 +656,7 @@ def clickhouse_mark_all_materialized() -> None:
mark_all_materialized()


@shared_task(ignore_result=True)
@shared_task(ignore_result=True, queue=CeleryQueue.USAGE_REPORTS)
def send_org_usage_reports() -> None:
from posthog.tasks.usage_report import send_all_org_usage_reports

Expand Down
1 change: 1 addition & 0 deletions posthog/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ class CeleryQueue:
ANALYTICS_QUERIES = "analytics_queries"
EXPORTS = "exports"
SUBSCRIPTION_DELIVERY = "subscription_delivery"
USAGE_REPORTS = "usage_reports"
Loading