Skip to content

Commit

Permalink
feat: Celery tasks refactor (#19827)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jan 18, 2024
1 parent 2a741f2 commit 7f174fa
Show file tree
Hide file tree
Showing 32 changed files with 1,192 additions and 1,060 deletions.
4 changes: 3 additions & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Default set of queues to be used by Celery.
# Important: Add new queues to make Celery consume tasks from them.
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery
4 changes: 2 additions & 2 deletions bin/start-worker
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ trap 'kill $(jobs -p)' EXIT
source ./bin/celery-queues.env

# start celery worker with heartbeat (-B)
SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-gossip --without-mingle -Ofair -n node@%h &
SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-mingle -Ofair -n node@%h &

if [[ "$PLUGIN_SERVER_IDLE" != "1" && "$PLUGIN_SERVER_IDLE" != "true" ]]; then
./bin/plugin-server
fi

wait
wait
2 changes: 1 addition & 1 deletion ee/api/test/test_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

from ee.api.test.base import APILicensedTest
from ee.models.license import License
from posthog.celery import sync_all_organization_available_features
from posthog.models import Team, User
from posthog.models.organization import Organization, OrganizationMembership
from posthog.tasks.tasks import sync_all_organization_available_features


class TestOrganizationEnterpriseAPI(APILicensedTest):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'''
/* user_id:132 celery:posthog.celery.sync_insight_caching_state */
/* user_id:132 celery:posthog.tasks.tasks.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
2 changes: 1 addition & 1 deletion ee/models/license.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from django.utils import timezone
from rest_framework import exceptions, status

from posthog.celery import sync_all_organization_available_features
from posthog.constants import AvailableFeature
from posthog.models.utils import sane_repr
from posthog.tasks.tasks import sync_all_organization_available_features


class LicenseError(exceptions.APIException):
Expand Down
6 changes: 3 additions & 3 deletions ee/session_recordings/persistence_tasks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from datetime import timedelta

import structlog
from celery import shared_task
from django.utils import timezone

from ee.session_recordings.session_recording_extensions import persist_recording
from posthog.celery import app
from posthog.session_recordings.models.session_recording import SessionRecording

logger = structlog.get_logger(__name__)


@app.task()
@shared_task(ignore_result=True)
def persist_single_recording(id: str, team_id: int) -> None:
persist_recording(id, team_id)


@app.task()
@shared_task(ignore_result=True)
def persist_finished_recordings() -> None:
one_day_old = timezone.now() - timedelta(hours=24)
finished_recordings = SessionRecording.objects.filter(created_at__lte=one_day_old, object_storage_path=None)
Expand Down
17 changes: 13 additions & 4 deletions ee/tasks/subscriptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from typing import Optional

import structlog
from celery import shared_task
from prometheus_client import Counter
from sentry_sdk import capture_exception, capture_message

from ee.tasks.subscriptions.email_subscriptions import send_email_subscription_report
from ee.tasks.subscriptions.slack_subscriptions import send_slack_subscription_report
from ee.tasks.subscriptions.subscription_utils import generate_assets
from posthog import settings
from posthog.celery import app
from posthog.models.subscription import Subscription
from posthog.tasks.utils import CeleryQueue

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -120,7 +121,7 @@ def _deliver_subscription_report(
subscription.save()


@app.task()
@shared_task(queue=CeleryQueue.SUBSCRIPTION_DELIVERY)
def schedule_all_subscriptions() -> None:
"""
Schedule all past notifications (with a buffer) to be delivered
Expand Down Expand Up @@ -148,12 +149,20 @@ def schedule_all_subscriptions() -> None:
report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5


@app.task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10)
@shared_task(
soft_time_limit=report_timeout_seconds,
time_limit=report_timeout_seconds + 10,
queue=CeleryQueue.SUBSCRIPTION_DELIVERY,
)
def deliver_subscription_report(subscription_id: int) -> None:
return _deliver_subscription_report(subscription_id)


@app.task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10)
@shared_task(
soft_time_limit=report_timeout_seconds,
time_limit=report_timeout_seconds + 10,
queue=CeleryQueue.SUBSCRIPTION_DELIVERY,
)
def handle_subscription_value_change(
subscription_id: int, previous_value: str, invite_message: Optional[str] = None
) -> None:
Expand Down
14 changes: 1 addition & 13 deletions posthog/api/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any, Dict, List

from dateutil import parser
from django.db import IntegrityError
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from rest_framework import exceptions, request, serializers, status, viewsets
Expand All @@ -11,10 +10,9 @@

from posthog.api.routing import StructuredViewSetMixin
from posthog.api.utils import get_token
from posthog.celery import app
from posthog.exceptions import generate_exception_response
from posthog.models.prompt import Prompt, PromptSequence, UserPromptState
from posthog.models.user import User
from posthog.tasks.prompts import trigger_prompt_for_user
from posthog.utils_cors import cors_response


Expand Down Expand Up @@ -185,16 +183,6 @@ class Meta:
]


@app.task(ignore_result=True)
def trigger_prompt_for_user(email: str, sequence_id: int):
try:
sequence = PromptSequence.objects.get(pk=sequence_id)
user = User.objects.get(email=email)
UserPromptState.objects.get_or_create(user=user, sequence=sequence, step=None)
except (User.DoesNotExist, IntegrityError):
pass


@csrf_exempt
def prompt_webhook(request: request.Request):
if request.method == "POST":
Expand Down
6 changes: 3 additions & 3 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.11
'''
/* celery:posthog.celery.clickhouse_clear_removed_data */
/* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */
SELECT DISTINCT team_id,
cohort_id
FROM cohortpeople
Expand Down Expand Up @@ -168,7 +168,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.8
'''
/* celery:posthog.celery.clickhouse_clear_removed_data */
/* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */
SELECT DISTINCT team_id,
cohort_id
FROM cohortpeople
Expand All @@ -179,7 +179,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.9
'''
/* celery:posthog.celery.clickhouse_clear_removed_data */
/* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */
DELETE
FROM cohortpeople
WHERE (team_id = 2
Expand Down
12 changes: 6 additions & 6 deletions posthog/api/test/test_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test.client import Client
from django.utils import timezone
from posthog.celery import clickhouse_clear_removed_data
from posthog.clickhouse.client.execute import sync_execute
from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType
from posthog.schema import PropertyOperator
from posthog.tasks.calculate_cohort import calculate_cohort_from_list
from rest_framework import status
from rest_framework.test import APIClient

from posthog.api.test.test_exports import TestExportMixin
from posthog.clickhouse.client.execute import sync_execute
from posthog.models import FeatureFlag, Person
from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType
from posthog.models.cohort import Cohort
from posthog.models.team.team import Team
from posthog.schema import PropertyOperator
from posthog.tasks.calculate_cohort import calculate_cohort_from_list
from posthog.tasks.tasks import clickhouse_clear_removed_data
from posthog.test.base import (
APIBaseTest,
ClickhouseTestMixin,
QueryMatchingTest,
_create_event,
_create_person,
flush_persons_and_events,
QueryMatchingTest,
snapshot_clickhouse_queries,
)

Expand Down
3 changes: 1 addition & 2 deletions posthog/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from posthoganalytics.client import Client

from posthog.settings import SELF_CAPTURE, SKIP_ASYNC_MIGRATIONS_SETUP
from posthog.tasks.tasks import sync_all_organization_available_features
from posthog.utils import (
get_git_branch,
get_git_commit,
Expand Down Expand Up @@ -35,8 +36,6 @@ def ready(self):
# log development server launch to posthog
if os.getenv("RUN_MAIN") == "true":
# Sync all organization.available_features once on launch, in case plans changed
from posthog.celery import sync_all_organization_available_features

sync_all_organization_available_features()

# NOTE: This has to be created as a separate client so that the "capture" call doesn't lock in the properties
Expand Down
5 changes: 1 addition & 4 deletions posthog/caching/insight_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from posthog.caching.calculate_results import calculate_result_by_insight
from posthog.models import Dashboard, Insight, InsightCachingState, Team
from posthog.models.instance_setting import get_instance_setting
from posthog.tasks.tasks import update_cache_task

logger = structlog.get_logger(__name__)

Expand All @@ -25,8 +26,6 @@


def schedule_cache_updates():
from posthog.celery import update_cache_task

# :TODO: Separate celery queue for updates rather than limiting via this method
PARALLEL_INSIGHT_CACHE = get_instance_setting("PARALLEL_DASHBOARD_ITEM_CACHE")

Expand Down Expand Up @@ -139,8 +138,6 @@ def update_cache(caching_state_id: UUID):
statsd.incr("caching_state_update_errors")

if caching_state.refresh_attempt < MAX_ATTEMPTS:
from posthog.celery import update_cache_task

update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds())

InsightCachingState.objects.filter(pk=caching_state.pk).update(
Expand Down
4 changes: 2 additions & 2 deletions posthog/caching/test/test_insight_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def cache_keys(cache):


@pytest.mark.django_db
@patch("posthog.celery.update_cache_task")
@patch("posthog.caching.insight_cache.update_cache_task")
def test_schedule_cache_updates(update_cache_task, team: Team, user: User):
caching_state1 = create_insight_caching_state(team, user, filters=filter_dict, last_refresh=None)
create_insight_caching_state(team, user, filters=filter_dict)
Expand Down Expand Up @@ -164,7 +164,7 @@ def test_update_cache_updates_identical_cache_keys(team: Team, user: User, cache

@pytest.mark.django_db
@freeze_time("2020-01-04T13:01:01Z")
@patch("posthog.celery.update_cache_task")
@patch("posthog.caching.insight_cache.update_cache_task")
@patch("posthog.caching.insight_cache.calculate_result_by_insight")
def test_update_cache_when_calculation_fails(
spy_calculate_result_by_insight,
Expand Down
Loading

0 comments on commit 7f174fa

Please sign in to comment.