diff --git a/asyncio/core.py b/asyncio/core.py index a8b8e6e..33168e3 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -10,6 +10,10 @@ # pylint or black. # pylint: skip-file # fmt: off +""" +Core +==== +""" from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add import sys, select, traceback @@ -26,10 +30,14 @@ class CancelledError(BaseException): + """Injected into a task when calling `Task.cancel()`""" + pass class TimeoutError(Exception): + """Raised when waiting for a task longer than the specified timeout.""" + pass @@ -65,6 +73,11 @@ def __next__(self): # Pause task execution for the given time (integer in milliseconds, uPy extension) # Use a SingletonGenerator to do it without allocating on the heap def sleep_ms(t, sgen=SingletonGenerator()): + """Sleep for *t* milliseconds. + + This is a coroutine, and a MicroPython extension. + """ + assert sgen.state is None, "Check for a missing `await` in your code" sgen.state = ticks_add(ticks(), max(0, t)) return sgen @@ -72,6 +85,11 @@ def sleep_ms(t, sgen=SingletonGenerator()): # Pause task execution for the given time (in seconds) def sleep(t): + """Sleep for *t* seconds + + This is a coroutine. + """ + return sleep_ms(int(t * 1000)) @@ -152,6 +170,11 @@ def _promote_to_task(aw): # Create and schedule a new task from a coroutine def create_task(coro): + """Create a new task from the given coroutine and schedule it to run. + + Returns the corresponding `Task` object. + """ + if not hasattr(coro, "send"): raise TypeError("coroutine expected") t = Task(coro, globals()) @@ -161,6 +184,8 @@ def create_task(coro): # Keep scheduling tasks until there are none left to schedule def run_until_complete(main_task=None): + """Run the given *main_task* until it completes.""" + global cur_task excs_all = (CancelledError, Exception) # To prevent heap allocation in loop excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop @@ -232,6 +257,11 @@ def run_until_complete(main_task=None): # Create a new task from a coroutine and run it until it finishes def run(coro): + """Create a new task from the given coroutine and run it until it completes. + + Returns the value returned by *coro*. + """ + return run_until_complete(create_task(coro)) @@ -247,21 +277,33 @@ async def _stopper(): class Loop: + """Class representing the event loop""" + _exc_handler = None def create_task(coro): + """Create a task from the given *coro* and return the new `Task` object.""" + return create_task(coro) def run_forever(): + """Run the event loop until `Loop.stop()` is called.""" + global _stop_task _stop_task = Task(_stopper(), globals()) run_until_complete(_stop_task) # TODO should keep running until .stop() is called, even if there're no tasks left def run_until_complete(aw): + """Run the given *awaitable* until it completes. If *awaitable* is not a task then + it will be promoted to one. + """ + return run_until_complete(_promote_to_task(aw)) def stop(): + """Stop the event loop""" + global _stop_task if _stop_task is not None: _task_queue.push_head(_stop_task) @@ -269,32 +311,58 @@ def stop(): _stop_task = None def close(): + """Close the event loop.""" + pass def set_exception_handler(handler): + """Set the exception handler to call when a Task raises an exception that is not + caught. The *handler* should accept two arguments: ``(loop, context)`` + """ + Loop._exc_handler = handler def get_exception_handler(): + """Get the current exception handler. Returns the handler, or ``None`` if no + custom handler is set. + """ + return Loop._exc_handler def default_exception_handler(loop, context): + """The default exception handler that is called.""" + exc = context["exception"] traceback.print_exception(None, exc, exc.__traceback__) def call_exception_handler(context): + """Call the current exception handler. The argument *context* is passed through + and is a dictionary containing keys: + ``'message'``, ``'exception'``, ``'future'`` + """ (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) # The runq_len and waitq_len arguments are for legacy uasyncio compatibility def get_event_loop(runq_len=0, waitq_len=0): + """Return the event loop used to schedule and run tasks. See `Loop`.""" + return Loop def current_task(): + """Return the `Task` object associated with the currently running task.""" + return cur_task def new_event_loop(): + """Reset the event loop and return it. + + **NOTE**: Since MicroPython only has a single event loop, this function just resets + the loop's state, it does not create a new one + """ + global _task_queue, _io_queue # TaskQueue of Task instances _task_queue = TaskQueue() diff --git a/asyncio/event.py b/asyncio/event.py index 346b974..04f6e15 100644 --- a/asyncio/event.py +++ b/asyncio/event.py @@ -10,11 +10,19 @@ # pylint or black. # pylint: skip-file # fmt: off +""" +Events +====== +""" from . import core # Event class for primitive events that can be waited on, set, and cleared class Event: + """Create a new event which can be used to synchronize tasks. Events + start in the cleared state. + """ + def __init__(self): self.state = False # False=unset; True=set self.waiting = ( @@ -22,9 +30,14 @@ def __init__(self): ) # Queue of Tasks waiting on completion of this event def is_set(self): + """Returns ``True`` if the event is set, ``False`` otherwise.""" + return self.state def set(self): + """Set the event. Any tasks waiting on the event will be scheduled to run. + """ + # Event becomes set, schedule any tasks waiting on it # Note: This must not be called from anything except the thread running # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). @@ -33,15 +46,23 @@ def set(self): self.state = True def clear(self): + """Clear the event.""" + self.state = False async def wait(self): + """Wait for the event to be set. If the event is already set then it returns + immediately. + + This is a coroutine. + """ + if not self.state: # Event not set, put the calling task on the event's waiting queue self.waiting.push_head(core.cur_task) # Set calling task's data to the event's queue so it can be removed if needed core.cur_task.data = self.waiting - yield + await core.sleep(0) return True diff --git a/asyncio/funcs.py b/asyncio/funcs.py index 01fe551..2289d33 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -10,16 +10,33 @@ # pylint or black. # pylint: skip-file # fmt: off +""" +Functions +========= +""" + from . import core async def wait_for(aw, timeout, sleep=core.sleep): + """Wait for the *aw* awaitable to complete, but cancel if it takes longer + than *timeout* seconds. If *aw* is not a task then a task will be created + from it. + + If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``: + this should be trapped by the caller. + + Returns the return value of *aw*. + + This is a coroutine. + """ + aw = core._promote_to_task(aw) if timeout is None: return await aw - def runner(waiter, aw): + async def runner(waiter, aw): nonlocal status, result try: result = await aw @@ -60,10 +77,23 @@ def runner(waiter, aw): def wait_for_ms(aw, timeout): + """Similar to `wait_for` but *timeout* is an integer in milliseconds. + + This is a coroutine, and a MicroPython extension. + """ + return wait_for(aw, timeout, core.sleep_ms) async def gather(*aws, return_exceptions=False): + """Run all *aws* awaitables concurrently. Any *aws* that are not tasks + are promoted to tasks. + + Returns a list of return values of all *aws* + + This is a coroutine. + """ + ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): try: diff --git a/asyncio/lock.py b/asyncio/lock.py index 0a93872..3b93e6a 100644 --- a/asyncio/lock.py +++ b/asyncio/lock.py @@ -10,11 +10,22 @@ # pylint or black. # pylint: skip-file # fmt: off +""" +Locks +===== +""" from . import core # Lock class for primitive mutex capability class Lock: + """Create a new lock which can be used to coordinate tasks. Locks start in + the unlocked state. + + In addition to the methods below, locks can be used in an ``async with`` + statement. + """ + def __init__(self): # The state can take the following values: # - 0: unlocked @@ -25,9 +36,16 @@ def __init__(self): self.waiting = core.TaskQueue() def locked(self): + """Returns ``True`` if the lock is locked, otherwise ``False``.""" + return self.state == 1 def release(self): + """Release the lock. If any tasks are waiting on the lock then the next + one in the queue is scheduled to run and the lock remains locked. Otherwise, + no tasks are waiting and the lock becomes unlocked. + """ + if self.state != 1: raise RuntimeError("Lock not acquired") if self.waiting.peek(): @@ -39,13 +57,19 @@ def release(self): self.state = 0 async def acquire(self): + """Wait for the lock to be in the unlocked state and then lock it in an + atomic way. Only one task can acquire the lock at any one time. + + This is a coroutine. + """ + if self.state != 0: # Lock unavailable, put the calling Task on the waiting queue self.waiting.push_head(core.cur_task) # Set calling task's data to the lock's queue so it can be removed if needed core.cur_task.data = self.waiting try: - yield + await core.sleep(0) except core.CancelledError as er: if self.state == core.cur_task: # Cancelled while pending on resume, schedule next waiting Task diff --git a/asyncio/stream.py b/asyncio/stream.py index 41baeb2..97dcf6a 100644 --- a/asyncio/stream.py +++ b/asyncio/stream.py @@ -10,17 +10,30 @@ # pylint or black. # pylint: skip-file # fmt: off +""" +Streams +======= +""" from . import core class Stream: + """This represents a TCP stream connection. To minimise code this class + implements both a reader and a writer, and both ``StreamReader`` and + ``StreamWriter`` alias to this class. + """ + def __init__(self, s, e={}): self.s = s self.e = e self.out_buf = b"" def get_extra_info(self, v): + """Get extra information about the stream, given by *v*. The valid + values for *v* are: ``peername``. + """ + return self.e[v] async def __aenter__(self): @@ -33,21 +46,49 @@ def close(self): pass async def wait_closed(self): + """Wait for the stream to close. + + This is a coroutine. + """ + # TODO yield? self.s.close() async def read(self, n): - yield core._io_queue.queue_read(self.s) + """Read up to *n* bytes and return them. + + This is a coroutine. + """ + + core._io_queue.queue_read(self.s) + await core.sleep(0) return self.s.read(n) async def readinto(self, buf): - yield core._io_queue.queue_read(self.s) + """Read up to n bytes into *buf* with n being equal to the length of *buf* + + Return the number of bytes read into *buf* + + This is a coroutine, and a MicroPython extension. + """ + + core._io_queue.queue_read(self.s) + await core.sleep(0) return self.s.readinto(buf) async def readexactly(self, n): + """Read exactly *n* bytes and return them as a bytes object. + + Raises an ``EOFError`` exception if the stream ends before reading + *n* bytes. + + This is a coroutine. + """ + r = b"" while n: - yield core._io_queue.queue_read(self.s) + core._io_queue.queue_read(self.s) + await core.sleep(0) r2 = self.s.read(n) if r2 is not None: if not len(r2): @@ -57,18 +98,34 @@ async def readexactly(self, n): return r async def readline(self): + """Read a line and return it. + + This is a coroutine. + """ + l = b"" while True: - yield core._io_queue.queue_read(self.s) + core._io_queue.queue_read(self.s) + await core.sleep(0) l2 = self.s.readline() # may do multiple reads but won't block l += l2 if not l2 or l[-1] == 10: # \n (check l in case l2 is str) return l def write(self, buf): + """Accumulated *buf* to the output buffer. The data is only flushed when + `Stream.drain` is called. It is recommended to call `Stream.drain` + immediately after calling this function. + """ + self.out_buf += buf async def drain(self): + """Drain (write) all buffered output data out to the stream. + + This is a coroutine. + """ + mv = memoryview(self.out_buf) off = 0 while off < len(mv): @@ -86,6 +143,15 @@ async def drain(self): # Create a TCP stream connection to a remote host async def open_connection(host, port): + """Open a TCP connection to the given *host* and *port*. The *host* address will + be resolved using `socket.getaddrinfo`, which is currently a blocking call. + + Returns a pair of streams: a reader and a writer stream. Will raise a socket-specific + ``OSError`` if the host could not be resolved or if the connection could not be made. + + This is a coroutine. + """ + from uerrno import EINPROGRESS import usocket as socket @@ -100,12 +166,17 @@ async def open_connection(host, port): except OSError as er: if er.errno != EINPROGRESS: raise er - yield core._io_queue.queue_write(s) + core._io_queue.queue_write(s) + await core.sleep(0) return ss, ss # Class representing a TCP stream server, can be closed and used in "async with" class Server: + """This represents the server class returned from `start_server`. It can be used in + an ``async with`` statement to close the server upon exit. + """ + async def __aenter__(self): return self @@ -114,9 +185,16 @@ async def __aexit__(self, exc_type, exc, tb): await self.wait_closed() def close(self): + """Close the server.""" + self.task.cancel() async def wait_closed(self): + """Wait for the server to close. + + This is a coroutine. + """ + await self.task async def _serve(self, s, cb): @@ -141,6 +219,15 @@ async def _serve(self, s, cb): # Helper function to start a TCP stream server, running as a new task # TODO could use an accept-callback on socket read activity instead of creating a task async def start_server(cb, host, port, backlog=5): + """Start a TCP server on the given *host* and *port*. The *cb* callback will be + called with incoming, accepted connections, and be passed 2 arguments: reader + writer streams for the connection. + + Returns a `Server` object. + + This is a coroutine. + """ + import usocket as socket # Create and bind server socket. diff --git a/asyncio/task.py b/asyncio/task.py index 2bc8bbd..9a76497 100644 --- a/asyncio/task.py +++ b/asyncio/task.py @@ -10,6 +10,10 @@ # pylint or black. # pylint: skip-file # fmt: off +""" +Tasks +===== +""" # This file contains the core TaskQueue based on a pairing heap, and the core Task class. # They can optionally be replaced by C implementations. @@ -130,6 +134,13 @@ def remove(self, v): # Task class representing a coroutine, can be waited on and cancelled. class Task: + """This object wraps a coroutine into a running task. Tasks can be waited on + using ``await task``, which will wait for the task to complete and return the + return value of the task. + + Tasks should not be created directly, rather use ``create_task`` to create them. + """ + def __init__(self, coro, globals=None): self.coro = coro # Coroutine of this Task self.data = None # General data for queue it is waiting on @@ -162,9 +173,15 @@ def __next__(self): core.cur_task.data = self def done(self): + """Whether the task is complete.""" + return not self.state def cancel(self): + """Cancel the task by injecting a ``CancelledError`` into it. The task + may or may not ignore this exception. + """ + # Check if task is already finished. if not self.state: return False diff --git a/docs/api.rst b/docs/api.rst index b8c4c04..2414146 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -8,5 +8,24 @@ .. automodule:: asyncio :members: -.. toctree:: - :hidden: +.. automodule:: asyncio.core + :members: + :exclude-members: SingletonGenerator, IOQueue + +.. automodule:: asyncio.event + :members: + :exclude-members: ThreadSafeFlag + +.. automodule:: asyncio.funcs + :members: + +.. automodule:: asyncio.lock + :members: + +.. automodule:: asyncio.stream + :members: + :exclude-members: stream_awrite + +.. automodule:: asyncio.task + :members: + :exclude-members: ph_meld, ph_pairing, ph_delete, TaskQueue