From 45c4443d05584790c80b7ec952713c0d838a55ae Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 17 Jan 2024 17:38:14 +0100 Subject: [PATCH] fix: Add timeout to subscription reports (#19811) --- docker-compose.base.yml | 7 +++++++ docker-compose.dev.yml | 7 +++++++ ee/settings.py | 5 ++++- ee/tasks/subscriptions/__init__.py | 8 ++++++-- ee/tasks/subscriptions/subscription_utils.py | 2 +- .../subscriptions/test_subscriptions_utils.py | 10 +++++----- posthog/api/test/__snapshots__/test_decide.ambr | 16 ++++++++++++++++ posthog/tasks/exporter.py | 2 ++ 8 files changed, 48 insertions(+), 9 deletions(-) diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 72ed8dedde605..22cd812fe2790 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -60,6 +60,13 @@ services: image: maildev/maildev:2.0.5 restart: on-failure + flower: + image: mher/flower:2.0.0 + restart: on-failure + environment: + FLOWER_PORT: 5555 + CELERY_BROKER_URL: redis://redis:6379 + worker: &worker command: ./bin/docker-worker-celery --with-scheduler restart: on-failure diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 365ec2c5b452d..b4601915c95c3 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -27,6 +27,13 @@ services: ports: - '6379:6379' + flower: + extends: + file: docker-compose.base.yml + service: flower + ports: + - '5555:5555' + clickhouse: extends: file: docker-compose.base.yml diff --git a/ee/settings.py b/ee/settings.py index 5fbb4c81fe8ce..766f1533822c1 100644 --- a/ee/settings.py +++ b/ee/settings.py @@ -68,4 +68,7 @@ # Whether to enable the admin portal. Default false for self-hosted as if not setup properly can pose security issues. ADMIN_PORTAL_ENABLED = get_from_env("ADMIN_PORTAL_ENABLED", DEMO or DEBUG, type_cast=str_to_bool) -ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10, type_cast=int) +ASSET_GENERATION_MAX_TIMEOUT_SECONDS = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_SECONDS", 60.0, type_cast=float) +PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env( + "PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10.0, type_cast=float +) diff --git a/ee/tasks/subscriptions/__init__.py b/ee/tasks/subscriptions/__init__.py index 7be2f2cf1ca22..70f9b474d2a5b 100644 --- a/ee/tasks/subscriptions/__init__.py +++ b/ee/tasks/subscriptions/__init__.py @@ -9,6 +9,7 @@ from ee.tasks.subscriptions.email_subscriptions import send_email_subscription_report from ee.tasks.subscriptions.slack_subscriptions import send_slack_subscription_report from ee.tasks.subscriptions.subscription_utils import generate_assets +from posthog import settings from posthog.celery import app from posthog.models.subscription import Subscription @@ -145,14 +146,17 @@ def schedule_all_subscriptions() -> None: deliver_subscription_report.delay(subscription.id) -@app.task() +report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5 + + +@app.task(soft_time_limit=report_timeout_seconds, timeout=report_timeout_seconds + 10) def deliver_subscription_report(subscription_id: int) -> None: if not settings.TEST: return return _deliver_subscription_report(subscription_id) -@app.task() +@app.task(soft_time_limit=30, time_limit=40) def handle_subscription_value_change( subscription_id: int, previous_value: str, invite_message: Optional[str] = None ) -> None: diff --git a/ee/tasks/subscriptions/subscription_utils.py b/ee/tasks/subscriptions/subscription_utils.py index 5df00e4a8ee85..3350a5cc561b7 100644 --- a/ee/tasks/subscriptions/subscription_utils.py +++ b/ee/tasks/subscriptions/subscription_utils.py @@ -60,7 +60,7 @@ def generate_assets( wait_for_parallel_celery_group( parallel_job, - max_timeout=timedelta(minutes=settings.ASSET_GENERATION_MAX_TIMEOUT_MINUTES), + max_timeout=timedelta(minutes=settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES), ) return insights, assets diff --git a/ee/tasks/test/subscriptions/test_subscriptions_utils.py b/ee/tasks/test/subscriptions/test_subscriptions_utils.py index 35b2ca350ed8a..c8ff89adcea65 100644 --- a/ee/tasks/test/subscriptions/test_subscriptions_utils.py +++ b/ee/tasks/test/subscriptions/test_subscriptions_utils.py @@ -34,7 +34,7 @@ def setUp(self) -> None: self.subscription = create_subscription(team=self.team, insight=self.insight, created_by=self.user) def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None: - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): insights, assets = generate_assets(self.subscription) assert insights == [self.insight] @@ -44,7 +44,7 @@ def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_gr def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None: subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user) - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): insights, assets = generate_assets(subscription) assert len(insights) == len(self.tiles) @@ -54,7 +54,7 @@ def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_ def test_raises_if_missing_resource(self, _mock_export_task: MagicMock, _mock_group: MagicMock) -> None: subscription = create_subscription(team=self.team, created_by=self.user) - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e: + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e: generate_assets(subscription) assert str(e.value) == "There are no insights to be sent for this Subscription" @@ -68,7 +68,7 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo current_tile.insight.save() subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user) - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): insights, assets = generate_assets(subscription) assert len(insights) == 1 @@ -90,7 +90,7 @@ def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_ mock_running_exports.children = [running_export_task] mock_running_exports.ready = mock_ready - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e: + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e: generate_assets(self.subscription) assert str(e.value) == "Timed out waiting for celery task to finish" diff --git a/posthog/api/test/__snapshots__/test_decide.ambr b/posthog/api/test/__snapshots__/test_decide.ambr index d8daad640dd92..0082db5b23262 100644 --- a/posthog/api/test/__snapshots__/test_decide.ambr +++ b/posthog/api/test/__snapshots__/test_decide.ambr @@ -88,6 +88,22 @@ LIMIT 21 /*controller='team-detail',route='api/projects/%28%3FP%3Cid%3E%5B%5E/.%5D%2B%29/%3F%24'*/ ''' # --- +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.10 + ''' + SELECT "posthog_pluginconfig"."id", + "posthog_pluginconfig"."web_token", + "posthog_pluginsourcefile"."updated_at", + "posthog_plugin"."updated_at", + "posthog_pluginconfig"."updated_at" + FROM "posthog_pluginconfig" + INNER JOIN "posthog_plugin" ON ("posthog_pluginconfig"."plugin_id" = "posthog_plugin"."id") + INNER JOIN "posthog_pluginsourcefile" ON ("posthog_plugin"."id" = "posthog_pluginsourcefile"."plugin_id") + WHERE ("posthog_pluginconfig"."enabled" + AND "posthog_pluginsourcefile"."filename" = 'site.ts' + AND "posthog_pluginsourcefile"."status" = 'TRANSPILED' + AND "posthog_pluginconfig"."team_id" = 2) + ''' +# --- # name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.2 ''' SELECT "posthog_organizationmembership"."id", diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index 69f968f207cea..ffec5d5b1142f 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -2,6 +2,7 @@ from prometheus_client import Counter, Histogram +from posthog import settings from posthog.celery import app from posthog.models import ExportedAsset @@ -40,6 +41,7 @@ retry_backoff=True, acks_late=True, ignore_result=False, + time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS, ) def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None: from posthog.tasks.exports import csv_exporter, image_exporter