Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle signals within the asyncio loop. #476

Merged
merged 9 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 21 additions & 63 deletions launch/launch/launch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
from .launch_description import LaunchDescription
from .launch_description_entity import LaunchDescriptionEntity
from .some_actions_type import SomeActionsType
from .utilities import install_signal_handlers
from .utilities import on_sigint
from .utilities import on_sigquit
from .utilities import on_sigterm
from .utilities import AsyncSafeSignalManager
from .utilities import visit_all_entities_and_collect_futures


Expand Down Expand Up @@ -77,10 +74,6 @@ def __init__(
# Setup logging
self.__logger = launch.logging.get_logger('launch')

# Install signal handlers if not already installed, will raise if not
# in main-thread, call manually in main-thread to avoid this.
install_signal_handlers()

# Setup context and register a built-in event handler for bootstrapping.
self.__context = LaunchContext(argv=self.__argv)
self.__context.register_event_handler(OnIncludeLaunchDescription())
Expand Down Expand Up @@ -194,12 +187,7 @@ def _prepare_run_loop(self):
# Setup custom signal handlers for SIGINT, SIGTERM and maybe SIGQUIT.
sigint_received = False

def _on_sigint(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
def _on_sigint(signum):
nonlocal sigint_received
base_msg = 'user interrupted with ctrl-c (SIGINT)'
if not sigint_received:
Expand All @@ -211,57 +199,24 @@ def _on_sigint(signum, frame, prev_handler):
sigint_received = True
else:
self.__logger.warning('{} again, ignoring...'.format(base_msg))
if callable(prev_handler):
try:
# Run pre-existing signal handler.
prev_handler(signum, frame)
except KeyboardInterrupt:
# Ignore exception.
pass
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGINT, current_handler)

on_sigint(_on_sigint)

def _on_sigterm(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return

def _on_sigterm(signum):
signame = signal.Signals(signum).name
self.__logger.error(
'user interrupted with ctrl-\\ ({}), terminating...'.format(signame))
# TODO(wjwwood): try to terminate running subprocesses before exiting.
self.__logger.error('using SIGTERM or SIGQUIT can result in orphaned processes')
self.__logger.error('using {} can result in orphaned processes'.format(signame))
self.__logger.error('make sure no processes launched are still running')
this_loop.call_soon(this_task.cancel)
if callable(prev_handler):
# Run pre-existing signal handler.
prev_handler(signum, frame)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGTERM, current_handler)

on_sigterm(_on_sigterm)

def _on_sigquit(signum, frame, prev_handler):
# Ignore additional signals until we finish processing this one.
current_handler = signal.signal(signal.SIGQUIT, signal.SIG_IGN)
if current_handler is signal.SIG_IGN:
# This function has been called re-entrantly.
return
self.__logger.error('user interrupted with ctrl-\\ (SIGQUIT), terminating...')
_on_sigterm(signum, frame, prev_handler)
# Restore current signal handler (not necessarily this one).
signal.signal(signal.SIGQUIT, current_handler)

on_sigquit(_on_sigquit)

# Yield asyncio loop and current task.
yield self.__loop_from_run_thread, this_task
finally:
# Unset the signal handlers while not running.
on_sigint(None)
on_sigterm(None)
on_sigquit(None)

with AsyncSafeSignalManager(this_loop) as manager:
# Setup signal handlers
manager.handle(signal.SIGINT, _on_sigint)
manager.handle(signal.SIGTERM, _on_sigterm)
manager.handle(signal.SIGQUIT, _on_sigterm)
# Yield asyncio loop and current task.
yield this_loop, this_task
finally:
# No matter what happens, unset the loop.
with self.__loop_from_run_thread_lock:
self.__context._set_asyncio_loop(None)
Expand Down Expand Up @@ -400,8 +355,11 @@ def run(self, *, shutdown_when_idle=True) -> int:
run_async_task = loop.create_task(self.run_async(
shutdown_when_idle=shutdown_when_idle
))
loop.run_until_complete(run_async_task)
return run_async_task.result()
while True:
try:
return loop.run_until_complete(run_async_task)
except KeyboardInterrupt:
continue

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
self.__shutting_down = True
Expand Down
10 changes: 2 additions & 8 deletions launch/launch/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
from .ensure_argument_type_impl import ensure_argument_type
from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions
from .perform_substitutions_impl import perform_substitutions
from .signal_management import install_signal_handlers
from .signal_management import on_sigint
from .signal_management import on_sigquit
from .signal_management import on_sigterm
from .signal_management import AsyncSafeSignalManager
from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures

__all__ = [
Expand All @@ -32,10 +29,7 @@
'create_future',
'ensure_argument_type',
'perform_substitutions',
'install_signal_handlers',
'on_sigint',
'on_sigquit',
'on_sigterm',
'AsyncSafeSignalManager',
'normalize_to_list_of_substitutions',
'visit_all_entities_and_collect_futures',
]
171 changes: 46 additions & 125 deletions launch/launch/utilities/signal_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,131 +14,52 @@

"""Module for the signal management functionality."""

import platform
import os
import socket
import signal
import threading

import launch.logging

__signal_handlers_installed_lock = threading.Lock()
__signal_handlers_installed = False
__custom_sigint_handler = None
__custom_sigquit_handler = None
__custom_sigterm_handler = None


def on_sigint(handler):
"""
Set the signal handler to be called on SIGINT.

Pass None for no custom handler.

install_signal_handlers() must have been called in the main thread before.
It is called automatically by the constructor of `launch.LaunchService`.
"""
global __custom_sigint_handler
if handler is not None and not callable(handler):
raise ValueError('handler must be callable or None')
__custom_sigint_handler = handler


def on_sigquit(handler):
"""
Set the signal handler to be called on SIGQUIT.

Note Windows does not have SIGQUIT, so it can be set with this function,
but the handler will not be called.

Pass None for no custom handler.

install_signal_handlers() must have been called in the main thread before.
It is called automatically by the constructor of `launch.LaunchService`.
"""
global __custom_sigquit_handler
if handler is not None and not callable(handler):
raise ValueError('handler must be callable or None')
__custom_sigquit_handler = handler


def on_sigterm(handler):
"""
Set the signal handler to be called on SIGTERM.

Pass None for no custom handler.

install_signal_handlers() must have been called in the main thread before.
It is called automatically by the constructor of `launch.LaunchService`.
"""
global __custom_sigterm_handler
if handler is not None and not callable(handler):
raise ValueError('handler must be callable or None')
__custom_sigterm_handler = handler


def install_signal_handlers():
"""
Install custom signal handlers so that hooks can be setup from other threads.

Calling this multiple times does not fail, but the signals are only
installed once.

If called outside of the main-thread, a ValueError is raised, see:
https://docs.python.org/3.6/library/signal.html#signal.signal

Also, if you register your own signal handlers after calling this function,
then you should store and forward to the existing signal handlers, because
otherwise the signal handlers registered by on_sigint(), on_sigquit, etc.
will not be run.
And the signal handlers registered with those functions are used to
gracefully exit the LaunchService when signaled (at least), and without
them it may not behave correctly.

If you register signal handlers before calling this function, then your
signal handler will automatically be called by the signal handlers in this
thread.
"""
global __signal_handlers_installed_lock, __signal_handlers_installed
with __signal_handlers_installed_lock:
if __signal_handlers_installed:
return
__signal_handlers_installed = True

global __custom_sigint_handler, __custom_sigquit_handler, __custom_sigterm_handler

__original_sigint_handler = signal.getsignal(signal.SIGINT)
__original_sigterm_handler = signal.getsignal(signal.SIGTERM)

def __on_sigint(signum, frame):
if callable(__custom_sigint_handler):
__custom_sigint_handler(signum, frame, __original_sigint_handler)
elif callable(__original_sigint_handler):
__original_sigint_handler(signum, frame)

if platform.system() != 'Windows':
# Windows does not support SIGQUIT
__original_sigquit_handler = signal.getsignal(signal.SIGQUIT)

def __on_sigquit(signum, frame):
if callable(__custom_sigquit_handler):
__custom_sigquit_handler(signum, frame, __original_sigquit_handler)
elif callable(__original_sigquit_handler):
__original_sigquit_handler(signum, frame)

def __on_sigterm(signum, frame):
if callable(__custom_sigterm_handler):
__custom_sigterm_handler(signum, frame, __original_sigterm_handler)
elif callable(__original_sigterm_handler):
__original_sigterm_handler(signum, frame)

# signals must be registered in the main thread, but print a nicer message if we're not there
try:
signal.signal(signal.SIGINT, __on_sigint)
signal.signal(signal.SIGTERM, __on_sigterm)
if platform.system() != 'Windows':
# Windows does not support SIGQUIT
signal.signal(signal.SIGQUIT, __on_sigquit)
except ValueError:
logger = launch.logging.get_logger(__name__)
logger.error("failed to set signal handlers in '{}'".format(__name__))
logger.error('this function must be called in the main thread')
raise
class AsyncSafeSignalManager:

def __init__(self, loop):
self.__loop = loop
self.__handlers = {}
self.__prev_wsock_fd = -1
self.__wsock, self.__rsock = socket.socketpair()
self.__wsock.setblocking(False)
self.__rsock.setblocking(False)

def __enter__(self):
self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal)
self.__prev_wsock_fd = signal.set_wakeup_fd(self.__wsock.fileno())
return self

def __exit__(self, type_, value, traceback):
assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wsock_fd)
self.__loop.remove_reader(self.__rsock.fileno())

def __handle_signal(self):
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
while True:
try:
data = self.__rsock.recv(4096)
if not data:
break
for signum in data:
if signum not in self.__handlers:
continue
self.__handlers[signum](signum)
if self.__prev_wsock_fd != -1:
os.write(self.__prev_wsock_fd, data)
except InterruptedError:
continue
except BlockingIOError:
break

def handle(self, signum, handler):
if signum not in signal.Signals.__members__.values():
raise ValueError('{} is not a signal number'.format(signum))
if not callable(handler):
raise ValueError('signal handler must be callable')
old_handler = self.__handlers.get(signum, None)
self.__handlers[signum] = handler
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
return old_handler