From e983e10ed18897724e4e9bff5715aef15b7350a5 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Sun, 1 Oct 2023 12:06:00 +0100 Subject: [PATCH] fix: subscriptions revoke child exports (#17700) * fix: subscriptions should revoke children on timeout * fix: subscriptions should revoke children on timeout * check state before revoking * Update query snapshots --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- ...ickhouse_experiment_secondary_results.ambr | 2 +- .../subscriptions/test_subscriptions_utils.py | 21 +++++++++++++++++++ posthog/utils.py | 17 ++++++++++++++- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr index 9f9e01f13028a..d42c1cb3ff2e1 100644 --- a/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr +++ b/ee/clickhouse/views/test/__snapshots__/test_clickhouse_experiment_secondary_results.ambr @@ -1,6 +1,6 @@ # name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results ' - /* user_id:128 celery:posthog.celery.sync_insight_caching_state */ + /* user_id:125 celery:posthog.celery.sync_insight_caching_state */ SELECT team_id, date_diff('second', max(timestamp), now()) AS age FROM events diff --git a/ee/tasks/test/subscriptions/test_subscriptions_utils.py b/ee/tasks/test/subscriptions/test_subscriptions_utils.py index 7bdc897b935bb..440dcc97904f4 100644 --- a/ee/tasks/test/subscriptions/test_subscriptions_utils.py +++ b/ee/tasks/test/subscriptions/test_subscriptions_utils.py @@ -71,3 +71,24 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo assert len(insights) == 1 assert len(assets) == 1 assert mock_export_task.s.call_count == 1 + + def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_group: MagicMock) -> None: + # mock the group so that its children are never ready, + # and we capture calls to revoke + mock_running_exports = MagicMock() + mock_ready = MagicMock() + running_export_task = MagicMock() + + running_export_task.state = "PENDING" + + mock_ready.return_value = False + mock_group.return_value.apply_async.return_value = mock_running_exports + + mock_running_exports.children = [running_export_task] + mock_running_exports.ready = mock_ready + + with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e: + generate_assets(self.subscription) + + assert str(e.value) == "Timed out waiting for celery task to finish" + running_export_task.revoke.assert_called() diff --git a/posthog/utils.py b/posthog/utils.py index fd50a3a600445..4de5563bc0d0b 100644 --- a/posthog/utils.py +++ b/posthog/utils.py @@ -36,6 +36,7 @@ import pytz import structlog from asgiref.sync import async_to_sync +from celery.result import AsyncResult from celery.schedules import crontab from dateutil import parser from dateutil.relativedelta import relativedelta @@ -1215,12 +1216,26 @@ async def wait_for_parallel_celery_group(task: Any, max_timeout: Optional[dateti while not task.ready(): if timezone.now() - start_time > max_timeout: + child_states = [] + child: AsyncResult + for child in task.children: + child_states.append(child.state) + # this child should not be retried... + if child.state in ["PENDING", "STARTED"]: + # terminating here terminates the process not the task + # but if the task is in PENDING or STARTED after 10 minutes + # we have to assume the celery process isn't processing another task + # see: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks + # and: https://docs.celeryq.dev/en/latest/reference/celery.result.html + # we terminate the process to avoid leaking an instance of Chrome + child.revoke(terminate=True) + logger.error( "Timed out waiting for celery task to finish", ready=task.ready(), successful=task.successful(), failed=task.failed(), - child_states=[child.state for child in task.children], + child_states=child_states, timeout=max_timeout, start_time=start_time, )