Skip to content

Commit

Permalink
fix: subscriptions revoke child exports (#17700)
Browse files Browse the repository at this point in the history
* fix: subscriptions should revoke children on timeout

* fix: subscriptions should revoke children on timeout

* check state before revoking

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
pauldambra and github-actions[bot] authored Oct 1, 2023
1 parent a4f76e5 commit e983e10
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:128 celery:posthog.celery.sync_insight_caching_state */
/* user_id:125 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
21 changes: 21 additions & 0 deletions ee/tasks/test/subscriptions/test_subscriptions_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,24 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo
assert len(insights) == 1
assert len(assets) == 1
assert mock_export_task.s.call_count == 1

def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_group: MagicMock) -> None:
# mock the group so that its children are never ready,
# and we capture calls to revoke
mock_running_exports = MagicMock()
mock_ready = MagicMock()
running_export_task = MagicMock()

running_export_task.state = "PENDING"

mock_ready.return_value = False
mock_group.return_value.apply_async.return_value = mock_running_exports

mock_running_exports.children = [running_export_task]
mock_running_exports.ready = mock_ready

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e:
generate_assets(self.subscription)

assert str(e.value) == "Timed out waiting for celery task to finish"
running_export_task.revoke.assert_called()
17 changes: 16 additions & 1 deletion posthog/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import pytz
import structlog
from asgiref.sync import async_to_sync
from celery.result import AsyncResult
from celery.schedules import crontab
from dateutil import parser
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -1215,12 +1216,26 @@ async def wait_for_parallel_celery_group(task: Any, max_timeout: Optional[dateti

while not task.ready():
if timezone.now() - start_time > max_timeout:
child_states = []
child: AsyncResult
for child in task.children:
child_states.append(child.state)
# this child should not be retried...
if child.state in ["PENDING", "STARTED"]:
# terminating here terminates the process not the task
# but if the task is in PENDING or STARTED after 10 minutes
# we have to assume the celery process isn't processing another task
# see: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
# and: https://docs.celeryq.dev/en/latest/reference/celery.result.html
# we terminate the process to avoid leaking an instance of Chrome
child.revoke(terminate=True)

logger.error(
"Timed out waiting for celery task to finish",
ready=task.ready(),
successful=task.successful(),
failed=task.failed(),
child_states=[child.state for child in task.children],
child_states=child_states,
timeout=max_timeout,
start_time=start_time,
)
Expand Down

0 comments on commit e983e10

Please sign in to comment.