diff --git a/src/writer/wf_project.py b/src/writer/wf_project.py index 6d5a1701a..3fb769a11 100644 --- a/src/writer/wf_project.py +++ b/src/writer/wf_project.py @@ -11,6 +11,7 @@ import logging import multiprocessing import os +import queue import time import typing from collections import OrderedDict @@ -30,6 +31,7 @@ class WfProjectContext: app_path: str write_files_async_queue: Queue = Queue() write_files_async_process: typing.Optional[multiprocessing.Process] = None + write_files_async_stop: Any = multiprocessing.Event() # Note: Event is a function, not a class in python, it can't be typed def write_files_async(context: WfProjectContext, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None: @@ -77,9 +79,10 @@ def start_process_write_files_async(context: WfProjectContext, save_interval: fl :param wf_project_save_interval: the interval in seconds to save the project files :return: """ - - - p = multiprocessing.Process(target=_start_process_write_files_async_process, args=(context, save_interval,)) + p = multiprocessing.Process( + target=_start_process_write_files_async_process, + args=(context, save_interval, context.write_files_async_stop) + ) context.write_files_async_process = p p.start() @@ -88,9 +91,12 @@ def shutdown_process_write_files_async(context: WfProjectContext) -> None: """ Shutdown the process that writes the .wf/ files """ - if context.write_files_async_process is not None: - context.write_files_async_process.terminate() + if context.write_files_async_process is not None and \ + context.write_files_async_process.is_alive(): + context.write_files_async_stop.set() context.write_files_async_process.join() + context.write_files_async_process = None + context.write_files_async_stop.unset() def read_files(app_path: str) -> Tuple[MetadataDefinition, dict[str, ComponentDefinition]]: """ @@ -271,9 +277,16 @@ def _hierarchical_position(components, c): return sorted(components.values(), key=lambda c: _hierarchical_position(components, c)) -def _start_process_write_files_async_process(context: WfProjectContext, save_interval): +def _start_process_write_files_async_process(context: WfProjectContext, save_interval, stop_event): while True: - obj = context.write_files_async_queue.get() + if stop_event.is_set(): + break + + try: + obj = context.write_files_async_queue.get(block=False) + except queue.Empty: + obj = None + if obj is not None: app_path, metadata, components = obj write_files(app_path, metadata, components)