From 2f9e058d3ae129f4deef6b68fcd77c2cf4100780 Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sun, 8 Oct 2023 22:35:59 +0200 Subject: [PATCH] Drop 3.7-style named task creation, use asyncio directly Signed-off-by: Sergey Vasilyev --- kopf/_cogs/aiokits/aioenums.py | 4 +-- kopf/_cogs/aiokits/aiotasks.py | 26 +++++-------------- kopf/_cogs/clients/watching.py | 2 +- kopf/_core/engines/daemons.py | 2 +- kopf/_core/reactor/running.py | 9 +++---- .../utilities/aiotasks/test_task_creation.py | 24 ----------------- .../utilities/aiotasks/test_task_guarding.py | 6 ++--- .../utilities/aiotasks/test_task_selection.py | 6 ++--- .../utilities/aiotasks/test_task_stopping.py | 22 ++++++++-------- tests/utilities/aiotasks/test_task_waiting.py | 4 +-- 10 files changed, 33 insertions(+), 72 deletions(-) delete mode 100644 tests/utilities/aiotasks/test_task_creation.py diff --git a/kopf/_cogs/aiokits/aioenums.py b/kopf/_cogs/aiokits/aioenums.py index a36d2809..4051830d 100644 --- a/kopf/_cogs/aiokits/aioenums.py +++ b/kopf/_cogs/aiokits/aioenums.py @@ -4,8 +4,6 @@ import time from typing import Awaitable, Generator, Generic, Optional, TypeVar -from kopf._cogs.aiokits import aiotasks - FlagReasonT = TypeVar('FlagReasonT', bound=enum.Flag) @@ -168,7 +166,7 @@ def __init__(self, waiter: AsyncFlagWaiter[FlagReasonT], *, timeout: Optional[fl def __await__(self) -> Generator[None, None, AsyncFlagWaiter[FlagReasonT]]: name = f"time-limited waiting for the daemon stopper {self._setter!r}" coro = asyncio.wait_for(self._setter.async_event.wait(), timeout=self._timeout) - task = aiotasks.create_task(coro, name=name) + task = asyncio.create_task(coro, name=name) try: yield from task except asyncio.TimeoutError: diff --git a/kopf/_cogs/aiokits/aiotasks.py b/kopf/_cogs/aiokits/aiotasks.py index fbeaf117..69ab4863 100644 --- a/kopf/_cogs/aiokits/aiotasks.py +++ b/kopf/_cogs/aiokits/aiotasks.py @@ -10,9 +10,8 @@ so there is no added overhead; instead, the implicit overhead is made explicit. """ import asyncio -import sys -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Collection, Coroutine, \ - Generator, NamedTuple, Optional, Set, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Any, Callable, Collection, Coroutine, \ + NamedTuple, Optional, Set, Tuple, TypeVar from kopf._cogs.helpers import typedefs @@ -27,17 +26,6 @@ Future = asyncio.Future Task = asyncio.Task -# Accept `name=` always, but simulate it for Python 3.7 to do nothing. -if sys.version_info >= (3, 8): - create_task = asyncio.create_task -else: - def create_task( - coro: Union[Generator[Any, None, _T], Awaitable[_T]], - *, - name: Optional[str] = None, # noqa: W613 # pylint: disable=unused-argument - ) -> Task: - return asyncio.create_task(coro) - async def cancel_coro( coro: Coroutine[Any, Any, Any], @@ -65,7 +53,7 @@ async def cancel_coro( coro.close() # OR: coro.throw(asyncio.CancelledError()) except AttributeError: # The official way is to create an extra task object, thus to waste some memory. - corotask = create_task(coro=coro, name=name) + corotask = asyncio.create_task(coro=coro, name=name) corotask.cancel() try: await corotask @@ -133,7 +121,7 @@ def create_guarded_task( This is only a shortcut for named task creation (name is used in 2 places). """ - return create_task( + return asyncio.create_task( name=name, coro=guard( name=name, @@ -303,8 +291,8 @@ def __init__( self._pending_coros: asyncio.Queue[SchedulerJob] = asyncio.Queue() self._running_tasks: Set[Task] = set() self._cleaning_queue: asyncio.Queue[Task] = asyncio.Queue() - self._cleaning_task = create_task(self._task_cleaner(), name=f"task cleaner of {self!r}") - self._spawning_task = create_task(self._task_spawner(), name=f"task spawner of {self!r}") + self._cleaning_task = asyncio.create_task(self._task_cleaner(), name=f"cleaner of {self!r}") + self._spawning_task = asyncio.create_task(self._task_spawner(), name=f"spawner of {self!r}") def empty(self) -> bool: """ Check if the scheduler has nothing to do. """ @@ -371,7 +359,7 @@ async def _task_spawner(self) -> None: # when they are finished --- to be awaited and released "passively". while self._can_spawn(): coro, name = self._pending_coros.get_nowait() # guaranteed by the predicate - task = create_task(coro=coro, name=name) + task = asyncio.create_task(coro=coro, name=name) task.add_done_callback(self._task_done_callback) self._running_tasks.add(task) if self._closed: diff --git a/kopf/_cogs/clients/watching.py b/kopf/_cogs/clients/watching.py index 3144ccee..d16d4ba8 100644 --- a/kopf/_cogs/clients/watching.py +++ b/kopf/_cogs/clients/watching.py @@ -145,7 +145,7 @@ async def streaming_block( # Create the signalling future for when paused again. operator_pause_waiter: aiotasks.Future if operator_paused is not None: - operator_pause_waiter = aiotasks.create_task( + operator_pause_waiter = asyncio.create_task( operator_paused.wait_for(True), name=f"pause-waiter for {resource}") else: diff --git a/kopf/_core/engines/daemons.py b/kopf/_core/engines/daemons.py index 6d1de9ad..2302eb62 100644 --- a/kopf/_core/engines/daemons.py +++ b/kopf/_core/engines/daemons.py @@ -99,7 +99,7 @@ async def spawn_daemons( stopper=stopper, # for stopping (outside of causes) handler=handler, logger=loggers.LocalObjectLogger(body=cause.body, settings=settings), - task=aiotasks.create_task(_runner( + task=asyncio.create_task(_runner( settings=settings, daemons=daemons, # for self-garbage-collection handler=handler, diff --git a/kopf/_core/reactor/running.py b/kopf/_core/reactor/running.py index a8a7102b..96186ef8 100644 --- a/kopf/_core/reactor/running.py +++ b/kopf/_core/reactor/running.py @@ -219,17 +219,17 @@ async def spawn_tasks( posting.settings_var.set(settings) # A few common background forever-running infrastructural tasks (irregular root tasks). - tasks.append(aiotasks.create_task( + tasks.append(asyncio.create_task( name="stop-flag checker", coro=_stop_flag_checker( signal_flag=signal_flag, stop_flag=stop_flag))) - tasks.append(aiotasks.create_task( + tasks.append(asyncio.create_task( name="ultimate termination", coro=_ultimate_termination( settings=settings, stop_flag=stop_flag))) - tasks.append(aiotasks.create_task( + tasks.append(asyncio.create_task( name="startup/cleanup activities", coro=_startup_cleanup_activities( root_tasks=tasks, # used as a "live" view, populated later. @@ -433,8 +433,7 @@ async def _stop_flag_checker( if signal_flag is not None: flags.append(signal_flag) if stop_flag is not None: - flags.append(aiotasks.create_task(aioadapters.wait_flag(stop_flag), - name="stop-flag waiter")) + flags.append(asyncio.create_task(aioadapters.wait_flag(stop_flag), name="stop-flag waiter")) # Wait until one of the stoppers is set/raised. try: diff --git a/tests/utilities/aiotasks/test_task_creation.py b/tests/utilities/aiotasks/test_task_creation.py deleted file mode 100644 index 5c35f97b..00000000 --- a/tests/utilities/aiotasks/test_task_creation.py +++ /dev/null @@ -1,24 +0,0 @@ -import asyncio - -import pytest - -from kopf._cogs.aiokits.aiotasks import create_task - - -async def sample() -> None: - pass - - -@pytest.mark.skipif('sys.version_info < (3, 8)') -def test_py38_create_task_is_the_native_one(): - assert create_task is asyncio.create_task - - -@pytest.mark.skipif('sys.version_info >= (3, 8)') -async def test_py37_create_task_accepts_name(mocker): - real_create_task = mocker.patch('asyncio.create_task') - coro = sample() - task = create_task(coro, name='unused') - assert real_create_task.called - assert task is real_create_task.return_value - await coro # to prevent "never awaited" errors diff --git a/tests/utilities/aiotasks/test_task_guarding.py b/tests/utilities/aiotasks/test_task_guarding.py index 4a3e0265..239c1393 100644 --- a/tests/utilities/aiotasks/test_task_guarding.py +++ b/tests/utilities/aiotasks/test_task_guarding.py @@ -3,7 +3,7 @@ import pytest -from kopf._cogs.aiokits.aiotasks import create_guarded_task, create_task, reraise +from kopf._cogs.aiokits.aiotasks import create_guarded_task, reraise class Error(Exception): @@ -97,14 +97,14 @@ async def test_guard_waits_for_the_flag(): async def test_reraise_escalates_errors(): - task = create_task(fail("boo!")) + task = asyncio.create_task(fail("boo!")) await asyncio.wait([task], timeout=0.01) # let it start & react with pytest.raises(Error): await reraise([task]) async def test_reraise_skips_cancellations(): - task = create_task(asyncio.Event().wait()) + task = asyncio.create_task(asyncio.Event().wait()) done, pending = await asyncio.wait([task], timeout=0.01) # let it start assert not done task.cancel() diff --git a/tests/utilities/aiotasks/test_task_selection.py b/tests/utilities/aiotasks/test_task_selection.py index a3bf17b8..8de7d9e1 100644 --- a/tests/utilities/aiotasks/test_task_selection.py +++ b/tests/utilities/aiotasks/test_task_selection.py @@ -1,12 +1,12 @@ import asyncio -from kopf._cogs.aiokits.aiotasks import all_tasks, create_task +from kopf._cogs.aiokits.aiotasks import all_tasks async def test_alltasks_exclusion(): flag = asyncio.Event() - task1 = create_task(flag.wait()) - task2 = create_task(flag.wait()) + task1 = asyncio.create_task(flag.wait()) + task2 = asyncio.create_task(flag.wait()) done, pending = await asyncio.wait([task1, task2], timeout=0.01) assert not done diff --git a/tests/utilities/aiotasks/test_task_stopping.py b/tests/utilities/aiotasks/test_task_stopping.py index 3717bd46..c61bfe9c 100644 --- a/tests/utilities/aiotasks/test_task_stopping.py +++ b/tests/utilities/aiotasks/test_task_stopping.py @@ -3,7 +3,7 @@ import pytest -from kopf._cogs.aiokits.aiotasks import create_task, stop +from kopf._cogs.aiokits.aiotasks import stop async def simple() -> None: @@ -38,8 +38,8 @@ async def test_stop_with_no_tasks_when_quiet(assert_logs, caplog): async def test_stop_immediately_with_finishing(assert_logs, caplog): logger = logging.getLogger() caplog.set_level(0) - task1 = create_task(simple()) - task2 = create_task(simple()) + task1 = asyncio.create_task(simple()) + task2 = asyncio.create_task(simple()) done, pending = await stop([task1, task2], title='sample', logger=logger, cancelled=False) assert done assert not pending @@ -51,8 +51,8 @@ async def test_stop_immediately_with_finishing(assert_logs, caplog): async def test_stop_immediately_with_cancelling(assert_logs, caplog): logger = logging.getLogger() caplog.set_level(0) - task1 = create_task(simple()) - task2 = create_task(simple()) + task1 = asyncio.create_task(simple()) + task2 = asyncio.create_task(simple()) done, pending = await stop([task1, task2], title='sample', logger=logger, cancelled=True) assert done assert not pending @@ -65,9 +65,9 @@ async def test_stop_immediately_with_cancelling(assert_logs, caplog): async def test_stop_iteratively(assert_logs, caplog, cancelled): logger = logging.getLogger() caplog.set_level(0) - task1 = create_task(simple()) - task2 = create_task(stuck()) - stask = create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled)) + task1 = asyncio.create_task(simple()) + task2 = asyncio.create_task(stuck()) + stask = asyncio.create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled)) done, pending = await asyncio.wait({stask}, timeout=0.011) assert not done @@ -91,9 +91,9 @@ async def test_stop_iteratively(assert_logs, caplog, cancelled): async def test_stop_itself_is_cancelled(assert_logs, caplog, cancelled): logger = logging.getLogger() caplog.set_level(0) - task1 = create_task(simple()) - task2 = create_task(stuck()) - stask = create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled)) + task1 = asyncio.create_task(simple()) + task2 = asyncio.create_task(stuck()) + stask = asyncio.create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled)) done, pending = await asyncio.wait({stask}, timeout=0.011) assert not done diff --git a/tests/utilities/aiotasks/test_task_waiting.py b/tests/utilities/aiotasks/test_task_waiting.py index 75949d5d..3a76ed84 100644 --- a/tests/utilities/aiotasks/test_task_waiting.py +++ b/tests/utilities/aiotasks/test_task_waiting.py @@ -1,6 +1,6 @@ import asyncio -from kopf._cogs.aiokits.aiotasks import create_task, wait +from kopf._cogs.aiokits.aiotasks import wait async def test_wait_with_no_tasks(): @@ -11,7 +11,7 @@ async def test_wait_with_no_tasks(): async def test_wait_with_timeout(): flag = asyncio.Event() - task = create_task(flag.wait()) + task = asyncio.create_task(flag.wait()) done, pending = await wait([task], timeout=0.01) assert not done assert pending == {task}