diff --git a/Tests/test_sklearn.py b/Tests/test_sklearn.py index 62c6707..bd51aff 100644 --- a/Tests/test_sklearn.py +++ b/Tests/test_sklearn.py @@ -5,9 +5,8 @@ # Test case using the fixture @pytest.mark.timeout(300) # Extended timeout to 5 minutes def test_run_bci(): - bci = PyBCI(minimumEpochsRequired=5, createPseudoDevice=True) clf = MLPClassifier(max_iter = 1000, solver ="lbfgs")#solver=clf, alpha=alpha,hidden_layer_sizes=hid) - bci = PyBCI(minimumEpochsRequired=5, createPseudoDevice=True) + bci = PyBCI(minimumEpochsRequired=5, createPseudoDevice=True, clf = clf) while not bci.connected: bci.Connect() time.sleep(1) diff --git a/pybci/Utils/PseudoDevice.py b/pybci/Utils/PseudoDevice.py index f684431..07733b6 100644 --- a/pybci/Utils/PseudoDevice.py +++ b/pybci/Utils/PseudoDevice.py @@ -26,7 +26,7 @@ def __init__(self, execution_mode='process', *args, **kwargs): # Note: Don't initialize self.device here for 'process' mode! elif self.execution_mode == 'thread': self.command_queue = None # Not needed for threads, but kept for consistency - self.stop_signal = multiprocessing.Event() + self.stop_signal = False self.device = PseudoDevice(*self.args, **self.kwargs, stop_signal=self.stop_signal, is_multiprocessing=False) # Initialize for 'thread' mode self.worker = threading.Thread(target=self._run_device) else: @@ -42,10 +42,7 @@ def __init__(self, execution_mode='process', *args, **kwargs): self.log_reader_process.start() def __del__(self): - if not self._should_stop(): - self.stop_signal.set() - self.worker.join() - + self.StopStreaming() # Your existing method to stop threads and processes def _run_device(self): if self.execution_mode == 'process': @@ -84,7 +81,8 @@ def StopStreaming(self): self.worker.join() # Wait for the worker to finish - if self.log_reader_process: + # Terminate the log reader process if it exists + if self.log_reader_process is not None: self.log_reader_process.terminate() self.log_reader_process.join() @@ -198,6 +196,7 @@ def update(self): self.outlet.push_chunk(num.tolist()) self.last_update_time = current_time + # Make sure this method is available in the PseudoDevice class def StopStreaming(self): if self.is_multiprocessing: self.stop_signal.set() @@ -207,10 +206,6 @@ def StopStreaming(self): if hasattr(self, 'thread'): self.thread.join() # Wait for the thread to finish - if self.pseudoMarkerConfig.autoplay and hasattr(self, 'marker_thread'): - self.marker_thread.join() # Wait for the marker thread to finish - - self.log_message(Logger.INFO, " PseudoDevice - Stopped streaming.") def BeginStreaming(self): if self.is_multiprocessing: # For multiprocessing, we assume the worker process is already running diff --git a/t.py b/t.py new file mode 100644 index 0000000..5f431f4 --- /dev/null +++ b/t.py @@ -0,0 +1,34 @@ +from pybci import PyBCI +import time + +from sklearn.neural_network import MLPClassifier +# Test case using the fixture + +def test_run_bci(): + clf = MLPClassifier(max_iter = 1000, solver ="lbfgs")#solver=clf, alpha=alpha,hidden_layer_sizes=hid) + bci = PyBCI(minimumEpochsRequired=5, createPseudoDevice=True) + while not bci.connected: + bci.Connect() + time.sleep(1) + bci.TrainMode() + accuracy_achieved = False + marker_received = False + accuracy=0 + while True: + currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing + time.sleep(0.5) # wait for marker updates + print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r") + if len(currentMarkers) > 1: # check there is more then one marker type received + marker_received = True + if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: + classInfo = bci.CurrentClassifierInfo() # hangs if called too early + accuracy = classInfo["accuracy"]### + if accuracy > 0.75: + accuracy_achieved = True + bci.StopThreads() + break + if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired+4: + break + #assert accuracy_achieved and marker_received +if __name__ == '__main__': + test_run_bci() \ No newline at end of file