Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add separate celery queue for usage reports #20048

Merged
merged 12 commits into from
Jan 31, 2024
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery,usage_reports
21 changes: 11 additions & 10 deletions posthog/settings/sentry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import os
from datetime import timedelta
from random import random

import sentry_sdk
from dateutil import parser
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
Expand All @@ -10,10 +13,6 @@
from posthog.settings import get_from_env
from posthog.settings.base_variables import TEST

from dateutil import parser
from random import random
from datetime import timedelta


def before_send(event, hint):
for exception in event.get("exception", {}).get("values", []):
Expand Down Expand Up @@ -107,13 +106,15 @@ def traces_sampler(sampling_context: dict) -> float:

elif op == "celery.task":
task = sampling_context.get("celery_job", {}).get("task")
if task == "posthog.celery.redis_heartbeat":
return 0.0001 # 0.01%
if task == "posthog.celery.redis_celery_queue_depth":

if task in (
"posthog.celery.redis_heartbeat",
"posthog.celery.redis_celery_queue_depth",
):
return 0.0001 # 0.01%
else:
# Default sample rate for Celery tasks
return 0.001 # 0.1%

# Default sample rate for Celery tasks
return 0.001 # 0.1%
elif op == "queue.task.celery":
task = sampling_context.get("celery_job", {}).get("task")
if task == "posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag":
Expand Down
2 changes: 1 addition & 1 deletion posthog/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def setup_periodic_tasks(sender: Celery, **kwargs: Any) -> None:
sender.add_periodic_task(
crontab(hour="2", minute="15", day_of_week="mon"),
send_org_usage_reports.s(),
name="send instance usage report",
name="send instance usage report, monday",
)
sender.add_periodic_task(
crontab(hour="0", minute="15", day_of_week="tue,wed,thu,fri,sat,sun"),
Expand Down
3 changes: 2 additions & 1 deletion posthog/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ def redis_celery_queue_depth() -> None:
"posthog_celery_queue_depth",
"We use this to monitor the depth of the celery queue.",
registry=registry,
labelnames=["queue_name"],
)

for queue in CeleryQueue:
Expand Down Expand Up @@ -641,7 +642,7 @@ def clickhouse_mark_all_materialized() -> None:
mark_all_materialized()


@shared_task(ignore_result=True)
@shared_task(ignore_result=True, queue=CeleryQueue.USAGE_REPORTS.value)
def send_org_usage_reports() -> None:
from posthog.tasks.usage_report import send_all_org_usage_reports

Expand Down
14 changes: 11 additions & 3 deletions posthog/tasks/usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from posthog.models.team.team import Team
from posthog.models.utils import namedtuplefetchall
from posthog.settings import CLICKHOUSE_CLUSTER, INSTANCE_TAG
from posthog.tasks.utils import CeleryQueue
from posthog.utils import (
get_helm_info_env,
get_instance_realm,
Expand All @@ -63,6 +64,13 @@
QUERY_RETRY_DELAY = 1
QUERY_RETRY_BACKOFF = 2

USAGE_REPORT_TASK_KWARGS = dict(
queue=CeleryQueue.USAGE_REPORTS.value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raquelmsmith just FYI - modified this to have each task use the queue. It doesn't work to only have the parent task on its own. The task that get created also need to specify their intended queue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that makes sense. Thanks!

ignore_result=True,
autoretry_for=(Exception,),
retry_backoff=True,
)


@dataclasses.dataclass
class UsageReportCounters:
Expand Down Expand Up @@ -274,7 +282,7 @@ def get_org_owner_or_first_user(organization_id: str) -> Optional[User]:
return user


@shared_task(ignore_result=True, autoretry_for=(Exception,), max_retries=3)
@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3)
def send_report_to_billing_service(org_id: str, report: Dict[str, Any]) -> None:
if not settings.EE_AVAILABLE:
return
Expand Down Expand Up @@ -621,7 +629,7 @@ def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> List
return results


@shared_task(ignore_result=True, max_retries=0)
@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=0)
def capture_report(
capture_event_name: str,
org_id: str,
Expand Down Expand Up @@ -952,7 +960,7 @@ def _get_full_org_usage_report_as_dict(full_report: FullUsageReport) -> Dict[str
return dataclasses.asdict(full_report)


@shared_task(ignore_result=True, max_retries=3, autoretry_for=(Exception,))
@shared_task(**USAGE_REPORT_TASK_KWARGS, max_retries=3)
def send_all_org_usage_reports(
dry_run: bool = False,
at: Optional[str] = None,
Expand Down
1 change: 1 addition & 0 deletions posthog/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ class CeleryQueue(Enum):
ANALYTICS_QUERIES = "analytics_queries"
EXPORTS = "exports"
SUBSCRIPTION_DELIVERY = "subscription_delivery"
USAGE_REPORTS = "usage_reports"
Loading