Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jan 18, 2024
1 parent 7afcb92 commit 2b1e3a4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Important: Add new queues to make Celery consume tasks from them.

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery
15 changes: 12 additions & 3 deletions ee/tasks/subscriptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ee.tasks.subscriptions.subscription_utils import generate_assets
from posthog import settings
from posthog.models.subscription import Subscription
from posthog.tasks.utils import CeleryQueue

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -120,7 +121,7 @@ def _deliver_subscription_report(
subscription.save()


@shared_task()
@shared_task(queue=CeleryQueue.SUBSCRIPTION_DELIVERY)
def schedule_all_subscriptions() -> None:
"""
Schedule all past notifications (with a buffer) to be delivered
Expand Down Expand Up @@ -148,12 +149,20 @@ def schedule_all_subscriptions() -> None:
report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5


@shared_task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10)
@shared_task(
soft_time_limit=report_timeout_seconds,
time_limit=report_timeout_seconds + 10,
queue=CeleryQueue.SUBSCRIPTION_DELIVERY,
)
def deliver_subscription_report(subscription_id: int) -> None:
return _deliver_subscription_report(subscription_id)


@shared_task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10)
@shared_task(
soft_time_limit=report_timeout_seconds,
time_limit=report_timeout_seconds + 10,
queue=CeleryQueue.SUBSCRIPTION_DELIVERY,
)
def handle_subscription_value_change(
subscription_id: int, previous_value: str, invite_message: Optional[str] = None
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions posthog/tasks/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from posthog import settings
from posthog.models import ExportedAsset
from posthog.tasks.utils import CeleryQueue

EXPORT_QUEUED_COUNTER = Counter(
"exporter_task_queued",
Expand Down Expand Up @@ -42,6 +43,7 @@
acks_late=True,
ignore_result=False,
time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS,
queue=CeleryQueue.EXPORTS,
)
def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None:
from posthog.tasks.exports import csv_exporter, image_exporter
Expand Down
25 changes: 24 additions & 1 deletion posthog/tasks/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
from enum import Enum

# NOTE: These are the queues used for logically separating workloads.
# Many queues are consumed by one "consumer" - a worker configured to consume from that queue.
# The goal should be to split up queues based on the type of work being done, so that we can scale effectively
# and change the consumer configs without the need for code changes
#
# Worker consumers config here https://github.com/PostHog/posthog-cloud-infra/blob/main/helm/values/prod.yml#L368
# e.g.
# consumers:
# - name: priority
# queues:
# - email
# - stats
# - name: default
# concurrency: 4
# queues:
# - celery # default queue for Celery
# - name: async
# concurrency: 4
# queues:
# - analytics_queries
# - subscriptions


# NOTE: Keep in sync with bin/celery-queues.env
class CeleryQueue(Enum):
Expand All @@ -8,4 +30,5 @@ class CeleryQueue(Enum):
INSIGHT_EXPORT = "insight_export"
INSIGHT_REFRESH = "insight_refresh"
ANALYTICS_QUERIES = "analytics_queries"
# EXPORTS = "exports"
EXPORTS = "exports"
SUBSCRIPTION_DELIVERY = "subscription_delivery"

0 comments on commit 2b1e3a4

Please sign in to comment.