Skip to content

Commit

Permalink
fix: Add timeout to subscription reports (#19811)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jan 17, 2024
1 parent 6a1af49 commit 45c4443
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 9 deletions.
7 changes: 7 additions & 0 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ services:
image: maildev/maildev:2.0.5
restart: on-failure

flower:
image: mher/flower:2.0.0
restart: on-failure
environment:
FLOWER_PORT: 5555
CELERY_BROKER_URL: redis://redis:6379

worker: &worker
command: ./bin/docker-worker-celery --with-scheduler
restart: on-failure
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ services:
ports:
- '6379:6379'

flower:
extends:
file: docker-compose.base.yml
service: flower
ports:
- '5555:5555'

clickhouse:
extends:
file: docker-compose.base.yml
Expand Down
5 changes: 4 additions & 1 deletion ee/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@
# Whether to enable the admin portal. Default false for self-hosted as if not setup properly can pose security issues.
ADMIN_PORTAL_ENABLED = get_from_env("ADMIN_PORTAL_ENABLED", DEMO or DEBUG, type_cast=str_to_bool)

ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10, type_cast=int)
ASSET_GENERATION_MAX_TIMEOUT_SECONDS = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_SECONDS", 60.0, type_cast=float)
PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env(
"PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10.0, type_cast=float
)
8 changes: 6 additions & 2 deletions ee/tasks/subscriptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ee.tasks.subscriptions.email_subscriptions import send_email_subscription_report
from ee.tasks.subscriptions.slack_subscriptions import send_slack_subscription_report
from ee.tasks.subscriptions.subscription_utils import generate_assets
from posthog import settings
from posthog.celery import app
from posthog.models.subscription import Subscription

Expand Down Expand Up @@ -145,14 +146,17 @@ def schedule_all_subscriptions() -> None:
deliver_subscription_report.delay(subscription.id)


@app.task()
report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5


@app.task(soft_time_limit=report_timeout_seconds, timeout=report_timeout_seconds + 10)
def deliver_subscription_report(subscription_id: int) -> None:
if not settings.TEST:
return
return _deliver_subscription_report(subscription_id)


@app.task()
@app.task(soft_time_limit=30, time_limit=40)
def handle_subscription_value_change(
subscription_id: int, previous_value: str, invite_message: Optional[str] = None
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion ee/tasks/subscriptions/subscription_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def generate_assets(

wait_for_parallel_celery_group(
parallel_job,
max_timeout=timedelta(minutes=settings.ASSET_GENERATION_MAX_TIMEOUT_MINUTES),
max_timeout=timedelta(minutes=settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES),
)

return insights, assets
10 changes: 5 additions & 5 deletions ee/tasks/test/subscriptions/test_subscriptions_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def setUp(self) -> None:
self.subscription = create_subscription(team=self.team, insight=self.insight, created_by=self.user)

def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None:
with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
insights, assets = generate_assets(self.subscription)

assert insights == [self.insight]
Expand All @@ -44,7 +44,7 @@ def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_gr
def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None:
subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user)

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
insights, assets = generate_assets(subscription)

assert len(insights) == len(self.tiles)
Expand All @@ -54,7 +54,7 @@ def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_
def test_raises_if_missing_resource(self, _mock_export_task: MagicMock, _mock_group: MagicMock) -> None:
subscription = create_subscription(team=self.team, created_by=self.user)

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e:
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e:
generate_assets(subscription)

assert str(e.value) == "There are no insights to be sent for this Subscription"
Expand All @@ -68,7 +68,7 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo
current_tile.insight.save()
subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user)

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
insights, assets = generate_assets(subscription)

assert len(insights) == 1
Expand All @@ -90,7 +90,7 @@ def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_
mock_running_exports.children = [running_export_task]
mock_running_exports.ready = mock_ready

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e:
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e:
generate_assets(self.subscription)

assert str(e.value) == "Timed out waiting for celery task to finish"
Expand Down
16 changes: 16 additions & 0 deletions posthog/api/test/__snapshots__/test_decide.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@
LIMIT 21 /*controller='team-detail',route='api/projects/%28%3FP%3Cid%3E%5B%5E/.%5D%2B%29/%3F%24'*/
'''
# ---
# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.10
'''
SELECT "posthog_pluginconfig"."id",
"posthog_pluginconfig"."web_token",
"posthog_pluginsourcefile"."updated_at",
"posthog_plugin"."updated_at",
"posthog_pluginconfig"."updated_at"
FROM "posthog_pluginconfig"
INNER JOIN "posthog_plugin" ON ("posthog_pluginconfig"."plugin_id" = "posthog_plugin"."id")
INNER JOIN "posthog_pluginsourcefile" ON ("posthog_plugin"."id" = "posthog_pluginsourcefile"."plugin_id")
WHERE ("posthog_pluginconfig"."enabled"
AND "posthog_pluginsourcefile"."filename" = 'site.ts'
AND "posthog_pluginsourcefile"."status" = 'TRANSPILED'
AND "posthog_pluginconfig"."team_id" = 2)
'''
# ---
# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.2
'''
SELECT "posthog_organizationmembership"."id",
Expand Down
2 changes: 2 additions & 0 deletions posthog/tasks/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from prometheus_client import Counter, Histogram

from posthog import settings
from posthog.celery import app
from posthog.models import ExportedAsset

Expand Down Expand Up @@ -40,6 +41,7 @@
retry_backoff=True,
acks_late=True,
ignore_result=False,
time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS,
)
def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None:
from posthog.tasks.exports import csv_exporter, image_exporter
Expand Down

0 comments on commit 45c4443

Please sign in to comment.