Skip to content

Commit

Permalink
Drop 3.7-style named task creation, use asyncio directly
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Vasilyev <[email protected]>
  • Loading branch information
nolar committed Oct 8, 2023
1 parent af05d4b commit 2f9e058
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 72 deletions.
4 changes: 1 addition & 3 deletions kopf/_cogs/aiokits/aioenums.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import time
from typing import Awaitable, Generator, Generic, Optional, TypeVar

from kopf._cogs.aiokits import aiotasks

FlagReasonT = TypeVar('FlagReasonT', bound=enum.Flag)


Expand Down Expand Up @@ -168,7 +166,7 @@ def __init__(self, waiter: AsyncFlagWaiter[FlagReasonT], *, timeout: Optional[fl
def __await__(self) -> Generator[None, None, AsyncFlagWaiter[FlagReasonT]]:
name = f"time-limited waiting for the daemon stopper {self._setter!r}"
coro = asyncio.wait_for(self._setter.async_event.wait(), timeout=self._timeout)
task = aiotasks.create_task(coro, name=name)
task = asyncio.create_task(coro, name=name)
try:
yield from task
except asyncio.TimeoutError:
Expand Down
26 changes: 7 additions & 19 deletions kopf/_cogs/aiokits/aiotasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
so there is no added overhead; instead, the implicit overhead is made explicit.
"""
import asyncio
import sys
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Collection, Coroutine, \
Generator, NamedTuple, Optional, Set, Tuple, TypeVar, Union
from typing import TYPE_CHECKING, Any, Callable, Collection, Coroutine, \
NamedTuple, Optional, Set, Tuple, TypeVar

from kopf._cogs.helpers import typedefs

Expand All @@ -27,17 +26,6 @@
Future = asyncio.Future
Task = asyncio.Task

# Accept `name=` always, but simulate it for Python 3.7 to do nothing.
if sys.version_info >= (3, 8):
create_task = asyncio.create_task
else:
def create_task(
coro: Union[Generator[Any, None, _T], Awaitable[_T]],
*,
name: Optional[str] = None, # noqa: W613 # pylint: disable=unused-argument
) -> Task:
return asyncio.create_task(coro)


async def cancel_coro(
coro: Coroutine[Any, Any, Any],
Expand Down Expand Up @@ -65,7 +53,7 @@ async def cancel_coro(
coro.close() # OR: coro.throw(asyncio.CancelledError())
except AttributeError:
# The official way is to create an extra task object, thus to waste some memory.
corotask = create_task(coro=coro, name=name)
corotask = asyncio.create_task(coro=coro, name=name)
corotask.cancel()
try:
await corotask
Expand Down Expand Up @@ -133,7 +121,7 @@ def create_guarded_task(
This is only a shortcut for named task creation (name is used in 2 places).
"""
return create_task(
return asyncio.create_task(
name=name,
coro=guard(
name=name,
Expand Down Expand Up @@ -303,8 +291,8 @@ def __init__(
self._pending_coros: asyncio.Queue[SchedulerJob] = asyncio.Queue()
self._running_tasks: Set[Task] = set()
self._cleaning_queue: asyncio.Queue[Task] = asyncio.Queue()
self._cleaning_task = create_task(self._task_cleaner(), name=f"task cleaner of {self!r}")
self._spawning_task = create_task(self._task_spawner(), name=f"task spawner of {self!r}")
self._cleaning_task = asyncio.create_task(self._task_cleaner(), name=f"cleaner of {self!r}")
self._spawning_task = asyncio.create_task(self._task_spawner(), name=f"spawner of {self!r}")

def empty(self) -> bool:
""" Check if the scheduler has nothing to do. """
Expand Down Expand Up @@ -371,7 +359,7 @@ async def _task_spawner(self) -> None:
# when they are finished --- to be awaited and released "passively".
while self._can_spawn():
coro, name = self._pending_coros.get_nowait() # guaranteed by the predicate
task = create_task(coro=coro, name=name)
task = asyncio.create_task(coro=coro, name=name)
task.add_done_callback(self._task_done_callback)
self._running_tasks.add(task)
if self._closed:
Expand Down
2 changes: 1 addition & 1 deletion kopf/_cogs/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async def streaming_block(
# Create the signalling future for when paused again.
operator_pause_waiter: aiotasks.Future
if operator_paused is not None:
operator_pause_waiter = aiotasks.create_task(
operator_pause_waiter = asyncio.create_task(
operator_paused.wait_for(True),
name=f"pause-waiter for {resource}")
else:
Expand Down
2 changes: 1 addition & 1 deletion kopf/_core/engines/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def spawn_daemons(
stopper=stopper, # for stopping (outside of causes)
handler=handler,
logger=loggers.LocalObjectLogger(body=cause.body, settings=settings),
task=aiotasks.create_task(_runner(
task=asyncio.create_task(_runner(
settings=settings,
daemons=daemons, # for self-garbage-collection
handler=handler,
Expand Down
9 changes: 4 additions & 5 deletions kopf/_core/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,17 @@ async def spawn_tasks(
posting.settings_var.set(settings)

# A few common background forever-running infrastructural tasks (irregular root tasks).
tasks.append(aiotasks.create_task(
tasks.append(asyncio.create_task(
name="stop-flag checker",
coro=_stop_flag_checker(
signal_flag=signal_flag,
stop_flag=stop_flag)))
tasks.append(aiotasks.create_task(
tasks.append(asyncio.create_task(
name="ultimate termination",
coro=_ultimate_termination(
settings=settings,
stop_flag=stop_flag)))
tasks.append(aiotasks.create_task(
tasks.append(asyncio.create_task(
name="startup/cleanup activities",
coro=_startup_cleanup_activities(
root_tasks=tasks, # used as a "live" view, populated later.
Expand Down Expand Up @@ -433,8 +433,7 @@ async def _stop_flag_checker(
if signal_flag is not None:
flags.append(signal_flag)
if stop_flag is not None:
flags.append(aiotasks.create_task(aioadapters.wait_flag(stop_flag),
name="stop-flag waiter"))
flags.append(asyncio.create_task(aioadapters.wait_flag(stop_flag), name="stop-flag waiter"))

# Wait until one of the stoppers is set/raised.
try:
Expand Down
24 changes: 0 additions & 24 deletions tests/utilities/aiotasks/test_task_creation.py

This file was deleted.

6 changes: 3 additions & 3 deletions tests/utilities/aiotasks/test_task_guarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from kopf._cogs.aiokits.aiotasks import create_guarded_task, create_task, reraise
from kopf._cogs.aiokits.aiotasks import create_guarded_task, reraise


class Error(Exception):
Expand Down Expand Up @@ -97,14 +97,14 @@ async def test_guard_waits_for_the_flag():


async def test_reraise_escalates_errors():
task = create_task(fail("boo!"))
task = asyncio.create_task(fail("boo!"))
await asyncio.wait([task], timeout=0.01) # let it start & react
with pytest.raises(Error):
await reraise([task])


async def test_reraise_skips_cancellations():
task = create_task(asyncio.Event().wait())
task = asyncio.create_task(asyncio.Event().wait())
done, pending = await asyncio.wait([task], timeout=0.01) # let it start
assert not done
task.cancel()
Expand Down
6 changes: 3 additions & 3 deletions tests/utilities/aiotasks/test_task_selection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio

from kopf._cogs.aiokits.aiotasks import all_tasks, create_task
from kopf._cogs.aiokits.aiotasks import all_tasks


async def test_alltasks_exclusion():
flag = asyncio.Event()
task1 = create_task(flag.wait())
task2 = create_task(flag.wait())
task1 = asyncio.create_task(flag.wait())
task2 = asyncio.create_task(flag.wait())
done, pending = await asyncio.wait([task1, task2], timeout=0.01)
assert not done

Expand Down
22 changes: 11 additions & 11 deletions tests/utilities/aiotasks/test_task_stopping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from kopf._cogs.aiokits.aiotasks import create_task, stop
from kopf._cogs.aiokits.aiotasks import stop


async def simple() -> None:
Expand Down Expand Up @@ -38,8 +38,8 @@ async def test_stop_with_no_tasks_when_quiet(assert_logs, caplog):
async def test_stop_immediately_with_finishing(assert_logs, caplog):
logger = logging.getLogger()
caplog.set_level(0)
task1 = create_task(simple())
task2 = create_task(simple())
task1 = asyncio.create_task(simple())
task2 = asyncio.create_task(simple())
done, pending = await stop([task1, task2], title='sample', logger=logger, cancelled=False)
assert done
assert not pending
Expand All @@ -51,8 +51,8 @@ async def test_stop_immediately_with_finishing(assert_logs, caplog):
async def test_stop_immediately_with_cancelling(assert_logs, caplog):
logger = logging.getLogger()
caplog.set_level(0)
task1 = create_task(simple())
task2 = create_task(simple())
task1 = asyncio.create_task(simple())
task2 = asyncio.create_task(simple())
done, pending = await stop([task1, task2], title='sample', logger=logger, cancelled=True)
assert done
assert not pending
Expand All @@ -65,9 +65,9 @@ async def test_stop_immediately_with_cancelling(assert_logs, caplog):
async def test_stop_iteratively(assert_logs, caplog, cancelled):
logger = logging.getLogger()
caplog.set_level(0)
task1 = create_task(simple())
task2 = create_task(stuck())
stask = create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled))
task1 = asyncio.create_task(simple())
task2 = asyncio.create_task(stuck())
stask = asyncio.create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled))

done, pending = await asyncio.wait({stask}, timeout=0.011)
assert not done
Expand All @@ -91,9 +91,9 @@ async def test_stop_iteratively(assert_logs, caplog, cancelled):
async def test_stop_itself_is_cancelled(assert_logs, caplog, cancelled):
logger = logging.getLogger()
caplog.set_level(0)
task1 = create_task(simple())
task2 = create_task(stuck())
stask = create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled))
task1 = asyncio.create_task(simple())
task2 = asyncio.create_task(stuck())
stask = asyncio.create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled))

done, pending = await asyncio.wait({stask}, timeout=0.011)
assert not done
Expand Down
4 changes: 2 additions & 2 deletions tests/utilities/aiotasks/test_task_waiting.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from kopf._cogs.aiokits.aiotasks import create_task, wait
from kopf._cogs.aiokits.aiotasks import wait


async def test_wait_with_no_tasks():
Expand All @@ -11,7 +11,7 @@ async def test_wait_with_no_tasks():

async def test_wait_with_timeout():
flag = asyncio.Event()
task = create_task(flag.wait())
task = asyncio.create_task(flag.wait())
done, pending = await wait([task], timeout=0.01)
assert not done
assert pending == {task}
Expand Down

0 comments on commit 2f9e058

Please sign in to comment.