Skip to content

Commit

Permalink
Wait for activity completions on worker shutdown (#370)
Browse files Browse the repository at this point in the history
Fixes #368
  • Loading branch information
cretz authored Aug 24, 2023
1 parent 31358d1 commit 40daaaa
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
7 changes: 7 additions & 0 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ async def drain_poll_queue(self) -> None:
except temporalio.bridge.worker.PollShutdownError:
return

# Only call this after run()/drain_poll_queue() have returned. This will not
# raise an exception.
async def wait_all_completed(self) -> None:
running_tasks = [v.task for v in self._running_activities.values() if v.task]
if running_tasks:
await asyncio.gather(*running_tasks, return_exceptions=False)

def _cancel(
self, task_token: bytes, cancel: temporalio.bridge.proto.activity_task.Cancel
) -> None:
Expand Down
7 changes: 7 additions & 0 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,13 @@ async def raise_on_shutdown():
for task in tasks:
task.cancel()

# If there's an activity worker, we have to let all activity completions
# finish. We cannot guarantee that because poll shutdown completed
# (which means activities completed) that they got flushed to the
# server.
if self._activity_worker:
await self._activity_worker.wait_all_completed()

# Do final shutdown
try:
await self._bridge_worker.finalize_shutdown()
Expand Down

0 comments on commit 40daaaa

Please sign in to comment.