Skip to content

Commit

Permalink
Merge pull request #47 from adafruit/9.0.0-updates
Browse files Browse the repository at this point in the history
update asyncio from MicroPython v1.19.1
  • Loading branch information
dhalbert authored Oct 12, 2023
2 parents cb0d823 + 7c25d09 commit 723b428
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 61 deletions.
6 changes: 6 additions & 0 deletions asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __next__(self):
self.exc.__traceback__ = None
raise self.exc


# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
Expand Down Expand Up @@ -265,6 +266,11 @@ def run_until_complete(main_task=None):
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
Expand Down
5 changes: 1 addition & 4 deletions asyncio/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ class Event:

def __init__(self):
self.state = False # False=unset; True=set
self.waiting = (
core.TaskQueue()
) # Queue of Tasks waiting on completion of this event
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event

def is_set(self):
"""Returns ``True`` if the event is set, ``False`` otherwise."""
Expand Down Expand Up @@ -90,6 +88,5 @@ async def wait(self):
yield core._io_queue.queue_read(self)
self._flag = 0


except ImportError:
pass
126 changes: 87 additions & 39 deletions asyncio/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# SPDX-License-Identifier: MIT
#
# MicroPython uasyncio module
# MIT license; Copyright (c) 2019-2020 Damien P. George
# MIT license; Copyright (c) 2019-2022 Damien P. George
#
# This code comes from MicroPython, and has not been run through black or pylint there.
# Altering these files significantly would make merging difficult, so we will not use
Expand All @@ -19,6 +19,22 @@
from . import core


async def _run(waiter, aw):
try:
result = await aw
status = True
except BaseException as er:
result = None
status = er
if waiter.data is None:
# The waiter is still waiting, cancel it.
if waiter.cancel():
# Waiter was cancelled by us, change its CancelledError to an instance of
# CancelledError that contains the status and result of waiting on aw.
# If the wait_for task subsequently gets cancelled externally then this
# instance will be reset to a CancelledError instance without arguments.
waiter.data = core.CancelledError(status, result)

async def wait_for(aw, timeout, sleep=core.sleep):
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
than *timeout* seconds. If *aw* is not a task then a task will be created
Expand All @@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep):
if timeout is None:
return await aw

async def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
s = True
except BaseException as er:
s = er
if status is None:
# The waiter is still waiting, set status for it and cancel it.
status = s
waiter.cancel()

# Run aw in a separate runner task that manages its exceptions.
status = None
result = None
runner_task = core.create_task(runner(core.cur_task, aw))
runner_task = core.create_task(_run(core.cur_task, aw))

try:
# Wait for the timeout to elapse.
await sleep(timeout)
except core.CancelledError as er:
if status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return result
elif status is None:
status = er.args[0] if er.args else None
if status is None:
# This wait_for was cancelled externally, so cancel aw and re-raise.
status = True
runner_task.cancel()
raise er
elif status is True:
# aw completed successfully and cancelled the sleep, so return aw's result.
return er.args[1]
else:
# aw raised an exception, propagate it out to the caller.
raise status

# The sleep finished before aw, so cancel aw and raise TimeoutError.
status = True
runner_task.cancel()
await runner_task
raise core.TimeoutError
Expand All @@ -85,30 +86,77 @@ def wait_for_ms(aw, timeout):
return wait_for(aw, timeout, core.sleep_ms)


class _Remove:
@staticmethod
def remove(t):
pass


async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.
Returns a list of return values of all *aws*
This is a coroutine.
"""
def done(t, er):
# Sub-task "t" has finished, with exception "er".
nonlocal state
if gather_task.data is not _Remove:
# The main gather task has already been scheduled, so do nothing.
# This happens if another sub-task already raised an exception and
# woke the main gather task (via this done function), or if the main
# gather task was cancelled externally.
return
elif not return_exceptions and not isinstance(er, StopIteration):
# A sub-task raised an exception, indicate that to the gather task.
state = er
else:
state -= 1
if state:
# Still some sub-tasks running.
return
# Gather waiting is done, schedule the main gather task.
core._task_queue.push_head(gather_task)

ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
# TODO handle cancel of gather itself
# if ts[i].coro:
# iter(ts[i]).waiting.push_head(cur_task)
# try:
# yield
# except CancelledError as er:
# # cancel all waiting tasks
# raise er
ts[i] = await ts[i]
except (core.CancelledError, Exception) as er:
if return_exceptions:
ts[i] = er
else:
raise er
if ts[i].state is not True:
# Task is not running, gather not currently supported for this case.
raise RuntimeError("can't gather")
# Register the callback to call when the task is done.
ts[i].state = done

# Set the state for execution of the gather.
gather_task = core.cur_task
state = len(ts)
cancel_all = False

# Wait for the a sub-task to need attention.
gather_task.data = _Remove
try:
await core._never()
except core.CancelledError as er:
cancel_all = True
state = er

# Clean up tasks.
for i in range(len(ts)):
if ts[i].state is done:
# Sub-task is still running, deregister the callback and cancel if needed.
ts[i].state = True
if cancel_all:
ts[i].cancel()
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
else:
# Sub-task had an exception with return_exceptions==True, so get its exception.
ts[i] = ts[i].data

# Either this gather was cancelled, or one of the sub-tasks raised an exception with
# return_exceptions==False, so reraise the exception here.
if state is not 0:
raise state

# Return the list of return values of each sub-task.
return ts
4 changes: 1 addition & 3 deletions asyncio/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ async def open_connection(host, port):
from uerrno import EINPROGRESS
import usocket as socket

ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[
0
] # TODO this is blocking!
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
s = socket.socket(ai[0], ai[1], ai[2])
s.setblocking(False)
ss = Stream(s)
Expand Down
41 changes: 26 additions & 15 deletions asyncio/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,27 @@ def __init__(self):
def peek(self):
return self.heap

def push_sorted(self, v, key):
def push(self, v, key=None):
assert v.ph_child is None
assert v.ph_next is None
v.data = None
v.ph_key = key
v.ph_child = None
v.ph_next = None
v.ph_key = key if key is not None else core.ticks()
self.heap = ph_meld(v, self.heap)

def push_head(self, v):
self.push_sorted(v, core.ticks())

def pop_head(self):
def pop(self):
v = self.heap
self.heap = ph_pairing(self.heap.ph_child)
assert v.ph_next is None
self.heap = ph_pairing(v.ph_child)
v.ph_child = None
return v

def remove(self, v):
self.heap = ph_delete(self.heap, v)

# Compatibility aliases, remove after they are no longer used
push_head = push
push_sorted = push
pop_head = pop

# Task class representing a coroutine, can be waited on and cancelled.
class Task:
Expand All @@ -144,7 +147,7 @@ class Task:
def __init__(self, coro, globals=None):
self.coro = coro # Coroutine of this Task
self.data = None # General data for queue it is waiting on
self.state = True # None, False, True or a TaskQueue instance
self.state = True # None, False, True, a callable, or a TaskQueue instance
self.ph_key = 0 # Pairing heap
self.ph_child = None # Paring heap
self.ph_child_last = None # Paring heap
Expand All @@ -158,17 +161,25 @@ def __iter__(self):
elif self.state is True:
# Allocated head of linked list of Tasks waiting on completion of this task.
self.state = TaskQueue()
elif type(self.state) is not TaskQueue:
# Task has state used for another purpose, so can't also wait on it.
raise RuntimeError("can't wait")
return self

# CircuitPython needs __await()__.
__await__ = __iter__

def __next__(self):
if not self.state:
# Task finished, raise return value to caller so it can continue.
raise self.data
if self.data is None:
# Task finished but has already been sent to the loop's exception handler.
raise StopIteration
else:
# Task finished, raise return value to caller so it can continue.
raise self.data
else:
# Put calling task on waiting queue.
self.state.push_head(core.cur_task)
self.state.push(core.cur_task)
# Set calling task's data to this task that it waits on, to double-link it.
core.cur_task.data = self

Expand All @@ -195,10 +206,10 @@ def cancel(self):
if hasattr(self.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push_head(self)
core._task_queue.push(self)
self.data = core.CancelledError
return True

0 comments on commit 723b428

Please sign in to comment.