From 3d77b5d326cfcbf800be7e99cccb798cd468dada Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 12 Jan 2021 18:54:00 -0300 Subject: [PATCH] Handle signals within the asyncio loop. (#476) Signed-off-by: Michel Hidalgo --- launch/launch/launch_service.py | 97 ++----- launch/launch/utilities/__init__.py | 10 +- launch/launch/utilities/signal_management.py | 256 ++++++++++-------- .../utilities/test_signal_management.py | 140 ++++++---- 4 files changed, 254 insertions(+), 249 deletions(-) diff --git a/launch/launch/launch_service.py b/launch/launch/launch_service.py index bd59a861d..182b50124 100644 --- a/launch/launch/launch_service.py +++ b/launch/launch/launch_service.py @@ -18,6 +18,7 @@ import collections.abc import contextlib import logging +import platform import signal import threading import traceback @@ -42,10 +43,7 @@ from .launch_description import LaunchDescription from .launch_description_entity import LaunchDescriptionEntity from .some_actions_type import SomeActionsType -from .utilities import install_signal_handlers -from .utilities import on_sigint -from .utilities import on_sigquit -from .utilities import on_sigterm +from .utilities import AsyncSafeSignalManager from .utilities import visit_all_entities_and_collect_futures @@ -62,11 +60,6 @@ def __init__( """ Create a LaunchService. - If called outside of the main-thread before the function - :func:`launch.utilities.install_signal_handlers()` has been called, - a ValueError can be raised, as setting signal handlers cannot be done - outside of the main-thread. - :param: argv stored in the context for access by the entities, None results in [] :param: noninteractive if True (not default), this service will assume it has no terminal associated e.g. it is being executed from a non interactive script @@ -80,10 +73,6 @@ def __init__( # Setup logging self.__logger = launch.logging.get_logger('launch') - # Install signal handlers if not already installed, will raise if not - # in main-thread, call manually in main-thread to avoid this. - install_signal_handlers() - # Setup context and register a built-in event handler for bootstrapping. self.__context = LaunchContext(argv=self.__argv, noninteractive=noninteractive) self.__context.register_event_handler(OnIncludeLaunchDescription()) @@ -197,12 +186,7 @@ def _prepare_run_loop(self): # Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT. sigint_received = False - def _on_sigint(signum, frame, prev_handler): - # Ignore additional signals until we finish processing this one. - current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - if current_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return + def _on_sigint(signum): nonlocal sigint_received base_msg = 'user interrupted with ctrl-c (SIGINT)' if not sigint_received: @@ -214,57 +198,25 @@ def _on_sigint(signum, frame, prev_handler): sigint_received = True else: self.__logger.warning('{} again, ignoring...'.format(base_msg)) - if callable(prev_handler): - try: - # Run pre-existing signal handler. - prev_handler(signum, frame) - except KeyboardInterrupt: - # Ignore exception. - pass - # Restore current signal handler (not necessarily this one). - signal.signal(signal.SIGINT, current_handler) - - on_sigint(_on_sigint) - - def _on_sigterm(signum, frame, prev_handler): - # Ignore additional signals until we finish processing this one. - current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) - if current_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return + + def _on_sigterm(signum): + signame = signal.Signals(signum).name + self.__logger.error( + 'user interrupted with ctrl-\\ ({}), terminating...'.format(signame)) # TODO(wjwwood): try to terminate running subprocesses before exiting. - self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes') + self.__logger.error('using {} can result in orphaned processes'.format(signame)) self.__logger.error('make sure no processes launched are still running') this_loop.call_soon(this_task.cancel) - if callable(prev_handler): - # Run pre-existing signal handler. - prev_handler(signum, frame) - # Restore current signal handler (not necessarily this one). - signal.signal(signal.SIGTERM, current_handler) - - on_sigterm(_on_sigterm) - - def _on_sigquit(signum, frame, prev_handler): - # Ignore additional signals until we finish processing this one. - current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN) - if current_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return - self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...') - _on_sigterm(signum, frame, prev_handler) - # Restore current signal handler (not necessarily this one). - signal.signal(signal.SIGQUIT, current_handler) - - on_sigquit(_on_sigquit) - - # Yield asyncio loop and current task. - yield self.__loop_from_run_thread, this_task - finally: - # Unset the signal handlers while not running. - on_sigint(None) - on_sigterm(None) - on_sigquit(None) + with AsyncSafeSignalManager(this_loop) as manager: + # Setup signal handlers + manager.handle(signal.SIGINT, _on_sigint) + manager.handle(signal.SIGTERM, _on_sigterm) + if platform.system() != 'Windows': + manager.handle(signal.SIGQUIT, _on_sigterm) + # Yield asyncio loop and current task. + yield this_loop, this_task + finally: # No matter what happens, unset the loop. with self.__loop_from_run_thread_lock: self.__context._set_asyncio_loop(None) @@ -309,9 +261,6 @@ async def run_async(self, *, shutdown_when_idle=True) -> int: This should only ever be run from the main thread and not concurrently with other asynchronous runs. - Note that custom signal handlers are set, and KeyboardInterrupt is caught and ignored - around the original signal handler. After the run ends, this behavior is undone. - :param: shutdown_when_idle if True (default), the service will shutdown when idle. """ # Make sure this has not been called from any thread but the main thread. @@ -397,14 +346,20 @@ def run(self, *, shutdown_when_idle=True) -> int: This should only ever be run from the main thread and not concurrently with asynchronous runs (see `run_async()` documentation). + Note that KeyboardInterrupt is caught and ignored, as signals are handled separately. + After the run ends, this behavior is undone. + :param: shutdown_when_idle if True (default), the service will shutdown when idle """ loop = osrf_pycommon.process_utils.get_loop() run_async_task = loop.create_task(self.run_async( shutdown_when_idle=shutdown_when_idle )) - loop.run_until_complete(run_async_task) - return run_async_task.result() + while True: + try: + return loop.run_until_complete(run_async_task) + except KeyboardInterrupt: + continue def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]: self.__shutting_down = True diff --git a/launch/launch/utilities/__init__.py b/launch/launch/utilities/__init__.py index e92baefb6..f3efb4de5 100644 --- a/launch/launch/utilities/__init__.py +++ b/launch/launch/utilities/__init__.py @@ -19,10 +19,7 @@ from .ensure_argument_type_impl import ensure_argument_type from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions from .perform_substitutions_impl import perform_substitutions -from .signal_management import install_signal_handlers -from .signal_management import on_sigint -from .signal_management import on_sigquit -from .signal_management import on_sigterm +from .signal_management import AsyncSafeSignalManager from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures __all__ = [ @@ -32,10 +29,7 @@ 'create_future', 'ensure_argument_type', 'perform_substitutions', - 'install_signal_handlers', - 'on_sigint', - 'on_sigquit', - 'on_sigterm', + 'AsyncSafeSignalManager', 'normalize_to_list_of_substitutions', 'visit_all_entities_and_collect_futures', ] diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index 333e3bc88..a144fc5fb 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -12,133 +12,159 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Module for the signal management functionality.""" +"""Module for signal management functionality.""" +import asyncio +import os import platform import signal +import socket import threading -import launch.logging +from typing import Callable +from typing import Optional +from typing import Tuple # noqa: F401 +from typing import Union -__signal_handlers_installed_lock = threading.Lock() -__signal_handlers_installed = False -__custom_sigint_handler = None -__custom_sigquit_handler = None -__custom_sigterm_handler = None - -def on_sigint(handler): - """ - Set the signal handler to be called on SIGINT. - - Pass None for no custom handler. - - install_signal_handlers() must have been called in the main thread before. - It is called automatically by the constructor of `launch.LaunchService`. - """ - global __custom_sigint_handler - if handler is not None and not callable(handler): - raise ValueError('handler must be callable or None') - __custom_sigint_handler = handler - - -def on_sigquit(handler): - """ - Set the signal handler to be called on SIGQUIT. - - Note Windows does not have SIGQUIT, so it can be set with this function, - but the handler will not be called. - - Pass None for no custom handler. - - install_signal_handlers() must have been called in the main thread before. - It is called automatically by the constructor of `launch.LaunchService`. - """ - global __custom_sigquit_handler - if handler is not None and not callable(handler): - raise ValueError('handler must be callable or None') - __custom_sigquit_handler = handler - - -def on_sigterm(handler): - """ - Set the signal handler to be called on SIGTERM. - - Pass None for no custom handler. - - install_signal_handlers() must have been called in the main thread before. - It is called automatically by the constructor of `launch.LaunchService`. +def is_winsock_handle(fd): + """Check if the given file descriptor is WinSock handle.""" + if platform.system() != 'Windows': + return False + try: + # On Windows, WinSock handles and regular file handles + # have disjoint APIs. This test leverages the fact that + # attempting to get an MSVC runtime file handle from a + # WinSock handle will fail. + import msvcrt + msvcrt.get_osfhandle(fd) + return False + except OSError: + return True + + +class AsyncSafeSignalManager: """ - global __custom_sigterm_handler - if handler is not None and not callable(handler): - raise ValueError('handler must be callable or None') - __custom_sigterm_handler = handler + A context manager class for asynchronous handling of signals. + Similar in purpose to :func:`asyncio.loop.add_signal_handler` but + not limited to Unix platforms. -def install_signal_handlers(): - """ - Install custom signal handlers so that hooks can be setup from other threads. + Signal handlers can be registered at any time with a given manager. + These will become active for the extent of said manager context. + Unlike regular signal handlers, asynchronous signals handlers + can safely interact with their event loop. - Calling this multiple times does not fail, but the signals are only - installed once. + The same manager can be used multiple consecutive times and even + be nested with other managers, as these are independent from each + other i.e. managers do not override each other's handlers. - If called outside of the main-thread, a ValueError is raised, see: - https://docs.python.org/3.6/library/signal.html#signal.signal + If used outside of the main thread, a ValueError is raised. - Also, if you register your own signal handlers after calling this function, - then you should store and forward to the existing signal handlers, because - otherwise the signal handlers registered by on_sigint(), on_sigquit, etc. - will not be run. - And the signal handlers registered with those functions are used to - gracefully exit the LaunchService when signaled (at least), and without - them it may not behave correctly. - - If you register signal handlers before calling this function, then your - signal handler will automatically be called by the signal handlers in this - thread. + The underlying mechanism is built around :func:`signal.set_wakeup_fd` + so as to not interfere with regular handlers installed via + :func:`signal.signal`. + All signals received are forwarded to the previously setup file + descriptor, if any. """ - global __signal_handlers_installed_lock, __signal_handlers_installed - with __signal_handlers_installed_lock: - if __signal_handlers_installed: - return - __signal_handlers_installed = True - - global __custom_sigint_handler, __custom_sigquit_handler, __custom_sigterm_handler - - __original_sigint_handler = signal.getsignal(signal.SIGINT) - __original_sigterm_handler = signal.getsignal(signal.SIGTERM) - def __on_sigint(signum, frame): - if callable(__custom_sigint_handler): - __custom_sigint_handler(signum, frame, __original_sigint_handler) - elif callable(__original_sigint_handler): - __original_sigint_handler(signum, frame) - - if platform.system() != 'Windows': - # Windows does not support SIGQUIT - __original_sigquit_handler = signal.getsignal(signal.SIGQUIT) - - def __on_sigquit(signum, frame): - if callable(__custom_sigquit_handler): - __custom_sigquit_handler(signum, frame, __original_sigquit_handler) - elif callable(__original_sigquit_handler): - __original_sigquit_handler(signum, frame) - - def __on_sigterm(signum, frame): - if callable(__custom_sigterm_handler): - __custom_sigterm_handler(signum, frame, __original_sigterm_handler) - elif callable(__original_sigterm_handler): - __original_sigterm_handler(signum, frame) - - # signals must be registered in the main thread, but print a nicer message if we're not there - try: - signal.signal(signal.SIGINT, __on_sigint) - signal.signal(signal.SIGTERM, __on_sigterm) - if platform.system() != 'Windows': - # Windows does not support SIGQUIT - signal.signal(signal.SIGQUIT, __on_sigquit) - except ValueError: - logger = launch.logging.get_logger(__name__) - logger.error("failed to set signal handlers in '{}'".format(__name__)) - logger.error('this function must be called in the main thread') - raise + def __init__( + self, + loop: asyncio.AbstractEventLoop + ): + """ + Instantiate manager. + + :param loop: event loop that will handle the signals. + """ + self.__loop = loop # type: asyncio.AbstractEventLoop + self.__background_loop = None # type: Optional[asyncio.AbstractEventLoop] + self.__handlers = {} # type: dict + self.__prev_wakeup_handle = -1 # type: Union[int, socket.socket] + self.__wsock, self.__rsock = socket.socketpair() # type: Tuple[socket.socket, socket.socket] # noqa + self.__wsock.setblocking(False) + self.__rsock.setblocking(False) + + def __enter__(self): + try: + self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal) + except NotImplementedError: + # Some event loops, like the asyncio.ProactorEventLoop + # on Windows, do not support asynchronous socket reads. + # So we emulate it. + self.__background_loop = asyncio.SelectorEventLoop() + self.__background_loop.add_reader( + self.__rsock.fileno(), + self.__loop.call_soon_threadsafe, + self.__handle_signal) + + def run_background_loop(): + asyncio.set_event_loop(self.__background_loop) + self.__background_loop.run_forever() + + self.__background_thread = threading.Thread(target=run_background_loop) + self.__background_thread.start() + self.__prev_wakeup_handle = signal.set_wakeup_fd(self.__wsock.fileno()) + if self.__prev_wakeup_handle != -1 and is_winsock_handle(self.__prev_wakeup_handle): + # On Windows, os.write will fail on a WinSock handle. There is no WinSock API + # in the standard library either. Thus we wrap it in a socket.socket instance. + self.__prev_wakeup_handle = socket.socket(fileno=self.__prev_wakeup_handle) + return self + + def __exit__(self, type_, value, traceback): + if isinstance(self.__prev_wakeup_handle, socket.socket): + # Detach (Windows) socket and retrieve the raw OS handle. + prev_wakeup_handle = self.__prev_wakeup_handle.fileno() + self.__prev_wakeup_handle.detach() + self.__prev_wakeup_handle = prev_wakeup_handle + assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wakeup_handle) + if self.__background_loop: + self.__background_loop.call_soon_threadsafe(self.__background_loop.stop) + self.__background_thread.join() + self.__background_loop.close() + else: + self.__loop.remove_reader(self.__rsock.fileno()) + + def __handle_signal(self): + while True: + try: + data = self.__rsock.recv(4096) + if not data: + break + for signum in data: + if signum not in self.__handlers: + continue + self.__handlers[signum](signum) + if self.__prev_wakeup_handle != -1: + # Send over (Windows) socket or write file. + if isinstance(self.__prev_wakeup_handle, socket.socket): + self.__prev_wakeup_handle.send(data) + else: + os.write(self.__prev_wakeup_handle, data) + except InterruptedError: + continue + except BlockingIOError: + break + + def handle( + self, + signum: Union[signal.Signals, int], + handler: Optional[Callable[[int], None]], + ) -> Optional[Callable[[int], None]]: + """ + Register a callback for asynchronous handling of a given signal. + + :param signum: number of the signal to be handled + :param handler: callback taking a signal number + as its sole argument, or None + :return: previous handler if any, otherwise None + """ + signum = signal.Signals(signum) + if handler is not None: + if not callable(handler): + raise ValueError('signal handler must be a callable') + old_handler = self.__handlers.get(signum, None) + self.__handlers[signum] = handler + else: + old_handler = self.__handlers.pop(signum, None) + return old_handler diff --git a/launch/test/launch/utilities/test_signal_management.py b/launch/test/launch/utilities/test_signal_management.py index 7fc6ad76f..92d504cc8 100644 --- a/launch/test/launch/utilities/test_signal_management.py +++ b/launch/test/launch/utilities/test_signal_management.py @@ -14,64 +14,94 @@ """Tests for the signal_management module.""" -from launch.utilities import install_signal_handlers, on_sigint, on_sigquit, on_sigterm +import asyncio +import functools +import platform +import signal -import pytest +from launch.utilities import AsyncSafeSignalManager +import osrf_pycommon.process_utils -def test_install_signal_handlers(): - """Test the install_signal_handlers() function.""" - install_signal_handlers() - install_signal_handlers() - install_signal_handlers() - -def test_on_sigint(): - """Test the on_sigint() function.""" - # None is acceptable - on_sigint(None) - - def mock_sigint_handler(): - pass - - on_sigint(mock_sigint_handler) - - # Non-callable is not - with pytest.raises(ValueError): - on_sigint('non-callable') - - # TODO(jacobperron): implement a functional test by using subprocess.Popen - - -def test_on_sigquit(): - """Test the on_sigquit() function.""" - # None is acceptable - on_sigquit(None) - - def mock_sigquit_handler(): +def cap_signals(*signals): + def _noop(*args): pass - on_sigquit(mock_sigquit_handler) - - # Non-callable is not - with pytest.raises(ValueError): - on_sigquit('non-callable') - - # TODO(jacobperron): implement a functional test by using subprocess.Popen - - -def test_on_sigterm(): - """Test the on_sigterm() function.""" - # None is acceptable - on_sigterm(None) - - def mock_sigterm_handler(): - pass - - on_sigterm(mock_sigterm_handler) - - # Non-callable is not - with pytest.raises(ValueError): - on_sigterm('non-callable') - - # TODO(jacobperron): implement a functional test by using subprocess.Popen + def _decorator(func): + @functools.wraps(func) + def _wrapper(*args, **kwargs): + handlers = {} + try: + for s in signals: + handlers[s] = signal.signal(s, _noop) + return func(*args, **kwargs) + finally: + assert all(signal.signal(s, h) is _noop for s, h in handlers.items()) + return _wrapper + + return _decorator + + +if platform.system() == 'Windows': + # NOTE(hidmic): this is risky, but we have few options. + SIGNAL = signal.SIGINT + ANOTHER_SIGNAL = signal.SIGBREAK +else: + SIGNAL = signal.SIGUSR1 + ANOTHER_SIGNAL = signal.SIGUSR2 + + +@cap_signals(SIGNAL, ANOTHER_SIGNAL) +def test_async_safe_signal_manager(): + """Test AsyncSafeSignalManager class.""" + loop = osrf_pycommon.process_utils.get_loop() + + manager = AsyncSafeSignalManager(loop) + + # Register signal handler outside context + got_signal = asyncio.Future(loop=loop) + manager.handle(SIGNAL, got_signal.set_result) + + # Signal handling is active within context + with manager: + # Register signal handler within context + got_another_signal = asyncio.Future(loop=loop) + manager.handle(ANOTHER_SIGNAL, got_another_signal.set_result) + + # Verify signal handling is working + loop.call_soon(signal.raise_signal, SIGNAL) + loop.run_until_complete(asyncio.wait( + [got_signal, got_another_signal], + return_when=asyncio.FIRST_COMPLETED, + timeout=1.0 + )) + assert got_signal.done() + assert got_signal.result() == SIGNAL + assert not got_another_signal.done() + + # Unregister signal handler within context + manager.handle(SIGNAL, None) + + # Verify signal handler is no longer there + loop.call_soon(signal.raise_signal, SIGNAL) + loop.run_until_complete(asyncio.wait( + [got_another_signal], timeout=1.0 + )) + assert not got_another_signal.done() + + # Signal handling is (now) inactive outside context + loop.call_soon(signal.raise_signal, ANOTHER_SIGNAL) + loop.run_until_complete(asyncio.wait( + [got_another_signal], timeout=1.0 + )) + assert not got_another_signal.done() + + # Managers' context may be re-entered + with manager: + loop.call_soon(signal.raise_signal, ANOTHER_SIGNAL) + loop.run_until_complete(asyncio.wait( + [got_another_signal], timeout=1.0 + )) + assert got_another_signal.done() + assert got_another_signal.result() == ANOTHER_SIGNAL