Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream contexts #206

Merged
merged 14 commits into from
May 4, 2021
Merged

Stream contexts #206

merged 14 commits into from
May 4, 2021

Conversation

goodboy
Copy link
Owner

@goodboy goodboy commented Apr 28, 2021

This is a breaking change to remove support for streaming using Portal.run() calls and instead replace it with a new @asynccontextmanager api Portal.open_stream_from() as stream:.

The main motivation for this is that it avoids leaving users to do there own manual stream shutdown management (normally ala async_generator.aslosing() or the new aclosing() coming in the stdlib) and promotes more stringent SC style in streaming clients. That latter part is important as we move shortly to supporting an api for #53 which will require explicit cross-actor-context management.

The basic gist should be clear from the changed tests.
Once this is merged it will break old code.

@goodboy goodboy added enhancement New feature or request api labels Apr 28, 2021
Move receive stream into streaming modules and rebrand as a "message
stream".  Factor out cancellation mechanics in `.aclose()` into the
`Context` type which will soon provide the api for for cancelling portal
invocations.  Comment-stage a few methods on both types in anticipation
of a new bi-directional streaming api.  Add a `MsgStream` bidirectional
channel type which will be the eventual type yielded from
`Context.open_stream()`.  Adjust the response/dialog types to be the set
`{'asyncfun', 'asyncgen', 'context'}`. OH, and add async func checking
in `Portal.run()` to catch and error on sync funcs early.
NB: this is a breaking change removing support for `Portal.run()` being
able to invoke remote streaming functions and instead replacing the
method call with an async context manager api `Portal.open_stream_from()`
This style explicitly defines stream teardown at the call site instead
of expecting the user to handle tricky things correctly themselves: eg.
`async_geneartor.aclosing()`. Going forward `Portal.run()` can be used
only for invoking async functions.
Copy link
Collaborator

@guilledk guilledk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New API looking sexy! 🎉

Copy link
Collaborator

@ryanhiebert ryanhiebert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a great motivation. I'm not good for a deep review, but nice!

if not expect:
print("all values streamed, BREAKING")
break
# TODO: this is justification for a
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh so one question is if we want to move forward with a ActorNursery.run_in_actor() equivalent for streams. Pretty sure we go over this in #172.

raise TypeError(
"The first argument to the stream function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func


class ReceiveMsgStream(trio.abc.ReceiveChannel):
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stole this name from @njsmith 🤫

self._ctx = ctx
self._rx_chan = rx_chan
self._portal = portal
# self._chan = portal.channel
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean this?

try:
msg = await self._rx_chan.receive()
return msg['yield']
# return msg['yield']
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this.

Allows for invoking remote routines and receiving results through an
underlying ``tractor.Channel`` as though the remote (async)
function / generator was invoked locally.
A portal is "opened" (and eventually closed) by one side of an
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this explanation is more acceptable 🏄🏼

)

@asynccontextmanager
async def open_stream_from(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the meat of the new API.

portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

async with (
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, yeah so turns out this style (tuple after the with) is not allowed pre-3.9 😢; though oddly doesn't seem documented anywhere?

It's def failing in ci where you can see there's no syntax error in 3.9.

So we'll have to delay this I guess until we do a 3.9+ pin.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a wish list: #207

Turns out can't use the nicer syntax before python 3.9 (even though it
doesn't seem documented anywhere?).

Relates to #207
@goodboy goodboy mentioned this pull request Apr 28, 2021
8 tasks
@goodboy
Copy link
Owner Author

goodboy commented Apr 28, 2021

Oh right, docs heh.

I guess that's a thing if there are any not already in the examples?

@goodboy goodboy merged commit af93b85 into master May 4, 2021
@goodboy goodboy deleted the stream_contexts branch May 4, 2021 14:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants