Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if Task(Future) is canceled. #1377

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from

Conversation

fujitatomoya
Copy link
Collaborator

closes #1099

@@ -273,6 +273,27 @@ async def coroutine():
self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

def test_create_task_coroutine_cancel(self) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this change, all the other tests are passing but this additional one.

this test generates the following warning, eventually this test case fails... i spent some time to see what is going on here, but still not sure about this. requesting help for this.

colcon test --event-handlers console_direct+ --packages-select rclpy --ctest-args -R test_executor

PluggyTeardownRaisedWarning
13: ../../src/ros2/rclpy/rclpy/test/test_executor.py .......F............... [ 82%]
13: .....                                                                    [100%]
13:
13: =================================== FAILURES ===================================
13: ________________ TestExecutor.test_create_task_normal_function _________________
13:
13:     @pytest.hookimpl(hookwrapper=True, tryfirst=True)
13:     def pytest_runtest_call() -> Generator[None, None, None]:
13: >       yield from unraisable_exception_runtest_hook()
13:
13: /usr/lib/python3/dist-packages/_pytest/unraisableexception.py:88:
13: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
13:
13:     def unraisable_exception_runtest_hook() -> Generator[None, None, None]:
13:         with catch_unraisable_exception() as cm:
13:             yield
13:             if cm.unraisable:
13:                 if cm.unraisable.err_msg is not None:
13:                     err_msg = cm.unraisable.err_msg
13:                 else:
13:                     err_msg = "Exception ignored in"
13:                 msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
13:                 msg += "".join(
13:                     traceback.format_exception(
13:                         cm.unraisable.exc_type,
13:                         cm.unraisable.exc_value,
13:                         cm.unraisable.exc_traceback,
13:                     )
13:                 )
13: >               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
13: E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine at 0x7f198fffa980>
13: E
13: E               Traceback (most recent call last):
13: E                 File "/usr/lib/python3.12/warnings.py", line 553, in _warn_unawaited_coroutine
13: E                   warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
13: E               RuntimeWarning: coroutine 'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine' was never awaited
13:
13: /usr/lib/python3/dist-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning
13:
13: During handling of the above exception, another exception occurred:
13:
13: cls = <class '_pytest.runner.CallInfo'>
13: func = <function call_runtest_hook.<locals>.<lambda> at 0x7f198e051f80>
13: when = 'call'
13: reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)
13:
13:     @classmethod
13:     def from_call(
13:         cls,
13:         func: "Callable[[], TResult]",
13:         when: "Literal['collect', 'setup', 'call', 'teardown']",
13:         reraise: Optional[
13:             Union[Type[BaseException], Tuple[Type[BaseException], ...]]
13:         ] = None,
13:     ) -> "CallInfo[TResult]":
13:         """Call func, wrapping the result in a CallInfo.
13:
13:         :param func:
13:             The function to call. Called without arguments.
13:         :param when:
13:             The phase in which the function is called.
13:         :param reraise:
13:             Exception or exceptions that shall propagate if raised by the
13:             function, instead of being wrapped in the CallInfo.
13:         """
13:         excinfo = None
13:         start = timing.time()
13:         precise_start = timing.perf_counter()
13:         try:
13: >           result: Optional[TResult] = func()
13:
13: /usr/lib/python3/dist-packages/_pytest/runner.py:341:
13: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
13: /usr/lib/python3/dist-packages/_pytest/runner.py:262: in <lambda>
13:     lambda: ihook(item=item, **kwds), when=when, reraise=reraise
13: /usr/lib/python3/dist-packages/pluggy/_hooks.py:501: in __call__
13:     return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
13: /usr/lib/python3/dist-packages/pluggy/_manager.py:119: in _hookexec
13:     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
13: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
13:
13: hook_name = 'pytest_runtest_call'
13: hook_impl = <HookImpl plugin_name='unraisableexception', plugin=<module '_pytest.unraisableexception' from '/usr/lib/python3/dist-packages/_pytest/unraisableexception.py'>>
13: e = PytestUnraisableExceptionWarning('Exception ignored in: <coroutine object TestExecutor.test_create_task_coroutine_canc...\nRuntimeWarning: coroutine \'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine\' was never awaited\n')
13:
13:     def _warn_teardown_exception(
13:         hook_name: str, hook_impl: HookImpl, e: BaseException
13:     ) -> None:
13:         msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n"
13:         msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n"
13:         msg += f"{type(e).__name__}: {e}\n"
13:         msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning"  # noqa: E501
13: >       warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5)
13: E       pluggy.PluggyTeardownRaisedWarning: A plugin raised an exception during an old-style hookwrapper teardown.
13: E       Plugin: unraisableexception, Hook: pytest_runtest_call
13: E       PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine at 0x7f198fffa980>
13: E
13: E       Traceback (most recent call last):
13: E         File "/usr/lib/python3.12/warnings.py", line 553, in _warn_unawaited_coroutine
13: E           warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
13: E       RuntimeWarning: coroutine 'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine' was never awaited
13: E
13: E       For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning
13:
13: /usr/lib/python3/dist-packages/pluggy/_callers.py:49: PluggyTeardownRaisedWarning
13: - generated xml file: /root/ros2_ws/colcon_ws/build/rclpy/test_results/rclpy/test_executor.xunit.xml -
13: =========================== short test summary info ============================
13: FAILED ../../src/ros2/rclpy/rclpy/test/test_executor.py::TestExecutor::test_create_task_normal_function
13: ========================= 1 failed, 27 passed in 7.21s =========================
13: Exception ignored in: <function Executor.__del__ at 0x7f19977f7240>
13: Traceback (most recent call last):
13:   File "/root/ros2_ws/colcon_ws/src/ros2/rclpy/rclpy/rclpy/executors.py", line 262, in __del__
13:     self._sigint_gc.destroy()
13:   File "/root/ros2_ws/colcon_ws/src/ros2/rclpy/rclpy/rclpy/signals.py", line 70, in destroy
13:     with self.handle:
13: test_rclpy._rclpy_pybind11.InvalidHandle: cannot use Destroyable because destruction was requested
13: -- run_test.py: return code 1
13: -- run_test.py: verify result file '/root/ros2_ws/colcon_ws/build/rclpy/test_results/rclpy/test_executor.xunit.xml'
1/1 Test #13: test_executor ....................***Failed    7.82 sec

Copy link

@nadavelkabets nadavelkabets Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the test raised RuntimeWarning: coroutine 'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine' was never awaited.

Previously, because of the bug, a cancelled task was not removed from the executor and was never garbage collected.
After your change, future.cancel() causes the executor to drop the task and the coroutine is never yielded. Some time later, the coroutine object is garbage collected, and since the coroutine is not closed, the exception is raised.

The error "coroutine was never awaited" is common and can be easily replicated:

async def foo():
	pass

coro = foo()
coro.__del__()

RuntimeWarning: coroutine 'foo' was never awaited

Calling coro.close() solves this issue by throwing GeneratorExit into the coroutine https://docs.python.org/3/reference/datamodel.html#coroutine.close

There are the possible solutions I see:

  1. Copying asyncio and cancelling the task by throwing a CancelledError exception into the coroutine.
    The main benefit of this approach is the possibility of user code inside the coroutine to catch the exception and perform cleanup.
    https://github.com/python/cpython/blob/061e50f196373d920c3eaa3718b9d0553914e006/Lib/asyncio/tasks.py#L270-L273
    https://github.com/python/cpython/blob/061e50f196373d920c3eaa3718b9d0553914e006/Lib/asyncio/tasks.py#L283-L307
def cancel(self):
	try:
		if not self._done and iscoroutine(self._handle):
			self._handle.throw(CancelledError())
	finally:
		super().cancel()
  1. Calling coro.close()
def cancel(self):
	if not self._done and iscoroutine(self._handle):
		  self._handle.close()
	super().cancel()
  1. Might work but feels like a patch to me, also skips cleanup and I don't really like that.
warnings.filterwarnings(
	'ignore',
	message=r'^coroutine .* was never awaited$',
	category=RuntimeWarning)

Also, I see that you called asyncio.sleep(1) in your test. It doesn't really matter in this case since this portion of the code is never executed, but await asyncio.sleep(1) is not possible in rclpy, since the implementation calls for asyncio.get_running_loop() but no asyncio loop is initialized or running (asyncio.sleep(0) is a little different as it just yields).
https://github.com/python/cpython/blob/061e50f196373d920c3eaa3718b9d0553914e006/Lib/asyncio/tasks.py#L688-L705

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wholeheartedly agree with the first suggestion above ☝️ If we are implementing cancellation, I think we should follow asyncio's lead, as it would also help with #1098 . However, if we want to go the whole way, we'd also need to introduce a way to await subscriptions and timers (with e.g. an async iterator).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @haudren-woven on this. Creating usage close to asyncio's implementation would also create a better entry path for new ROS users/ developers. I personally very much enjoyed the CancelledError Exception approach when working with asyncio.

@@ -61,7 +61,7 @@ def __del__(self) -> None:

def __await__(self) -> Generator[None, None, Optional[T]]:
# Yield if the task is not finished
while not self._done:
while not self._done and not self._cancelled:
Copy link

@nadavelkabets nadavelkabets Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asyncio tasks work a little differently, they have 3 possible states and are considered done if they are not pending:

# States for Future.
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'

https://github.com/python/cpython/blob/ee0746af7d7cfc6cc25441726034e4fea4bcf7e5/Lib/asyncio/base_futures.py#L7-L10

  def done(self):
        """Return True if the future is done.

        Done means either that a result / exception are available, or that the
        future was cancelled.
        """
        return self._state != _PENDING

https://github.com/python/cpython/blob/403410fa1be036214efa7955127911e5592910db/Lib/asyncio/futures.py#L177-L183

@fujitatomoya what do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this 👍 In the current state, there are a number of duplicated checks to self._done and not self._cancelled or similar.

@nadavelkabets
Copy link

Please backport this bugfix into humble/jazzy.
Cancelling tasks is a crucial feature of any event loop, and for a project of mine I had to come up with complex and incomplete solutions to make that work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cancelling asynchronous tasks has no effect?
4 participants