From c3e71e451e85edfc1c3ee6cf6608ff58a40781cf Mon Sep 17 00:00:00 2001 From: Fabien Arcellier Date: Thu, 21 Nov 2024 09:02:14 +0100 Subject: [PATCH] fix: the process of saving project information stops properly in any condition --- src/writer/app_runner.py | 30 +++++++++++++++++++----------- src/writer/wf_project.py | 27 ++++++++++++++++++++------- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/src/writer/app_runner.py b/src/writer/app_runner.py index 4ad11d2ec..616093a6e 100644 --- a/src/writer/app_runner.py +++ b/src/writer/app_runner.py @@ -634,17 +634,6 @@ def _start_wf_project_process_write_files(self): wf_project.start_process_write_files_async(self.wf_project_context, AppRunner.WF_PROJECT_SAVE_INTERVAL) def load(self) -> None: - def signal_handler(sig, frame): - self.shut_down() - sys.exit(0) - - try: - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - except ValueError: - # No need to handle signal as not main thread - pass - self.run_code = self._load_persisted_script() self.bmc_components = self._load_persisted_components() @@ -654,6 +643,13 @@ def signal_handler(sig, frame): self._start_app_process() + # We have to create new processes as wf_projet_process before subscribing to signal. + # When a new process is create, the parent process is fork. The child would also subscribe to signal. + # + # When signal happen, both process will answer and one of them raise error due to mismatch between + # parent pid and pid. + self._subscribe_terminal_signal() + async def dispatch_message(self, session_id: Optional[str], request: AppProcessServerRequest) -> AppProcessServerResponse: """ @@ -885,3 +881,15 @@ def run_async_in_thread(): thread.start() thread.join() return + + def _subscribe_terminal_signal(self): + def signal_handler(sig, frame): + self.shut_down() + sys.exit(0) + + try: + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + except ValueError: + # No need to handle signal as not main thread + pass diff --git a/src/writer/wf_project.py b/src/writer/wf_project.py index 6d5a1701a..61530b741 100644 --- a/src/writer/wf_project.py +++ b/src/writer/wf_project.py @@ -11,6 +11,7 @@ import logging import multiprocessing import os +import queue import time import typing from collections import OrderedDict @@ -30,6 +31,7 @@ class WfProjectContext: app_path: str write_files_async_queue: Queue = Queue() write_files_async_process: typing.Optional[multiprocessing.Process] = None + write_files_async_stop: Any = multiprocessing.Event() # Note: Event is a function, not a class in python, it can't be typed def write_files_async(context: WfProjectContext, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None: @@ -77,9 +79,10 @@ def start_process_write_files_async(context: WfProjectContext, save_interval: fl :param wf_project_save_interval: the interval in seconds to save the project files :return: """ - - - p = multiprocessing.Process(target=_start_process_write_files_async_process, args=(context, save_interval,)) + p = multiprocessing.Process( + target=_start_process_write_files_async_process, + args=(context, save_interval, context.write_files_async_stop) + ) context.write_files_async_process = p p.start() @@ -88,9 +91,12 @@ def shutdown_process_write_files_async(context: WfProjectContext) -> None: """ Shutdown the process that writes the .wf/ files """ - if context.write_files_async_process is not None: - context.write_files_async_process.terminate() + if context.write_files_async_process is not None and \ + context.write_files_async_process.is_alive(): + context.write_files_async_stop.set() context.write_files_async_process.join() + context.write_files_async_process = None + context.write_files_async_stop.clear() def read_files(app_path: str) -> Tuple[MetadataDefinition, dict[str, ComponentDefinition]]: """ @@ -271,9 +277,16 @@ def _hierarchical_position(components, c): return sorted(components.values(), key=lambda c: _hierarchical_position(components, c)) -def _start_process_write_files_async_process(context: WfProjectContext, save_interval): +def _start_process_write_files_async_process(context: WfProjectContext, save_interval, stop_event): while True: - obj = context.write_files_async_queue.get() + if stop_event.is_set(): + break + + try: + obj = context.write_files_async_queue.get(block=False) + except queue.Empty: + obj = None + if obj is not None: app_path, metadata, components = obj write_files(app_path, metadata, components)