Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channels, a replacement for Queues #586

Merged
merged 23 commits into from
Oct 5, 2018
Merged

Conversation

njsmith
Copy link
Member

@njsmith njsmith commented Jul 30, 2018

(not ready to merge, but posting it now for discussion)

This is a possible answer to #497. Essentially, it rethinks how Queue
might work, based on Trio idioms instead of whatever someone stuck
into the stdlib 15 years ago. The main difference is that you call
open_channel and get back a put_channel and a get_channel, which
act like the two ends of a stream, and have their own closure states.
Also, you can clone() an endpoint to extend the closure tracking to
fan-in/fan-out scenarios. If you don't care about any of this, you can
use it exactly like our current Queue, except that you pass around
the put and get ends seperately. But it also allows for fancier things
like this fan-in example:

import trio

async def producer(put_channel):
    # We close our handle when we're done with it
    with put_channel:
        for i in range(3):
            await put_channel.put(i)

async def main():
    put_channel, get_channel = trio.open_channel(0)
    async with trio.open_nursery() as nursery:
        # We hand out clones to all the new producers, and then close the
        # original.
        with put_channel:
            for _ in range(10):
                nursery.start_soon(producer, put_channel.clone())
        # Prints the numbers [0, 1, 2], ten times each, in some order, and
        # then exits.
        async for value in get_channel:
            print(value)

trio.run(main)

Decisions in this first draft (not necessarily good ones):

  • Semantics generally modelled after SendStream/ReceiveStream
    • When put handle is closed:
      • pending and future put() calls on the same handle immediately
        raise ClosedResourceError
      • if this was the last put handle:
        • further calls to get continue to drain any queued data
        • and then eventually start raising EndOfChannel
        • __anext__ raises StopAsyncIteration instead of EndOfChannel
    • When get handle is closed:
      • pending and future get() calls on the same handle immediately
        raise ClosedResourceError
      • if this was the last get handle
        • pending and future puts immediately raise BrokenChannelError
  • Our handles do not automatically call close() on __del__
    • don't want people to accidentally depend on GC
    • issuing ResourceWarnings would be annoying for users who don't
      care about the close functionality
  • closing all get handles while there is still data queued up silently
    discards the data -- but maybe it should cause the last
    get_channel.close() to raise BrokenChannelError instead?

This "always_abort=" thing is kind of half-baked and needs more
review... there are more things that mess with _abort_func than I
realized, and I kind of hacked at them until they worked. For example,
it's kind of weird and non-obvious that if you use always_abort=True,
then

@codecov
Copy link

codecov bot commented Jul 30, 2018

Codecov Report

Merging #586 into master will increase coverage by 15.34%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master     #586       +/-   ##
===========================================
+ Coverage   83.97%   99.31%   +15.34%     
===========================================
  Files          94       96        +2     
  Lines       13001    11464     -1537     
  Branches      783      820       +37     
===========================================
+ Hits        10918    11386      +468     
+ Misses       2061       58     -2003     
+ Partials       22       20        -2
Impacted Files Coverage Δ
trio/_core/__init__.py 100% <ø> (ø) ⬆️
trio/tests/test_highlevel_serve_listeners.py 100% <100%> (+8.57%) ⬆️
trio/_sync.py 100% <100%> (+3.87%) ⬆️
trio/_core/tests/test_windows.py 100% <100%> (+6.25%) ⬆️
trio/_core/_unbounded_queue.py 100% <100%> (+2%) ⬆️
trio/tests/test_channel.py 100% <100%> (ø)
trio/__init__.py 100% <100%> (+54.92%) ⬆️
trio/tests/test_sync.py 100% <100%> (+6.07%) ⬆️
trio/_abc.py 100% <100%> (+10.9%) ⬆️
trio/_core/_exceptions.py 100% <100%> (+9.52%) ⬆️
... and 73 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 16b6447...d424aa9. Read the comment docs.

@njsmith
Copy link
Member Author

njsmith commented Jul 30, 2018

Two things I'm having doubts about:

  • A chunk of the complexity here – and in particular, the whole motivation for the always_abort thing – is that we keep track of which pending calls came in through which cloned handle. An alternative would be to not do this, so that if you're blocked in put, and all put handles are closed, then you get woken up, but if just your put handle is closed, you don't notice. (If you don't use clone, the two options are identical.)

  • The fan-in example to me is a pretty compelling argument for supporting PutChannel.clone or something like it. The case for GetChannel.clone and GetChannel.close are much weaker. I'm not sure if supporting them is actually useful.

@njsmith
Copy link
Member Author

njsmith commented Aug 12, 2018

I'm learning against supporting GetChannel.clone for now... in addition to being poorly motivated, there's a possible source of confusion: if I have two clones of the same GetChannel, and put one item into the channel, then does it get sent to both clones (fork) or just one of them (round-robin)?

I'm also being tempted to make GetChannel.close raise an error if it's closed while there are still pending items, and GetChannel.__del__ issue a warning in the same case. Maybe if we do this then we'd let people do get_channel.close(ignore_pending=True) when they want to intentionally throw away pending data and don't want to bother putting a try/except around things.

I'm also wondering if we should try making this API start out relatively minimal. I don't really want to add qsize and full and all that stuff that Queue has, it's mostly useless and encourages bad habits. I'm tempted to even remove put_nowait and get_nowait until someone has a use case.

Still not sure what to do about the always_abort thing. I don't like the way it works here. Some options:

  • Move the flag to reschedule instead of wait_task_rescheduled?
  • Give up on tracking the relationship between calls to put and PutHandle objects
  • Hold our noses and put up with the hacky version I started with, where the PutHandle._tasks set could be slightly out-of-sync with reality and PutHandle.close had to catch this and work around it.

And finally I am wondering about how to roll this out. The idea would be to eventually replace trio.Queue, which is heavily used. There are a lot of subtle design questions here (as you can see above), so it makes me nervous that this PR hasn't attracted any feedback yet :-). Maybe the way to do it is to have a release where we advertise channels as provisional, without yet deprecating trio.Queue, and then hassle folks to try it out and give feedback? (And then if it passes, the next release could deprecate trio.Queue in favor of channels.)

@smurfix
Copy link
Contributor

smurfix commented Aug 12, 2018

put_nowait is very useful when called from an asyncio callback.
get_nowait also has its use; during connection shutdown one might want to drain the queue and log the pending messages without introducing cancel points.

@njsmith
Copy link
Member Author

njsmith commented Aug 12, 2018

put_nowait is very useful when called from an asyncio callback.

Do you mean the queue that trio-asyncio uses to send requests back to the main asyncio loop dispatcher? I'm not 100% sure if that's in scope or not, since it really should be using an unbounded queue... but then I've also been wondering if we should go ahead and allow for unbounded channels, e.g. by explicitly passing capacity=inf. (The example that's got me thinking this: consider a web crawler that keeps a queue of URLs to crawl. Each worker takes a URL off the queue, fetches it, parses the page, and pushes the resulting URLs back onto the queue to get to later. We put a limit on how many workers run at once. That's enough to produce deadlocks with any finite queue capacity, if the queue fills up so no workers can push new URLs onto it, so they stop pulling items off the queue...) So fair enough.

get_nowait also has its use; during connection shutdown one might want to drain the queue and log the pending messages without introducing cancel points.

Yeah, mayyybe... this kind of consideration is why I'm more likely to keep get_nowait than full or task_done or whatever. But I'm not convinced yet that what you said is something that really happens – like if you're doing a graceful shutdown, you want to signal that to the producer and then let it finish draining the queue until it gets EndOfQueue. "All the items that are currently in the queue" is not really a reliable concept because of race conditions. And if you've decided to cancel everything and abandon the queue without waiting for EndOfQueue, then there's not much point in worrying about this.

I guess get_nowait is potentially useful for trio-asyncio too, if you want to make sure you do one tick of the asyncio loop per tick of the trio loop...

@smurfix
Copy link
Contributor

smurfix commented Aug 12, 2018

Do you mean the queue that trio-asyncio uses to send requests back to the main asyncio loop dispatcher?

No. Just your garden variety asyncio receive-data callback when you're hooking into a transport+protocol.

That send-back queue is a prime candidate for capacity=inf though.

@njsmith
Copy link
Member Author

njsmith commented Aug 18, 2018

Some discussion of what the always_abort thing is trying to do here: https://gitter.im/python-trio/general?at=5b77863e802bc42c5f356669

Interesting realization: the fundamental problem is that when there are multiple ways that a task can wake back up, it can be difficult for one path to know how to clean up the state used by the other paths. In this case, put_handle.put can be woken by either get_handle.get or put_handle.close. Other examples where I've brushed against this are similar: Condition.wait / ParkingLot.repark involves a task that goes to sleep on one queue and then later gets moved to another, so it's challenging for its abort_fn to keep track of which queue it should be aborted from. #242 discusses the case where we sleep on an arbitrary collection of different events, and then of course we need something clever indeed to keep track of all the different wakeup paths.

I don't think always_abort is really working. In that discussion though I realized a much simpler hack that would work for both this case, and for ParkingLot.repark, is to add a bit of state attached to the task object that the sleeper and waker are free to use to pass data back and forth. Maybe task.sleep_state, which as far as the trio core is concerned is meaningless except that it gets reset to None every time a task is rescheduled. Of course this is also potentially an argument for a more structured solution like #242, but we don't need to solve all that right now...

njsmith added a commit to njsmith/trio that referenced this pull request Aug 21, 2018
@njsmith
Copy link
Member Author

njsmith commented Aug 22, 2018

Okay, gave up on always_abort, rebased, and redid the wakeup stuff based on #616. This is much simpler.

Also removed GetChannel.clone, though the infrastructure to support it is still there. Should refactor to simplify this.

Added some initial tests, including a version of the fan-in example at the top of this thread. Weirdly enough, they seem be passing.

Still need a bunch more tests, docs, statistics and repr.

@mehaase
Copy link

mehaase commented Sep 18, 2018

There are a lot of subtle design questions here (as you can see above), so it makes me nervous that this PR hasn't attracted any feedback yet :-).

I just experimented with the channels implementation (from commit abca8bd) as a closeable message queue for python-trio/trio-websocket#11. My use case is pretty simple, and this API is perfect for it. (I'm afraid I don't really understand the always_abort, cloning, or other design discussions.)

@njsmith
Copy link
Member Author

njsmith commented Sep 28, 2018

I played around a bit with what it would take to implement my prototype channel interfaces across a process boundary, on top of a Stream. It's actually very simple (e.g. send(obj) pickles the object, adds some framing, and sends it down the pipe, receive() reverses that; it's even quite easy to support clone()).

This is much simpler if we standardize on BrokenResourceError type instead of having separate types BrokenStreamError + BrokenChannelError (#620).

The places where the APIs don't quite line up are:

  • We can't reasonably implement nowait methods on top of a generic Stream which might have internal buffering etc. But this doesn't seem like a big deal; either the IPC channel just doesn't have those methods, or we can implement versions that just raise WouldBlock.

  • Making send atomic-wrt-cancellation: this is natural for in-memory channels (either send completes or it doesn't). For inter-process channels, it's much trickier, because the cancellation might arrive at a moment when you're only half-way through transferring the object through the pipe, and now what? Basically you don't have any option other than to abort the send unfinished, and now your pipe has lost synchronization and is unusable. (I guess if you can afford a background task to pump the pipe, then you could buffer one object, and make it atomic-wrt-cancellation again. But that's a pretty substantial API change. OTOH I guess if you have a full-fledged multiprocessing-esque library with process pools and cleverness to let you pass channel endpoints across processes and all that, then it might be fine.)

  • For an in-process channel, close is naturally synchronous. For an inter-process channel, it has to be async.

The first two don't seem like dealbreakers, or things we necessarily need to figure out right now. But that's what makes the last one tricky :-).

We could make close synchronous for now, and then if later we decide to make abstract trio.abc.{Send,Receive}Channel interfaces we could add an aclose method too at that point. The trio.testing in-process Stream classes have a synchronous close method in addition to the async aclose that's required by the Stream interface; you just use aclose in generic code and close in code that knows it has a concrete class.

Or, if our guess is that inter-process channels will be important, then we could make it aclose from the start, even for in-memory streams. Again, if we decide later that this was a mistake, we can add a synchronous close method. Or even deprecate aclose, if we do it soon enough... I guess if we're wrong about inter-process channels being important, then we'll know that before 1.0.

Any thoughts? As a user, would it annoy you to have to do await channel.aclose() when you know you're working with an in-process synchronization primitive whose close method could just as well be synchronous, and is just inserting a superfluous checkpoint? Would you value being able to take code written to use channels within a process and switch it to speaking across processes with the same API?

Discovered while writing the docs that not having it is really
confusing and hard to justify.
@njsmith njsmith changed the title [rfc, wip] Channels, a potential replacement for Queues Channels, a replacement for Queues Oct 4, 2018
@njsmith
Copy link
Member Author

njsmith commented Oct 4, 2018

Done, maybe

I think the code, tests, docs, and newsfragments are all ready here. Anyone want to give reviewing it a try? The diff is large, but even just reading through the new docs and docstrings would be helpful!

To summarize:

  • New abstract base classes trio.abc.SendChannel and trio.abc.ReceiveChannel. The former has send and send_nowait methods; the latter has receive and receive_nowait and __aiter__. They both have clone and aclose.

  • New concrete constructor trio.open_memory_channel(max_buffer_size), which returns a (SendChannel, ReceiveChannel) pair.

  • Detailed examples in the docs to illustrate why closing is useful, why cloning is useful, and how buffering works.

  • Deprecated trio.Queue and trio.hazmat.UnboundedQueue.

  • I have a list of possible follow-up tweaks to think about, currently stashed in a comment at the top of trio/_channel.py. These should move into a new follow-up issue before merging.

@Fuyukai
Copy link
Member

Fuyukai commented Oct 4, 2018

From the code it seems clear that sending does a round-robin send to all the tasks, but the docs don't seem to specify this. Seems like it would be useful to include.

Copy link
Member

@oremanj oremanj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great; just some suggestions for documentation cleanup.

docs/source/reference-core.rst Outdated Show resolved Hide resolved
docs/source/reference-core.rst Show resolved Hide resolved
trio/_channel.py Outdated
# underlying stream when all SendChannels are closed.

# to think about later:
# - max_buffer_size=0 default?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd support this - it seems to go hand-in-hand with the docs saying "if in doubt, use 0".

trio/_core/_unbounded_queue.py Outdated Show resolved Hide resolved
trio/_sync.py Outdated Show resolved Hide resolved
trio/tests/test_channel.py Show resolved Hide resolved
docs/source/reference-core.rst Outdated Show resolved Hide resolved
docs/source/reference-core.rst Outdated Show resolved Hide resolved
docs/source/reference-core.rst Outdated Show resolved Hide resolved
docs/source/reference-core.rst Outdated Show resolved Hide resolved
newsfragments/497.feature.rst Outdated Show resolved Hide resolved
trio/_channel.py Outdated Show resolved Hide resolved
trio/_abc.py Outdated Show resolved Hide resolved
trio/_abc.py Outdated Show resolved Hide resolved
trio/tests/test_channel.py Outdated Show resolved Hide resolved
@njsmith
Copy link
Member Author

njsmith commented Oct 5, 2018

Thanks everyone for the detailed comments, you caught a ton of stuff I missed :-).

I think they've all been addressed now.

@Fuyukai I tried to clarify this in a few places, but not quite sure how to do it best... would you mind taking another look, and if you think it could be clearer let me know where you expected to see it?

@pquentin
Copy link
Member

pquentin commented Oct 5, 2018

Do you want to move the follow-up tweaks into a new issue before I hit merge?

@njsmith njsmith mentioned this pull request Oct 5, 2018
@njsmith
Copy link
Member Author

njsmith commented Oct 5, 2018

Do you want to move the follow-up tweaks into a new issue before I hit merge?

Done (see #719), plus a few more tweaks I noticed.

I'd still like to hear @Fuyukai (or anyone's :-)) thoughts on whether/how to make it clear that receivers alternate rather than each getting their own copy of incoming objects, but I suppose that can always be handled in a follow-up PR if necessary.

@pquentin
Copy link
Member

pquentin commented Oct 5, 2018

I think that the new version is clear enough because you 1/ switched from fan-in/fan-out to producers/consumers and 2/ called this out explicitly in the docs.

But yeah, I think further improvements can go in another PR.

@pquentin
Copy link
Member

pquentin commented Oct 5, 2018

Okay, let's do this! ✨

@pquentin pquentin merged commit a1d2dbd into python-trio:master Oct 5, 2018
@njsmith njsmith deleted the channels branch October 5, 2018 10:09
@njsmith
Copy link
Member Author

njsmith commented Oct 5, 2018 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants