From e88ad298fc0b5d22f97a89f28c24ee4ff9487336 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 16 Dec 2020 19:16:18 -0300 Subject: [PATCH 1/9] Handle signals within the asyncio loop. Similar to asyncio._UnixSelectorEventLoop.add_signal_handler() implementation, but not limited to Unix platforms. Signed-off-by: Michel Hidalgo --- launch/launch/launch_service.py | 84 +++------ launch/launch/utilities/__init__.py | 10 +- launch/launch/utilities/signal_management.py | 171 +++++-------------- 3 files changed, 69 insertions(+), 196 deletions(-) diff --git a/launch/launch/launch_service.py b/launch/launch/launch_service.py index 73395d8ff..455d777f3 100644 --- a/launch/launch/launch_service.py +++ b/launch/launch/launch_service.py @@ -42,10 +42,7 @@ from .launch_description import LaunchDescription from .launch_description_entity import LaunchDescriptionEntity from .some_actions_type import SomeActionsType -from .utilities import install_signal_handlers -from .utilities import on_sigint -from .utilities import on_sigquit -from .utilities import on_sigterm +from .utilities import AsyncSafeSignalManager from .utilities import visit_all_entities_and_collect_futures @@ -77,10 +74,6 @@ def __init__( # Setup logging self.__logger = launch.logging.get_logger('launch') - # Install signal handlers if not already installed, will raise if not - # in main-thread, call manually in main-thread to avoid this. - install_signal_handlers() - # Setup context and register a built-in event handler for bootstrapping. self.__context = LaunchContext(argv=self.__argv) self.__context.register_event_handler(OnIncludeLaunchDescription()) @@ -194,12 +187,7 @@ def _prepare_run_loop(self): # Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT. sigint_received = False - def _on_sigint(signum, frame, prev_handler): - # Ignore additional signals until we finish processing this one. - current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - if current_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return + def _on_sigint(signum): nonlocal sigint_received base_msg = 'user interrupted with ctrl-c (SIGINT)' if not sigint_received: @@ -211,57 +199,24 @@ def _on_sigint(signum, frame, prev_handler): sigint_received = True else: self.__logger.warning('{} again, ignoring...'.format(base_msg)) - if callable(prev_handler): - try: - # Run pre-existing signal handler. - prev_handler(signum, frame) - except KeyboardInterrupt: - # Ignore exception. - pass - # Restore current signal handler (not necessarily this one). - signal.signal(signal.SIGINT, current_handler) - - on_sigint(_on_sigint) - - def _on_sigterm(signum, frame, prev_handler): - # Ignore additional signals until we finish processing this one. - current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) - if current_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return + + def _on_sigterm(signum): + signame = signal.Signals(signum).name + self.__logger.error( + 'user interrupted with ctrl-\\ ({}), terminating...'.format(signame)) # TODO(wjwwood): try to terminate running subprocesses before exiting. - self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes') + self.__logger.error('using {} can result in orphaned processes'.format(signame)) self.__logger.error('make sure no processes launched are still running') this_loop.call_soon(this_task.cancel) - if callable(prev_handler): - # Run pre-existing signal handler. - prev_handler(signum, frame) - # Restore current signal handler (not necessarily this one). - signal.signal(signal.SIGTERM, current_handler) - - on_sigterm(_on_sigterm) - - def _on_sigquit(signum, frame, prev_handler): - # Ignore additional signals until we finish processing this one. - current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN) - if current_handler is signal.SIG_IGN: - # This function has been called re-entrantly. - return - self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...') - _on_sigterm(signum, frame, prev_handler) - # Restore current signal handler (not necessarily this one). - signal.signal(signal.SIGQUIT, current_handler) - - on_sigquit(_on_sigquit) - - # Yield asyncio loop and current task. - yield self.__loop_from_run_thread, this_task - finally: - # Unset the signal handlers while not running. - on_sigint(None) - on_sigterm(None) - on_sigquit(None) + with AsyncSafeSignalManager(this_loop) as manager: + # Setup signal handlers + manager.handle(signal.SIGINT, _on_sigint) + manager.handle(signal.SIGTERM, _on_sigterm) + manager.handle(signal.SIGQUIT, _on_sigterm) + # Yield asyncio loop and current task. + yield this_loop, this_task + finally: # No matter what happens, unset the loop. with self.__loop_from_run_thread_lock: self.__context._set_asyncio_loop(None) @@ -400,8 +355,11 @@ def run(self, *, shutdown_when_idle=True) -> int: run_async_task = loop.create_task(self.run_async( shutdown_when_idle=shutdown_when_idle )) - loop.run_until_complete(run_async_task) - return run_async_task.result() + while True: + try: + return loop.run_until_complete(run_async_task) + except KeyboardInterrupt: + continue def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]: self.__shutting_down = True diff --git a/launch/launch/utilities/__init__.py b/launch/launch/utilities/__init__.py index e92baefb6..f3efb4de5 100644 --- a/launch/launch/utilities/__init__.py +++ b/launch/launch/utilities/__init__.py @@ -19,10 +19,7 @@ from .ensure_argument_type_impl import ensure_argument_type from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions from .perform_substitutions_impl import perform_substitutions -from .signal_management import install_signal_handlers -from .signal_management import on_sigint -from .signal_management import on_sigquit -from .signal_management import on_sigterm +from .signal_management import AsyncSafeSignalManager from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures __all__ = [ @@ -32,10 +29,7 @@ 'create_future', 'ensure_argument_type', 'perform_substitutions', - 'install_signal_handlers', - 'on_sigint', - 'on_sigquit', - 'on_sigterm', + 'AsyncSafeSignalManager', 'normalize_to_list_of_substitutions', 'visit_all_entities_and_collect_futures', ] diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index 333e3bc88..c5febe726 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -14,131 +14,52 @@ """Module for the signal management functionality.""" -import platform +import os +import socket import signal -import threading -import launch.logging -__signal_handlers_installed_lock = threading.Lock() -__signal_handlers_installed = False -__custom_sigint_handler = None -__custom_sigquit_handler = None -__custom_sigterm_handler = None - - -def on_sigint(handler): - """ - Set the signal handler to be called on SIGINT. - - Pass None for no custom handler. - - install_signal_handlers() must have been called in the main thread before. - It is called automatically by the constructor of `launch.LaunchService`. - """ - global __custom_sigint_handler - if handler is not None and not callable(handler): - raise ValueError('handler must be callable or None') - __custom_sigint_handler = handler - - -def on_sigquit(handler): - """ - Set the signal handler to be called on SIGQUIT. - - Note Windows does not have SIGQUIT, so it can be set with this function, - but the handler will not be called. - - Pass None for no custom handler. - - install_signal_handlers() must have been called in the main thread before. - It is called automatically by the constructor of `launch.LaunchService`. - """ - global __custom_sigquit_handler - if handler is not None and not callable(handler): - raise ValueError('handler must be callable or None') - __custom_sigquit_handler = handler - - -def on_sigterm(handler): - """ - Set the signal handler to be called on SIGTERM. - - Pass None for no custom handler. - - install_signal_handlers() must have been called in the main thread before. - It is called automatically by the constructor of `launch.LaunchService`. - """ - global __custom_sigterm_handler - if handler is not None and not callable(handler): - raise ValueError('handler must be callable or None') - __custom_sigterm_handler = handler - - -def install_signal_handlers(): - """ - Install custom signal handlers so that hooks can be setup from other threads. - - Calling this multiple times does not fail, but the signals are only - installed once. - - If called outside of the main-thread, a ValueError is raised, see: - https://docs.python.org/3.6/library/signal.html#signal.signal - - Also, if you register your own signal handlers after calling this function, - then you should store and forward to the existing signal handlers, because - otherwise the signal handlers registered by on_sigint(), on_sigquit, etc. - will not be run. - And the signal handlers registered with those functions are used to - gracefully exit the LaunchService when signaled (at least), and without - them it may not behave correctly. - - If you register signal handlers before calling this function, then your - signal handler will automatically be called by the signal handlers in this - thread. - """ - global __signal_handlers_installed_lock, __signal_handlers_installed - with __signal_handlers_installed_lock: - if __signal_handlers_installed: - return - __signal_handlers_installed = True - - global __custom_sigint_handler, __custom_sigquit_handler, __custom_sigterm_handler - - __original_sigint_handler = signal.getsignal(signal.SIGINT) - __original_sigterm_handler = signal.getsignal(signal.SIGTERM) - - def __on_sigint(signum, frame): - if callable(__custom_sigint_handler): - __custom_sigint_handler(signum, frame, __original_sigint_handler) - elif callable(__original_sigint_handler): - __original_sigint_handler(signum, frame) - - if platform.system() != 'Windows': - # Windows does not support SIGQUIT - __original_sigquit_handler = signal.getsignal(signal.SIGQUIT) - - def __on_sigquit(signum, frame): - if callable(__custom_sigquit_handler): - __custom_sigquit_handler(signum, frame, __original_sigquit_handler) - elif callable(__original_sigquit_handler): - __original_sigquit_handler(signum, frame) - - def __on_sigterm(signum, frame): - if callable(__custom_sigterm_handler): - __custom_sigterm_handler(signum, frame, __original_sigterm_handler) - elif callable(__original_sigterm_handler): - __original_sigterm_handler(signum, frame) - - # signals must be registered in the main thread, but print a nicer message if we're not there - try: - signal.signal(signal.SIGINT, __on_sigint) - signal.signal(signal.SIGTERM, __on_sigterm) - if platform.system() != 'Windows': - # Windows does not support SIGQUIT - signal.signal(signal.SIGQUIT, __on_sigquit) - except ValueError: - logger = launch.logging.get_logger(__name__) - logger.error("failed to set signal handlers in '{}'".format(__name__)) - logger.error('this function must be called in the main thread') - raise +class AsyncSafeSignalManager: + + def __init__(self, loop): + self.__loop = loop + self.__handlers = {} + self.__prev_wsock_fd = -1 + self.__wsock, self.__rsock = socket.socketpair() + self.__wsock.setblocking(False) + self.__rsock.setblocking(False) + + def __enter__(self): + self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal) + self.__prev_wsock_fd = signal.set_wakeup_fd(self.__wsock.fileno()) + return self + + def __exit__(self, type_, value, traceback): + assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wsock_fd) + self.__loop.remove_reader(self.__rsock.fileno()) + + def __handle_signal(self): + while True: + try: + data = self.__rsock.recv(4096) + if not data: + break + for signum in data: + if signum not in self.__handlers: + continue + self.__handlers[signum](signum) + if self.__prev_wsock_fd != -1: + os.write(self.__prev_wsock_fd, data) + except InterruptedError: + continue + except BlockingIOError: + break + + def handle(self, signum, handler): + if signum not in signal.Signals.__members__.values(): + raise ValueError('{} is not a signal number'.format(signum)) + if not callable(handler): + raise ValueError('signal handler must be callable') + old_handler = self.__handlers.get(signum, None) + self.__handlers[signum] = handler + return old_handler From 4071f5d9bfa32680c6d3b2cf84d8a0485c4d0a88 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 22 Dec 2020 21:29:50 -0300 Subject: [PATCH 2/9] Add AsyncSafeSignalManager test Signed-off-by: Michel Hidalgo --- .../utilities/test_signal_management.py | 91 ++++++------------- 1 file changed, 30 insertions(+), 61 deletions(-) diff --git a/launch/test/launch/utilities/test_signal_management.py b/launch/test/launch/utilities/test_signal_management.py index 7fc6ad76f..d28e3a23a 100644 --- a/launch/test/launch/utilities/test_signal_management.py +++ b/launch/test/launch/utilities/test_signal_management.py @@ -14,64 +14,33 @@ """Tests for the signal_management module.""" -from launch.utilities import install_signal_handlers, on_sigint, on_sigquit, on_sigterm - -import pytest - - -def test_install_signal_handlers(): - """Test the install_signal_handlers() function.""" - install_signal_handlers() - install_signal_handlers() - install_signal_handlers() - - -def test_on_sigint(): - """Test the on_sigint() function.""" - # None is acceptable - on_sigint(None) - - def mock_sigint_handler(): - pass - - on_sigint(mock_sigint_handler) - - # Non-callable is not - with pytest.raises(ValueError): - on_sigint('non-callable') - - # TODO(jacobperron): implement a functional test by using subprocess.Popen - - -def test_on_sigquit(): - """Test the on_sigquit() function.""" - # None is acceptable - on_sigquit(None) - - def mock_sigquit_handler(): - pass - - on_sigquit(mock_sigquit_handler) - - # Non-callable is not - with pytest.raises(ValueError): - on_sigquit('non-callable') - - # TODO(jacobperron): implement a functional test by using subprocess.Popen - - -def test_on_sigterm(): - """Test the on_sigterm() function.""" - # None is acceptable - on_sigterm(None) - - def mock_sigterm_handler(): - pass - - on_sigterm(mock_sigterm_handler) - - # Non-callable is not - with pytest.raises(ValueError): - on_sigterm('non-callable') - - # TODO(jacobperron): implement a functional test by using subprocess.Popen +import asyncio +import signal + +from launch.utilities import AsyncSafeSignalManager + +import osrf_pycommon.process_utils + + +def test_async_safe_signal_manager(): + """Test AsyncSafeSignalManager class.""" + loop = osrf_pycommon.process_utils.get_loop() + + prev_signal_handler = signal.signal( + signal.SIGUSR1, lambda signum: None + ) + try: + got_signal = asyncio.Future(loop=loop) + with AsyncSafeSignalManager(loop) as manager: + manager.handle( + signal.SIGUSR1, + lambda signum: got_signal.set_result(signum) + ) + loop.call_soon(signal.raise_signal, signal.SIGUSR1) + loop.run_until_complete( + asyncio.wait([got_signal], timeout=5.0) + ) + assert got_signal.done() + assert got_signal.result() == signal.SIGUSR1 + finally: + signal.signal(signal.SIGUSR1, prev_signal_handler) From 7823301bb9169fb32d8f9df73391726112563d4e Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 23 Dec 2020 08:22:04 -0300 Subject: [PATCH 3/9] Add AsyncSafeSignalManager documentation Signed-off-by: Michel Hidalgo --- launch/launch/utilities/signal_management.py | 64 +++++++++++++++++--- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index c5febe726..12e1b1a45 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -12,16 +12,53 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Module for the signal management functionality.""" +"""Module for signal management functionality.""" +import asyncio import os -import socket import signal +import socket + + +from typing import Callable +from typing import Optional +from typing import Union class AsyncSafeSignalManager: + """ + A context manager class for asynchronous handling of signals. + + Similar in purpose to :func:`asyncio.loop.add_signal_handler` but + not limited to Unix platforms. + + Signal handlers can be registered at any time with a given manager. + These will become active for the extent of said manager context. + Unlike regular signal handlers, asynchronous signals handlers + can safely interact with their event loop. - def __init__(self, loop): + The same manager can be used multiple consecutive times and even + be nested with other managers, as these are independent from each + other i.e. managers do not override each other's handlers. + + If used outside of the main thread, a ValueError is raised. + + The underlying mechanism is built around :func:`signal.set_wakeup_fd` + so as to not interfere with regular handlers installed via + :func:`signal.signal`. + All signals received are forwarded to the previously setup file + descriptor, if any. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop + ): + """ + Instantiate manager. + + :param loop: event loop that will handle the signals. + """ self.__loop = loop self.__handlers = {} self.__prev_wsock_fd = -1 @@ -55,11 +92,22 @@ def __handle_signal(self): except BlockingIOError: break - def handle(self, signum, handler): - if signum not in signal.Signals.__members__.values(): - raise ValueError('{} is not a signal number'.format(signum)) - if not callable(handler): - raise ValueError('signal handler must be callable') + def handle( + self, + signum: Union[signal.Signals, int], + handler: Optional[Callable[[int], None]], + ) -> Optional[Callable[[int], None]]: + """ + Register a callback for asynchronous handling of a given signal. + + :param signum: number of the signal to be handled + :param handler: callback taking a signal number + as its sole argument, or None + :return: previous handler if any, otherwise None + """ + signum = signal.Signals(signum) + if handler is not None and not callable(handler): + raise ValueError('signal handler must be a callable') old_handler = self.__handlers.get(signum, None) self.__handlers[signum] = handler return old_handler From bc92085a36f4c9ec91021302ecb182913ce10da0 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 23 Dec 2020 08:22:17 -0300 Subject: [PATCH 4/9] Update LaunchService documentation Signed-off-by: Michel Hidalgo --- launch/launch/launch_service.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/launch/launch/launch_service.py b/launch/launch/launch_service.py index 455d777f3..06e249208 100644 --- a/launch/launch/launch_service.py +++ b/launch/launch/launch_service.py @@ -58,11 +58,6 @@ def __init__( """ Create a LaunchService. - If called outside of the main-thread before the function - :func:`launch.utilities.install_signal_handlers()` has been called, - a ValueError can be raised, as setting signal handlers cannot be done - outside of the main-thread. - :param: argv stored in the context for access by the entities, None results in [] :param: debug if True (not default), asyncio the logger are seutp for debug """ @@ -261,9 +256,6 @@ async def run_async(self, *, shutdown_when_idle=True) -> int: This should only ever be run from the main thread and not concurrently with other asynchronous runs. - Note that custom signal handlers are set, and KeyboardInterrupt is caught and ignored - around the original signal handler. After the run ends, this behavior is undone. - :param: shutdown_when_idle if True (default), the service will shutdown when idle. """ # Make sure this has not been called from any thread but the main thread. @@ -349,6 +341,9 @@ def run(self, *, shutdown_when_idle=True) -> int: This should only ever be run from the main thread and not concurrently with asynchronous runs (see `run_async()` documentation). + Note that KeyboardInterrupt is caught and ignored, as signals are handled separately. + After the run ends, this behavior is undone. + :param: shutdown_when_idle if True (default), the service will shutdown when idle """ loop = osrf_pycommon.process_utils.get_loop() From 3c753e60b1d4a3c16269b4aeb257287c17b632cc Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 23 Dec 2020 11:25:50 -0300 Subject: [PATCH 5/9] Address peer review comments Signed-off-by: Michel Hidalgo --- launch/launch/utilities/signal_management.py | 11 ++- .../utilities/test_signal_management.py | 85 +++++++++++++++---- 2 files changed, 74 insertions(+), 22 deletions(-) diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index 12e1b1a45..d9d8f792c 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -106,8 +106,11 @@ def handle( :return: previous handler if any, otherwise None """ signum = signal.Signals(signum) - if handler is not None and not callable(handler): - raise ValueError('signal handler must be a callable') - old_handler = self.__handlers.get(signum, None) - self.__handlers[signum] = handler + if handler is not None: + if not callable(handler): + raise ValueError('signal handler must be a callable') + old_handler = self.__handlers.get(signum, None) + self.__handlers[signum] = handler + else: + old_handler = self.__handlers.pop(signum, None) return old_handler diff --git a/launch/test/launch/utilities/test_signal_management.py b/launch/test/launch/utilities/test_signal_management.py index d28e3a23a..a7f11752f 100644 --- a/launch/test/launch/utilities/test_signal_management.py +++ b/launch/test/launch/utilities/test_signal_management.py @@ -15,6 +15,7 @@ """Tests for the signal_management module.""" import asyncio +import functools import signal from launch.utilities import AsyncSafeSignalManager @@ -22,25 +23,73 @@ import osrf_pycommon.process_utils +def cap_signals(*signals): + def _noop(*args): + pass + + def _decorator(func): + @functools.wraps(func) + def _wrapper(*args, **kwargs): + try: + handlers = {s: signal.signal(s, _noop) for s in signals} + return func(*args, **kwargs) + finally: + assert all(signal.signal(s, h) is _noop for s, h in handlers.items()) + return _wrapper + + return _decorator + + +@cap_signals(signal.SIGUSR1, signal.SIGUSR2) def test_async_safe_signal_manager(): """Test AsyncSafeSignalManager class.""" loop = osrf_pycommon.process_utils.get_loop() - prev_signal_handler = signal.signal( - signal.SIGUSR1, lambda signum: None - ) - try: - got_signal = asyncio.Future(loop=loop) - with AsyncSafeSignalManager(loop) as manager: - manager.handle( - signal.SIGUSR1, - lambda signum: got_signal.set_result(signum) - ) - loop.call_soon(signal.raise_signal, signal.SIGUSR1) - loop.run_until_complete( - asyncio.wait([got_signal], timeout=5.0) - ) - assert got_signal.done() - assert got_signal.result() == signal.SIGUSR1 - finally: - signal.signal(signal.SIGUSR1, prev_signal_handler) + manager = AsyncSafeSignalManager(loop) + + # Register signal handler outside context + got_signal = asyncio.Future(loop=loop) + manager.handle(signal.SIGUSR1, got_signal.set_result) + + # Signal handling is active within context + with manager: + # Register signal handler within context + got_another_signal = asyncio.Future(loop=loop) + manager.handle(signal.SIGUSR2, got_another_signal.set_result) + + # Verify signal handling is working + loop.call_soon(signal.raise_signal, signal.SIGUSR1) + loop.run_until_complete(asyncio.wait( + [got_signal, got_another_signal], + return_when=asyncio.FIRST_COMPLETED, + timeout=1.0 + )) + assert got_signal.done() + assert got_signal.result() == signal.SIGUSR1 + assert not got_another_signal.done() + + # Unregister signal handler within context + manager.handle(signal.SIGUSR1, None) + + # Verify signal handler is no longer there + loop.call_soon(signal.raise_signal, signal.SIGUSR1) + loop.run_until_complete(asyncio.wait( + [got_another_signal], timeout=1.0 + )) + assert not got_another_signal.done() + + # Signal handling is (now) inactive outside context + loop.call_soon(signal.raise_signal, signal.SIGUSR2) + loop.run_until_complete(asyncio.wait( + [got_another_signal], timeout=1.0 + )) + assert not got_another_signal.done() + + # Managers' context may be re-entered + with manager: + loop.call_soon(signal.raise_signal, signal.SIGUSR2) + loop.run_until_complete(asyncio.wait( + [got_another_signal], timeout=1.0 + )) + assert got_another_signal.done() + assert got_another_signal.result() == signal.SIGUSR2 From 3f9ce9cb62f38bb0c3580c756d71d75ac3a69636 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 23 Dec 2020 18:03:58 -0300 Subject: [PATCH 6/9] Deal with Windows quirks. Signed-off-by: Michel Hidalgo --- launch/launch/utilities/signal_management.py | 37 +++++++++++++++---- .../utilities/test_signal_management.py | 29 ++++++++++----- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index d9d8f792c..67fd52199 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -18,10 +18,11 @@ import os import signal import socket - +import threading from typing import Callable from typing import Optional +from typing import Tuple # noqa: F401 from typing import Union @@ -59,21 +60,43 @@ def __init__( :param loop: event loop that will handle the signals. """ - self.__loop = loop - self.__handlers = {} - self.__prev_wsock_fd = -1 - self.__wsock, self.__rsock = socket.socketpair() + self.__loop = loop # type: asyncio.AbstractEventLoop + self.__background_loop = None # type: Optional[asyncio.AbstractEventLoop] + self.__handlers = {} # type: dict + self.__prev_wsock_fd = -1 # type: int + self.__wsock, self.__rsock = socket.socketpair() # type: Tuple[socket.socket, socket.socket] # noqa self.__wsock.setblocking(False) self.__rsock.setblocking(False) def __enter__(self): - self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal) + try: + self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal) + except NotImplementedError: + # NOTE(hidmic): some event loops, like the asyncio.ProactorEventLoop + # on Windows, do not support asynchronous socket reads. Emulate it. + self.__background_loop = asyncio.SelectorEventLoop() + self.__background_loop.add_reader( + self.__rsock.fileno(), + self.__loop.call_soon_threadsafe, + self.__handle_signal) + + def run_background_loop(): + asyncio.set_event_loop(self.__background_loop) + self.__background_loop.run_forever() + + self.__background_thread = threading.Thread(target=run_background_loop) + self.__background_thread.start() self.__prev_wsock_fd = signal.set_wakeup_fd(self.__wsock.fileno()) return self def __exit__(self, type_, value, traceback): assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wsock_fd) - self.__loop.remove_reader(self.__rsock.fileno()) + if self.__background_loop: + self.__background_loop.call_soon_threadsafe(self.__background_loop.stop) + self.__background_thread.join() + self.__background_loop.close() + else: + self.__loop.remove_reader(self.__rsock.fileno()) def __handle_signal(self): while True: diff --git a/launch/test/launch/utilities/test_signal_management.py b/launch/test/launch/utilities/test_signal_management.py index a7f11752f..64b800954 100644 --- a/launch/test/launch/utilities/test_signal_management.py +++ b/launch/test/launch/utilities/test_signal_management.py @@ -16,6 +16,7 @@ import asyncio import functools +import platform import signal from launch.utilities import AsyncSafeSignalManager @@ -40,7 +41,15 @@ def _wrapper(*args, **kwargs): return _decorator -@cap_signals(signal.SIGUSR1, signal.SIGUSR2) +if platform.system() == 'Windows': + SIGNAL = signal.CTRL_C_EVENT + ANOTHER_SIGNAL = signal.CTRL_BREAK_EVENT +else: + SIGNAL = signal.SIGUSR1 + ANOTHER_SIGNAL = signal.SIGUSR2 + + +@cap_signals(SIGNAL, ANOTHER_SIGNAL) def test_async_safe_signal_manager(): """Test AsyncSafeSignalManager class.""" loop = osrf_pycommon.process_utils.get_loop() @@ -49,37 +58,37 @@ def test_async_safe_signal_manager(): # Register signal handler outside context got_signal = asyncio.Future(loop=loop) - manager.handle(signal.SIGUSR1, got_signal.set_result) + manager.handle(SIGNAL, got_signal.set_result) # Signal handling is active within context with manager: # Register signal handler within context got_another_signal = asyncio.Future(loop=loop) - manager.handle(signal.SIGUSR2, got_another_signal.set_result) + manager.handle(ANOTHER_SIGNAL, got_another_signal.set_result) # Verify signal handling is working - loop.call_soon(signal.raise_signal, signal.SIGUSR1) + loop.call_soon(signal.raise_signal, SIGNAL) loop.run_until_complete(asyncio.wait( [got_signal, got_another_signal], return_when=asyncio.FIRST_COMPLETED, timeout=1.0 )) assert got_signal.done() - assert got_signal.result() == signal.SIGUSR1 + assert got_signal.result() == SIGNAL assert not got_another_signal.done() # Unregister signal handler within context - manager.handle(signal.SIGUSR1, None) + manager.handle(SIGNAL, None) # Verify signal handler is no longer there - loop.call_soon(signal.raise_signal, signal.SIGUSR1) + loop.call_soon(signal.raise_signal, SIGNAL) loop.run_until_complete(asyncio.wait( [got_another_signal], timeout=1.0 )) assert not got_another_signal.done() # Signal handling is (now) inactive outside context - loop.call_soon(signal.raise_signal, signal.SIGUSR2) + loop.call_soon(signal.raise_signal, ANOTHER_SIGNAL) loop.run_until_complete(asyncio.wait( [got_another_signal], timeout=1.0 )) @@ -87,9 +96,9 @@ def test_async_safe_signal_manager(): # Managers' context may be re-entered with manager: - loop.call_soon(signal.raise_signal, signal.SIGUSR2) + loop.call_soon(signal.raise_signal, ANOTHER_SIGNAL) loop.run_until_complete(asyncio.wait( [got_another_signal], timeout=1.0 )) assert got_another_signal.done() - assert got_another_signal.result() == signal.SIGUSR2 + assert got_another_signal.result() == ANOTHER_SIGNAL From 71ab3f2c2851333ee19fa62f5fc676c39f84c257 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Wed, 23 Dec 2020 19:45:12 -0300 Subject: [PATCH 7/9] Deal with more Windows quirks. Signed-off-by: Michel Hidalgo --- launch/launch/utilities/signal_management.py | 43 ++++++++++++++++--- .../utilities/test_signal_management.py | 9 ++-- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index 67fd52199..ed6cec66d 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -16,6 +16,7 @@ import asyncio import os +import platform import signal import socket import threading @@ -26,6 +27,20 @@ from typing import Union +def is_winsock_handle(fd): + """Check if the given file descriptor is WinSock handle.""" + if platform.system() != 'Windows': + return False + try: + # On Windows, WinSock handles and regular file handles + # have disjoint APIs. This test leverages the fact that + # attempting to os.dup a WinSock handle will fail. + os.close(os.dup(fd)) + return False + except OSError: + return True + + class AsyncSafeSignalManager: """ A context manager class for asynchronous handling of signals. @@ -63,7 +78,7 @@ def __init__( self.__loop = loop # type: asyncio.AbstractEventLoop self.__background_loop = None # type: Optional[asyncio.AbstractEventLoop] self.__handlers = {} # type: dict - self.__prev_wsock_fd = -1 # type: int + self.__prev_wakeup_handle = -1 # type: Union[int, socket.socket] self.__wsock, self.__rsock = socket.socketpair() # type: Tuple[socket.socket, socket.socket] # noqa self.__wsock.setblocking(False) self.__rsock.setblocking(False) @@ -72,8 +87,9 @@ def __enter__(self): try: self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal) except NotImplementedError: - # NOTE(hidmic): some event loops, like the asyncio.ProactorEventLoop - # on Windows, do not support asynchronous socket reads. Emulate it. + # Some event loops, like the asyncio.ProactorEventLoop + # on Windows, do not support asynchronous socket reads. + # So we emulate it. self.__background_loop = asyncio.SelectorEventLoop() self.__background_loop.add_reader( self.__rsock.fileno(), @@ -86,11 +102,20 @@ def run_background_loop(): self.__background_thread = threading.Thread(target=run_background_loop) self.__background_thread.start() - self.__prev_wsock_fd = signal.set_wakeup_fd(self.__wsock.fileno()) + self.__prev_wakeup_handle = signal.set_wakeup_fd(self.__wsock.fileno()) + if self.__prev_wakeup_handle != -1 and is_winsock_handle(self.__prev_wakeup_handle): + # On Windows, os.write will fail on a WinSock handle. There is no WinSock API + # in the standard library either. Thus we wrap it in a socket.socket instance. + self.__prev_wakeup_handle = socket.socket(fileno=self.__prev_wakeup_handle) return self def __exit__(self, type_, value, traceback): - assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wsock_fd) + if isinstance(self.__prev_wakeup_handle, socket.socket): + # Detach (Windows) socket and retrieve the raw OS handle. + prev_wakeup_handle = self.__prev_wakeup_handle.fileno() + self.__prev_wakeup_handle.detach() + self.__prev_wakeup_handle = prev_wakeup_handle + assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wakeup_handle) if self.__background_loop: self.__background_loop.call_soon_threadsafe(self.__background_loop.stop) self.__background_thread.join() @@ -108,8 +133,12 @@ def __handle_signal(self): if signum not in self.__handlers: continue self.__handlers[signum](signum) - if self.__prev_wsock_fd != -1: - os.write(self.__prev_wsock_fd, data) + if self.__prev_wakeup_handle != -1: + # Send over (Windows) socket or write file. + if isinstance(self.__prev_wakeup_handle, socket.socket): + self.__prev_wakeup_handle.send(data) + else: + os.write(self.__prev_wakeup_handle, data) except InterruptedError: continue except BlockingIOError: diff --git a/launch/test/launch/utilities/test_signal_management.py b/launch/test/launch/utilities/test_signal_management.py index 64b800954..92d504cc8 100644 --- a/launch/test/launch/utilities/test_signal_management.py +++ b/launch/test/launch/utilities/test_signal_management.py @@ -31,8 +31,10 @@ def _noop(*args): def _decorator(func): @functools.wraps(func) def _wrapper(*args, **kwargs): + handlers = {} try: - handlers = {s: signal.signal(s, _noop) for s in signals} + for s in signals: + handlers[s] = signal.signal(s, _noop) return func(*args, **kwargs) finally: assert all(signal.signal(s, h) is _noop for s, h in handlers.items()) @@ -42,8 +44,9 @@ def _wrapper(*args, **kwargs): if platform.system() == 'Windows': - SIGNAL = signal.CTRL_C_EVENT - ANOTHER_SIGNAL = signal.CTRL_BREAK_EVENT + # NOTE(hidmic): this is risky, but we have few options. + SIGNAL = signal.SIGINT + ANOTHER_SIGNAL = signal.SIGBREAK else: SIGNAL = signal.SIGUSR1 ANOTHER_SIGNAL = signal.SIGUSR2 From 652e7b56c9b7daf5a552c1c4915815b137b7e4e5 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Mon, 4 Jan 2021 10:38:33 -0300 Subject: [PATCH 8/9] No SIGQUIT on Windows. Signed-off-by: Michel Hidalgo --- launch/launch/launch_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/launch/launch/launch_service.py b/launch/launch/launch_service.py index 06e249208..a3456a8a0 100644 --- a/launch/launch/launch_service.py +++ b/launch/launch/launch_service.py @@ -18,6 +18,7 @@ import collections.abc import contextlib import logging +import platform import signal import threading import traceback @@ -208,7 +209,8 @@ def _on_sigterm(signum): # Setup signal handlers manager.handle(signal.SIGINT, _on_sigint) manager.handle(signal.SIGTERM, _on_sigterm) - manager.handle(signal.SIGQUIT, _on_sigterm) + if platform.system() != 'Windows': + manager.handle(signal.SIGQUIT, _on_sigterm) # Yield asyncio loop and current task. yield this_loop, this_task finally: From 584df2fdcdf84edaedeeb24130c3e086bd3815d0 Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 12 Jan 2021 16:22:11 -0300 Subject: [PATCH 9/9] Use msvcrt.get_osfhandle() to detect WinSock handles. Signed-off-by: Michel Hidalgo --- launch/launch/utilities/signal_management.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index ed6cec66d..a144fc5fb 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -34,8 +34,10 @@ def is_winsock_handle(fd): try: # On Windows, WinSock handles and regular file handles # have disjoint APIs. This test leverages the fact that - # attempting to os.dup a WinSock handle will fail. - os.close(os.dup(fd)) + # attempting to get an MSVC runtime file handle from a + # WinSock handle will fail. + import msvcrt + msvcrt.get_osfhandle(fd) return False except OSError: return True