diff --git a/asyncio/core.py b/asyncio/core.py index a98cf88..e31c1ce 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -69,6 +69,7 @@ def __next__(self): self.exc.__traceback__ = None raise self.exc + # Pause task execution for the given time (integer in milliseconds, uPy extension) # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): @@ -265,6 +266,11 @@ def run_until_complete(main_task=None): if t.state is True: # "None" indicates that the task is complete and not await'ed on (yet). t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False + waiting = True else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): diff --git a/asyncio/event.py b/asyncio/event.py index 164a269..a402d26 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -25,9 +25,7 @@ class Event: def __init__(self): self.state = False # False=unset; True=set - self.waiting = ( - core.TaskQueue() - ) # Queue of Tasks waiting on completion of this event + self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event def is_set(self): """Returns ``True`` if the event is set, ``False`` otherwise.""" @@ -90,6 +88,5 @@ async def wait(self): yield core._io_queue.queue_read(self) self._flag = 0 - except ImportError: pass diff --git a/asyncio/funcs.py b/asyncio/funcs.py index 8718118..baf804d 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: MIT # # MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George +# MIT license; Copyright (c) 2019-2022 Damien P. George # # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use @@ -19,6 +19,22 @@ from . import core +async def _run(waiter, aw): + try: + result = await aw + status = True + except BaseException as er: + result = None + status = er + if waiter.data is None: + # The waiter is still waiting, cancel it. + if waiter.cancel(): + # Waiter was cancelled by us, change its CancelledError to an instance of + # CancelledError that contains the status and result of waiting on aw. + # If the wait_for task subsequently gets cancelled externally then this + # instance will be reset to a CancelledError instance without arguments. + waiter.data = core.CancelledError(status, result) + async def wait_for(aw, timeout, sleep=core.sleep): """Wait for the *aw* awaitable to complete, but cancel if it takes longer than *timeout* seconds. If *aw* is not a task then a task will be created @@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep): if timeout is None: return await aw - async def runner(waiter, aw): - nonlocal status, result - try: - result = await aw - s = True - except BaseException as er: - s = er - if status is None: - # The waiter is still waiting, set status for it and cancel it. - status = s - waiter.cancel() - # Run aw in a separate runner task that manages its exceptions. - status = None - result = None - runner_task = core.create_task(runner(core.cur_task, aw)) + runner_task = core.create_task(_run(core.cur_task, aw)) try: # Wait for the timeout to elapse. await sleep(timeout) except core.CancelledError as er: - if status is True: - # aw completed successfully and cancelled the sleep, so return aw's result. - return result - elif status is None: + status = er.args[0] if er.args else None + if status is None: # This wait_for was cancelled externally, so cancel aw and re-raise. - status = True runner_task.cancel() raise er + elif status is True: + # aw completed successfully and cancelled the sleep, so return aw's result. + return er.args[1] else: # aw raised an exception, propagate it out to the caller. raise status # The sleep finished before aw, so cancel aw and raise TimeoutError. - status = True runner_task.cancel() await runner_task raise core.TimeoutError @@ -85,30 +86,77 @@ def wait_for_ms(aw, timeout): return wait_for(aw, timeout, core.sleep_ms) +class _Remove: + @staticmethod + def remove(t): + pass + + async def gather(*aws, return_exceptions=False): """Run all *aws* awaitables concurrently. Any *aws* that are not tasks are promoted to tasks. Returns a list of return values of all *aws* - - This is a coroutine. """ + def done(t, er): + # Sub-task "t" has finished, with exception "er". + nonlocal state + if gather_task.data is not _Remove: + # The main gather task has already been scheduled, so do nothing. + # This happens if another sub-task already raised an exception and + # woke the main gather task (via this done function), or if the main + # gather task was cancelled externally. + return + elif not return_exceptions and not isinstance(er, StopIteration): + # A sub-task raised an exception, indicate that to the gather task. + state = er + else: + state -= 1 + if state: + # Still some sub-tasks running. + return + # Gather waiting is done, schedule the main gather task. + core._task_queue.push_head(gather_task) ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): - try: - # TODO handle cancel of gather itself - # if ts[i].coro: - # iter(ts[i]).waiting.push_head(cur_task) - # try: - # yield - # except CancelledError as er: - # # cancel all waiting tasks - # raise er - ts[i] = await ts[i] - except (core.CancelledError, Exception) as er: - if return_exceptions: - ts[i] = er - else: - raise er + if ts[i].state is not True: + # Task is not running, gather not currently supported for this case. + raise RuntimeError("can't gather") + # Register the callback to call when the task is done. + ts[i].state = done + + # Set the state for execution of the gather. + gather_task = core.cur_task + state = len(ts) + cancel_all = False + + # Wait for the a sub-task to need attention. + gather_task.data = _Remove + try: + await core._never() + except core.CancelledError as er: + cancel_all = True + state = er + + # Clean up tasks. + for i in range(len(ts)): + if ts[i].state is done: + # Sub-task is still running, deregister the callback and cancel if needed. + ts[i].state = True + if cancel_all: + ts[i].cancel() + elif isinstance(ts[i].data, StopIteration): + # Sub-task ran to completion, get its return value. + ts[i] = ts[i].data.value + else: + # Sub-task had an exception with return_exceptions==True, so get its exception. + ts[i] = ts[i].data + + # Either this gather was cancelled, or one of the sub-tasks raised an exception with + # return_exceptions==False, so reraise the exception here. + if state is not 0: + raise state + + # Return the list of return values of each sub-task. return ts diff --git a/asyncio/stream.py b/asyncio/stream.py index 7bf32cb..4a9d6bf 100644 --- a/asyncio/stream.py +++ b/asyncio/stream.py @@ -151,9 +151,7 @@ async def open_connection(host, port): from uerrno import EINPROGRESS import usocket as socket - ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[ - 0 - ] # TODO this is blocking! + ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! s = socket.socket(ai[0], ai[1], ai[2]) s.setblocking(False) ss = Stream(s) diff --git a/asyncio/task.py b/asyncio/task.py index 9a76497..2e3a6db 100644 --- a/asyncio/task.py +++ b/asyncio/task.py @@ -113,24 +113,27 @@ def __init__(self): def peek(self): return self.heap - def push_sorted(self, v, key): + def push(self, v, key=None): + assert v.ph_child is None + assert v.ph_next is None v.data = None - v.ph_key = key - v.ph_child = None - v.ph_next = None + v.ph_key = key if key is not None else core.ticks() self.heap = ph_meld(v, self.heap) - def push_head(self, v): - self.push_sorted(v, core.ticks()) - - def pop_head(self): + def pop(self): v = self.heap - self.heap = ph_pairing(self.heap.ph_child) + assert v.ph_next is None + self.heap = ph_pairing(v.ph_child) + v.ph_child = None return v def remove(self, v): self.heap = ph_delete(self.heap, v) + # Compatibility aliases, remove after they are no longer used + push_head = push + push_sorted = push + pop_head = pop # Task class representing a coroutine, can be waited on and cancelled. class Task: @@ -144,7 +147,7 @@ class Task: def __init__(self, coro, globals=None): self.coro = coro # Coroutine of this Task self.data = None # General data for queue it is waiting on - self.state = True # None, False, True or a TaskQueue instance + self.state = True # None, False, True, a callable, or a TaskQueue instance self.ph_key = 0 # Pairing heap self.ph_child = None # Paring heap self.ph_child_last = None # Paring heap @@ -158,17 +161,25 @@ def __iter__(self): elif self.state is True: # Allocated head of linked list of Tasks waiting on completion of this task. self.state = TaskQueue() + elif type(self.state) is not TaskQueue: + # Task has state used for another purpose, so can't also wait on it. + raise RuntimeError("can't wait") return self + # CircuitPython needs __await()__. __await__ = __iter__ def __next__(self): if not self.state: - # Task finished, raise return value to caller so it can continue. - raise self.data + if self.data is None: + # Task finished but has already been sent to the loop's exception handler. + raise StopIteration + else: + # Task finished, raise return value to caller so it can continue. + raise self.data else: # Put calling task on waiting queue. - self.state.push_head(core.cur_task) + self.state.push(core.cur_task) # Set calling task's data to this task that it waits on, to double-link it. core.cur_task.data = self @@ -195,10 +206,10 @@ def cancel(self): if hasattr(self.data, "remove"): # Not on the main running queue, remove the task from the queue it's on. self.data.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) elif core.ticks_diff(self.ph_key, core.ticks()) > 0: # On the main running queue but scheduled in the future, so bring it forward to now. core._task_queue.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) self.data = core.CancelledError return True