Skip to content

Commit

Permalink
Refactor channel interfaces
Browse files Browse the repository at this point in the history
Make MemorySendChannel and MemoryReceiveChannel into public classes,
and move a number of the methods that used to be on the abstract
SendChannel/ReceiveChannel interfaces into the Memory*Channel concrete
classes. Also add a Channel type, analogous to Stream.

See: python-triogh-719

Still to do:

- Merge MemorySendChannel and MemoryReceiveChannel into a single
  MemoryChannel
- decide what to do about clone
- decide whether to add some kind of half-close on Channel
- refactor this PR's one-off solution to python-triogh-1092 into something more
  general.
  • Loading branch information
njsmith committed Jun 19, 2019
1 parent 1bdb085 commit 9980f3f
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 156 deletions.
32 changes: 20 additions & 12 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ Using channels to pass values between tasks
different tasks. They're particularly useful for implementing
producer/consumer patterns.

The channel API is defined by the abstract base classes
The core channel API is defined by the abstract base classes
:class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`.
You can use these to implement your own custom channels, that do
things like pass objects between processes or over the network. But in
Expand All @@ -1228,14 +1228,23 @@ inside a single process, and for that you can use
what you use when you're looking for a queue. The main difference
is that Trio splits the classic queue interface up into two
objects. The advantage of this is that it makes it possible to put
the two ends in different processes, and that we can close the two
sides separately.
the two ends in different processes without rewriting your code,
and that we can close the two sides separately.

`MemorySendChannel` and `MemoryReceiveChannel` also expose several
more features beyond the core channel interface:

.. autoclass:: MemorySendChannel
:members:

.. autoclass:: MemoryReceiveChannel
:members:


A simple channel example
++++++++++++++++++++++++

Here's a simple example of how to use channels:
Here's a simple example of how to use memory channels:

.. literalinclude:: reference-core/channels-simple.py

Expand Down Expand Up @@ -1347,14 +1356,13 @@ program above:
.. literalinclude:: reference-core/channels-mpmc-fixed.py
:emphasize-lines: 7, 9, 10, 12, 13

This example demonstrates using the :meth:`SendChannel.clone
<trio.abc.SendChannel.clone>` and :meth:`ReceiveChannel.clone
<trio.abc.ReceiveChannel.clone>` methods. What these do is create
copies of our endpoints, that act just like the original – except that
they can be closed independently. And the underlying channel is only
closed after *all* the clones have been closed. So this completely
solves our problem with shutdown, and if you run this program, you'll
see it print its six lines of output and then exits cleanly.
This example demonstrates using the `MemorySendChannel.clone` and
`MemoryReceiveChannel.clone` methods. What these do is create copies
of our endpoints, that act just like the original – except that they
can be closed independently. And the underlying channel is only closed
after *all* the clones have been closed. So this completely solves our
problem with shutdown, and if you run this program, you'll see it
print its six lines of output and then exits cleanly.

Notice a small trick we use: the code in ``main`` creates clone
objects to pass into all the child tasks, and then closes the original
Expand Down
17 changes: 13 additions & 4 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,19 @@ Abstract base classes
- :class:`~trio.SocketListener`, :class:`~trio.SSLListener`
* - :class:`SendChannel`
- :class:`AsyncResource`
- :meth:`~SendChannel.send`, :meth:`~SendChannel.send_nowait`
- :meth:`~SendChannel.send`
-
- :func:`~trio.open_memory_channel`
- `~trio.MemorySendChannel`
* - :class:`ReceiveChannel`
- :class:`AsyncResource`
- :meth:`~ReceiveChannel.receive`, :meth:`~ReceiveChannel.receive_nowait`
- :meth:`~ReceiveChannel.receive`
- ``__aiter__``, ``__anext__``
- :func:`~trio.open_memory_channel`
- `~trio.MemoryReceiveChannel`
* - `Channel`
- `SendChannel`, `ReceiveChannel`
-
-
-

.. autoclass:: trio.abc.AsyncResource
:members:
Expand Down Expand Up @@ -165,6 +170,10 @@ Abstract base classes
:members:
:show-inheritance:

.. autoclass:: trio.abc.Channel
:members:
:show-inheritance:

.. currentmodule:: trio


Expand Down
6 changes: 6 additions & 0 deletions newsfragments/719.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
We cleaned up the distinction between the "abstract channel interface"
and the "memory channel" concrete implementation.
`trio.abc.SendChannel` and `trio.abc.ReceiveChannel` have been slimmed
down, `trio.SendMemoryChannel` and `trio.ReceiveMemoryChannel` are now
public types that can be used in type hints, and there's a new
`trio.abc.Channel` interface for future bidirectional channels.
4 changes: 3 additions & 1 deletion trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@

from ._highlevel_generic import aclose_forcefully, StapledStream

from ._channel import open_memory_channel
from ._channel import (
open_memory_channel, MemorySendChannel, MemoryReceiveChannel
)

from ._signals import open_signal_receiver

Expand Down
138 changes: 28 additions & 110 deletions trio/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,18 @@ async def send_eof(self):
"""


# A regular invariant generic type
T = TypeVar("T")

# The type of object produced by a ReceiveChannel (covariant because
# ReceiveChannel[Derived] can be passed to someone expecting
# ReceiveChannel[Base])
T_co = TypeVar("T_co", covariant=True)
ReceiveType = TypeVar("ReceiveType", covariant=True)

# The type of object accepted by a SendChannel (contravariant because
# SendChannel[Base] can be passed to someone expecting
# SendChannel[Derived])
T_contra = TypeVar("T_contra", contravariant=True)
SendType = TypeVar("SendType", contravariant=True)

# The type of object produced by a Listener (covariant plus must be
# an AsyncResource)
Expand Down Expand Up @@ -537,39 +540,21 @@ async def accept(self):
"""


class SendChannel(AsyncResource, Generic[T_contra]):
class SendChannel(AsyncResource, Generic[SendType]):
"""A standard interface for sending Python objects to some receiver.
:class:`SendChannel` objects also implement the :class:`AsyncResource`
interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
or using an ``async with`` block.
`SendChannel` objects also implement the `AsyncResource` interface, so
they can be closed by calling `~AsyncResource.aclose` or using an ``async
with`` block.
If you want to send raw bytes rather than Python objects, see
:class:`ReceiveStream`.
`ReceiveStream`.
"""
__slots__ = ()

@abstractmethod
def send_nowait(self, value):
"""Attempt to send an object through the channel, without blocking.
Args:
value (object): The object to send.
Raises:
trio.WouldBlock: if the operation cannot be completed immediately
(for example, because the channel's internal buffer is full).
trio.BrokenResourceError: if something has gone wrong, and the
channel is broken. For example, you may get this if the receiver
has already been closed.
trio.ClosedResourceError: if you previously closed this
:class:`SendChannel` object.
"""

@abstractmethod
async def send(self, value):
async def send(self, value: SendType) -> None:
"""Attempt to send an object through the channel, blocking if necessary.
Args:
Expand All @@ -585,33 +570,8 @@ async def send(self, value):
"""

@abstractmethod
def clone(self):
"""Clone this send channel object.
This returns a new :class:`SendChannel` object, which acts as a
duplicate of the original: sending on the new object does exactly the
same thing as sending on the old object.
However, closing one of the objects does not close the other, and
receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have
been closed.
This is useful for communication patterns that involve multiple
producers all sending objects to the same destination. If you give
each producer its own clone of the :class:`SendChannel`, and then make
sure to close each :class:`SendChannel` when it's finished, receivers
will automatically get notified when all producers are finished. See
:ref:`channel-mpmc` for examples.
Raises:
trio.ClosedResourceError: if you already closed this
:class:`SendChannel` object.
"""


class ReceiveChannel(AsyncResource, Generic[T_co]):
class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
"""A standard interface for receiving Python objects from some sender.
You can iterate over a :class:`ReceiveChannel` using an ``async for``
Expand All @@ -621,45 +581,22 @@ class ReceiveChannel(AsyncResource, Generic[T_co]):
...
This is equivalent to calling :meth:`receive` repeatedly. The loop exits
without error when :meth:`receive` raises :exc:`~trio.EndOfChannel`.
without error when `receive` raises `~trio.EndOfChannel`.
:class:`ReceiveChannel` objects also implement the :class:`AsyncResource`
interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
or using an ``async with`` block.
`ReceiveChannel` objects also implement the `AsyncResource` interface, so
they can be closed by calling `~AsyncResource.aclose` or using an ``async
with`` block.
If you want to receive raw bytes rather than Python objects, see
:class:`ReceiveStream`.
`ReceiveStream`.
"""
__slots__ = ()

@abstractmethod
def receive_nowait(self):
"""Attempt to receive an incoming object, without blocking.
Returns:
object: Whatever object was received.
Raises:
trio.WouldBlock: if the operation cannot be completed immediately
(for example, because no object has been sent yet).
trio.EndOfChannel: if the sender has been closed cleanly, and no
more objects are coming. This is not an error condition.
trio.ClosedResourceError: if you previously closed this
:class:`ReceiveChannel` object.
trio.BrokenResourceError: if something has gone wrong, and the
channel is broken.
"""

@abstractmethod
async def receive(self):
async def receive(self) -> ReceiveType:
"""Attempt to receive an incoming object, blocking if necessary.
It's legal for multiple tasks to call :meth:`receive` at the same
time. If this happens, then one task receives the first value sent,
another task receives the next value sent, and so on.
Returns:
object: Whatever object was received.
Expand All @@ -673,40 +610,21 @@ async def receive(self):
"""

@abstractmethod
def clone(self):
"""Clone this receive channel object.
This returns a new :class:`ReceiveChannel` object, which acts as a
duplicate of the original: receiving on the new object does exactly
the same thing as receiving on the old object.
However, closing one of the objects does not close the other, and the
underlying channel is not closed until all clones are closed.
This is useful for communication patterns involving multiple consumers
all receiving objects from the same underlying channel. See
:ref:`channel-mpmc` for examples.
.. warning:: The clones all share the same underlying channel.
Whenever a clone :meth:`receive`\\s a value, it is removed from the
channel and the other clones do *not* receive that value. If you
want to send multiple copies of the same stream of values to
multiple destinations, like :func:`itertools.tee`, then you need to
find some other solution; this method does *not* do that.
Raises:
trio.ClosedResourceError: if you already closed this
:class:`SendChannel` object.
"""

@aiter_compat
def __aiter__(self):
return self

async def __anext__(self):
async def __anext__(self) -> ReceiveType:
try:
return await self.receive()
except trio.EndOfChannel:
raise StopAsyncIteration


class Channel(SendChannel[T], ReceiveChannel[T]):
"""A standard interface for interacting with bidirectional channels.
A `Channel` is an object that implements both the `SendChannel` and
`ReceiveChannel` interfaces, so you can both send and receive objects.
"""
Loading

0 comments on commit 9980f3f

Please sign in to comment.