Skip to content

Commit

Permalink
Merge pull request #206 from goodboy/stream_contexts
Browse files Browse the repository at this point in the history
Explicit stream contexts
  • Loading branch information
goodboy authored May 4, 2021
2 parents a5a88e2 + fc36e73 commit af93b85
Show file tree
Hide file tree
Showing 19 changed files with 628 additions and 405 deletions.
5 changes: 3 additions & 2 deletions examples/asynchronous_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ async def main():

# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(stream_forever):
print(letter)
async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter)

# we support trio's cancellation system
assert cancel_scope.cancelled_caught
Expand Down
4 changes: 2 additions & 2 deletions examples/debugging/multi_daemon_subactors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async def main():
p1 = await n.start_actor('name_error', enable_modules=[__name__])

# retreive results
stream = await p0.run(breakpoint_forever)
await p1.run(name_error)
async with p0.open_stream_from(breakpoint_forever) as stream:
await p1.run(name_error)


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion examples/debugging/subactor_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ async def main():


if __name__ == '__main__':
tractor.run(main, debug_mode=True)
tractor.run(main, debug_mode=True, loglevel='debug')
35 changes: 22 additions & 13 deletions examples/full_fledged_streaming_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ async def aggregate(seed):
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
rpc_module_paths=[__name__],
enable_modules=[__name__],
)

portals.append(portal)

send_chan, recv_chan = trio.open_memory_channel(500)

async def push_to_chan(portal, send_chan):

# TODO: https://github.com/goodboy/tractor/issues/207
async with send_chan:
async for value in await portal.run(stream_data, seed=seed):
# leverage trio's built-in backpressure
await send_chan.send(value)
async with portal.open_stream_from(stream_data, seed=seed) as stream:
async for value in stream:
# leverage trio's built-in backpressure
await send_chan.send(value)

print(f"FINISHED ITERATING {portal.channel.uid}")

Expand Down Expand Up @@ -71,18 +74,24 @@ async def main():
import time
pre_start = time.time()

portal = await nursery.run_in_actor(
aggregate,
portal = await nursery.start_actor(
name='aggregator',
seed=seed,
enable_modules=[__name__],
)

start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in await portal.result():
result_stream.append(value)
async with portal.open_stream_from(
aggregate,
seed=seed,
) as stream:

start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in stream:
result_stream.append(value)

await portal.cancel_actor()

print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
Expand Down
11 changes: 6 additions & 5 deletions examples/multiple_streams_one_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ async def stream_data(seed=10):

async def stream_from_portal(p, consumed):

async for item in await p.run(stream_data):
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)
async with p.open_stream_from(stream_data) as stream:
async for item in stream:
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)


async def main():
Expand Down
16 changes: 9 additions & 7 deletions tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery() as nursery:

portal = await nursery.run_in_actor(assert_err, name='errorer', **args)
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)

# get result(s) from main task
try:
Expand Down Expand Up @@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
rpc_module_paths=[__name__],
enable_modules=[__name__],
)

# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(stream_forever):
print(letter)
async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter)

# we support trio's cancellation system
assert cancel_scope.cancelled_caught
Expand Down Expand Up @@ -430,15 +433,14 @@ async def main():
tractor.run(main)



async def spin_for(period=3):
"Sync sleep."
time.sleep(period)


async def spawn():
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
await tn.run_in_actor(
spin_for,
name='sleeper',
)
Expand All @@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
await tn.run_in_actor(
spawn,
name='spawn',
)
Expand Down
99 changes: 52 additions & 47 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0):


async def stream_from(portal):
async for value in await portal.result():
print(value)
async with portal.open_stream_from(stream_forever) as stream:
async for value in stream:
print(value)


async def spawn_and_check_registry(
Expand Down Expand Up @@ -139,18 +140,20 @@ async def get_reg():
registry = await get_reg()
assert actor.uid in registry

if with_streaming:
to_run = stream_forever
else:
to_run = trio.sleep_forever
try:
async with tractor.open_nursery() as n:
async with trio.open_nursery() as trion:

async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {}
for i in range(3):
name = f'a{i}'
portals[name] = await n.run_in_actor(to_run, name=name)
if with_streaming:
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])

else: # no streaming
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)

# wait on last actor to come up
async with tractor.wait_for_actor(name):
Expand All @@ -171,19 +174,19 @@ async def get_reg():
trion.start_soon(cancel, use_signal, 1)

last_p = pts[-1]
async for value in await last_p.result():
print(value)
await stream_from(last_p)

else:
await cancel(use_signal)

finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)

# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry
# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry


@pytest.mark.parametrize('use_signal', [False, True])
Expand Down Expand Up @@ -260,36 +263,38 @@ async def close_chans_before_nursery(
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')

async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor(
stream_forever,
name='consumer1',
)
agen1 = await portal1.result()

portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
agen2 = await portal2.run(stream_forever)

async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(stream_forever) as agen1:
async with portal2.open_stream_from(
stream_forever
) as agen2:
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(.5)
await trio.sleep(1)

# all subactors should have de-registered
registry = await get_reg()
Expand Down
Loading

0 comments on commit af93b85

Please sign in to comment.