Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Celery tasks refactor #19827

Merged
merged 19 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Default set of queues to be used by Celery.
# Important: Add new queues to make Celery consume tasks from them.
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries

# NOTE: Keep in sync with posthog/tasks/utils.py
CELERY_WORKER_QUEUES=celery,stats,email,insight_export,insight_refresh,analytics_queries,exports,subscription_delivery
4 changes: 2 additions & 2 deletions bin/start-worker
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ trap 'kill $(jobs -p)' EXIT
source ./bin/celery-queues.env

# start celery worker with heartbeat (-B)
SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-gossip --without-mingle -Ofair -n node@%h &
SKIP_ASYNC_MIGRATIONS_SETUP=0 celery -A posthog worker -B --scheduler redbeat.RedBeatScheduler --without-heartbeat --without-mingle -Ofair -n node@%h &

if [[ "$PLUGIN_SERVER_IDLE" != "1" && "$PLUGIN_SERVER_IDLE" != "true" ]]; then
./bin/plugin-server
fi

wait
wait
2 changes: 1 addition & 1 deletion ee/api/test/test_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

from ee.api.test.base import APILicensedTest
from ee.models.license import License
from posthog.celery import sync_all_organization_available_features
from posthog.models import Team, User
from posthog.models.organization import Organization, OrganizationMembership
from posthog.tasks.tasks import sync_all_organization_available_features


class TestOrganizationEnterpriseAPI(APILicensedTest):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'''
/* user_id:132 celery:posthog.celery.sync_insight_caching_state */
/* user_id:132 celery:posthog.tasks.tasks.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
2 changes: 1 addition & 1 deletion ee/models/license.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from django.utils import timezone
from rest_framework import exceptions, status

from posthog.celery import sync_all_organization_available_features
from posthog.constants import AvailableFeature
from posthog.models.utils import sane_repr
from posthog.tasks.tasks import sync_all_organization_available_features


class LicenseError(exceptions.APIException):
Expand Down
6 changes: 3 additions & 3 deletions ee/session_recordings/persistence_tasks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from datetime import timedelta

import structlog
from celery import shared_task
from django.utils import timezone

from ee.session_recordings.session_recording_extensions import persist_recording
from posthog.celery import app
from posthog.session_recordings.models.session_recording import SessionRecording

logger = structlog.get_logger(__name__)


@app.task()
@shared_task(ignore_result=True)
def persist_single_recording(id: str, team_id: int) -> None:
persist_recording(id, team_id)


@app.task()
@shared_task(ignore_result=True)
def persist_finished_recordings() -> None:
one_day_old = timezone.now() - timedelta(hours=24)
finished_recordings = SessionRecording.objects.filter(created_at__lte=one_day_old, object_storage_path=None)
Expand Down
17 changes: 13 additions & 4 deletions ee/tasks/subscriptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
from typing import Optional

import structlog
from celery import shared_task
from prometheus_client import Counter
from sentry_sdk import capture_exception, capture_message

from ee.tasks.subscriptions.email_subscriptions import send_email_subscription_report
from ee.tasks.subscriptions.slack_subscriptions import send_slack_subscription_report
from ee.tasks.subscriptions.subscription_utils import generate_assets
from posthog import settings
from posthog.celery import app
from posthog.models.subscription import Subscription
from posthog.tasks.utils import CeleryQueue

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -120,7 +121,7 @@ def _deliver_subscription_report(
subscription.save()


@app.task()
@shared_task(queue=CeleryQueue.SUBSCRIPTION_DELIVERY)
def schedule_all_subscriptions() -> None:
"""
Schedule all past notifications (with a buffer) to be delivered
Expand Down Expand Up @@ -148,12 +149,20 @@ def schedule_all_subscriptions() -> None:
report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5


@app.task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10)
@shared_task(
soft_time_limit=report_timeout_seconds,
time_limit=report_timeout_seconds + 10,
queue=CeleryQueue.SUBSCRIPTION_DELIVERY,
)
def deliver_subscription_report(subscription_id: int) -> None:
return _deliver_subscription_report(subscription_id)


@app.task(soft_time_limit=report_timeout_seconds, time_limit=report_timeout_seconds + 10)
@shared_task(
soft_time_limit=report_timeout_seconds,
time_limit=report_timeout_seconds + 10,
queue=CeleryQueue.SUBSCRIPTION_DELIVERY,
)
def handle_subscription_value_change(
subscription_id: int, previous_value: str, invite_message: Optional[str] = None
) -> None:
Expand Down
14 changes: 1 addition & 13 deletions posthog/api/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any, Dict, List

from dateutil import parser
from django.db import IntegrityError
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from rest_framework import exceptions, request, serializers, status, viewsets
Expand All @@ -11,10 +10,9 @@

from posthog.api.routing import StructuredViewSetMixin
from posthog.api.utils import get_token
from posthog.celery import app
from posthog.exceptions import generate_exception_response
from posthog.models.prompt import Prompt, PromptSequence, UserPromptState
from posthog.models.user import User
from posthog.tasks.prompts import trigger_prompt_for_user
from posthog.utils_cors import cors_response


Expand Down Expand Up @@ -185,16 +183,6 @@ class Meta:
]


@app.task(ignore_result=True)
def trigger_prompt_for_user(email: str, sequence_id: int):
try:
sequence = PromptSequence.objects.get(pk=sequence_id)
user = User.objects.get(email=email)
UserPromptState.objects.get_or_create(user=user, sequence=sequence, step=None)
except (User.DoesNotExist, IntegrityError):
pass


@csrf_exempt
def prompt_webhook(request: request.Request):
if request.method == "POST":
Expand Down
6 changes: 3 additions & 3 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.11
'''
/* celery:posthog.celery.clickhouse_clear_removed_data */
/* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */
SELECT DISTINCT team_id,
cohort_id
FROM cohortpeople
Expand Down Expand Up @@ -168,7 +168,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.8
'''
/* celery:posthog.celery.clickhouse_clear_removed_data */
/* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */
SELECT DISTINCT team_id,
cohort_id
FROM cohortpeople
Expand All @@ -179,7 +179,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.9
'''
/* celery:posthog.celery.clickhouse_clear_removed_data */
/* celery:posthog.tasks.tasks.clickhouse_clear_removed_data */
DELETE
FROM cohortpeople
WHERE (team_id = 2
Expand Down
12 changes: 6 additions & 6 deletions posthog/api/test/test_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test.client import Client
from django.utils import timezone
from posthog.celery import clickhouse_clear_removed_data
from posthog.clickhouse.client.execute import sync_execute
from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType
from posthog.schema import PropertyOperator
from posthog.tasks.calculate_cohort import calculate_cohort_from_list
from rest_framework import status
from rest_framework.test import APIClient

from posthog.api.test.test_exports import TestExportMixin
from posthog.clickhouse.client.execute import sync_execute
from posthog.models import FeatureFlag, Person
from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType
from posthog.models.cohort import Cohort
from posthog.models.team.team import Team
from posthog.schema import PropertyOperator
from posthog.tasks.calculate_cohort import calculate_cohort_from_list
from posthog.tasks.tasks import clickhouse_clear_removed_data
from posthog.test.base import (
APIBaseTest,
ClickhouseTestMixin,
QueryMatchingTest,
_create_event,
_create_person,
flush_persons_and_events,
QueryMatchingTest,
snapshot_clickhouse_queries,
)

Expand Down
3 changes: 1 addition & 2 deletions posthog/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from posthoganalytics.client import Client

from posthog.settings import SELF_CAPTURE, SKIP_ASYNC_MIGRATIONS_SETUP
from posthog.tasks.tasks import sync_all_organization_available_features
from posthog.utils import (
get_git_branch,
get_git_commit,
Expand Down Expand Up @@ -35,8 +36,6 @@ def ready(self):
# log development server launch to posthog
if os.getenv("RUN_MAIN") == "true":
# Sync all organization.available_features once on launch, in case plans changed
from posthog.celery import sync_all_organization_available_features

sync_all_organization_available_features()

# NOTE: This has to be created as a separate client so that the "capture" call doesn't lock in the properties
Expand Down
5 changes: 1 addition & 4 deletions posthog/caching/insight_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from posthog.caching.calculate_results import calculate_result_by_insight
from posthog.models import Dashboard, Insight, InsightCachingState, Team
from posthog.models.instance_setting import get_instance_setting
from posthog.tasks.tasks import update_cache_task

logger = structlog.get_logger(__name__)

Expand All @@ -25,8 +26,6 @@


def schedule_cache_updates():
from posthog.celery import update_cache_task

# :TODO: Separate celery queue for updates rather than limiting via this method
PARALLEL_INSIGHT_CACHE = get_instance_setting("PARALLEL_DASHBOARD_ITEM_CACHE")

Expand Down Expand Up @@ -139,8 +138,6 @@ def update_cache(caching_state_id: UUID):
statsd.incr("caching_state_update_errors")

if caching_state.refresh_attempt < MAX_ATTEMPTS:
from posthog.celery import update_cache_task

update_cache_task.apply_async(args=[caching_state_id], countdown=timedelta(minutes=10).total_seconds())

InsightCachingState.objects.filter(pk=caching_state.pk).update(
Expand Down
4 changes: 2 additions & 2 deletions posthog/caching/test/test_insight_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def cache_keys(cache):


@pytest.mark.django_db
@patch("posthog.celery.update_cache_task")
@patch("posthog.caching.insight_cache.update_cache_task")
def test_schedule_cache_updates(update_cache_task, team: Team, user: User):
caching_state1 = create_insight_caching_state(team, user, filters=filter_dict, last_refresh=None)
create_insight_caching_state(team, user, filters=filter_dict)
Expand Down Expand Up @@ -164,7 +164,7 @@ def test_update_cache_updates_identical_cache_keys(team: Team, user: User, cache

@pytest.mark.django_db
@freeze_time("2020-01-04T13:01:01Z")
@patch("posthog.celery.update_cache_task")
@patch("posthog.caching.insight_cache.update_cache_task")
@patch("posthog.caching.insight_cache.calculate_result_by_insight")
def test_update_cache_when_calculation_fails(
spy_calculate_result_by_insight,
Expand Down
Loading
Loading