Skip to content

Commit

Permalink
Merge pull request #19 from tekktrik/doc/add-docstrings
Browse files Browse the repository at this point in the history
Add docstrings
  • Loading branch information
dhalbert authored Jun 10, 2022
2 parents dbbfcb3 + 1ff8313 commit c496cfb
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 10 deletions.
68 changes: 68 additions & 0 deletions asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Core
====
"""

from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add
import sys, select, traceback
Expand All @@ -26,10 +30,14 @@


class CancelledError(BaseException):
"""Injected into a task when calling `Task.cancel()`"""

pass


class TimeoutError(Exception):
"""Raised when waiting for a task longer than the specified timeout."""

pass


Expand Down Expand Up @@ -65,13 +73,23 @@ def __next__(self):
# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
"""Sleep for *t* milliseconds.
This is a coroutine, and a MicroPython extension.
"""

assert sgen.state is None, "Check for a missing `await` in your code"
sgen.state = ticks_add(ticks(), max(0, t))
return sgen


# Pause task execution for the given time (in seconds)
def sleep(t):
"""Sleep for *t* seconds
This is a coroutine.
"""

return sleep_ms(int(t * 1000))


Expand Down Expand Up @@ -152,6 +170,11 @@ def _promote_to_task(aw):

# Create and schedule a new task from a coroutine
def create_task(coro):
"""Create a new task from the given coroutine and schedule it to run.
Returns the corresponding `Task` object.
"""

if not hasattr(coro, "send"):
raise TypeError("coroutine expected")
t = Task(coro, globals())
Expand All @@ -161,6 +184,8 @@ def create_task(coro):

# Keep scheduling tasks until there are none left to schedule
def run_until_complete(main_task=None):
"""Run the given *main_task* until it completes."""

global cur_task
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
Expand Down Expand Up @@ -232,6 +257,11 @@ def run_until_complete(main_task=None):

# Create a new task from a coroutine and run it until it finishes
def run(coro):
"""Create a new task from the given coroutine and run it until it completes.
Returns the value returned by *coro*.
"""

return run_until_complete(create_task(coro))


Expand All @@ -247,54 +277,92 @@ async def _stopper():


class Loop:
"""Class representing the event loop"""

_exc_handler = None

def create_task(coro):
"""Create a task from the given *coro* and return the new `Task` object."""

return create_task(coro)

def run_forever():
"""Run the event loop until `Loop.stop()` is called."""

global _stop_task
_stop_task = Task(_stopper(), globals())
run_until_complete(_stop_task)
# TODO should keep running until .stop() is called, even if there're no tasks left

def run_until_complete(aw):
"""Run the given *awaitable* until it completes. If *awaitable* is not a task then
it will be promoted to one.
"""

return run_until_complete(_promote_to_task(aw))

def stop():
"""Stop the event loop"""

global _stop_task
if _stop_task is not None:
_task_queue.push_head(_stop_task)
# If stop() is called again, do nothing
_stop_task = None

def close():
"""Close the event loop."""

pass

def set_exception_handler(handler):
"""Set the exception handler to call when a Task raises an exception that is not
caught. The *handler* should accept two arguments: ``(loop, context)``
"""

Loop._exc_handler = handler

def get_exception_handler():
"""Get the current exception handler. Returns the handler, or ``None`` if no
custom handler is set.
"""

return Loop._exc_handler

def default_exception_handler(loop, context):
"""The default exception handler that is called."""

exc = context["exception"]
traceback.print_exception(None, exc, exc.__traceback__)

def call_exception_handler(context):
"""Call the current exception handler. The argument *context* is passed through
and is a dictionary containing keys:
``'message'``, ``'exception'``, ``'future'``
"""
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)


# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
def get_event_loop(runq_len=0, waitq_len=0):
"""Return the event loop used to schedule and run tasks. See `Loop`."""

return Loop


def current_task():
"""Return the `Task` object associated with the currently running task."""

return cur_task


def new_event_loop():
"""Reset the event loop and return it.
**NOTE**: Since MicroPython only has a single event loop, this function just resets
the loop's state, it does not create a new one
"""

global _task_queue, _io_queue
# TaskQueue of Task instances
_task_queue = TaskQueue()
Expand Down
23 changes: 22 additions & 1 deletion asyncio/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,34 @@
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Events
======
"""

from . import core

# Event class for primitive events that can be waited on, set, and cleared
class Event:
"""Create a new event which can be used to synchronize tasks. Events
start in the cleared state.
"""

def __init__(self):
self.state = False # False=unset; True=set
self.waiting = (
core.TaskQueue()
) # Queue of Tasks waiting on completion of this event

def is_set(self):
"""Returns ``True`` if the event is set, ``False`` otherwise."""

return self.state

def set(self):
"""Set the event. Any tasks waiting on the event will be scheduled to run.
"""

# Event becomes set, schedule any tasks waiting on it
# Note: This must not be called from anything except the thread running
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
Expand All @@ -33,15 +46,23 @@ def set(self):
self.state = True

def clear(self):
"""Clear the event."""

self.state = False

async def wait(self):
"""Wait for the event to be set. If the event is already set then it returns
immediately.
This is a coroutine.
"""

if not self.state:
# Event not set, put the calling task on the event's waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the event's queue so it can be removed if needed
core.cur_task.data = self.waiting
yield
await core.sleep(0)
return True


Expand Down
32 changes: 31 additions & 1 deletion asyncio/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,33 @@
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Functions
=========
"""


from . import core


async def wait_for(aw, timeout, sleep=core.sleep):
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
than *timeout* seconds. If *aw* is not a task then a task will be created
from it.
If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``:
this should be trapped by the caller.
Returns the return value of *aw*.
This is a coroutine.
"""

aw = core._promote_to_task(aw)
if timeout is None:
return await aw

def runner(waiter, aw):
async def runner(waiter, aw):
nonlocal status, result
try:
result = await aw
Expand Down Expand Up @@ -60,10 +77,23 @@ def runner(waiter, aw):


def wait_for_ms(aw, timeout):
"""Similar to `wait_for` but *timeout* is an integer in milliseconds.
This is a coroutine, and a MicroPython extension.
"""

return wait_for(aw, timeout, core.sleep_ms)


async def gather(*aws, return_exceptions=False):
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
are promoted to tasks.
Returns a list of return values of all *aws*
This is a coroutine.
"""

ts = [core._promote_to_task(aw) for aw in aws]
for i in range(len(ts)):
try:
Expand Down
26 changes: 25 additions & 1 deletion asyncio/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,22 @@
# pylint or black.
# pylint: skip-file
# fmt: off
"""
Locks
=====
"""

from . import core

# Lock class for primitive mutex capability
class Lock:
"""Create a new lock which can be used to coordinate tasks. Locks start in
the unlocked state.
In addition to the methods below, locks can be used in an ``async with``
statement.
"""

def __init__(self):
# The state can take the following values:
# - 0: unlocked
Expand All @@ -25,9 +36,16 @@ def __init__(self):
self.waiting = core.TaskQueue()

def locked(self):
"""Returns ``True`` if the lock is locked, otherwise ``False``."""

return self.state == 1

def release(self):
"""Release the lock. If any tasks are waiting on the lock then the next
one in the queue is scheduled to run and the lock remains locked. Otherwise,
no tasks are waiting and the lock becomes unlocked.
"""

if self.state != 1:
raise RuntimeError("Lock not acquired")
if self.waiting.peek():
Expand All @@ -39,13 +57,19 @@ def release(self):
self.state = 0

async def acquire(self):
"""Wait for the lock to be in the unlocked state and then lock it in an
atomic way. Only one task can acquire the lock at any one time.
This is a coroutine.
"""

if self.state != 0:
# Lock unavailable, put the calling Task on the waiting queue
self.waiting.push_head(core.cur_task)
# Set calling task's data to the lock's queue so it can be removed if needed
core.cur_task.data = self.waiting
try:
yield
await core.sleep(0)
except core.CancelledError as er:
if self.state == core.cur_task:
# Cancelled while pending on resume, schedule next waiting Task
Expand Down
Loading

0 comments on commit c496cfb

Please sign in to comment.