Skip to content

Commit

Permalink
Merge pull request #14 from tgoodlet/asyncgen_closing_fix
Browse files Browse the repository at this point in the history
Asyncgen closing fix
  • Loading branch information
goodboy authored Jul 14, 2018
2 parents c326a90 + 1f85f71 commit f636bfd
Showing 1 changed file with 57 additions and 39 deletions.
96 changes: 57 additions & 39 deletions tractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uuid

import trio
from async_generator import asynccontextmanager
from async_generator import asynccontextmanager, aclosing

from .ipc import Channel, _connect_chan
from .log import get_console_log, get_logger
Expand Down Expand Up @@ -77,21 +77,29 @@ async def _invoke(
coro = func(**kwargs)

if inspect.isasyncgen(coro):
async for item in coro:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})

log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': None, 'cid': cid})
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})

log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': None, 'cid': cid})
else:
if treat_as_gen:
# XXX: the async-func may spawn further tasks which push
Expand All @@ -102,14 +110,15 @@ async def _invoke(
else:
await chan.send({'return': await coro, 'cid': cid})

task_status.started()
except Exception:
log.exception("Actor errored:")
if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid})
log.exception("Actor errored:")
else:
raise

task_status.started()


async def result_from_q(q, chan):
"""Process a msg from a remote actor.
Expand Down Expand Up @@ -444,33 +453,36 @@ async def _async_main(
self._process_messages, self._parent_chan)

if self.main:
with trio.open_cancel_scope() as main_scope:
self._main_scope = main_scope
try:
if self._parent_chan:
try:
if self._parent_chan:
async with trio.open_nursery() as n:
self._main_scope = n.cancel_scope
log.debug(f"Starting main task `{self.main}`")
# spawned subactor so deliver "main"
# task result(s) back to parent
await nursery.start(
await n.start(
_invoke, 'main',
self._parent_chan, self.main, {},
# treat_as_gen, raise_errs params
False, True
)
else:
# run directly - we are an "unspawned actor"
else:
with trio.open_cancel_scope() as main_scope:
self._main_scope = main_scope
# run directly we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly")
result = await self.main()
finally:
self._main_complete.set()
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
log.debug(f"Shutting down root nursery")
nursery.cancel_scope.cancel()
if main_scope.cancelled_caught:
finally:
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
log.debug(f"Shutting down root nursery")
nursery.cancel_scope.cancel()
self._main_complete.set()

if self._main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully")
log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until
Expand Down Expand Up @@ -659,6 +671,7 @@ async def yield_from_q():
log.debug(
f"Cancelling async gen call {cid} to "
f"{self.channel.uid}")
raise

return yield_from_q()

Expand Down Expand Up @@ -874,14 +887,19 @@ async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete.
"""
if etype is not None:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case the
# else block here might not complete? Should both be shielded?
if etype is trio.Cancelled:
log.warn(f"{current_actor().uid} was cancelled with {etype}, "
"cancelling actor nursery")
with trio.open_cancel_scope(shield=True):
log.warn(
f"{current_actor().uid} was cancelled with {etype}"
", cancelling actor nursery")
await self.cancel()
else:
log.exception(f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
log.exception(
f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel()
else:
# XXX: this is effectively the lone cancellation/supervisor
Expand Down

0 comments on commit f636bfd

Please sign in to comment.