Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: the process of saving project information stops properly in any condition - WF-114 #642

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions src/writer/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,6 @@ def _start_wf_project_process_write_files(self):
wf_project.start_process_write_files_async(self.wf_project_context, AppRunner.WF_PROJECT_SAVE_INTERVAL)

def load(self) -> None:
def signal_handler(sig, frame):
self.shut_down()
sys.exit(0)

try:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
except ValueError:
# No need to handle signal as not main thread
pass

self.run_code = self._load_persisted_script()
self.bmc_components = self._load_persisted_components()

Expand All @@ -654,6 +643,13 @@ def signal_handler(sig, frame):

self._start_app_process()

# We have to create new processes as wf_projet_process before subscribing to signal.
# When a new process is create, the parent process is fork. The child would also subscribe to signal.
#
# When signal happen, both process will answer and one of them raise error due to mismatch between
# parent pid and pid.
self._subscribe_terminal_signal()

async def dispatch_message(self, session_id: Optional[str], request: AppProcessServerRequest) -> AppProcessServerResponse:

"""
Expand Down Expand Up @@ -885,3 +881,15 @@ def run_async_in_thread():
thread.start()
thread.join()
return

def _subscribe_terminal_signal(self):
def signal_handler(sig, frame):
self.shut_down()
sys.exit(0)

try:
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
except ValueError:
# No need to handle signal as not main thread
pass
27 changes: 20 additions & 7 deletions src/writer/wf_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import multiprocessing
import os
import queue
import time
import typing
from collections import OrderedDict
Expand All @@ -30,6 +31,7 @@ class WfProjectContext:
app_path: str
write_files_async_queue: Queue = Queue()
write_files_async_process: typing.Optional[multiprocessing.Process] = None
write_files_async_stop: Any = multiprocessing.Event() # Note: Event is a function, not a class in python, it can't be typed


def write_files_async(context: WfProjectContext, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None:
Expand Down Expand Up @@ -77,9 +79,10 @@ def start_process_write_files_async(context: WfProjectContext, save_interval: fl
:param wf_project_save_interval: the interval in seconds to save the project files
:return:
"""


p = multiprocessing.Process(target=_start_process_write_files_async_process, args=(context, save_interval,))
p = multiprocessing.Process(
target=_start_process_write_files_async_process,
args=(context, save_interval, context.write_files_async_stop)
)
context.write_files_async_process = p
p.start()

Expand All @@ -88,9 +91,12 @@ def shutdown_process_write_files_async(context: WfProjectContext) -> None:
"""
Shutdown the process that writes the .wf/ files
"""
if context.write_files_async_process is not None:
context.write_files_async_process.terminate()
if context.write_files_async_process is not None and \
context.write_files_async_process.is_alive():
context.write_files_async_stop.set()
context.write_files_async_process.join()
context.write_files_async_process = None
context.write_files_async_stop.clear()

def read_files(app_path: str) -> Tuple[MetadataDefinition, dict[str, ComponentDefinition]]:
"""
Expand Down Expand Up @@ -271,9 +277,16 @@ def _hierarchical_position(components, c):
return sorted(components.values(), key=lambda c: _hierarchical_position(components, c))


def _start_process_write_files_async_process(context: WfProjectContext, save_interval):
def _start_process_write_files_async_process(context: WfProjectContext, save_interval, stop_event):
while True:
obj = context.write_files_async_queue.get()
if stop_event.is_set():
break

try:
obj = context.write_files_async_queue.get(block=False)
except queue.Empty:
obj = None

if obj is not None:
app_path, metadata, components = obj
write_files(app_path, metadata, components)
Expand Down