Skip to content

Commit

Permalink
feat(celery): Prepare to run on multiple queues (#19157)
Browse files Browse the repository at this point in the history
* Add Celery queues env file with default queues

Reasoning:
We need to configure Celery workers in several places to consume
from a specific set of queues.

* Define some queues
  • Loading branch information
webjunkie authored Jan 17, 2024
1 parent 7a037b4 commit 95fec19
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 0 deletions.
1 change: 1 addition & 0 deletions .run/Celery.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Celery" type="PythonConfigurationType" factoryName="Python">
<module name="posthog" />
<option name="ENV_FILES" value="$PROJECT_DIR$/bin/celery-queues.env" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
Expand Down
3 changes: 3 additions & 0 deletions bin/celery-queues.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Default set of queues to be used by Celery.
# Important: Add new queues to make Celery consume tasks from them.
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,gevent
4 changes: 4 additions & 0 deletions bin/docker-worker-celery
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ FLAGS+=("-n node@%h")
# https://github.com/heroku/heroku-buildpack-python/blob/main/vendor/WEB_CONCURRENCY.sh
[[ -n "${WEB_CONCURRENCY}" ]] && FLAGS+=" --concurrency $WEB_CONCURRENCY"

if [[ -z "${CELERY_WORKER_QUEUES}" ]]; then
source ./bin/celery-queues.env
fi

echo
echo "SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker ${FLAGS[*]}"
echo
Expand Down
2 changes: 2 additions & 0 deletions bin/start-worker
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ set -e
# this kills all processes when the last one terminates
trap 'kill $(jobs -p)' EXIT

source ./bin/celery-queues.env

# start celery worker with heartbeat (-B)
SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-gossip --without-mingle -Ofair -n node@%h &

Expand Down
79 changes: 79 additions & 0 deletions posthog/api/test/__snapshots__/test_annotation.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,85 @@
LIMIT 1000 /*controller='project_annotations-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/annotations/%3F%24'*/
'''
# ---
# name: TestAnnotation.test_retrieving_annotation_is_not_n_plus_1.15
'''
SELECT "posthog_annotation"."id",
"posthog_annotation"."content",
"posthog_annotation"."created_at",
"posthog_annotation"."updated_at",
"posthog_annotation"."dashboard_item_id",
"posthog_annotation"."team_id",
"posthog_annotation"."organization_id",
"posthog_annotation"."created_by_id",
"posthog_annotation"."scope",
"posthog_annotation"."creation_type",
"posthog_annotation"."date_marker",
"posthog_annotation"."deleted",
"posthog_annotation"."apply_all",
"posthog_dashboarditem"."id",
"posthog_dashboarditem"."name",
"posthog_dashboarditem"."derived_name",
"posthog_dashboarditem"."description",
"posthog_dashboarditem"."team_id",
"posthog_dashboarditem"."filters",
"posthog_dashboarditem"."filters_hash",
"posthog_dashboarditem"."query",
"posthog_dashboarditem"."order",
"posthog_dashboarditem"."deleted",
"posthog_dashboarditem"."saved",
"posthog_dashboarditem"."created_at",
"posthog_dashboarditem"."last_refresh",
"posthog_dashboarditem"."refreshing",
"posthog_dashboarditem"."created_by_id",
"posthog_dashboarditem"."is_sample",
"posthog_dashboarditem"."short_id",
"posthog_dashboarditem"."favorited",
"posthog_dashboarditem"."refresh_attempt",
"posthog_dashboarditem"."last_modified_at",
"posthog_dashboarditem"."last_modified_by_id",
"posthog_dashboarditem"."dashboard_id",
"posthog_dashboarditem"."layouts",
"posthog_dashboarditem"."color",
"posthog_dashboarditem"."dive_dashboard_id",
"posthog_dashboarditem"."updated_at",
"posthog_dashboarditem"."deprecated_tags",
"posthog_dashboarditem"."tags",
"posthog_user"."id",
"posthog_user"."password",
"posthog_user"."last_login",
"posthog_user"."first_name",
"posthog_user"."last_name",
"posthog_user"."is_staff",
"posthog_user"."is_active",
"posthog_user"."date_joined",
"posthog_user"."uuid",
"posthog_user"."current_organization_id",
"posthog_user"."current_team_id",
"posthog_user"."email",
"posthog_user"."pending_email",
"posthog_user"."temporary_token",
"posthog_user"."distinct_id",
"posthog_user"."is_email_verified",
"posthog_user"."requested_password_reset_at",
"posthog_user"."has_seen_product_intro_for",
"posthog_user"."strapi_id",
"posthog_user"."email_opt_in",
"posthog_user"."theme_mode",
"posthog_user"."partial_notification_settings",
"posthog_user"."anonymize_data",
"posthog_user"."toolbar_mode",
"posthog_user"."events_column_config"
FROM "posthog_annotation"
LEFT OUTER JOIN "posthog_dashboarditem" ON ("posthog_annotation"."dashboard_item_id" = "posthog_dashboarditem"."id")
LEFT OUTER JOIN "posthog_user" ON ("posthog_annotation"."created_by_id" = "posthog_user"."id")
WHERE ((("posthog_annotation"."organization_id" = '00000000-0000-0000-0000-000000000000'::uuid
AND "posthog_annotation"."scope" = 'organization')
OR "posthog_annotation"."team_id" = 2)
AND NOT "posthog_annotation"."deleted")
ORDER BY "posthog_annotation"."date_marker" DESC
LIMIT 1000 /*controller='project_annotations-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/annotations/%3F%24'*/
'''
# ---
# name: TestAnnotation.test_retrieving_annotation_is_not_n_plus_1.2
'''
SELECT "posthog_organizationmembership"."id",
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dnspython==2.2.1
drf-exceptions-hog==0.4.0
drf-extensions==0.7.0
drf-spectacular==0.27.0
gevent==23.9.1
geoip2==4.6.0
google-cloud-bigquery==3.11.4
google-cloud-sqlcommenter==2.0.0
Expand Down
8 changes: 8 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ future==0.18.3
# via lzstring
geoip2==4.6.0
# via -r requirements.in
gevent==23.9.1
# via -r requirements.in
gitdb==4.0.11
# via gitpython
gitpython==3.1.40
Expand Down Expand Up @@ -263,6 +265,8 @@ googleapis-common-protos==1.60.0
# via
# google-api-core
# grpcio-status
greenlet==3.0.3
# via gevent
grpcio==1.57.0
# via
# google-api-core
Expand Down Expand Up @@ -681,6 +685,10 @@ yarl==1.7.2
# via aiohttp
zipp==3.17.0
# via importlib-metadata
zope-event==5.0
# via gevent
zope-interface==6.1
# via gevent

# The following packages are considered to be unsafe in a requirements file:
# setuptools

0 comments on commit 95fec19

Please sign in to comment.