Skip to content

Commit

Permalink
Merge pull request #642 from FabienArcellier/WF-114-the-process-of-sa…
Browse files Browse the repository at this point in the history
…ving-project-information-stops-in-a-predictable-manner

fix: the process of saving project information stops properly in any condition - WF-114
  • Loading branch information
ramedina86 authored Nov 22, 2024
2 parents 88f7a77 + c3e71e4 commit 457c271
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
30 changes: 19 additions & 11 deletions src/writer/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,6 @@ def _start_wf_project_process_write_files(self):
wf_project.start_process_write_files_async(self.wf_project_context, AppRunner.WF_PROJECT_SAVE_INTERVAL)

def load(self) -> None:
def signal_handler(sig, frame):
self.shut_down()
sys.exit(0)

try:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
except ValueError:
# No need to handle signal as not main thread
pass

self.run_code = self._load_persisted_script()
self.bmc_components = self._load_persisted_components()

Expand All @@ -654,6 +643,13 @@ def signal_handler(sig, frame):

self._start_app_process()

# We have to create new processes as wf_projet_process before subscribing to signal.
# When a new process is create, the parent process is fork. The child would also subscribe to signal.
#
# When signal happen, both process will answer and one of them raise error due to mismatch between
# parent pid and pid.
self._subscribe_terminal_signal()

async def dispatch_message(self, session_id: Optional[str], request: AppProcessServerRequest) -> AppProcessServerResponse:

"""
Expand Down Expand Up @@ -885,3 +881,15 @@ def run_async_in_thread():
thread.start()
thread.join()
return

def _subscribe_terminal_signal(self):
def signal_handler(sig, frame):
self.shut_down()
sys.exit(0)

try:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
except ValueError:
# No need to handle signal as not main thread
pass
27 changes: 20 additions & 7 deletions src/writer/wf_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import multiprocessing
import os
import queue
import time
import typing
from collections import OrderedDict
Expand All @@ -30,6 +31,7 @@ class WfProjectContext:
app_path: str
write_files_async_queue: Queue = Queue()
write_files_async_process: typing.Optional[multiprocessing.Process] = None
write_files_async_stop: Any = multiprocessing.Event() # Note: Event is a function, not a class in python, it can't be typed


def write_files_async(context: WfProjectContext, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None:
Expand Down Expand Up @@ -77,9 +79,10 @@ def start_process_write_files_async(context: WfProjectContext, save_interval: fl
:param wf_project_save_interval: the interval in seconds to save the project files
:return:
"""


p = multiprocessing.Process(target=_start_process_write_files_async_process, args=(context, save_interval,))
p = multiprocessing.Process(
target=_start_process_write_files_async_process,
args=(context, save_interval, context.write_files_async_stop)
)
context.write_files_async_process = p
p.start()

Expand All @@ -88,9 +91,12 @@ def shutdown_process_write_files_async(context: WfProjectContext) -> None:
"""
Shutdown the process that writes the .wf/ files
"""
if context.write_files_async_process is not None:
context.write_files_async_process.terminate()
if context.write_files_async_process is not None and \
context.write_files_async_process.is_alive():
context.write_files_async_stop.set()
context.write_files_async_process.join()
context.write_files_async_process = None
context.write_files_async_stop.clear()

def read_files(app_path: str) -> Tuple[MetadataDefinition, dict[str, ComponentDefinition]]:
"""
Expand Down Expand Up @@ -271,9 +277,16 @@ def _hierarchical_position(components, c):
return sorted(components.values(), key=lambda c: _hierarchical_position(components, c))


def _start_process_write_files_async_process(context: WfProjectContext, save_interval):
def _start_process_write_files_async_process(context: WfProjectContext, save_interval, stop_event):
while True:
obj = context.write_files_async_queue.get()
if stop_event.is_set():
break

try:
obj = context.write_files_async_queue.get(block=False)
except queue.Empty:
obj = None

if obj is not None:
app_path, metadata, components = obj
write_files(app_path, metadata, components)
Expand Down

0 comments on commit 457c271

Please sign in to comment.