Skip to content

Commit

Permalink
fix: the process of saving project information stops properly in any …
Browse files Browse the repository at this point in the history
…condition
  • Loading branch information
FabienArcellier committed Nov 21, 2024
1 parent 77c793e commit 8874132
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions src/writer/wf_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import multiprocessing
import os
import queue
import time
import typing
from collections import OrderedDict
Expand All @@ -30,6 +31,7 @@ class WfProjectContext:
app_path: str
write_files_async_queue: Queue = Queue()
write_files_async_process: typing.Optional[multiprocessing.Process] = None
write_files_async_stop: Any = multiprocessing.Event() # Note: Event is a function, not a class in python, it can't be typed


def write_files_async(context: WfProjectContext, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None:
Expand Down Expand Up @@ -77,9 +79,10 @@ def start_process_write_files_async(context: WfProjectContext, save_interval: fl
:param wf_project_save_interval: the interval in seconds to save the project files
:return:
"""


p = multiprocessing.Process(target=_start_process_write_files_async_process, args=(context, save_interval,))
p = multiprocessing.Process(
target=_start_process_write_files_async_process,
args=(context, save_interval, context.write_files_async_stop)
)
context.write_files_async_process = p
p.start()

Expand All @@ -88,9 +91,12 @@ def shutdown_process_write_files_async(context: WfProjectContext) -> None:
"""
Shutdown the process that writes the .wf/ files
"""
if context.write_files_async_process is not None:
context.write_files_async_process.terminate()
if context.write_files_async_process is not None and \
context.write_files_async_process.is_alive():
context.write_files_async_stop.set()
context.write_files_async_process.join()
context.write_files_async_process = None
context.write_files_async_stop.unset()

def read_files(app_path: str) -> Tuple[MetadataDefinition, dict[str, ComponentDefinition]]:
"""
Expand Down Expand Up @@ -271,9 +277,16 @@ def _hierarchical_position(components, c):
return sorted(components.values(), key=lambda c: _hierarchical_position(components, c))


def _start_process_write_files_async_process(context: WfProjectContext, save_interval):
def _start_process_write_files_async_process(context: WfProjectContext, save_interval, stop_event):
while True:
obj = context.write_files_async_queue.get()
if stop_event.is_set():
break

try:
obj = context.write_files_async_queue.get(block=False)
except queue.Empty:
obj = None

if obj is not None:
app_path, metadata, components = obj
write_files(app_path, metadata, components)
Expand Down

0 comments on commit 8874132

Please sign in to comment.