diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py index 102b29aaa..47ee136a7 100644 --- a/examples/asynchronous_generators.py +++ b/examples/asynchronous_generators.py @@ -24,8 +24,9 @@ async def main(): # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(stream_forever): - print(letter) + async with portal.open_stream_from(stream_forever) as stream: + async for letter in stream: + print(letter) # we support trio's cancellation system assert cancel_scope.cancelled_caught diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index eadb4c104..c37b87983 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -26,8 +26,8 @@ async def main(): p1 = await n.start_actor('name_error', enable_modules=[__name__]) # retreive results - stream = await p0.run(breakpoint_forever) - await p1.run(name_error) + async with p0.open_stream_from(breakpoint_forever) as stream: + await p1.run(name_error) if __name__ == '__main__': diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py index cb160045d..d880404df 100644 --- a/examples/debugging/subactor_breakpoint.py +++ b/examples/debugging/subactor_breakpoint.py @@ -21,4 +21,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True) + tractor.run(main, debug_mode=True, loglevel='debug') diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 0ed5f66f6..126eed919 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -21,7 +21,7 @@ async def aggregate(seed): # fork point portal = await nursery.start_actor( name=f'streamer_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) portals.append(portal) @@ -29,10 +29,13 @@ async def aggregate(seed): send_chan, recv_chan = trio.open_memory_channel(500) async def push_to_chan(portal, send_chan): + + # TODO: https://github.com/goodboy/tractor/issues/207 async with send_chan: - async for value in await portal.run(stream_data, seed=seed): - # leverage trio's built-in backpressure - await send_chan.send(value) + async with portal.open_stream_from(stream_data, seed=seed) as stream: + async for value in stream: + # leverage trio's built-in backpressure + await send_chan.send(value) print(f"FINISHED ITERATING {portal.channel.uid}") @@ -71,18 +74,24 @@ async def main(): import time pre_start = time.time() - portal = await nursery.run_in_actor( - aggregate, + portal = await nursery.start_actor( name='aggregator', - seed=seed, + enable_modules=[__name__], ) - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + async with portal.open_stream_from( + aggregate, + seed=seed, + ) as stream: + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in stream: + result_stream.append(value) + + await portal.cancel_actor() print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") diff --git a/examples/multiple_streams_one_portal.py b/examples/multiple_streams_one_portal.py index 354ec66f6..3e592a45c 100644 --- a/examples/multiple_streams_one_portal.py +++ b/examples/multiple_streams_one_portal.py @@ -15,11 +15,12 @@ async def stream_data(seed=10): async def stream_from_portal(p, consumed): - async for item in await p.run(stream_data): - if item in consumed: - consumed.remove(item) - else: - consumed.append(item) + async with p.open_stream_from(stream_data) as stream: + async for item in stream: + if item in consumed: + consumed.remove(item) + else: + consumed.append(item) async def main(): diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 42aec354d..eadcb44a2 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err): async def main(): async with tractor.open_nursery() as nursery: - portal = await nursery.run_in_actor(assert_err, name='errorer', **args) + portal = await nursery.run_in_actor( + assert_err, name='errorer', **args + ) # get result(s) from main task try: @@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method): async with tractor.open_nursery() as n: portal = await n.start_actor( 'donny', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(stream_forever): - print(letter) + async with portal.open_stream_from(stream_forever) as stream: + async for letter in stream: + print(letter) # we support trio's cancellation system assert cancel_scope.cancelled_caught @@ -430,7 +433,6 @@ async def main(): tractor.run(main) - async def spin_for(period=3): "Sync sleep." time.sleep(period) @@ -438,7 +440,7 @@ async def spin_for(period=3): async def spawn(): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( + await tn.run_in_actor( spin_for, name='sleeper', ) @@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep( async def main(): with trio.fail_after(2): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( + await tn.run_in_actor( spawn, name='spawn', ) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index eff289740..af03ce654 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0): async def stream_from(portal): - async for value in await portal.result(): - print(value) + async with portal.open_stream_from(stream_forever) as stream: + async for value in stream: + print(value) async def spawn_and_check_registry( @@ -139,18 +140,20 @@ async def get_reg(): registry = await get_reg() assert actor.uid in registry - if with_streaming: - to_run = stream_forever - else: - to_run = trio.sleep_forever + try: + async with tractor.open_nursery() as n: + async with trio.open_nursery() as trion: - async with trio.open_nursery() as trion: - try: - async with tractor.open_nursery() as n: portals = {} for i in range(3): name = f'a{i}' - portals[name] = await n.run_in_actor(to_run, name=name) + if with_streaming: + portals[name] = await n.start_actor( + name=name, enable_modules=[__name__]) + + else: # no streaming + portals[name] = await n.run_in_actor( + trio.sleep_forever, name=name) # wait on last actor to come up async with tractor.wait_for_actor(name): @@ -171,19 +174,19 @@ async def get_reg(): trion.start_soon(cancel, use_signal, 1) last_p = pts[-1] - async for value in await last_p.result(): - print(value) + await stream_from(last_p) + else: await cancel(use_signal) - finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -260,36 +263,38 @@ async def close_chans_before_nursery( get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') async with tractor.open_nursery() as tn: - portal1 = await tn.run_in_actor( - stream_forever, - name='consumer1', - ) - agen1 = await portal1.result() - - portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) - agen2 = await portal2.run(stream_forever) - - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() - - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block - - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + portal1 = await tn.start_actor( + name='consumer1', enable_modules=[__name__]) + portal2 = await tn.start_actor( + 'consumer2', enable_modules=[__name__]) + + # TODO: compact this back as was in last commit once + # 3.9+, see https://github.com/goodboy/tractor/issues/207 + async with portal1.open_stream_from(stream_forever) as agen1: + async with portal2.open_stream_from( + stream_forever + ) as agen2: + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block + + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): - await trio.sleep(.5) + await trio.sleep(1) # all subactors should have de-registered registry = await get_reg() diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 3fcb45dcd..0d4c62d79 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -60,45 +60,45 @@ def pred(i): def pred(i): return isinstance(i, int) + # TODO: https://github.com/goodboy/tractor/issues/207 async with tractor.find_actor(pub_actor_name) as portal: - stream = await portal.run( + async with portal.open_stream_from( pubber, topics=which, seed=seed, - ) - task_status.started(stream) - times = 10 - count = 0 - await stream.__anext__() - async for pkt in stream: - for topic, value in pkt.items(): - assert pred(value) - count += 1 - if count >= times: - break - - await stream.aclose() - - stream = await portal.run( - pubber, - topics=['odd'], - seed=seed, - ) - - await stream.__anext__() - count = 0 - # async with aclosing(stream) as stream: - try: + ) as stream: + task_status.started(stream) + times = 10 + count = 0 + await stream.__anext__() async for pkt in stream: for topic, value in pkt.items(): - pass - # assert pred(value) + assert pred(value) count += 1 if count >= times: break - finally: + await stream.aclose() + async with portal.open_stream_from( + pubber, + topics=['odd'], + seed=seed, + ) as stream: + await stream.__anext__() + count = 0 + # async with aclosing(stream) as stream: + try: + async for pkt in stream: + for topic, value in pkt.items(): + pass + # assert pred(value) + count += 1 + if count >= times: + break + finally: + await stream.aclose() + @tractor.msg.pub(tasks=['one', 'two']) async def multilock_pubber(get_topics): @@ -128,11 +128,10 @@ async def test_required_args(callwith_expecterror): await func(**kwargs) else: async with tractor.open_nursery() as n: - # await func(**kwargs) - portal = await n.run_in_actor( - multilock_pubber, + + portal = await n.start_actor( name='pubber', - **kwargs + enable_modules=[__name__], ) async with tractor.wait_for_actor('pubber'): @@ -140,8 +139,14 @@ async def test_required_args(callwith_expecterror): await trio.sleep(0.5) - async for val in await portal.result(): - assert val == {'doggy': 10} + async with portal.open_stream_from( + multilock_pubber, + **kwargs + ) as stream: + async for val in stream: + assert val == {'doggy': 10} + + await portal.cancel_actor() @pytest.mark.parametrize( diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 45fbd5b8d..9aba327f6 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -61,37 +61,38 @@ async def stream_from_single_subactor(stream_func): # no brokerd actor found portal = await nursery.start_actor( 'streamerd', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) seq = range(10) - stream = await portal.run( - stream_func, # one of the funcs above + async with portal.open_stream_from( + stream_func, sequence=list(seq), # has to be msgpack serializable - ) - # it'd sure be nice to have an asyncitertools here... - iseq = iter(seq) - ival = next(iseq) + ) as stream: - async for val in stream: - assert val == ival + # it'd sure be nice to have an asyncitertools here... + iseq = iter(seq) + ival = next(iseq) - try: - ival = next(iseq) - except StopIteration: - # should cancel far end task which will be - # caught and no error is raised - await stream.aclose() + async for val in stream: + assert val == ival + + try: + ival = next(iseq) + except StopIteration: + # should cancel far end task which will be + # caught and no error is raised + await stream.aclose() - await trio.sleep(0.3) + await trio.sleep(0.3) - try: - await stream.__anext__() - except StopAsyncIteration: - # stop all spawned subactors - await portal.cancel_actor() - # await nursery.cancel() + try: + await stream.__anext__() + except StopAsyncIteration: + # stop all spawned subactors + await portal.cancel_actor() + # await nursery.cancel() @pytest.mark.parametrize( @@ -132,7 +133,7 @@ async def aggregate(seed): # fork point portal = await nursery.start_actor( name=f'streamer_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) portals.append(portal) @@ -141,11 +142,14 @@ async def aggregate(seed): async def push_to_chan(portal, send_chan): async with send_chan: - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await send_chan.send(value) + + async with portal.open_stream_from( + stream_data, seed=seed, + ) as stream: + + async for value in stream: + # leverage trio's built-in backpressure + await send_chan.send(value) print(f"FINISHED ITERATING {portal.channel.uid}") @@ -183,22 +187,24 @@ async def a_quadruple_example(): seed = int(1e3) pre_start = time.time() - portal = await nursery.run_in_actor( - aggregate, - seed=seed, + portal = await nursery.start_actor( name='aggregator', + enable_modules=[__name__], ) start = time.time() # the portal call returns exactly what you'd expect # as if the remote "aggregate" function was called locally result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + + async with portal.open_stream_from(aggregate, seed=seed) as stream: + async for value in stream: + result_stream.append(value) print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") assert result_stream == list(range(seed)) + await portal.cancel_actor() return result_stream @@ -272,48 +278,55 @@ async def test_respawn_consumer_task( async with tractor.open_nursery() as n: - stream = await(await n.run_in_actor( + portal = await n.start_actor( + name='streamer', + enable_modules=[__name__] + ) + async with portal.open_stream_from( stream_data, seed=11, - name='streamer', - )).result() + ) as stream: + + expect = set(range(11)) + received = [] - expect = set(range(11)) - received = [] + # this is the re-spawn task routine + async def consume(task_status=trio.TASK_STATUS_IGNORED): + print('starting consume task..') + nonlocal stream - # this is the re-spawn task routine - async def consume(task_status=trio.TASK_STATUS_IGNORED): - print('starting consume task..') - nonlocal stream + with trio.CancelScope() as cs: + task_status.started(cs) - with trio.CancelScope() as cs: - task_status.started(cs) + # shield stream's underlying channel from cancellation + with stream.shield(): - # shield stream's underlying channel from cancellation - with stream.shield(): + async for v in stream: + print(f'from stream: {v}') + expect.remove(v) + received.append(v) - async for v in stream: - print(f'from stream: {v}') - expect.remove(v) - received.append(v) + print('exited consume') - print('exited consume') + async with trio.open_nursery() as ln: + cs = await ln.start(consume) - async with trio.open_nursery() as ln: - cs = await ln.start(consume) + while True: - while True: + await trio.sleep(0.1) - await trio.sleep(0.1) + if received[-1] % 2 == 0: - if received[-1] % 2 == 0: + print('cancelling consume task..') + cs.cancel() - print('cancelling consume task..') - cs.cancel() + # respawn + cs = await ln.start(consume) - # respawn - cs = await ln.start(consume) + if not expect: + print("all values streamed, BREAKING") + break - if not expect: - print("all values streamed, BREAKING") - break + # TODO: this is justification for a + # ``ActorNursery.stream_from_actor()`` helper? + await portal.cancel_actor() diff --git a/tractor/_actor.py b/tractor/_actor.py index 6333091b0..8601e8373 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1093,4 +1093,4 @@ async def register_actor( event.set() async def unregister_actor(self, uid: Tuple[str, str]) -> None: - self._registry.pop(uid, None) + self._registry.pop(uid) diff --git a/tractor/_debug.py b/tractor/_debug.py index 6040d3ff2..75e502a1a 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -182,24 +182,22 @@ async def wait_for_parent_stdin_hijack( _debugger_request_cs = cs try: async with get_root() as portal: - with trio.fail_after(.5): - stream = await portal.run( + async with portal.open_stream_from( tractor._debug._hijack_stdin_relay_to_child, subactor_uid=actor.uid, - ) - async with aclosing(stream): + ) as stream: - # block until first yield above - async for val in stream: + # block until first yield above + async for val in stream: - assert val == 'Locked' - task_status.started() + assert val == 'Locked' + task_status.started() - # with trio.CancelScope(shield=True): - await do_unlock.wait() + # with trio.CancelScope(shield=True): + await do_unlock.wait() - # trigger cancellation of remote stream - break + # trigger cancellation of remote stream + break finally: log.debug(f"Exiting debugger for actor {actor}") global _in_debug diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 63e0d0948..61375904f 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -46,6 +46,10 @@ class ModuleNotExposed(ModuleNotFoundError): "The requested module is not exposed for RPC" +class NoRuntime(RuntimeError): + "The root actor has not been initialized yet" + + def pack_error(exc: BaseException) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 32b89660e..9d34b3af0 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -4,7 +4,6 @@ import typing from typing import Any, Tuple, Optional from functools import partial -import inspect import msgpack import trio diff --git a/tractor/_portal.py b/tractor/_portal.py index 6d1c802c0..d82e040bc 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -3,11 +3,12 @@ """ import importlib import inspect -import typing -from typing import Tuple, Any, Dict, Optional, Set, Iterator +from typing import ( + Tuple, Any, Dict, Optional, Set, + Callable, AsyncGenerator +) from functools import partial from dataclasses import dataclass -from contextlib import contextmanager import warnings import trio @@ -17,16 +18,17 @@ from ._ipc import Channel from .log import get_logger from ._exceptions import unpack_error, NoResult, RemoteActorError +from ._streaming import Context, ReceiveMsgStream -log = get_logger('tractor') +log = get_logger(__name__) @asynccontextmanager async def maybe_open_nursery( nursery: trio.Nursery = None, shield: bool = False, -) -> typing.AsyncGenerator[trio.Nursery, Any]: +) -> AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -39,113 +41,30 @@ async def maybe_open_nursery( yield nursery -class ReceiveStream(trio.abc.ReceiveChannel): - """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination across an - inter-actor ``Channel``. This is the type returned to a local task - which invoked a remote streaming function using `Portal.run()`. - - Termination rules: - - if the local task signals stop iteration a cancel signal is - relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise a - ``StopAsyncIteration`` to terminate the local ``async for`` - - """ - def __init__( - self, - cid: str, - rx_chan: trio.abc.ReceiveChannel, - portal: 'Portal', - ) -> None: - self._cid = cid - self._rx_chan = rx_chan - self._portal = portal - self._shielded = False - - # delegate directly to underlying mem channel - def receive_nowait(self): - return self._rx_chan.receive_nowait() - - async def receive(self): - try: - msg = await self._rx_chan.receive() - return msg['yield'] - except trio.ClosedResourceError: - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() - raise StopAsyncIteration - except trio.Cancelled: - # relay cancels to the remote task - await self.aclose() - raise - except KeyError: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - raise unpack_error(msg, self._portal.channel) - - @contextmanager - def shield( - self - ) -> Iterator['ReceiveStream']: # noqa - """Shield this stream's underlying channel such that a local consumer task - can be cancelled (and possibly restarted) using ``trio.Cancelled``. - - """ - self._shielded = True - yield self - self._shielded = False - - async def aclose(self): - """Cancel associated remote actor task and local memory channel - on close. - """ - if self._rx_chan._closed: - log.warning(f"{self} is already closed") - return - - if self._shielded: - log.warning(f"{self} is shielded, portal channel being kept alive") - return - - cid = self._cid - with trio.move_on_after(0.5) as cs: - cs.shield = True - log.warning( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") - - # NOTE: we're telling the far end actor to cancel a task - # corresponding to *this actor*. The far end local channel - # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns('self', '_cancel_task', cid=cid) - - if cs.cancelled_caught: - # XXX: there's no way to know if the remote task was indeed - # cancelled in the case where the connection is broken or - # some other network error occurred. - if not self._portal.channel.connected(): - log.warning( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") - - with trio.CancelScope(shield=True): - await self._rx_chan.aclose() - - def clone(self): - return self +def func_deats(func: Callable) -> Tuple[str, str]: + return ( + func.__module__, + func.__name__, + ) class Portal: """A 'portal' to a(n) (remote) ``Actor``. - Allows for invoking remote routines and receiving results through an - underlying ``tractor.Channel`` as though the remote (async) - function / generator was invoked locally. + A portal is "opened" (and eventually closed) by one side of an + inter-actor communication context. The side which opens the portal + is equivalent to a "caller" in function parlance and usually is + either the called actor's parent (in process tree hierarchy terms) + or a client interested in scheduling work to be done remotely in a + far process. + + The portal api allows the "caller" actor to invoke remote routines + and receive results through an underlying ``tractor.Channel`` as + though the remote (async) function / generator was called locally. + It may be thought of loosely as an RPC api where native Python + function calling semantics are supported transparently; hence it is + like having a "portal" between the seperate actor memory spaces. - Think of this like a native async IPC API. """ def __init__(self, channel: Channel) -> None: self.channel = channel @@ -157,7 +76,7 @@ def __init__(self, channel: Channel) -> None: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None - self._streams: Set[ReceiveStream] = set() + self._streams: Set[ReceiveMsgStream] = set() self.actor = current_actor() async def _submit( @@ -182,102 +101,37 @@ async def _submit( first_msg = await recv_chan.receive() functype = first_msg.get('functype') - if functype == 'asyncfunc': - resp_type = 'return' - elif functype == 'asyncgen': - resp_type = 'yield' - elif 'error' in first_msg: + if 'error' in first_msg: raise unpack_error(first_msg, self.channel) - else: + + elif functype not in ('asyncfunc', 'asyncgen', 'context'): raise ValueError(f"{first_msg} is an invalid response packet?") - return cid, recv_chan, resp_type, first_msg + return cid, recv_chan, functype, first_msg async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: + assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, kwargs) - - async def run( - self, - func_or_ns: str, - fn_name: Optional[str] = None, - **kwargs - ) -> Any: - """Submit a remote function to be scheduled and run by actor, in - a new task, wrap and return its (stream of) result(s). - This is a blocking call and returns either a value from the - remote rpc task or a local async generator instance. - """ - if isinstance(func_or_ns, str): - warnings.warn( - "`Portal.run(namespace: str, funcname: str)` is now" - "deprecated, pass a function reference directly instead\n" - "If you still want to run a remote function by name use" - "`Portal.run_from_ns()`", - DeprecationWarning, - stacklevel=2, - ) - fn_mod_path = func_or_ns - assert isinstance(fn_name, str) - - else: # function reference was passed directly - fn = func_or_ns - fn_mod_path = fn.__module__ - fn_name = fn.__name__ - - return await self._return_from_resptype( - *(await self._submit(fn_mod_path, fn_name, kwargs)) - ) - - async def run_from_ns( - self, - namespace_path: str, - function_name: str, - **kwargs, - ) -> Any: - """Run a function from a (remote) namespace in a new task on the far-end actor. - - This is a more explitcit way to run tasks in a remote-process - actor using explicit object-path syntax. Hint: this is how - `.run()` works underneath. - - Note:: - - A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this should only - be used for `tractor` internals. - """ - return await self._return_from_resptype( - *(await self._submit(namespace_path, function_name, kwargs)) - ) + self._expect_result = await self._submit(ns, func, kwargs) - async def _return_from_resptype( + async def _return_once( self, cid: str, recv_chan: trio.abc.ReceiveChannel, resptype: str, first_msg: dict ) -> Any: - # TODO: not this needs some serious work and thinking about how - # to make async-generators the fundamental IPC API over channels! - # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': # stream response - rchan = ReceiveStream(cid, recv_chan, self) - self._streams.add(rchan) - return rchan - - elif resptype == 'return': # single response - msg = await recv_chan.receive() - try: - return msg['return'] - except KeyError: - # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, self.channel) - else: - raise ValueError(f"Unknown msg response type: {first_msg}") + assert resptype == 'asyncfunc' # single response + + msg = await recv_chan.receive() + try: + return msg['return'] + except KeyError: + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, self.channel) async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. @@ -300,9 +154,7 @@ async def result(self) -> Any: assert self._expect_result if self._result is None: try: - self._result = await self._return_from_resptype( - *self._expect_result - ) + self._result = await self._return_once(*self._expect_result) except RemoteActorError as err: self._result = err @@ -369,6 +221,122 @@ async def cancel_actor(self): f"{self.channel} for {self.channel.uid} was already closed?") return False + async def run_from_ns( + self, + namespace_path: str, + function_name: str, + **kwargs, + ) -> Any: + """Run a function from a (remote) namespace in a new task on the far-end actor. + + This is a more explitcit way to run tasks in a remote-process + actor using explicit object-path syntax. Hint: this is how + `.run()` works underneath. + + Note:: + + A special namespace `self` can be used to invoke `Actor` + instance methods in the remote runtime. Currently this should only + be used for `tractor` internals. + """ + return await self._return_once( + *(await self._submit(namespace_path, function_name, kwargs)) + ) + + async def run( + self, + func: str, + fn_name: Optional[str] = None, + **kwargs + ) -> Any: + """Submit a remote function to be scheduled and run by actor, in + a new task, wrap and return its (stream of) result(s). + + This is a blocking call and returns either a value from the + remote rpc task or a local async generator instance. + """ + if isinstance(func, str): + warnings.warn( + "`Portal.run(namespace: str, funcname: str)` is now" + "deprecated, pass a function reference directly instead\n" + "If you still want to run a remote function by name use" + "`Portal.run_from_ns()`", + DeprecationWarning, + stacklevel=2, + ) + fn_mod_path = func + assert isinstance(fn_name, str) + + else: # function reference was passed directly + if ( + not inspect.iscoroutinefunction(func) or + ( + inspect.iscoroutinefunction(func) and + getattr(func, '_tractor_stream_function', False) + ) + ): + raise TypeError( + f'{func} must be a non-streaming async function!') + + fn_mod_path, fn_name = func_deats(func) + + return await self._return_once( + *(await self._submit(fn_mod_path, fn_name, kwargs)) + ) + + @asynccontextmanager + async def open_stream_from( + self, + async_gen_func: Callable, # typing: ignore + **kwargs, + ) -> AsyncGenerator[ReceiveMsgStream, None]: + + if not inspect.isasyncgenfunction(async_gen_func): + if not ( + inspect.iscoroutinefunction(async_gen_func) and + getattr(async_gen_func, '_tractor_stream_function', False) + ): + raise TypeError( + f'{async_gen_func} must be an async generator function!') + + fn_mod_path, fn_name = func_deats(async_gen_func) + ( + cid, + recv_chan, + functype, + first_msg + ) = await self._submit(fn_mod_path, fn_name, kwargs) + + # receive only stream + assert functype == 'asyncgen' + + ctx = Context(self.channel, cid, _portal=self) + try: + async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: + self._streams.add(rchan) + yield rchan + finally: + # cancel the far end task on consumer close + try: + await ctx.cancel() + except trio.ClosedResourceError: + # if the far end terminates before we send a cancel the + # underlying transport-channel may already be closed. + log.debug(f'Context {ctx} was already closed?') + + self._streams.remove(rchan) + + # @asynccontextmanager + # async def open_context( + # self, + # func: Callable, + # **kwargs, + # ) -> Context: + # # TODO + # elif resptype == 'context': # context manager style setup/teardown + # # TODO likely not here though + # raise NotImplementedError + @dataclass class LocalPortal: @@ -387,10 +355,7 @@ async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: """ obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) - if inspect.iscoroutinefunction(func): - return await func(**kwargs) - else: - return func(**kwargs) + return await func(**kwargs) @asynccontextmanager @@ -399,7 +364,7 @@ async def open_portal( nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, shield: bool = False, -) -> typing.AsyncGenerator[Portal, None]: +) -> AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f8528ace5..a07f60c78 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -98,17 +98,11 @@ async def exhaust_portal( """ try: log.debug(f"Waiting on final result from {actor.uid}") - final = res = await portal.result() - # if it's an async-gen then alert that we're cancelling it - if inspect.isasyncgen(res): - final = [] - log.warning( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") - final.append(item) + + # XXX: streams should never be reaped here since they should + # always be established and shutdown using a context manager api + final = await portal.result() + except (Exception, trio.MultiError) as err: # we reraise in the parent task via a ``trio.MultiError`` return err diff --git a/tractor/_state.py b/tractor/_state.py index 37fdafa7a..1d5e2f3c8 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -7,6 +7,9 @@ import trio +from ._exceptions import NoRuntime + + _current_actor: Optional['Actor'] = None # type: ignore # noqa _runtime_vars: Dict[str, Any] = { '_debug_mode': False, @@ -19,7 +22,7 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore # """Get the process-local actor instance. """ if _current_actor is None and err_on_no_runtime: - raise RuntimeError("No local actor has been initialized yet") + raise NoRuntime("No local actor has been initialized yet") return _current_actor diff --git a/tractor/_streaming.py b/tractor/_streaming.py index e683216d0..0836f4e2d 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,14 +1,17 @@ import inspect -from contextvars import ContextVar +from contextlib import contextmanager # , asynccontextmanager from dataclasses import dataclass -from typing import Any +from typing import Any, Iterator, Optional +import warnings import trio from ._ipc import Channel +from ._exceptions import unpack_error +from .log import get_logger -_context: ContextVar['Context'] = ContextVar('context') +log = get_logger(__name__) @dataclass(frozen=True) @@ -18,22 +21,76 @@ class Context: Allows maintaining task or protocol specific state between communicating actors. A unique context is created on the receiving end for every request to a remote actor. + + A context can be cancelled and (eventually) restarted from + either side of the underlying IPC channel. + + A context can be used to open task oriented message streams. + """ chan: Channel cid: str - cancel_scope: trio.CancelScope + + # only set on the caller side + _portal: Optional['Portal'] = None # type: ignore # noqa + + # only set on the callee side + _cancel_scope: Optional[trio.CancelScope] = None async def send_yield(self, data: Any) -> None: + + warnings.warn( + "`Context.send_yield()` is now deprecated. " + "Use ``MessageStream.send()``. ", + DeprecationWarning, + stacklevel=2, + ) await self.chan.send({'yield': data, 'cid': self.cid}) async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) + async def cancel(self) -> None: + """Cancel this inter-actor-task context. -def current_context(): - """Get the current task's context instance. - """ - return _context.get() + Request that the far side cancel it's current linked context, + timeout quickly to sidestep 2-generals... + + """ + assert self._portal, ( + "No portal found, this is likely a callee side context") + + cid = self.cid + with trio.move_on_after(0.5) as cs: + cs.shield = True + log.warning( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run_from_ns('self', '_cancel_task', cid=cid) + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + if not self._portal.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + + # async def restart(self) -> None: + # # TODO + # pass + + # @asynccontextmanager + # async def open_stream( + # self, + # ) -> AsyncContextManager: + # # TODO + # pass def stream(func): @@ -41,9 +98,163 @@ def stream(func): """ func._tractor_stream_function = True sig = inspect.signature(func) - if 'ctx' not in sig.parameters: + params = sig.parameters + if 'stream' not in params and 'ctx' in params: + warnings.warn( + "`@tractor.stream decorated funcs should now declare a `stream` " + " arg, `ctx` is now designated for use with @tractor.context", + DeprecationWarning, + stacklevel=2, + ) + + if ( + 'ctx' not in params and + 'to_trio' not in params and + 'stream' not in params + ): raise TypeError( "The first argument to the stream function " f"{func.__name__} must be `ctx: tractor.Context`" ) return func + + +class ReceiveMsgStream(trio.abc.ReceiveChannel): + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. + + Termination rules: + - if the local task signals stop iteration a cancel signal is + relayed to the remote task indicating to stop streaming + - if the remote task signals the end of a stream, raise a + ``StopAsyncIteration`` to terminate the local ``async for`` + + """ + def __init__( + self, + ctx: Context, + rx_chan: trio.abc.ReceiveChannel, + portal: 'Portal', # type: ignore # noqa + ) -> None: + self._ctx = ctx + self._rx_chan = rx_chan + self._portal = portal + self._shielded = False + + # delegate directly to underlying mem channel + def receive_nowait(self): + return self._rx_chan.receive_nowait() + + async def receive(self): + try: + msg = await self._rx_chan.receive() + return msg['yield'] + + except KeyError: + # internal error should never get here + assert msg.get('cid'), ("Received internal error at portal?") + + # TODO: handle 2 cases with 3.10 match syntax + # - 'stop' + # - 'error' + # possibly just handle msg['stop'] here! + + # TODO: test that shows stream raising an expected error!!! + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self._portal.channel) + + except (trio.ClosedResourceError, StopAsyncIteration): + # XXX: this indicates that a `stop` message was + # sent by the far side of the underlying channel. + # Currently this is triggered by calling ``.aclose()`` on + # the send side of the channel inside + # ``Actor._push_result()``, but maybe it should be put here? + # to avoid exposing the internal mem chan closing mechanism? + # in theory we could instead do some flushing of the channel + # if needed to ensure all consumers are complete before + # triggering closure too early? + + # Locally, we want to close this stream gracefully, by + # terminating any local consumers tasks deterministically. + # We **don't** want to be closing this send channel and not + # relaying a final value to remaining consumers who may not + # have been scheduled to receive it yet? + + # lots of testing to do here + + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + raise StopAsyncIteration + + except trio.Cancelled: + # relay cancels to the remote task + await self.aclose() + raise + + @contextmanager + def shield( + self + ) -> Iterator['ReceiveMsgStream']: # noqa + """Shield this stream's underlying channel such that a local consumer task + can be cancelled (and possibly restarted) using ``trio.Cancelled``. + + """ + self._shielded = True + yield self + self._shielded = False + + async def aclose(self): + """Cancel associated remote actor task and local memory channel + on close. + """ + rx_chan = self._rx_chan + + if rx_chan._closed: + log.warning(f"{self} is already closed") + return + + # stats = rx_chan.statistics() + # if stats.open_receive_channels > 1: + # # if we've been cloned don't kill the stream + # log.debug( + # "there are still consumers running keeping stream alive") + # return + + if self._shielded: + log.warning(f"{self} is shielded, portal channel being kept alive") + return + + # close the local mem chan + rx_chan.close() + + # cancel surrounding IPC context + await self._ctx.cancel() + + # TODO: but make it broadcasting to consumers + # def clone(self): + # """Clone this receive channel allowing for multi-task + # consumption from the same channel. + + # """ + # return ReceiveStream( + # self._cid, + # self._rx_chan.clone(), + # self._portal, + # ) + + +# class MsgStream(ReceiveMsgStream, trio.abc.Channel): +# """ +# Bidirectional message stream for use within an inter-actor actor +# ``Context```. + +# """ +# async def send( +# self, +# data: Any +# ) -> None: +# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 881556256..4674bd9a8 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -2,6 +2,7 @@ ``trio`` inspired apis and helpers """ from functools import partial +import inspect import multiprocessing as mp from typing import Tuple, List, Dict, Optional import typing @@ -136,6 +137,14 @@ async def run_in_actor( # use the run_in_actor nursery nursery=self._ria_nursery, ) + + # XXX: don't allow stream funcs + if not ( + inspect.iscoroutinefunction(fn) and + not getattr(fn, '_tractor_stream_function', False) + ): + raise TypeError(f'{fn} must be an async function!') + # this marks the actor to be cancelled after its portal result # is retreived, see logic in `open_nursery()` below. self._cancel_after_result_on_exit.add(portal) diff --git a/tractor/msg.py b/tractor/msg.py index 5b343b630..560e644c0 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -29,9 +29,13 @@ def get_topics(): return tuple(topics2ctxs.keys()) agen = pub_async_gen_func(get_topics=get_topics) + async with aclosing(agen) as pub_gen: + async for published in pub_gen: + ctx_payloads: Dict[str, Any] = {} + for topic, data in published.items(): log.debug(f"publishing {topic, data}") # build a new dict packet or invoke provided packetizer