Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(celery): Prepare to run on multiple queues #19157

Merged
merged 11 commits into from
Jan 17, 2024
1 change: 1 addition & 0 deletions .run/Celery.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Celery" type="PythonConfigurationType" factoryName="Python">
<module name="posthog" />
<option name="ENV_FILES" value="$PROJECT_DIR$/bin/celery-queues.env" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
Expand Down
3 changes: 3 additions & 0 deletions bin/celery-queues.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Default set of queues to be used by Celery.
# Important: Add new queues to make Celery consume tasks from them.
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,gevent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If is is somehow possible to define this in code, I would 100% vote for that as otherwise I think this file will get forgotten about.

Best case would be to give ourself a change to never miss it, for example by using an enum of sorts so that we define the queues in one place. Then, if the env var isn't set, we set it to a default of all the known queues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm, not super sure what you mean. This was my attempt to define it in one place. The problem I see here is that we start Celery workers, as far as I can tell, for local and hobby deployments in two separate places. Both of these are shell scripts, hence both can use this env file. I also don't really find it nice to be honest. Do you see another way to do it? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjackwhite I'm still not sure how to immediately solve this in a better way. What I did improve though is that we can even use the env file for the Pycharm run config, so we can use this single list of all queues in all places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have been clearer with my description.

What I'm wondering is if there is anyway we can do this in python?

So for example:

class CeleryQueue(str, Enum):
    DEFAULT = "celery"
    EMAIL = "email"

all_queues = [e.value for e in CeleryQueue]

CELERY_WORKER_QUEUES=os.getenv("CELERY_WORKER_QUEUES", None) or all_queues

My thinking being that then we codify the default list of queues rather than relying on an external setting (that can easily be forgotten about) and it should be automatically picked up if we added a new queue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it just isn't possible for one reason or another though then consider it non-blocking and we can push forward

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively - if celery would just default to processing from all queues anyways - why do we need this config at all 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How I see this problem is that Celery is run via shell scripts and we need to pass in the queues there (it apparently cannot consume from "all" – at least not that I found). I agree that having in Python where we also direct the tasks to the queue would be great though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Yeah if its just not possible then I guess this is the solution 😅

4 changes: 4 additions & 0 deletions bin/docker-worker-celery
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ FLAGS+=("-n node@%h")
# https://github.com/heroku/heroku-buildpack-python/blob/main/vendor/WEB_CONCURRENCY.sh
[[ -n "${WEB_CONCURRENCY}" ]] && FLAGS+=" --concurrency $WEB_CONCURRENCY"

if [[ -z "${CELERY_WORKER_QUEUES}" ]]; then
source ./bin/celery-queues.env
fi

echo
echo "SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker ${FLAGS[*]}"
echo
Expand Down
2 changes: 2 additions & 0 deletions bin/start-worker
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ set -e
# this kills all processes when the last one terminates
trap 'kill $(jobs -p)' EXIT

source ./bin/celery-queues.env

# start celery worker with heartbeat (-B)
SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-gossip --without-mingle -Ofair -n node@%h &

Expand Down
79 changes: 79 additions & 0 deletions posthog/api/test/__snapshots__/test_annotation.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,85 @@
LIMIT 1000 /*controller='project_annotations-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/annotations/%3F%24'*/
'''
# ---
# name: TestAnnotation.test_retrieving_annotation_is_not_n_plus_1.15
'''
SELECT "posthog_annotation"."id",
"posthog_annotation"."content",
"posthog_annotation"."created_at",
"posthog_annotation"."updated_at",
"posthog_annotation"."dashboard_item_id",
"posthog_annotation"."team_id",
"posthog_annotation"."organization_id",
"posthog_annotation"."created_by_id",
"posthog_annotation"."scope",
"posthog_annotation"."creation_type",
"posthog_annotation"."date_marker",
"posthog_annotation"."deleted",
"posthog_annotation"."apply_all",
"posthog_dashboarditem"."id",
"posthog_dashboarditem"."name",
"posthog_dashboarditem"."derived_name",
"posthog_dashboarditem"."description",
"posthog_dashboarditem"."team_id",
"posthog_dashboarditem"."filters",
"posthog_dashboarditem"."filters_hash",
"posthog_dashboarditem"."query",
"posthog_dashboarditem"."order",
"posthog_dashboarditem"."deleted",
"posthog_dashboarditem"."saved",
"posthog_dashboarditem"."created_at",
"posthog_dashboarditem"."last_refresh",
"posthog_dashboarditem"."refreshing",
"posthog_dashboarditem"."created_by_id",
"posthog_dashboarditem"."is_sample",
"posthog_dashboarditem"."short_id",
"posthog_dashboarditem"."favorited",
"posthog_dashboarditem"."refresh_attempt",
"posthog_dashboarditem"."last_modified_at",
"posthog_dashboarditem"."last_modified_by_id",
"posthog_dashboarditem"."dashboard_id",
"posthog_dashboarditem"."layouts",
"posthog_dashboarditem"."color",
"posthog_dashboarditem"."dive_dashboard_id",
"posthog_dashboarditem"."updated_at",
"posthog_dashboarditem"."deprecated_tags",
"posthog_dashboarditem"."tags",
"posthog_user"."id",
"posthog_user"."password",
"posthog_user"."last_login",
"posthog_user"."first_name",
"posthog_user"."last_name",
"posthog_user"."is_staff",
"posthog_user"."is_active",
"posthog_user"."date_joined",
"posthog_user"."uuid",
"posthog_user"."current_organization_id",
"posthog_user"."current_team_id",
"posthog_user"."email",
"posthog_user"."pending_email",
"posthog_user"."temporary_token",
"posthog_user"."distinct_id",
"posthog_user"."is_email_verified",
"posthog_user"."requested_password_reset_at",
"posthog_user"."has_seen_product_intro_for",
"posthog_user"."strapi_id",
"posthog_user"."email_opt_in",
"posthog_user"."theme_mode",
"posthog_user"."partial_notification_settings",
"posthog_user"."anonymize_data",
"posthog_user"."toolbar_mode",
"posthog_user"."events_column_config"
FROM "posthog_annotation"
LEFT OUTER JOIN "posthog_dashboarditem" ON ("posthog_annotation"."dashboard_item_id" = "posthog_dashboarditem"."id")
LEFT OUTER JOIN "posthog_user" ON ("posthog_annotation"."created_by_id" = "posthog_user"."id")
WHERE ((("posthog_annotation"."organization_id" = '00000000-0000-0000-0000-000000000000'::uuid
AND "posthog_annotation"."scope" = 'organization')
OR "posthog_annotation"."team_id" = 2)
AND NOT "posthog_annotation"."deleted")
ORDER BY "posthog_annotation"."date_marker" DESC
LIMIT 1000 /*controller='project_annotations-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/annotations/%3F%24'*/
'''
# ---
# name: TestAnnotation.test_retrieving_annotation_is_not_n_plus_1.2
'''
SELECT "posthog_organizationmembership"."id",
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dnspython==2.2.1
drf-exceptions-hog==0.4.0
drf-extensions==0.7.0
drf-spectacular==0.27.0
gevent==23.9.1
geoip2==4.6.0
google-cloud-bigquery==3.11.4
google-cloud-sqlcommenter==2.0.0
Expand Down
8 changes: 8 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ future==0.18.3
# via lzstring
geoip2==4.6.0
# via -r requirements.in
gevent==23.9.1
# via -r requirements.in
gitdb==4.0.11
# via gitpython
gitpython==3.1.40
Expand Down Expand Up @@ -263,6 +265,8 @@ googleapis-common-protos==1.60.0
# via
# google-api-core
# grpcio-status
greenlet==3.0.3
# via gevent
grpcio==1.57.0
# via
# google-api-core
Expand Down Expand Up @@ -681,6 +685,10 @@ yarl==1.7.2
# via aiohttp
zipp==3.17.0
# via importlib-metadata
zope-event==5.0
# via gevent
zope-interface==6.1
# via gevent

# The following packages are considered to be unsafe in a requirements file:
# setuptools
Loading