diff --git a/ee/tasks/subscriptions/subscription_utils.py b/ee/tasks/subscriptions/subscription_utils.py index b75e26ca37856..ccd517b356cd9 100644 --- a/ee/tasks/subscriptions/subscription_utils.py +++ b/ee/tasks/subscriptions/subscription_utils.py @@ -2,7 +2,7 @@ from typing import List, Tuple, Union from django.conf import settings import structlog -from celery import group +from celery import chain from prometheus_client import Histogram from posthog.models.dashboard_tile import get_tiles_ordered_by_position @@ -45,8 +45,9 @@ def generate_assets( ExportedAsset.objects.bulk_create(assets) # Wait for all assets to be exported - tasks = [exporter.export_asset.s(asset.id) for asset in assets] - parallel_job = group(tasks).apply_async() + tasks = [exporter.export_asset.si(asset.id) for asset in assets] + # run them one after the other so we don't exhaust celery workers + parallel_job = chain(*tasks).apply_async() wait_for_parallel_celery_group( parallel_job, max_timeout=timedelta(minutes=settings.ASSET_GENERATION_MAX_TIMEOUT_MINUTES) diff --git a/ee/tasks/test/subscriptions/test_subscriptions_utils.py b/ee/tasks/test/subscriptions/test_subscriptions_utils.py index 440dcc97904f4..decdc8269e1e7 100644 --- a/ee/tasks/test/subscriptions/test_subscriptions_utils.py +++ b/ee/tasks/test/subscriptions/test_subscriptions_utils.py @@ -12,7 +12,7 @@ from posthog.test.base import APIBaseTest -@patch("ee.tasks.subscriptions.subscription_utils.group") +@patch("ee.tasks.subscriptions.subscription_utils.chain") @patch("ee.tasks.subscriptions.subscription_utils.exporter.export_asset") class TestSubscriptionsTasksUtils(APIBaseTest): dashboard: Dashboard @@ -36,7 +36,7 @@ def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_gr assert insights == [self.insight] assert len(assets) == 1 - assert mock_export_task.s.call_count == 1 + assert mock_export_task.si.call_count == 1 def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None: subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user) @@ -46,7 +46,7 @@ def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_ assert len(insights) == len(self.tiles) assert len(assets) == DEFAULT_MAX_ASSET_COUNT - assert mock_export_task.s.call_count == DEFAULT_MAX_ASSET_COUNT + assert mock_export_task.si.call_count == DEFAULT_MAX_ASSET_COUNT def test_raises_if_missing_resource(self, _mock_export_task: MagicMock, _mock_group: MagicMock) -> None: subscription = create_subscription(team=self.team, created_by=self.user) @@ -70,7 +70,7 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo assert len(insights) == 1 assert len(assets) == 1 - assert mock_export_task.s.call_count == 1 + assert mock_export_task.si.call_count == 1 def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_group: MagicMock) -> None: # mock the group so that its children are never ready,