diff --git a/src/writer/app_runner.py b/src/writer/app_runner.py index 73c91016d..4ad11d2ec 100644 --- a/src/writer/app_runner.py +++ b/src/writer/app_runner.py @@ -1,7 +1,6 @@ import asyncio import concurrent.futures import importlib.util -import json import logging import logging.handlers import multiprocessing @@ -47,6 +46,7 @@ StateEnquiryResponsePayload, WriterEvent, ) +from writer.wf_project import WfProjectContext logging.basicConfig(level=logging.INFO, format='%(message)s') @@ -580,6 +580,7 @@ class AppRunner: """ UPDATE_CHECK_INTERVAL_SECONDS = 0.2 + WF_PROJECT_SAVE_INTERVAL = 0.2 MAX_WAIT_NOTIFY_SECONDS = 10 def __init__(self, app_path: str, mode: str): @@ -600,6 +601,7 @@ def __init__(self, app_path: str, mode: str): self.log_listener: Optional[LogListener] = None self.code_update_loop: Optional[asyncio.AbstractEventLoop] = None self.code_update_condition: Optional[asyncio.Condition] = None + self.wf_project_context = WfProjectContext(app_path=app_path) if mode not in ("edit", "run"): raise ValueError("Invalid mode.") @@ -623,11 +625,14 @@ def _set_logger(self): self.log_listener = LogListener(self.log_queue) self.log_listener.start() - def _set_observer(self): + def _start_fs_observer(self): self.observer = PollingObserver(AppRunner.UPDATE_CHECK_INTERVAL_SECONDS) self.observer.schedule(FileEventHandler(self.reload_code_from_saved), path=self.app_path, recursive=True) self.observer.start() + def _start_wf_project_process_write_files(self): + wf_project.start_process_write_files_async(self.wf_project_context, AppRunner.WF_PROJECT_SAVE_INTERVAL) + def load(self) -> None: def signal_handler(sig, frame): self.shut_down() @@ -644,7 +649,8 @@ def signal_handler(sig, frame): self.bmc_components = self._load_persisted_components() if self.mode == "edit": - self._set_observer() + self._start_wf_project_process_write_files() + self._start_fs_observer() self._start_app_process() @@ -730,7 +736,7 @@ async def update_components(self, session_id: str, payload: ComponentUpdateReque "Cannot update components in non-update mode.") self.bmc_components = payload.components - wf_project.write_files(self.app_path, metadata={"writer_version": VERSION}, components=payload.components) + wf_project.write_files_async(self.wf_project_context, metadata={"writer_version": VERSION}, components=payload.components) return await self.dispatch_message(session_id, ComponentUpdateRequest( type="componentUpdate", @@ -796,6 +802,9 @@ def shut_down(self) -> None: self.log_queue.put(None) if self.log_listener is not None: self.log_listener.join() + + wf_project.shutdown_process_write_files_async(self.wf_project_context) + self._clean_process() def _start_app_process(self) -> None: diff --git a/src/writer/wf_project.py b/src/writer/wf_project.py index 5987c5d83..0ccf02aca 100644 --- a/src/writer/wf_project.py +++ b/src/writer/wf_project.py @@ -1,16 +1,20 @@ """ This module manipulates the folder of a wf project stored into `wf`. ->>> wf_project.write_files('app/hello', metadata={"writer_version": "0.1" }, components=...) +>>> wf_project.write_files_async('app/hello', metadata={"writer_version": "0.1" }, components=...) >>> metadata, components = wf_project.read_files('app/hello') """ +import dataclasses import io import json import logging +import multiprocessing import os +import time import typing from collections import OrderedDict +from multiprocessing import Queue from typing import Any, Dict, List, Tuple from writer import core_ui @@ -19,6 +23,25 @@ ROOTS = ['root', 'workflows_root'] COMPONENT_ROOTS = ['page', 'workflows_workflow'] +shared_queue_write_files: typing.Optional[Queue] = None + +@dataclasses.dataclass +class WfProjectContext: + app_path: str + write_files_async_queue: Queue = Queue() + write_files_async_process: typing.Optional[multiprocessing.Process] = None + + +def write_files_async(context: WfProjectContext, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None: + """ + This operation is asynchrone. It's managed in wf_project.process_write_files_async. + + see wf_project.write_files for description + + >>> wf_project.write_files_async('app/hello', metadata={"writer_version": "0.1" }, components=...) + """ + context.write_files_async_queue.put((context.app_path, metadata, components)) + def write_files(app_path: str, metadata: MetadataDefinition, components: Dict[str, ComponentDefinition]) -> None: """ Writes the meta data of the WF project to the `.wf` directory (metadata, components, ...). @@ -31,6 +54,7 @@ def write_files(app_path: str, metadata: MetadataDefinition, components: Dict[st >>> wf_project.write_files('app/hello', metadata={"writer_version": "0.1" }, components=...) """ + wf_directory = os.path.join(app_path, ".wf") if not os.path.exists(wf_directory): os.makedirs(wf_directory) @@ -43,6 +67,37 @@ def write_files(app_path: str, metadata: MetadataDefinition, components: Dict[st _remove_obsolete_component_files(wf_directory, components) +def start_process_write_files_async(context: WfProjectContext, save_interval: float) -> None: + """ + Creates a process that writes the .wf/ files + + This agent allows you to process the backup of .wf files in the background + without blocking application requests. + + :param wf_project_save_interval: the interval in seconds to save the project files + :return: + """ + def process(save_interval): + while True: + obj = context.write_files_async_queue.get() + if obj is not None: + app_path, metadata, components = obj + write_files(app_path, metadata, components) + + time.sleep(save_interval) + + p = multiprocessing.Process(target=process, args=(save_interval,)) + context.write_files_async_process = p + p.start() + +def shutdown_process_write_files_async(context: WfProjectContext) -> None: + """ + Shutdown the process that writes the .wf/ files + """ + if context.write_files_async_process is not None: + context.write_files_async_process.terminate() + context.write_files_async_process.join() + def read_files(app_path: str) -> Tuple[MetadataDefinition, dict[str, ComponentDefinition]]: """ Reads project files in the `.wf` folder.