diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 514b5072c3..0aab11dd46 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1433,6 +1433,17 @@ deadlock. Using an unbounded channel avoids this, because it means that :meth:`~trio.abc.SendChannel.send` never blocks. +Higher-level synchronization primitives +--------------------------------------- + +While events and channels are useful in a very wide range of +applications, some less common problems are best tackled with some +higher-level concurrency primitives that focus on a specific problem. + +.. autoclass:: EventStream + :members: + + Lower-level synchronization primitives ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/trio/__init__.py b/trio/__init__.py index d66ffceea9..051a8ccf0d 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -52,6 +52,7 @@ Lock, StrictFIFOLock, Condition, + EventStream, ) from ._highlevel_generic import aclose_forcefully, StapledStream diff --git a/trio/_sync.py b/trio/_sync.py index bf5f5d5e0b..5a95dc366c 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -784,3 +784,97 @@ def statistics(self): return _ConditionStatistics( tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics() ) + + +@attr.s +class EventStream(metaclass=Final): + """A concurrency primitive for a sequence of events. + + Multiple tasks can subscribe for events on the stream using an ``async + for`` loop:: + + events = EventStream() + + ... + + async for _ in events.subscribe(): + ... + + On each loop iteration, a subcriber will be blocked if there are no new + events on the stream. An event can be "fired" on a stream, which causes + subscribers to awake:: + + events.fire() + + By default, events are coalesced, but will never be lost. That is, if any + events are fired while a subscriber is processing its last wakeup, that + subscriber will not block on the next loop iteration. + + Note that EventStream does not hold any data items associated with events. + However subscribe() does yield integer indices that indicate a position + in the event stream, which could be used. fire() returns the index of the + event added to the stream. + + """ + _write_cursor = attr.ib(default=-1) + _wakeup = attr.ib(default=None) + _closed = attr.ib(default=False) + + def close(self): + """Close the stream. + + This causes all subscribers to terminate once they have consumed + all events. + """ + self._closed = True + self._wake() + + def _wake(self): + """Wake blocked tasks.""" + if self._wakeup is not None: + self._wakeup.set() + self._wakeup = None + + def fire(self): + """Fire an event on the stream.""" + if self._closed: + raise RuntimeError( + "Cannot fire an event on a closed event stream." + ) + self._write_cursor += 1 + self._wake() + return self._write_cursor + + async def _wait(self): + """Wait for the next wakeup. + + We lazily create the Event object to block on if one does not yet + exist; this avoids creating event objects that are never awaited. + + """ + if self._wakeup is None: + self._wakeup = trio.Event() + await self._wakeup.wait() + + async def subscribe(self, from_start=False, coalesce=True): + """Subscribe for events on the stream. + + If from_start is True, then subscribe for events from the start of + the stream. + + If coalesce is True, then each iteration 'consumes' all previous + events; otherwise, each iteration consumes just one event. + """ + read_cursor = -1 if from_start else self._write_cursor + while True: + if self._write_cursor > read_cursor: + if coalesce: + read_cursor = self._write_cursor + else: + read_cursor += 1 + yield read_cursor + else: + if self._closed: + return + else: + await self._wait() diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index 229dea301c..e641af0f99 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -6,7 +6,7 @@ from .. import _core from .. import _timeouts -from .._timeouts import sleep_forever, move_on_after +from .._timeouts import sleep_forever, move_on_after, sleep from .._sync import * @@ -568,3 +568,126 @@ async def lock_taker(): await wait_all_tasks_blocked() assert record == ["started"] lock_like.release() + + +async def test_EventStream_basics(): + p = EventStream() + + wakeups = 0 + + async def background(): + nonlocal wakeups + async for i in p.subscribe(): + wakeups += 1 + + async with _core.open_nursery() as nursery: + nursery.start_soon(background) + + # The event stream starts in a blocked state (no event fired) + await wait_all_tasks_blocked() + assert wakeups == 0 + + # Calling fire() lets it run: + p.fire() + await wait_all_tasks_blocked() + assert wakeups == 1 + + # Multiple events are coalesced into one: + p.fire() + p.fire() + p.fire() + await wait_all_tasks_blocked() + assert wakeups == 2 + + p.close() + + +async def test_EventStream_while_task_is_elsewhere(autojump_clock): + p = EventStream() + + wakeups = 0 + + async def background(): + nonlocal wakeups + async for _ in p.subscribe(): + wakeups += 1 + await sleep(10) + + async with _core.open_nursery() as nursery: + nursery.start_soon(background) + + # Double-check that it's all idle and settled waiting for a event + await sleep(5) + assert wakeups == 0 + await sleep(10) + assert wakeups == 0 + + # Wake it up + p.fire() + + # Now it's sitting in sleep()... + await sleep(5) + assert wakeups == 1 + + # ...when another event arrives. + p.fire() + + # It still wakes up though + await sleep(10) + assert wakeups == 2 + + p.close() + + +async def test_EventStream_subscribe_independence(autojump_clock): + p = EventStream() + + wakeups = [0, 0] + + async def background(i, sleep_time): + nonlocal wakeups + async for _ in p.subscribe(): + wakeups[i] += 1 + await sleep(sleep_time) + + try: + async with _core.open_nursery() as nursery: + nursery.start_soon(background, 0, 10) + nursery.start_soon(background, 1, 100) + + # Initially blocked, no event fired + await sleep(200) + assert wakeups == [0, 0] + + # Firing an event wakes both tasks + p.fire() + await sleep(5) + assert wakeups == [1, 1] + + # Now + # task 0 is sleeping for 5 more seconds + # task 1 is sleeping for 95 more seconds + + # Fire events at a 10s interval; task 0 will wake up for each + # task 1 will only wake up after its sleep + p.fire() + await sleep(10) + p.fire() + assert wakeups == [2, 1] + await sleep(100) + assert wakeups == [3, 2] + + # Now task 0 is blocked on the next event + # Task 1 is sleeping for 100s + + p.fire() + await sleep(1) + assert wakeups == [4, 2] + await sleep(100) + assert wakeups == [4, 3] + + p.close() + except: + import traceback + traceback.print_exc() + raise