From 9d69aa25f4d369cb5fd64c1b9fcbbe0c2b27b925 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Thu, 17 Aug 2023 11:26:32 -0400 Subject: [PATCH 1/3] update asyncio from MicroPython v1.19.1 --- asyncio/core.py | 22 +++++--- asyncio/event.py | 9 ++-- asyncio/funcs.py | 128 +++++++++++++++++++++++++++++++--------------- asyncio/lock.py | 6 +-- asyncio/stream.py | 4 +- asyncio/task.py | 29 ++++++----- 6 files changed, 125 insertions(+), 73 deletions(-) diff --git a/asyncio/core.py b/asyncio/core.py index a98cf88..524cc57 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -62,13 +62,14 @@ def __await__(self): def __next__(self): if self.state is not None: - _task_queue.push_sorted(cur_task, self.state) + _task_queue.push(cur_task, self.state) self.state = None return None else: self.exc.__traceback__ = None raise self.exc + # Pause task execution for the given time (integer in milliseconds, uPy extension) # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): @@ -178,11 +179,11 @@ def wait_io_event(self, dt): # print('poll', s, sm, ev) if ev & ~select.POLLOUT and sm[0] is not None: # POLLIN or error - _task_queue.push_head(sm[0]) + _task_queue.push(sm[0]) sm[0] = None if ev & ~select.POLLIN and sm[1] is not None: # POLLOUT or error - _task_queue.push_head(sm[1]) + _task_queue.push(sm[1]) sm[1] = None if sm[0] is None and sm[1] is None: self._dequeue(s) @@ -210,7 +211,7 @@ def create_task(coro): if not hasattr(coro, "send"): raise TypeError("coroutine expected") t = Task(coro, globals()) - _task_queue.push_head(t) + _task_queue.push(t) return t @@ -237,7 +238,7 @@ def run_until_complete(main_task=None): _io_queue.wait_io_event(dt) # Get next task to run and continue it - t = _task_queue.pop_head() + t = _task_queue.pop() cur_task = t try: # Continue running the coroutine, it's responsible for rescheduling itself @@ -265,10 +266,15 @@ def run_until_complete(main_task=None): if t.state is True: # "None" indicates that the task is complete and not await'ed on (yet). t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False + waiting = True else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): - _task_queue.push_head(t.state.pop_head()) + _task_queue.push(t.state.pop()) waiting = True # "False" indicates that the task is complete and has been await'ed on. t.state = False @@ -276,7 +282,7 @@ def run_until_complete(main_task=None): # An exception ended this detached task, so queue it for later # execution to handle the uncaught exception if no other task retrieves # the exception in the meantime (this is handled by Task.throw). - _task_queue.push_head(t) + _task_queue.push(t) # Save return value of coro to pass up to caller. t.data = er elif t.state is None: @@ -338,7 +344,7 @@ def stop(): global _stop_task if _stop_task is not None: - _task_queue.push_head(_stop_task) + _task_queue.push(_stop_task) # If stop() is called again, do nothing _stop_task = None diff --git a/asyncio/event.py b/asyncio/event.py index 164a269..5e1cb24 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -25,9 +25,7 @@ class Event: def __init__(self): self.state = False # False=unset; True=set - self.waiting = ( - core.TaskQueue() - ) # Queue of Tasks waiting on completion of this event + self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event def is_set(self): """Returns ``True`` if the event is set, ``False`` otherwise.""" @@ -42,7 +40,7 @@ def set(self): # Note: This must not be called from anything except the thread running # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). while self.waiting.peek(): - core._task_queue.push_head(self.waiting.pop_head()) + core._task_queue.push(self.waiting.pop()) self.state = True def clear(self): @@ -59,7 +57,7 @@ async def wait(self): if not self.state: # Event not set, put the calling task on the event's waiting queue - self.waiting.push_head(core.cur_task) + self.waiting.push(core.cur_task) # Set calling task's data to the event's queue so it can be removed if needed core.cur_task.data = self.waiting await core._never() @@ -90,6 +88,5 @@ async def wait(self): yield core._io_queue.queue_read(self) self._flag = 0 - except ImportError: pass diff --git a/asyncio/funcs.py b/asyncio/funcs.py index 8718118..dfb80e0 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: MIT # # MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George +# MIT license; Copyright (c) 2019-2022 Damien P. George # # This code comes from MicroPython, and has not been run through black or pylint there. # Altering these files significantly would make merging difficult, so we will not use @@ -19,6 +19,22 @@ from . import core +async def _run(waiter, aw): + try: + result = await aw + status = True + except BaseException as er: + result = None + status = er + if waiter.data is None: + # The waiter is still waiting, cancel it. + if waiter.cancel(): + # Waiter was cancelled by us, change its CancelledError to an instance of + # CancelledError that contains the status and result of waiting on aw. + # If the wait_for task subsequently gets cancelled externally then this + # instance will be reset to a CancelledError instance without arguments. + waiter.data = core.CancelledError(status, result) + async def wait_for(aw, timeout, sleep=core.sleep): """Wait for the *aw* awaitable to complete, but cancel if it takes longer than *timeout* seconds. If *aw* is not a task then a task will be created @@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep): if timeout is None: return await aw - async def runner(waiter, aw): - nonlocal status, result - try: - result = await aw - s = True - except BaseException as er: - s = er - if status is None: - # The waiter is still waiting, set status for it and cancel it. - status = s - waiter.cancel() - # Run aw in a separate runner task that manages its exceptions. - status = None - result = None - runner_task = core.create_task(runner(core.cur_task, aw)) + runner_task = core.create_task(_run(core.cur_task, aw)) try: # Wait for the timeout to elapse. await sleep(timeout) except core.CancelledError as er: - if status is True: - # aw completed successfully and cancelled the sleep, so return aw's result. - return result - elif status is None: + status = er.value + if status is None: # This wait_for was cancelled externally, so cancel aw and re-raise. - status = True runner_task.cancel() raise er + elif status is True: + # aw completed successfully and cancelled the sleep, so return aw's result. + return er.args[1] else: # aw raised an exception, propagate it out to the caller. raise status # The sleep finished before aw, so cancel aw and raise TimeoutError. - status = True runner_task.cancel() await runner_task raise core.TimeoutError @@ -85,30 +86,77 @@ def wait_for_ms(aw, timeout): return wait_for(aw, timeout, core.sleep_ms) -async def gather(*aws, return_exceptions=False): +class _Remove: + @staticmethod + def remove(t): + pass + + +def gather(*aws, return_exceptions=False): """Run all *aws* awaitables concurrently. Any *aws* that are not tasks are promoted to tasks. Returns a list of return values of all *aws* - - This is a coroutine. """ + def done(t, er): + # Sub-task "t" has finished, with exception "er". + nonlocal state + if gather_task.data is not _Remove: + # The main gather task has already been scheduled, so do nothing. + # This happens if another sub-task already raised an exception and + # woke the main gather task (via this done function), or if the main + # gather task was cancelled externally. + return + elif not return_exceptions and not isinstance(er, StopIteration): + # A sub-task raised an exception, indicate that to the gather task. + state = er + else: + state -= 1 + if state: + # Still some sub-tasks running. + return + # Gather waiting is done, schedule the main gather task. + core._task_queue.push(gather_task) ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): - try: - # TODO handle cancel of gather itself - # if ts[i].coro: - # iter(ts[i]).waiting.push_head(cur_task) - # try: - # yield - # except CancelledError as er: - # # cancel all waiting tasks - # raise er - ts[i] = await ts[i] - except (core.CancelledError, Exception) as er: - if return_exceptions: - ts[i] = er - else: - raise er + if ts[i].state is not True: + # Task is not running, gather not currently supported for this case. + raise RuntimeError("can't gather") + # Register the callback to call when the task is done. + ts[i].state = done + + # Set the state for execution of the gather. + gather_task = core.cur_task + state = len(ts) + cancel_all = False + + # Wait for the a sub-task to need attention. + gather_task.data = _Remove + try: + yield + except core.CancelledError as er: + cancel_all = True + state = er + + # Clean up tasks. + for i in range(len(ts)): + if ts[i].state is done: + # Sub-task is still running, deregister the callback and cancel if needed. + ts[i].state = True + if cancel_all: + ts[i].cancel() + elif isinstance(ts[i].data, StopIteration): + # Sub-task ran to completion, get its return value. + ts[i] = ts[i].data.value + else: + # Sub-task had an exception with return_exceptions==True, so get its exception. + ts[i] = ts[i].data + + # Either this gather was cancelled, or one of the sub-tasks raised an exception with + # return_exceptions==False, so reraise the exception here. + if state is not 0: + raise state + + # Return the list of return values of each sub-task. return ts diff --git a/asyncio/lock.py b/asyncio/lock.py index 71c972f..c9feb35 100644 --- a/asyncio/lock.py +++ b/asyncio/lock.py @@ -50,8 +50,8 @@ def release(self): raise RuntimeError("Lock not acquired") if self.waiting.peek(): # Task(s) waiting on lock, schedule next Task - self.state = self.waiting.pop_head() - core._task_queue.push_head(self.state) + self.state = self.waiting.pop() + core._task_queue.push(self.state) else: # No Task waiting so unlock self.state = 0 @@ -65,7 +65,7 @@ async def acquire(self): if self.state != 0: # Lock unavailable, put the calling Task on the waiting queue - self.waiting.push_head(core.cur_task) + self.waiting.push(core.cur_task) # Set calling task's data to the lock's queue so it can be removed if needed core.cur_task.data = self.waiting try: diff --git a/asyncio/stream.py b/asyncio/stream.py index 7bf32cb..4a9d6bf 100644 --- a/asyncio/stream.py +++ b/asyncio/stream.py @@ -151,9 +151,7 @@ async def open_connection(host, port): from uerrno import EINPROGRESS import usocket as socket - ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[ - 0 - ] # TODO this is blocking! + ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! s = socket.socket(ai[0], ai[1], ai[2]) s.setblocking(False) ss = Stream(s) diff --git a/asyncio/task.py b/asyncio/task.py index 9a76497..6df5cda 100644 --- a/asyncio/task.py +++ b/asyncio/task.py @@ -113,19 +113,18 @@ def __init__(self): def peek(self): return self.heap - def push_sorted(self, v, key): + def push(self, v, key=None): + assert v.ph_child is None + assert v.ph_next is None v.data = None - v.ph_key = key - v.ph_child = None - v.ph_next = None + v.ph_key = key if key is not None else core.ticks() self.heap = ph_meld(v, self.heap) - def push_head(self, v): - self.push_sorted(v, core.ticks()) - - def pop_head(self): + def pop(self): v = self.heap - self.heap = ph_pairing(self.heap.ph_child) + assert v.ph_next is None + self.heap = ph_pairing(v.ph_child) + v.ph_child = None return v def remove(self, v): @@ -144,7 +143,7 @@ class Task: def __init__(self, coro, globals=None): self.coro = coro # Coroutine of this Task self.data = None # General data for queue it is waiting on - self.state = True # None, False, True or a TaskQueue instance + self.state = True # None, False, True, a callable, or a TaskQueue instance self.ph_key = 0 # Pairing heap self.ph_child = None # Paring heap self.ph_child_last = None # Paring heap @@ -158,8 +157,12 @@ def __iter__(self): elif self.state is True: # Allocated head of linked list of Tasks waiting on completion of this task. self.state = TaskQueue() + elif type(self.state) is not TaskQueue: + # Task has state used for another purpose, so can't also wait on it. + raise RuntimeError("can't wait") return self + # CircuitPython needs __await()__. __await__ = __iter__ def __next__(self): @@ -168,7 +171,7 @@ def __next__(self): raise self.data else: # Put calling task on waiting queue. - self.state.push_head(core.cur_task) + self.state.push(core.cur_task) # Set calling task's data to this task that it waits on, to double-link it. core.cur_task.data = self @@ -195,10 +198,10 @@ def cancel(self): if hasattr(self.data, "remove"): # Not on the main running queue, remove the task from the queue it's on. self.data.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) elif core.ticks_diff(self.ph_key, core.ticks()) > 0: # On the main running queue but scheduled in the future, so bring it forward to now. core._task_queue.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) self.data = core.CancelledError return True From 510d4a3bb9326a31105622fe1905301a4f543a2c Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Fri, 18 Aug 2023 12:27:59 -0400 Subject: [PATCH 2/3] Fix issues noted during MicroPython v1.19.1 merge. gather is now async, and the one `yield` that was in it was replaced by `await core._never()`. Incorporated https://github.com/adafruit/circuitpython/commit/55169e0b4db968c037fdd2ef6a359fa3a485a7dd#diff-01b6761622028d47c6d1c2293fa365de3d8f03feb485d42e42ea9050198e1635 which never got into the asyncio library previously. (I fixed the bug independently, but didn't understand why it was needed, until I found that PR.) --- asyncio/funcs.py | 6 +++--- asyncio/task.py | 8 ++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/asyncio/funcs.py b/asyncio/funcs.py index dfb80e0..dc27088 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -59,7 +59,7 @@ async def wait_for(aw, timeout, sleep=core.sleep): # Wait for the timeout to elapse. await sleep(timeout) except core.CancelledError as er: - status = er.value + status = er.args[0] if er.args else None if status is None: # This wait_for was cancelled externally, so cancel aw and re-raise. runner_task.cancel() @@ -92,7 +92,7 @@ def remove(t): pass -def gather(*aws, return_exceptions=False): +async def gather(*aws, return_exceptions=False): """Run all *aws* awaitables concurrently. Any *aws* that are not tasks are promoted to tasks. @@ -134,7 +134,7 @@ def done(t, er): # Wait for the a sub-task to need attention. gather_task.data = _Remove try: - yield + await core._never() except core.CancelledError as er: cancel_all = True state = er diff --git a/asyncio/task.py b/asyncio/task.py index 6df5cda..c1306d0 100644 --- a/asyncio/task.py +++ b/asyncio/task.py @@ -167,8 +167,12 @@ def __iter__(self): def __next__(self): if not self.state: - # Task finished, raise return value to caller so it can continue. - raise self.data + if self.data is None: + # Task finished but has already been sent to the loop's exception handler. + raise StopIteration + else: + # Task finished, raise return value to caller so it can continue. + raise self.data else: # Put calling task on waiting queue. self.state.push(core.cur_task) From 7c25d09be04a2979bcfb43b801de57594112808d Mon Sep 17 00:00:00 2001 From: Jeff Epler Date: Mon, 21 Aug 2023 15:37:22 -0500 Subject: [PATCH 3/3] revert the rename of push and pop methods choose the way that is compatible with 8.x and 9.x --- asyncio/core.py | 16 ++++++++-------- asyncio/event.py | 4 ++-- asyncio/funcs.py | 2 +- asyncio/lock.py | 6 +++--- asyncio/task.py | 4 ++++ 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/asyncio/core.py b/asyncio/core.py index 524cc57..e31c1ce 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -62,7 +62,7 @@ def __await__(self): def __next__(self): if self.state is not None: - _task_queue.push(cur_task, self.state) + _task_queue.push_sorted(cur_task, self.state) self.state = None return None else: @@ -179,11 +179,11 @@ def wait_io_event(self, dt): # print('poll', s, sm, ev) if ev & ~select.POLLOUT and sm[0] is not None: # POLLIN or error - _task_queue.push(sm[0]) + _task_queue.push_head(sm[0]) sm[0] = None if ev & ~select.POLLIN and sm[1] is not None: # POLLOUT or error - _task_queue.push(sm[1]) + _task_queue.push_head(sm[1]) sm[1] = None if sm[0] is None and sm[1] is None: self._dequeue(s) @@ -211,7 +211,7 @@ def create_task(coro): if not hasattr(coro, "send"): raise TypeError("coroutine expected") t = Task(coro, globals()) - _task_queue.push(t) + _task_queue.push_head(t) return t @@ -238,7 +238,7 @@ def run_until_complete(main_task=None): _io_queue.wait_io_event(dt) # Get next task to run and continue it - t = _task_queue.pop() + t = _task_queue.pop_head() cur_task = t try: # Continue running the coroutine, it's responsible for rescheduling itself @@ -274,7 +274,7 @@ def run_until_complete(main_task=None): else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): - _task_queue.push(t.state.pop()) + _task_queue.push_head(t.state.pop_head()) waiting = True # "False" indicates that the task is complete and has been await'ed on. t.state = False @@ -282,7 +282,7 @@ def run_until_complete(main_task=None): # An exception ended this detached task, so queue it for later # execution to handle the uncaught exception if no other task retrieves # the exception in the meantime (this is handled by Task.throw). - _task_queue.push(t) + _task_queue.push_head(t) # Save return value of coro to pass up to caller. t.data = er elif t.state is None: @@ -344,7 +344,7 @@ def stop(): global _stop_task if _stop_task is not None: - _task_queue.push(_stop_task) + _task_queue.push_head(_stop_task) # If stop() is called again, do nothing _stop_task = None diff --git a/asyncio/event.py b/asyncio/event.py index 5e1cb24..a402d26 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -40,7 +40,7 @@ def set(self): # Note: This must not be called from anything except the thread running # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). while self.waiting.peek(): - core._task_queue.push(self.waiting.pop()) + core._task_queue.push_head(self.waiting.pop_head()) self.state = True def clear(self): @@ -57,7 +57,7 @@ async def wait(self): if not self.state: # Event not set, put the calling task on the event's waiting queue - self.waiting.push(core.cur_task) + self.waiting.push_head(core.cur_task) # Set calling task's data to the event's queue so it can be removed if needed core.cur_task.data = self.waiting await core._never() diff --git a/asyncio/funcs.py b/asyncio/funcs.py index dc27088..baf804d 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -116,7 +116,7 @@ def done(t, er): # Still some sub-tasks running. return # Gather waiting is done, schedule the main gather task. - core._task_queue.push(gather_task) + core._task_queue.push_head(gather_task) ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): diff --git a/asyncio/lock.py b/asyncio/lock.py index c9feb35..71c972f 100644 --- a/asyncio/lock.py +++ b/asyncio/lock.py @@ -50,8 +50,8 @@ def release(self): raise RuntimeError("Lock not acquired") if self.waiting.peek(): # Task(s) waiting on lock, schedule next Task - self.state = self.waiting.pop() - core._task_queue.push(self.state) + self.state = self.waiting.pop_head() + core._task_queue.push_head(self.state) else: # No Task waiting so unlock self.state = 0 @@ -65,7 +65,7 @@ async def acquire(self): if self.state != 0: # Lock unavailable, put the calling Task on the waiting queue - self.waiting.push(core.cur_task) + self.waiting.push_head(core.cur_task) # Set calling task's data to the lock's queue so it can be removed if needed core.cur_task.data = self.waiting try: diff --git a/asyncio/task.py b/asyncio/task.py index c1306d0..2e3a6db 100644 --- a/asyncio/task.py +++ b/asyncio/task.py @@ -130,6 +130,10 @@ def pop(self): def remove(self, v): self.heap = ph_delete(self.heap, v) + # Compatibility aliases, remove after they are no longer used + push_head = push + push_sorted = push + pop_head = pop # Task class representing a coroutine, can be waited on and cancelled. class Task: