diff --git a/bin/celery-queues.env b/bin/celery-queues.env index de76211d9d5ce..ba4376ab93ca1 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -2,4 +2,4 @@ # Important: Add new queues to make Celery consume tasks from them. # NOTE: Keep in sync with posthog/tasks/utils.py -CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery \ No newline at end of file +CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports \ No newline at end of file diff --git a/posthog/settings/sentry.py b/posthog/settings/sentry.py index 208c862c7fa7e..2b5d49fc36212 100644 --- a/posthog/settings/sentry.py +++ b/posthog/settings/sentry.py @@ -1,7 +1,10 @@ import logging import os +from datetime import timedelta +from random import random import sentry_sdk +from dateutil import parser from sentry_sdk.integrations.celery import CeleryIntegration from sentry_sdk.integrations.django import DjangoIntegration from sentry_sdk.integrations.logging import LoggingIntegration @@ -10,10 +13,6 @@ from posthog.settings import get_from_env from posthog.settings.base_variables import TEST -from dateutil import parser -from random import random -from datetime import timedelta - def before_send(event, hint): for exception in event.get("exception", {}).get("values", []): @@ -107,13 +106,15 @@ def traces_sampler(sampling_context: dict) -> float: elif op == "celery.task": task = sampling_context.get("celery_job", {}).get("task") - if task == "posthog.celery.redis_heartbeat": - return 0.0001 # 0.01% - if task == "posthog.celery.redis_celery_queue_depth": + + if task in ( + "posthog.celery.redis_heartbeat", + "posthog.celery.redis_celery_queue_depth", + ): return 0.0001 # 0.01% - else: - # Default sample rate for Celery tasks - return 0.001 # 0.1% + + # Default sample rate for Celery tasks + return 0.001 # 0.1% elif op == "queue.task.celery": task = sampling_context.get("celery_job", {}).get("task") if task == "posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag": diff --git a/posthog/tasks/scheduled.py b/posthog/tasks/scheduled.py index 446483ce4575a..85d39d4bcb904 100644 --- a/posthog/tasks/scheduled.py +++ b/posthog/tasks/scheduled.py @@ -97,7 +97,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None: sender.add_periodic_task( crontab(hour="2", minute="15", day_of_week="mon"), send_org_usage_reports.s(), - name="send instance usage report", + name="send instance usage report, monday", ) sender.add_periodic_task( crontab(hour="0", minute="15", day_of_week="tue,wed,thu,fri,sat,sun"), diff --git a/posthog/tasks/tasks.py b/posthog/tasks/tasks.py index 1cc442b968e9a..c35269442f350 100644 --- a/posthog/tasks/tasks.py +++ b/posthog/tasks/tasks.py @@ -458,6 +458,7 @@ def redis_celery_queue_depth() -> None: "posthog_celery_queue_depth", "We use this to monitor the depth of the celery queue.", registry=registry, + labelnames=["queue_name"], ) for queue in CeleryQueue: @@ -641,7 +642,7 @@ def clickhouse_mark_all_materialized() -> None: mark_all_materialized() -@shared_task(ignore_result=True) +@shared_task(ignore_result=True, queue=CeleryQueue.USAGE_REPORTS.value) def send_org_usage_reports() -> None: from posthog.tasks.usage_report import send_all_org_usage_reports diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index 056586243f4b4..c2daef6a4ec73 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -41,6 +41,7 @@ from posthog.models.team.team import Team from posthog.models.utils import namedtuplefetchall from posthog.settings import CLICKHOUSE_CLUSTER, INSTANCE_TAG +from posthog.tasks.utils import CeleryQueue from posthog.utils import ( get_helm_info_env, get_instance_realm, @@ -63,6 +64,13 @@ QUERY_RETRY_DELAY = 1 QUERY_RETRY_BACKOFF = 2 +USAGE_REPORT_TASK_KWARGS = dict( + queue=CeleryQueue.USAGE_REPORTS.value, + ignore_result=True, + autoretry_for=(Exception,), + retry_backoff=True, +) + @dataclasses.dataclass class UsageReportCounters: @@ -274,7 +282,7 @@ def get_org_owner_or_first_user(organization_id: str) -> Optional[User]: return user -@shared_task(ignore_result=True, autoretry_for=(Exception,), max_retries=3) +@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3) def send_report_to_billing_service(org_id: str, report: Dict[str, Any]) -> None: if not settings.EE_AVAILABLE: return @@ -621,7 +629,7 @@ def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> List return results -@shared_task(ignore_result=True, max_retries=0) +@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0) def capture_report( capture_event_name: str, org_id: str, @@ -952,7 +960,7 @@ def _get_full_org_usage_report_as_dict(full_report: FullUsageReport) -> Dict[str return dataclasses.asdict(full_report) -@shared_task(ignore_result=True, max_retries=3, autoretry_for=(Exception,)) +@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3) def send_all_org_usage_reports( dry_run: bool = False, at: Optional[str] = None, diff --git a/posthog/tasks/utils.py b/posthog/tasks/utils.py index 956504cdc6483..ecabc29adac79 100644 --- a/posthog/tasks/utils.py +++ b/posthog/tasks/utils.py @@ -34,3 +34,4 @@ class CeleryQueue(Enum): ANALYTICS_QUERIES = "analytics_queries" EXPORTS = "exports" SUBSCRIPTION_DELIVERY = "subscription_delivery" + USAGE_REPORTS = "usage_reports"