From 99b2808928678c2502421e310d3a5e8f77de566c Mon Sep 17 00:00:00 2001 From: James Ward Date: Sun, 29 Oct 2023 17:24:38 -0400 Subject: [PATCH 1/8] feat: add queue support --- asyncio/__init__.py | 1 + asyncio/queue.py | 77 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 asyncio/queue.py diff --git a/asyncio/__init__.py b/asyncio/__init__.py index ce8837d..5079c72 100644 --- a/asyncio/__init__.py +++ b/asyncio/__init__.py @@ -23,6 +23,7 @@ "Event": "event", "ThreadSafeFlag": "event", "Lock": "lock", + "Queue": "queue", "open_connection": "stream", "start_server": "stream", "StreamReader": "stream", diff --git a/asyncio/queue.py b/asyncio/queue.py new file mode 100644 index 0000000..c179ead --- /dev/null +++ b/asyncio/queue.py @@ -0,0 +1,77 @@ +from . import event + + +class QueueEmpty(Exception): + pass + + +class QueueFull(Exception): + pass + + +class Queue: + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._queue = list() + + self._join_counter = 0 + self._join_event = event.Event() + self._join_event.set() + + self._put_event = event.Event() + self._get_event = event.Event() + + def _get(self): + value = self._queue.pop(0) + self._get_event.set() + self._get_event.clear() + return value + + def _put(self, val): + self._join_counter += 1 + self._join_event.clear() + + self._queue.append(val) + + self._put_event.set() + self._put_event.clear() + + async def get(self): + while self.empty(): + await self._put_event.wait() + return self._get() + + def get_nowait(self): + if self.empty(): + raise QueueEmpty() + return self._get() + + async def put(self, val): + while self.full(): + await self._get_event.wait() + self._put(val) + + def put_nowait(self, val): + if self.full(): + raise QueueFull() + self._put(val) + + def qsize(self): + return len(self._queue) + + def empty(self): + return len(self._queue) == 0 + + def full(self): + return 0 < self.maxsize <= self.qsize() + + def task_done(self): + self._join_counter -= 1 + + if self._join_counter <= 0: + # Can't have less than 0 + self._join_counter = 0 + self._join_event.set() + + async def join(self): + await self._join_event.wait() From 2b708112ddce9c0e8e9a99819de266ab30f9d538 Mon Sep 17 00:00:00 2001 From: James Ward Date: Sun, 29 Oct 2023 21:38:02 -0400 Subject: [PATCH 2/8] chore: add docstring and use `[]` rather than `list()` --- asyncio/queue.py | 68 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/asyncio/queue.py b/asyncio/queue.py index c179ead..ee45b24 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -2,17 +2,32 @@ class QueueEmpty(Exception): + """Raised when Queue.get_nowait() is called on an empty Queue.""" pass class QueueFull(Exception): + """Raised when the Queue.put_nowait() method is called on a full Queue.""" pass class Queue: + """ + A queue, useful for coordinating producer and consumer coroutines. + + If maxsize is less than or equal to zero, the queue size is infinite. If it + is an integer greater than 0, then "await put()" will block when the + queue reaches maxsize, until an item is removed by get(). + + Unlike CPython's asyncio.Queue, this implementation is backed by a list rather + than `collections.deque` because smaller boards may not have the library + implemented. + """ + def __init__(self, maxsize=0): self.maxsize = maxsize - self._queue = list() + + self._queue = [] self._join_counter = 0 self._join_event = event.Event() @@ -37,41 +52,86 @@ def _put(self, val): self._put_event.clear() async def get(self): + """ + Remove and return an item from the queue. + + If queue is empty, wait until an item is available. + """ while self.empty(): await self._put_event.wait() return self._get() def get_nowait(self): + """ + Remove and return an item from the queue. + + If queue is empty, raise QueueEmpty. + """ if self.empty(): raise QueueEmpty() return self._get() async def put(self, val): + """ + Put an item into the queue. + + If the queue is full, waits until a free + slot is available before adding item. + """ while self.full(): await self._get_event.wait() self._put(val) def put_nowait(self, val): + """ + Put an item into the queue. + + If the queue is full, raises QueueFull. + """ if self.full(): raise QueueFull() self._put(val) def qsize(self): + """ + Number of items in this queue. + """ return len(self._queue) def empty(self): + """ + Return True if the queue is empty. + """ return len(self._queue) == 0 def full(self): + """ + Return True if there are maxsize items in the queue. + """ return 0 < self.maxsize <= self.qsize() def task_done(self): - self._join_counter -= 1 + """ + Indicate that a formerly enqueued task is complete. + + If a join() is currently blocking, it will resume when all items have + been processed (meaning that a task_done() call was received for every + item that had been put() into the queue). - if self._join_counter <= 0: + Raises ValueError if called more times than there were items placed in + the queue. + """ + if self._join_counter == 0: # Can't have less than 0 - self._join_counter = 0 + raise ValueError("task_done() called too many times") + + self._join_counter -= 1 + + if self._join_counter == 0: self._join_event.set() async def join(self): + """ + Block until all items in the queue have been gotten and processed. + """ await self._join_event.wait() From 814a6c0a06690c66e4a5888fda25d632d29c9a39 Mon Sep 17 00:00:00 2001 From: James Ward Date: Sun, 29 Oct 2023 22:22:40 -0400 Subject: [PATCH 3/8] chore: add module docstring --- asyncio/queue.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/asyncio/queue.py b/asyncio/queue.py index ee45b24..345bc62 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -1,14 +1,16 @@ +""" +Exceptions and classes related to asyncio Queue implementations. +""" + from . import event class QueueEmpty(Exception): """Raised when Queue.get_nowait() is called on an empty Queue.""" - pass class QueueFull(Exception): """Raised when the Queue.put_nowait() method is called on a full Queue.""" - pass class Queue: From 1e93f4fedb9a7ddab9077c0269fb900e2b508acf Mon Sep 17 00:00:00 2001 From: James Ward Date: Sun, 29 Oct 2023 22:50:41 -0400 Subject: [PATCH 4/8] chore: run black --- asyncio/queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncio/queue.py b/asyncio/queue.py index 345bc62..1286aa4 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -76,7 +76,7 @@ def get_nowait(self): async def put(self, val): """ Put an item into the queue. - + If the queue is full, waits until a free slot is available before adding item. """ From 40c8f643395771f5b1db58586dfe0e56daa44f9f Mon Sep 17 00:00:00 2001 From: James Ward Date: Sun, 29 Oct 2023 23:12:17 -0400 Subject: [PATCH 5/8] chore: add license to queue --- asyncio/queue.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/asyncio/queue.py b/asyncio/queue.py index 1286aa4..d8fcaf6 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -1,3 +1,9 @@ +# SPDX-FileCopyrightText: 2019-2020 Damien P. George +# +# SPDX-License-Identifier: MIT +# +# MicroPython uasyncio module +# MIT license; Copyright (c) 2019-2020 Damien P. George """ Exceptions and classes related to asyncio Queue implementations. """ From 588fe6d67f29e0314c1ec02ce79595943b202bf5 Mon Sep 17 00:00:00 2001 From: James Ward Date: Wed, 1 Nov 2023 20:32:33 -0400 Subject: [PATCH 6/8] chore: refactor to back by TaskQueue instead of Event --- asyncio/queue.py | 49 +++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/asyncio/queue.py b/asyncio/queue.py index d8fcaf6..db0217f 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -8,7 +8,7 @@ Exceptions and classes related to asyncio Queue implementations. """ -from . import event +from . import core class QueueEmpty(Exception): @@ -19,6 +19,19 @@ class QueueFull(Exception): """Raised when the Queue.put_nowait() method is called on a full Queue.""" +async def _wait_on_task_queue(task_queue: core.TaskQueue): + task_queue.push(core.cur_task) + # Set calling task's data to the TaskQueue so it can be removed if needed + core.cur_task.data = task_queue + # Send control back + await core._never() + + +def _release_task_queue(task_queue: core.TaskQueue): + while task_queue.peek(): + core._task_queue.push(task_queue.pop()) + + class Queue: """ A queue, useful for coordinating producer and consumer coroutines. @@ -37,27 +50,21 @@ def __init__(self, maxsize=0): self._queue = [] - self._join_counter = 0 - self._join_event = event.Event() - self._join_event.set() + self._active_tasks = 0 - self._put_event = event.Event() - self._get_event = event.Event() + self._waiting_for_completion = core.TaskQueue() + self._waiting_for_put = core.TaskQueue() + self._waiting_for_get = core.TaskQueue() def _get(self): value = self._queue.pop(0) - self._get_event.set() - self._get_event.clear() + _release_task_queue(self._waiting_for_get) return value def _put(self, val): - self._join_counter += 1 - self._join_event.clear() - self._queue.append(val) - - self._put_event.set() - self._put_event.clear() + self._active_tasks += 1 + _release_task_queue(self._waiting_for_put) async def get(self): """ @@ -66,7 +73,7 @@ async def get(self): If queue is empty, wait until an item is available. """ while self.empty(): - await self._put_event.wait() + await _wait_on_task_queue(self._waiting_for_put) return self._get() def get_nowait(self): @@ -87,7 +94,7 @@ async def put(self, val): slot is available before adding item. """ while self.full(): - await self._get_event.wait() + await _wait_on_task_queue(self._waiting_for_get) self._put(val) def put_nowait(self, val): @@ -129,17 +136,17 @@ def task_done(self): Raises ValueError if called more times than there were items placed in the queue. """ - if self._join_counter == 0: + if self._active_tasks == 0: # Can't have less than 0 raise ValueError("task_done() called too many times") - self._join_counter -= 1 + self._active_tasks -= 1 - if self._join_counter == 0: - self._join_event.set() + if self._active_tasks == 0: + _release_task_queue(self._waiting_for_completion) async def join(self): """ Block until all items in the queue have been gotten and processed. """ - await self._join_event.wait() + await _wait_on_task_queue(self._waiting_for_completion) From 42db9e687123a713d1c692ce18bb8f0760395af8 Mon Sep 17 00:00:00 2001 From: James Ward Date: Wed, 1 Nov 2023 20:38:13 -0400 Subject: [PATCH 7/8] chore: disable pylint warnings for protected access this is the norm for accessing from core --- asyncio/queue.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/asyncio/queue.py b/asyncio/queue.py index db0217f..d88a751 100644 --- a/asyncio/queue.py +++ b/asyncio/queue.py @@ -4,6 +4,11 @@ # # MicroPython uasyncio module # MIT license; Copyright (c) 2019-2020 Damien P. George + +# The rest of the library assumes that `_never` and `_task_queue` should be imported from +# core, which angers pylint. +# pylint: disable=protected-access + """ Exceptions and classes related to asyncio Queue implementations. """ From f94cfb3a81feec72a928db0d0cfb7ad51a41513d Mon Sep 17 00:00:00 2001 From: James Ward Date: Fri, 3 Nov 2023 20:27:33 -0400 Subject: [PATCH 8/8] docs: add queue to the api docs --- docs/api.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/api.rst b/docs/api.rst index 2414146..f85b478 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -22,6 +22,9 @@ .. automodule:: asyncio.lock :members: +.. automodule:: asyncio.queue + :members: + .. automodule:: asyncio.stream :members: :exclude-members: stream_awrite