From 5254cc7eab74964c81cb9de89be8b8781455ba3c Mon Sep 17 00:00:00 2001 From: Julian Bez Date: Wed, 17 Jan 2024 15:50:51 +0000 Subject: [PATCH] feat: Route some trial tasks to different queues (#19810) --- bin/celery-queues.env | 2 +- posthog/celery.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/celery-queues.env b/bin/celery-queues.env index 97920a2fb9262..c4f63d5aeaab7 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -1,3 +1,3 @@ # Default set of queues to be used by Celery. # Important: Add new queues to make Celery consume tasks from them. -CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,gevent \ No newline at end of file +CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries \ No newline at end of file diff --git a/posthog/celery.py b/posthog/celery.py index 3526c61567a45..90b57e69c7d3a 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -401,8 +401,8 @@ def redis_heartbeat(): get_client().set("POSTHOG_HEARTBEAT", int(time.time())) -@app.task(ignore_result=True, bind=True) -def process_query_task(self, team_id, query_id, query_json, limit_context=None, refresh_requested=False): +@app.task(ignore_result=True, queue="analytics_queries") +def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False): """ Kick off query Once complete save results to redis @@ -822,7 +822,7 @@ def clear_clickhouse_deleted_person(): remove_deleted_person_data() -@app.task(ignore_result=True) +@app.task(ignore_result=True, queue="email") def redis_celery_queue_depth(): try: with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: