Skip to content

Commit

Permalink
Update PseudoDevice.py
Browse files Browse the repository at this point in the history
  • Loading branch information
LMBooth committed Oct 2, 2023
1 parent a4b570d commit 7b4a91b
Showing 1 changed file with 22 additions and 55 deletions.
77 changes: 22 additions & 55 deletions pybci/Utils/PseudoDevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,89 +6,56 @@
#########################################################
from ..Utils.Logger import Logger
from ..Configuration.PseudoDeviceSettings import PseudoDataConfig, PseudoMarkerConfig
import multiprocessing, time, threading, pylsl, queue
import time, threading, pylsl, queue
from multiprocessing import Process, Queue, Event
import numpy as np
from collections import deque

class PseudoDeviceController:
log_queue = None

def __init__(self, execution_mode='process', *args, **kwargs):
self.execution_mode = execution_mode
self.args = args
self.kwargs = kwargs
self.stop_signal = Event()

# Create a command queue for the worker
if self.execution_mode == 'process':
self.command_queue = multiprocessing.Queue()
self.stop_signal = multiprocessing.Event()
self.worker = multiprocessing.Process(target=self._run_device)
self.log_queue = multiprocessing.Queue()
# Note: Don't initialize self.device here for 'process' mode!
self.command_queue = Queue()
self.worker = Process(target=self._run_device)
elif self.execution_mode == 'thread':
self.command_queue = None # Not needed for threads, but kept for consistency
self.stop_signal = False
self.device = PseudoDevice(*self.args, **self.kwargs, stop_signal=self.stop_signal, is_multiprocessing=False) # Initialize for 'thread' mode
self.worker = threading.Thread(target=self._run_device)
else:
raise ValueError(f"Unsupported execution mode: {execution_mode}")

self.worker.start()
# Initialize the logger

self.logger = Logger(log_queue=self.log_queue)
self.log_reader_process = None
if self.execution_mode == 'process':
self.log_reader_process = multiprocessing.Process(target=self.logger.start_queue_reader)
self.log_reader_process.start()

def __del__(self):
self.StopStreaming() # Your existing method to stop threads and processes

def _run_device(self):
if self.execution_mode == 'process':
device = PseudoDevice(*self.args, **self.kwargs, stop_signal=self.stop_signal, log_queue=self.log_queue, is_multiprocessing=True) # Initialize locally for 'process' mode

while not self._should_stop():
if not self.command_queue.empty():
command = self.command_queue.get()
device = PseudoDevice(*self.args, **self.kwargs, stop_signal=self.stop_signal)
while not self.stop_signal.is_set():
if self.execution_mode == 'process':
try:
command = self.command_queue.get_nowait()
if command == "BeginStreaming":
device.BeginStreaming()
# Sleep for a brief moment before checking again
time.sleep(0.01)
except queue.Empty:
pass
elif self.execution_mode == 'thread':
device.update()

elif self.execution_mode == 'thread':
while not self._should_stop():
self.device.update() # or any other method you want to run continuously
time.sleep(0.01)

def _should_stop(self):
if self.execution_mode == 'process':
return self.stop_signal.is_set()
else: # thread
return self.stop_signal
time.sleep(0.01)

def BeginStreaming(self):
if self.execution_mode == 'process':
self.command_queue.put("BeginStreaming")
else: # thread
self.device.BeginStreaming()
else:
self.worker.BeginStreaming()

def StopStreaming(self):
if self.execution_mode == 'process':
self.stop_signal.set()
else: # thread
self.stop_signal = True

self.worker.join() # Wait for the worker to finish

# Terminate the log reader process if it exists
if self.log_reader_process is not None:
self.log_reader_process.terminate()
self.log_reader_process.join()

def close():
# add close logic
print("close it")
self.stop_signal.set()
self.worker.join(timeout=5)
if self.worker.is_alive():
self.worker.terminate()

def precise_sleep(duration):
end_time = time.time() + duration
Expand Down

0 comments on commit 7b4a91b

Please sign in to comment.