From 97170a2d25d23f09590f796a3adfdf12ae9d8361 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Mon, 18 Nov 2024 19:20:45 +0100 Subject: [PATCH 1/4] add filewriter --- examples/plot_0_first_demo.py | 2 +- py_neuromodulation/gui/backend/app_pynm.py | 57 +++++++----- py_neuromodulation/stream/stream.py | 90 +++++++------------ py_neuromodulation/utils/file_writer.py | 100 +++++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 169 insertions(+), 81 deletions(-) create mode 100644 py_neuromodulation/utils/file_writer.py diff --git a/examples/plot_0_first_demo.py b/examples/plot_0_first_demo.py index ce1494e4..db3e0921 100644 --- a/examples/plot_0_first_demo.py +++ b/examples/plot_0_first_demo.py @@ -152,7 +152,7 @@ def generate_random_walk(NUM_CHANNELS, TIME_DATA_SAMPLES): # We will therefore use the :class:`~nm_analysis` class to showcase some functions. For multi-run -or subject analysis we will pass here the feature_file "sub" as default directory: analyzer = nm.FeatureReader( - feature_dir=stream.out_dir_root, feature_file=stream.experiment_name + feature_dir=stream.out_dir, feature_file=stream.experiment_name ) # %% diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index f0b360fe..3617ceb8 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -1,11 +1,21 @@ import asyncio import logging import numpy as np -from multiprocessing import Process +import multiprocessing from py_neuromodulation.stream import Stream, NMSettings from py_neuromodulation.utils import set_channels from py_neuromodulation.utils.io import read_mne_data +from py_neuromodulation import logger + +async def run_stream_controller(feature_queue: asyncio.Queue, rawdata_queue: asyncio.Queue, + websocket_manager_features: "WebSocketManager"): + while True: + if not feature_queue.empty() and websocket_manager_features is not None: + feature_dict = feature_queue.get() + logger.info("Sending message to Websocket") + await websocket_manager_features.send_cbor(feature_dict) + # here the rawdata queue could also be used to send raw data, potentiall through different websocket? class PyNMState: @@ -22,35 +32,43 @@ def __init__( # TODO: we currently can pass the sampling_rate_features to both the stream and the settings? self.settings: NMSettings = NMSettings(sampling_rate_features=17) - async def start_run_function( + + def start_run_function( self, out_dir: str = "", experiment_name: str = "sub", websocket_manager_features=None, ) -> None: - # TODO: we should add a way to pass the output path and the foldername - # Initialize the stream with as process with a queue that is passed to the stream - # The stream will then put the results in the queue - # there should be another websocket in which the results are sent to the frontend + - self.stream_handling_queue = asyncio.Queue() + self.stream_handling_queue = multiprocessing.Queue() + self.feature_queue = multiprocessing.Queue() + self.rawdata_queue = multiprocessing.Queue() + + self.stream_controller_process = multiprocessing.Process( + target=run_stream_controller, + args=(self.feature_queue, self.rawdata_queue, websocket_manager_features), + ) + self.stream_controller_process.start() self.logger.info("setup stream Process") self.stream.settings = self.settings - asyncio.create_task(self.stream.run( - out_dir=out_dir, - experiment_name=experiment_name, - stream_handling_queue=self.stream_handling_queue, - is_stream_lsl=self.lsl_stream_name is not None, - stream_lsl_name=self.lsl_stream_name - if self.lsl_stream_name is not None - else "", - websocket_featues=websocket_manager_features, - ) + self.stream.run( + out_dir=out_dir, + experiment_name=experiment_name, + stream_handling_queue=self.stream_handling_queue, + is_stream_lsl=self.lsl_stream_name is not None, + stream_lsl_name=self.lsl_stream_name + if self.lsl_stream_name is not None + else "", + websocket_featues=websocket_manager_features, ) + self.stream_controller_process.terminate() + + def setup_lsl_stream( self, lsl_stream_name: str | None = None, @@ -123,11 +141,6 @@ def setup_offline_stream( target_keywords=None, ) - # self.settings: NMSettings = NMSettings( - # sampling_rate_features=sampling_rate_features - # ) - - # self.settings.preprocessing = [] self.logger.info(f"settings: {self.settings}") self.stream: Stream = Stream( settings=self.settings, diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index 85ddb6f2..2f39ebd3 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -1,6 +1,6 @@ """Module for generic and offline data streams.""" -import asyncio +import time from typing import TYPE_CHECKING from collections.abc import Iterator import numpy as np @@ -11,6 +11,7 @@ from py_neuromodulation.stream.data_processor import DataProcessor from py_neuromodulation.utils.types import _PathLike, FeatureName +from py_neuromodulation.utils.file_writer import MsgPackFileWriter from py_neuromodulation.stream.settings import NMSettings if TYPE_CHECKING: @@ -197,7 +198,7 @@ def _handle_data(self, data: "np.ndarray | pd.DataFrame") -> np.ndarray: ) return data.to_numpy().transpose() - async def run( + def run( self, data: "np.ndarray | pd.DataFrame | None" = None, out_dir: _PathLike = "", @@ -207,17 +208,18 @@ async def run( save_csv: bool = False, save_interval: int = 10, return_df: bool = True, - # feature_queue: "multiprocessing.Queue | None" = None, + simulate_real_time: bool = True, + feature_queue: "multiprocessing.Queue | None" = None, stream_handling_queue: "multiprocessing.Queue | None" = None, - websocket_featues: "WebSocketManager | None" = None, ): self.is_stream_lsl = is_stream_lsl self.stream_lsl_name = stream_lsl_name self.stream_handling_queue = stream_handling_queue - # self.feature_queue = feature_queue self.save_csv = save_csv self.save_interval = save_interval self.return_df = return_df + self.out_dir = Path.cwd() if not out_dir else Path(out_dir) + self.experiment_name = experiment_name # Validate input data if data is not None: @@ -227,24 +229,13 @@ async def run( elif self.data is None and data is None and self.is_stream_lsl is False: raise ValueError("No data passed to run function.") - # Generate output dirs - self.out_dir_root = Path.cwd() if not out_dir else Path(out_dir) - self.out_dir = self.out_dir_root / experiment_name - # TONI: Need better default experiment name - self.experiment_name = experiment_name if experiment_name else "sub" - - self.out_dir.mkdir(parents=True, exist_ok=True) - - # Open database connection - # TONI: we should give the user control over the save format - from py_neuromodulation.utils.database import NMDatabase - - self.db = NMDatabase(experiment_name, out_dir) # Create output database + file_writer = MsgPackFileWriter(name=experiment_name, out_dir=out_dir) self.batch_count: int = 0 # Keep track of the number of batches processed # Reinitialize the data processor in case the nm_channels or nm_settings changed between runs of the same Stream # TONI: then I think we can just not initialize the data processor in the init function + # Timon: I think with the GUI addition, the setttings / channels will change and need to be reinitialized self.data_processor = DataProcessor( sfreq=self.sfreq, settings=self.settings, @@ -258,14 +249,6 @@ async def run( nm.logger.log_to_file(out_dir) - # Initialize mp.Pool for multiprocessing - #self.pool = mp.Pool(processes=self.settings.n_jobs) - # Set up shared memory for multiprocessing - #self.shared_memory = mp.Array(ctypes.c_double, self.settings.n_jobs * self.settings.n_jobs) - # Set up multiprocessing semaphores - #self.semaphore = mp.Semaphore(self.settings.n_jobs) - - # Initialize generator self.generator: Iterator if not is_stream_lsl: from py_neuromodulation.stream.generator import RawDataGenerator @@ -300,12 +283,17 @@ async def run( self.is_running = True if self.stream_handling_queue is not None: nm.logger.info("Checking for stop signal") - #await asyncio.sleep(0.001) - await asyncio.sleep(1 / self.settings.sampling_rate_features_hz) + if simulate_real_time: + time.sleep(1 / self.settings.sampling_rate_features_hz) if not self.stream_handling_queue.empty(): - stop_signal = await asyncio.wait_for(self.stream_handling_queue.get(), timeout=0.01) - if stop_signal == "stop": + + # check that the timing fits + # previously it was necessary to wait that the process had + # enough time to check for the stop signal + signal = self.stream_handling_queue.get() + if signal == "stop": break + if data_batch is None: break @@ -318,11 +306,6 @@ async def run( ) feature_dict["time"] = np.ceil(this_batch_end * 1000 + 1) - #( - # np.ceil(batch_length) - # if self.is_stream_lsl - # else - #) prev_batch_end = this_batch_end @@ -336,33 +319,26 @@ async def run( for key, value in feature_dict.items(): feature_dict[key] = np.float64(value) - self.db.insert_data(feature_dict) + file_writer.insert_data(feature_dict) + if feature_queue is not None: + feature_queue.put(feature_dict) - # if self.feature_queue is not None: - # self.feature_queue.put(feature_dict) - - if websocket_featues is not None: - nm.logger.info("Sending message to Websocket") - #nm.logger.info(feature_dict) - await websocket_featues.send_cbor(feature_dict) - #await websocket_featues.send_message(feature_dict) self.batch_count += 1 if self.batch_count % self.save_interval == 0: - self.db.commit() - - self.db.commit() # Save last batches + file_writer.save() - # If save_csv is False, still save the first row to get the column names - feature_df: "pd.DataFrame" = ( - self.db.fetch_all() if (self.save_csv or self.return_df) else self.db.head() - ) + file_writer.save() + if self.save_csv: + file_writer.save_as_csv(save_all_combined=True) - self.db.close() # Close the database connection + if self.return_df: + feature_df = file_writer.load_all() - self._save_after_stream(feature_arr=feature_df) + self._save_after_stream() self.is_running = False - return feature_df # TONI: Not sure if this makes sense anymore + return feature_df # Timon: We could think of returning the feature_reader instead + def plot_raw_signal( self, @@ -430,12 +406,9 @@ def plot_raw_signal( def _save_after_stream( self, - feature_arr: "pd.DataFrame | None" = None, ) -> None: - """Save features, settings, nm_channels and sidecar after run""" + """Save settings, nm_channels and sidecar after run""" self._save_sidecar() - if feature_arr is not None: - self._save_features(feature_arr) self._save_settings() self._save_channels() @@ -455,6 +428,7 @@ def _save_sidecar(self) -> None: """Save sidecar incduing fs, coords, sess_right to out_path_root and subfolder 'folder_name'""" additional_args = {"sess_right": self.sess_right} + self.data_processor.save_sidecar( self.out_dir, self.experiment_name, additional_args ) diff --git a/py_neuromodulation/utils/file_writer.py b/py_neuromodulation/utils/file_writer.py new file mode 100644 index 00000000..4861467b --- /dev/null +++ b/py_neuromodulation/utils/file_writer.py @@ -0,0 +1,100 @@ +import msgpack +from abc import ABC, abstractmethod +from pathlib import Path +import pandas as pd +import numpy as np +from py_neuromodulation.utils.types import _PathLike + +class AbstractFileWriter(ABC): + + @abstractmethod + def insert_data(self, feature_dict: dict): + pass + + @abstractmethod + def save(self): + pass + + @abstractmethod + def load_all(self): + pass + + @abstractmethod + def save_as_csv(self, save_all_combined: bool = False): + pass + +class MsgPackFileWriter(AbstractFileWriter): + """ + Class to store data in a serialized MessagePack file and load it back efficiently. + Parameters + ---------- + out_dir : _PathLike + The directory to save the MessagePack database. + """ + + def __init__( + self, + name: str = "sub", + out_dir: _PathLike = "", + ): + # Make sure out_dir exists + + self.out_dir = Path.cwd() if not out_dir else Path(out_dir) + self.out_dir = self.out_dir / name + + Path(self.out_dir).mkdir(parents=True, exist_ok=True) + + self.idx = 0 + self.name = name + self.csv_path = Path(self.out_dir, f"{name}_FEATURES.csv") + self.data_l = [] + + def insert_data(self, feature_dict: dict): + """ + Insert data into the MessagePack database. + Parameters + ---------- + feature_dict : dict + The dictionary with the feature names and values. + """ + self.data_l.append(feature_dict) + + def save(self): + """ + Save the current data to the MessagePack file. + """ + if len(self.data_l) == 0: + return + with open(self.out_dir / f"{self.name}-{self.idx}.msgpack", "wb") as f: + msgpack.pack(self.data_l, f) + self.idx += 1 + self.data_l = [] + + def load_all(self): + """ + Load data from the MessagePack file into memory. + """ + data_l = [] + for i in range(self.idx): + with open(self.out_dir / f"{self.name}-{i}.msgpack", "rb") as f: + data_l.append(msgpack.unpack(f)) + + data = pd.DataFrame(list(np.concatenate(data_l))) + return data + + def save_as_csv(self, save_all_combined: bool = False): + """ + Save the data as a CSV file. + """ + + if save_all_combined: + data = self.load_all() + data.to_csv(self.csv_path, index=False) + else: + if len(self.data_l) > 0: + self.data_l[-1].to_csv(self.csv_path, index=False) + else: + outpath =self.out_dir / f"{self.name}-0.msgpack" + with open(outpath, "rb") as f: + data = msgpack.unpack(f) + data.to_csv(self.csv_path, index=False) diff --git a/pyproject.toml b/pyproject.toml index 26b31c81..435bfb0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ dependencies = [ # exists only because of nolds? "numba>=0.60.0", "cbor2>=5.6.4", + "msgpack>=1.1.0", ] [project.optional-dependencies] From 32237a0d5765a4de1acc2fbb12af2b4521053892 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Tue, 19 Nov 2024 10:27:36 +0100 Subject: [PATCH 2/4] changes in app_pynm --- py_neuromodulation/gui/backend/app_backend.py | 11 ++-- py_neuromodulation/gui/backend/app_pynm.py | 53 +++++++++++-------- py_neuromodulation/stream/stream.py | 7 ++- pyproject.toml | 1 + 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/py_neuromodulation/gui/backend/app_backend.py b/py_neuromodulation/gui/backend/app_backend.py index 0e0b161e..7fae70c2 100644 --- a/py_neuromodulation/gui/backend/app_backend.py +++ b/py_neuromodulation/gui/backend/app_backend.py @@ -104,17 +104,20 @@ async def handle_stream_control(data: dict): self.logger.info(self.websocket_manager) self.logger.info("Starting stream") - asyncio.create_task( - self.pynm_state.start_run_function( + self.pynm_state.start_run_function( # out_dir=data["out_dir"], # experiment_name=data["experiment_name"], websocket_manager_features=self.websocket_manager, - ) ) + if action == "stop": self.logger.info("Stopping stream") - asyncio.create_task(self.pynm_state.stream_handling_queue.put("stop")) + self.pynm_state.stream_handling_queue.put("stop") + + # will this be enough time for stopping? + self.pynm_state.stream_controller_process.terminate() + self.pynm_state.run_func_process.terminate() return {"message": f"Stream action '{action}' executed"} diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index 3617ceb8..79c7e13a 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -1,7 +1,7 @@ import asyncio import logging import numpy as np -import multiprocessing +import multiprocess as mp from py_neuromodulation.stream import Stream, NMSettings from py_neuromodulation.utils import set_channels @@ -11,12 +11,16 @@ async def run_stream_controller(feature_queue: asyncio.Queue, rawdata_queue: asyncio.Queue, websocket_manager_features: "WebSocketManager"): while True: + await asyncio.sleep(0.002) + logger.info("wait for feature queue") if not feature_queue.empty() and websocket_manager_features is not None: feature_dict = feature_queue.get() logger.info("Sending message to Websocket") await websocket_manager_features.send_cbor(feature_dict) # here the rawdata queue could also be used to send raw data, potentiall through different websocket? +def run_stream_controller_sync(feature_queue, rawdata_queue, websocket_manager_features): + asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features)) class PyNMState: def __init__( @@ -26,6 +30,8 @@ def __init__( self.logger = logging.getLogger("uvicorn.error") self.lsl_stream_name = None + self.stream_controller_process = None + self.run_func_process = None if default_init: self.stream: Stream = Stream(sfreq=1500, data=np.random.random([1, 1])) @@ -40,34 +46,37 @@ def start_run_function( websocket_manager_features=None, ) -> None: + self.stream.settings = self.settings - self.stream_handling_queue = multiprocessing.Queue() - self.feature_queue = multiprocessing.Queue() - self.rawdata_queue = multiprocessing.Queue() + self.stream_handling_queue = mp.Queue() + self.feature_queue = mp.Queue() + self.rawdata_queue = mp.Queue() - self.stream_controller_process = multiprocessing.Process( - target=run_stream_controller, + self.logger.info("Starting run controller function") + self.stream_controller_process = mp.Process( + target=run_stream_controller_sync, args=(self.feature_queue, self.rawdata_queue, websocket_manager_features), ) - self.stream_controller_process.start() - - self.logger.info("setup stream Process") - - self.stream.settings = self.settings - self.stream.run( - out_dir=out_dir, - experiment_name=experiment_name, - stream_handling_queue=self.stream_handling_queue, - is_stream_lsl=self.lsl_stream_name is not None, - stream_lsl_name=self.lsl_stream_name - if self.lsl_stream_name is not None - else "", - websocket_featues=websocket_manager_features, + # this function also has to be in a process + is_stream_lsl = self.lsl_stream_name is not None + stream_lsl_name = self.lsl_stream_name if self.lsl_stream_name is not None else "" + + self.run_func_process = mp.Process( + target=self.stream.run, + kwargs={ + "out_dir" : out_dir, + "experiment_name" : experiment_name, + "stream_handling_queue" : self.stream_handling_queue, + "is_stream_lsl" : is_stream_lsl, + "stream_lsl_name" : stream_lsl_name, + "feature_queue" : self.feature_queue, + #"rawdata_queue" : self.rawdata_queue, + }, ) - self.stream_controller_process.terminate() - + #self.stream_controller_process.start() + self.run_func_process.start() def setup_lsl_stream( self, diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index 2f39ebd3..62071267 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -285,18 +285,22 @@ def run( nm.logger.info("Checking for stop signal") if simulate_real_time: time.sleep(1 / self.settings.sampling_rate_features_hz) + nm.logger.info("Sleep over") if not self.stream_handling_queue.empty(): - + nm.logger.info("Got stream handling queue signal") # check that the timing fits # previously it was necessary to wait that the process had # enough time to check for the stop signal signal = self.stream_handling_queue.get() + nm.logger.info(f"Got signal: {signal}") if signal == "stop": break if data_batch is None: + nm.logger.info("Data batch is None, stopping run function") break + nm.logger.info("Processing new data batch") feature_dict = self.data_processor.process(data_batch) this_batch_end = timestamps[-1] @@ -322,6 +326,7 @@ def run( file_writer.insert_data(feature_dict) if feature_queue is not None: feature_queue.put(feature_dict) + nm.logger.info("Feature dict sent to queue") self.batch_count += 1 if self.batch_count % self.save_interval == 0: diff --git a/pyproject.toml b/pyproject.toml index 435bfb0d..32cea5a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ dependencies = [ "numba>=0.60.0", "cbor2>=5.6.4", "msgpack>=1.1.0", + "multiprocess>=0.70.17", ] [project.optional-dependencies] From efe54ec63c09cdfaf2ccf4cbc4956bc3935b9bc6 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Tue, 19 Nov 2024 18:11:33 +0100 Subject: [PATCH 3/4] refactor stream handling to use threading instead of multiprocessing for improved control and responsiveness --- py_neuromodulation/gui/backend/app_backend.py | 5 +-- py_neuromodulation/gui/backend/app_pynm.py | 38 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/py_neuromodulation/gui/backend/app_backend.py b/py_neuromodulation/gui/backend/app_backend.py index 7fae70c2..e6c74985 100644 --- a/py_neuromodulation/gui/backend/app_backend.py +++ b/py_neuromodulation/gui/backend/app_backend.py @@ -114,10 +114,7 @@ async def handle_stream_control(data: dict): if action == "stop": self.logger.info("Stopping stream") self.pynm_state.stream_handling_queue.put("stop") - - # will this be enough time for stopping? - self.pynm_state.stream_controller_process.terminate() - self.pynm_state.run_func_process.terminate() + self.pynm_state.stop_event_ws.set() return {"message": f"Stream action '{action}' executed"} diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index 79c7e13a..bc5652cb 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -1,26 +1,27 @@ import asyncio import logging +import threading import numpy as np -import multiprocess as mp - +import multiprocessing as mp +from threading import Thread +from queue import Queue from py_neuromodulation.stream import Stream, NMSettings from py_neuromodulation.utils import set_channels from py_neuromodulation.utils.io import read_mne_data from py_neuromodulation import logger async def run_stream_controller(feature_queue: asyncio.Queue, rawdata_queue: asyncio.Queue, - websocket_manager_features: "WebSocketManager"): - while True: - await asyncio.sleep(0.002) - logger.info("wait for feature queue") + websocket_manager_features: "WebSocketManager", stop_event: threading.Event): + while not stop_event.wait(0.002): + #await asyncio.sleep(0.002) if not feature_queue.empty() and websocket_manager_features is not None: feature_dict = feature_queue.get() logger.info("Sending message to Websocket") await websocket_manager_features.send_cbor(feature_dict) # here the rawdata queue could also be used to send raw data, potentiall through different websocket? -def run_stream_controller_sync(feature_queue, rawdata_queue, websocket_manager_features): - asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features)) +def run_stream_controller_sync(feature_queue, rawdata_queue, websocket_manager_features, stop_event: threading.Event): + asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features, stop_event)) class PyNMState: def __init__( @@ -48,22 +49,29 @@ def start_run_function( self.stream.settings = self.settings - self.stream_handling_queue = mp.Queue() - self.feature_queue = mp.Queue() - self.rawdata_queue = mp.Queue() + self.stream_handling_queue = Queue() + self.feature_queue = Queue() + self.rawdata_queue = Queue() self.logger.info("Starting run controller function") - self.stream_controller_process = mp.Process( + self.stop_event_ws = threading.Event() + self.stream_controller_process = Thread( target=run_stream_controller_sync, - args=(self.feature_queue, self.rawdata_queue, websocket_manager_features), + daemon=True, + args=(self.feature_queue, + self.rawdata_queue, + websocket_manager_features, + self.stop_event_ws + ), ) # this function also has to be in a process is_stream_lsl = self.lsl_stream_name is not None stream_lsl_name = self.lsl_stream_name if self.lsl_stream_name is not None else "" - self.run_func_process = mp.Process( + self.run_func_process = Thread( target=self.stream.run, + daemon=True, kwargs={ "out_dir" : out_dir, "experiment_name" : experiment_name, @@ -75,7 +83,7 @@ def start_run_function( }, ) - #self.stream_controller_process.start() + self.stream_controller_process.start() self.run_func_process.start() def setup_lsl_stream( From 557dacfb8411285c5300e6f3d9bad9d91a795fc7 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Wed, 20 Nov 2024 18:09:09 +0100 Subject: [PATCH 4/4] refactor app_manager and app_pynm to use queue.Queue instead of multiprocessing for stream handling --- py_neuromodulation/gui/backend/app_manager.py | 6 --- py_neuromodulation/gui/backend/app_pynm.py | 41 +++++++++++-------- py_neuromodulation/stream/stream.py | 21 +++------- start_LSL_stream.py | 9 ++-- 4 files changed, 33 insertions(+), 44 deletions(-) diff --git a/py_neuromodulation/gui/backend/app_manager.py b/py_neuromodulation/gui/backend/app_manager.py index 2a67b07e..b1cb6971 100644 --- a/py_neuromodulation/gui/backend/app_manager.py +++ b/py_neuromodulation/gui/backend/app_manager.py @@ -203,12 +203,6 @@ def __init__( self.is_child_process = os.environ.get(self.LAUNCH_FLAG) == "TRUE" os.environ[self.LAUNCH_FLAG] = "TRUE" - # PyNM state - # TODO: need to find a way to pass the state to the backend - # self.pynm_state = PyNMState() - # self.app = PyNMBackend(pynm_state=self.pynm_state) - - # Logging self.logger = create_logger( "PyNM", "yellow", diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index bc5652cb..fb5bf3f4 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -4,29 +4,33 @@ import numpy as np import multiprocessing as mp from threading import Thread -from queue import Queue +import queue from py_neuromodulation.stream import Stream, NMSettings from py_neuromodulation.utils import set_channels from py_neuromodulation.utils.io import read_mne_data from py_neuromodulation import logger -async def run_stream_controller(feature_queue: asyncio.Queue, rawdata_queue: asyncio.Queue, +async def run_stream_controller(feature_queue: queue.Queue, rawdata_queue: queue.Queue, websocket_manager_features: "WebSocketManager", stop_event: threading.Event): while not stop_event.wait(0.002): - #await asyncio.sleep(0.002) if not feature_queue.empty() and websocket_manager_features is not None: feature_dict = feature_queue.get() logger.info("Sending message to Websocket") await websocket_manager_features.send_cbor(feature_dict) # here the rawdata queue could also be used to send raw data, potentiall through different websocket? -def run_stream_controller_sync(feature_queue, rawdata_queue, websocket_manager_features, stop_event: threading.Event): +def run_stream_controller_sync(feature_queue: queue.Queue, + rawdata_queue: queue.Queue, + websocket_manager_features: "WebSocketManager", + stop_event: threading.Event + ): + # The run_stream_controller needs to be started as an asyncio function due to the async websocket asyncio.run(run_stream_controller(feature_queue, rawdata_queue, websocket_manager_features, stop_event)) class PyNMState: def __init__( self, - default_init: bool = True, + default_init: bool = True, # has to be true for the backend settings communication ) -> None: self.logger = logging.getLogger("uvicorn.error") @@ -36,8 +40,7 @@ def __init__( if default_init: self.stream: Stream = Stream(sfreq=1500, data=np.random.random([1, 1])) - # TODO: we currently can pass the sampling_rate_features to both the stream and the settings? - self.settings: NMSettings = NMSettings(sampling_rate_features=17) + self.settings: NMSettings = NMSettings(sampling_rate_features=10) def start_run_function( @@ -49,13 +52,17 @@ def start_run_function( self.stream.settings = self.settings - self.stream_handling_queue = Queue() - self.feature_queue = Queue() - self.rawdata_queue = Queue() + self.stream_handling_queue = queue.Queue() + self.feature_queue = queue.Queue() + self.rawdata_queue = queue.Queue() + + self.logger.info("Starting stream_controller_process thread") - self.logger.info("Starting run controller function") + + # Stop even that is set in the app_backend self.stop_event_ws = threading.Event() - self.stream_controller_process = Thread( + + self.stream_controller_thread = Thread( target=run_stream_controller_sync, daemon=True, args=(self.feature_queue, @@ -65,11 +72,12 @@ def start_run_function( ), ) - # this function also has to be in a process is_stream_lsl = self.lsl_stream_name is not None stream_lsl_name = self.lsl_stream_name if self.lsl_stream_name is not None else "" - self.run_func_process = Thread( + # The run_func_thread is terminated through the stream_handling_queue + # which initiates to break the data generator and save the features + self.run_func_thread = Thread( target=self.stream.run, daemon=True, kwargs={ @@ -79,12 +87,13 @@ def start_run_function( "is_stream_lsl" : is_stream_lsl, "stream_lsl_name" : stream_lsl_name, "feature_queue" : self.feature_queue, + "simulate_real_time" : True, #"rawdata_queue" : self.rawdata_queue, }, ) - self.stream_controller_process.start() - self.run_func_process.start() + self.stream_controller_thread.start() + self.run_func_thread.start() def setup_lsl_stream( self, diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index 62071267..d0f77192 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -205,12 +205,12 @@ def run( experiment_name: str = "sub", is_stream_lsl: bool = False, stream_lsl_name: str | None = None, - save_csv: bool = False, + save_csv: bool = True, save_interval: int = 10, return_df: bool = True, - simulate_real_time: bool = True, - feature_queue: "multiprocessing.Queue | None" = None, - stream_handling_queue: "multiprocessing.Queue | None" = None, + simulate_real_time: bool = False, + feature_queue: "queue.Queue | None" = None, + stream_handling_queue: "queue.Queue | None" = None, ): self.is_stream_lsl = is_stream_lsl self.stream_lsl_name = stream_lsl_name @@ -218,7 +218,7 @@ def run( self.save_csv = save_csv self.save_interval = save_interval self.return_df = return_df - self.out_dir = Path.cwd() if not out_dir else Path(out_dir) + self.out_dir = Path.cwd() / experiment_name if not out_dir else Path(out_dir) self.experiment_name = experiment_name # Validate input data @@ -233,9 +233,6 @@ def run( self.batch_count: int = 0 # Keep track of the number of batches processed - # Reinitialize the data processor in case the nm_channels or nm_settings changed between runs of the same Stream - # TONI: then I think we can just not initialize the data processor in the init function - # Timon: I think with the GUI addition, the setttings / channels will change and need to be reinitialized self.data_processor = DataProcessor( sfreq=self.sfreq, settings=self.settings, @@ -282,15 +279,9 @@ def run( for timestamps, data_batch in self.generator: self.is_running = True if self.stream_handling_queue is not None: - nm.logger.info("Checking for stop signal") if simulate_real_time: time.sleep(1 / self.settings.sampling_rate_features_hz) - nm.logger.info("Sleep over") if not self.stream_handling_queue.empty(): - nm.logger.info("Got stream handling queue signal") - # check that the timing fits - # previously it was necessary to wait that the process had - # enough time to check for the stop signal signal = self.stream_handling_queue.get() nm.logger.info(f"Got signal: {signal}") if signal == "stop": @@ -318,7 +309,6 @@ def run( self._add_target(feature_dict, data_batch) - # We should ensure that feature output is always either float64 or None and remove this with suppress(TypeError): # Need this because some features output None for key, value in feature_dict.items(): feature_dict[key] = np.float64(value) @@ -326,7 +316,6 @@ def run( file_writer.insert_data(feature_dict) if feature_queue is not None: feature_queue.put(feature_dict) - nm.logger.info("Feature dict sent to queue") self.batch_count += 1 if self.batch_count % self.save_interval == 0: diff --git a/start_LSL_stream.py b/start_LSL_stream.py index aff1a8bf..7f31688d 100644 --- a/start_LSL_stream.py +++ b/start_LSL_stream.py @@ -74,14 +74,11 @@ # features = asyncio.run(stream.run(data, save_csv=True)) + # remove first eight channels + raw.drop_channels(raw.ch_names[:8]) + player = LSLOfflinePlayer(raw=raw, stream_name="example_stream") player.start_player(chunk_size=30, n_repeat=5999999) App(run_in_webview=False, dev=True).launch() - - - # Check if LSL stream pushes data - # settings = nm.NMSettings.get_fast_compute() - # LSLstream = LSLStream(settings, stream_name="example_stream") - # time, data = next(LSLstream.get_next_batch())