Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In run_process moved deliver_cancel exception handling to the caller code #1555

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/1532.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
In `run_process` moved `deliver_cancel` exception handling to the caller code.
100 changes: 50 additions & 50 deletions trio/_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import subprocess
import logging
import sys
from typing import Optional
from functools import partial
Expand Down Expand Up @@ -381,28 +382,20 @@ async def open_process(


async def _windows_deliver_cancel(p):
try:
p.terminate()
except OSError as exc:
warnings.warn(RuntimeWarning(f"TerminateProcess on {p!r} failed with: {exc!r}"))
p.terminate()


async def _posix_deliver_cancel(p):
try:
p.terminate()
await trio.sleep(5)
warnings.warn(
RuntimeWarning(
f"process {p!r} ignored SIGTERM for 5 seconds. "
f"(Maybe you should pass a custom deliver_cancel?) "
f"Trying SIGKILL."
)
)
p.kill()
except OSError as exc:
warnings.warn(
RuntimeWarning(f"tried to kill process {p!r}, but failed with: {exc!r}")
p.terminate()
await trio.sleep(5)
warnings.warn(
RuntimeWarning(
f"process {p!r} ignored SIGTERM for 5 seconds. "
f"(Maybe you should pass a custom deliver_cancel?) "
f"Trying SIGKILL."
)
)
p.kill()


async def run_process(
Expand Down Expand Up @@ -592,41 +585,48 @@ async def my_deliver_cancel(process):
stdout_chunks = []
stderr_chunks = []

async with await open_process(command, **options) as proc:

async def feed_input():
async with proc.stdin:
try:
await proc.stdin.send_all(input)
except trio.BrokenResourceError:
pass

async def read_output(stream, chunks):
async with stream:
async for chunk in stream:
chunks.append(chunk)

async with trio.open_nursery() as nursery:
if proc.stdin is not None:
nursery.start_soon(feed_input)
if proc.stdout is not None:
nursery.start_soon(read_output, proc.stdout, stdout_chunks)
if proc.stderr is not None:
nursery.start_soon(read_output, proc.stderr, stderr_chunks)
proc = await open_process(command, **options)

async def feed_input():
async with proc.stdin:
try:
await proc.wait()
except trio.Cancelled:
with trio.CancelScope(shield=True):
killer_cscope = trio.CancelScope(shield=True)
await proc.stdin.send_all(input)
except trio.BrokenResourceError:
pass

async def killer():
with killer_cscope:
async def read_output(stream, chunks):
async with stream:
async for chunk in stream:
chunks.append(chunk)

async with trio.open_nursery() as nursery:
if proc.stdin is not None:
nursery.start_soon(feed_input)
if proc.stdout is not None:
nursery.start_soon(read_output, proc.stdout, stdout_chunks)
if proc.stderr is not None:
nursery.start_soon(read_output, proc.stderr, stderr_chunks)
try:
await proc.wait()
except trio.Cancelled:
with trio.CancelScope(shield=True):
killer_cscope = trio.CancelScope(shield=True)

async def killer():
with killer_cscope:
try:
await deliver_cancel(proc)

nursery.start_soon(killer)
await proc.wait()
killer_cscope.cancel()
raise
except BaseException as exc:
LOGGER = logging.getLogger("trio.run_process")
LOGGER.exception(
f"tried to kill process {proc!r}, but failed with: {exc!r}"
)
raise

nursery.start_soon(killer)
await proc.wait()
killer_cscope.cancel()
raise

stdout = b"".join(stdout_chunks) if proc.stdout is not None else None
stderr = b"".join(stderr_chunks) if proc.stderr is not None else None
Expand Down
4 changes: 2 additions & 2 deletions trio/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ async def custom_deliver_cancel(proc):
assert custom_deliver_cancel_called


async def test_warn_on_failed_cancel_terminate(monkeypatch):
async def test_log_on_failed_cancel_terminate(monkeypatch):
original_terminate = Process.terminate

def broken_terminate(self):
Expand All @@ -464,7 +464,7 @@ def broken_terminate(self):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer testing a warning, so you should change the name of the test (since it's currently "warn_on_failed_cancel_terminate")

monkeypatch.setattr(Process, "terminate", broken_terminate)

with pytest.warns(RuntimeWarning, match=".*whoops.*"):
with pytest.raises(OSError, match=".*whoops.*"):
async with _core.open_nursery() as nursery:
nursery.start_soon(run_process, SLEEP(9999))
await wait_all_tasks_blocked()
Expand Down