From f6e42414c142134a0576d8cd7ebc07c4cee6de8d Mon Sep 17 00:00:00 2001 From: LMBooth Date: Wed, 25 Oct 2023 22:34:12 +0100 Subject: [PATCH] cli update updates cli calls to exit more gracefully, and added coverage tests for cli --- Tests/test_cli.py | 75 ++++++++--- pybci/CliTests/testPyTorch.py | 215 +++++++++++++++++++------------ pybci/CliTests/testSimple.py | 96 +++++++++----- pybci/CliTests/testSklearn.py | 102 ++++++++++----- pybci/CliTests/testTensorflow.py | 138 +++++++++++++------- pybci/version.py | 2 +- 6 files changed, 411 insertions(+), 217 deletions(-) diff --git a/Tests/test_cli.py b/Tests/test_cli.py index a3c8bc0..3a80460 100644 --- a/Tests/test_cli.py +++ b/Tests/test_cli.py @@ -1,19 +1,60 @@ -#import time -#from pybci.CliTests.testSimple import main as mainSimple -#from pybci.CliTests.testSklearn import main as mainSklearn -#from pybci.CliTests.testPyTorch import main as mainPyTorch -#from pybci.CliTests.testTensorflow import main as mainTensorflow - +import threading +from pybci.CliTests.testSimple import main as mainSimple +from pybci.CliTests.testSklearn import main as mainSklearn +from pybci.CliTests.testPyTorch import main as mainPyTorch +from pybci.CliTests.testTensorflow import main as mainTensorflow +from unittest.mock import patch # Example usage -def test_cli(): - #mainSimple(min_epochs_train=1, min_epochs_test=2, timeout=10) - #time.sleep(15) - #m#ainSklearn(min_epochs_train=1, min_epochs_test=2, timeout=10) - #time.sleep(15) - #mainPyTorch(min_epochs_train=1, min_epochs_test=2, timeout=10) - #time.sleep(15) - #mainTensorflow(min_epochs_train=1, min_epochs_test=2, timeout=10) - #time.sleep(15) - assert True - \ No newline at end of file +#def test_cli(): +def test_cli_simple_timeout(): + with patch('builtins.input', return_value='stop'): + timeout = 30 # timeout in seconds + my_bci_wrapper = None + + def run_main(): + nonlocal my_bci_wrapper + my_bci_wrapper = mainSimple(createPseudoDevice=True, min_epochs_train=1, min_epochs_test=2, timeout=timeout) + + main_thread = threading.Thread(target=run_main) + main_thread.start() + main_thread.join() + +def test_cli_sklearn_timeout(): + with patch('builtins.input', return_value='stop'): + timeout = 30 # timeout in seconds + my_bci_wrapper = None + + def run_main(): + nonlocal my_bci_wrapper + my_bci_wrapper = mainSklearn(createPseudoDevice=True, min_epochs_train=1, min_epochs_test=2, timeout=timeout) + + main_thread = threading.Thread(target=run_main) + main_thread.start() + main_thread.join() + +def test_cli_pytorch_timeout(): + with patch('builtins.input', return_value='stop'): + timeout = 30 # timeout in seconds + my_bci_wrapper = None + + def run_main(): + nonlocal my_bci_wrapper + my_bci_wrapper = mainPyTorch(createPseudoDevice=True, min_epochs_train=1, min_epochs_test=2, timeout=timeout) + + main_thread = threading.Thread(target=run_main) + main_thread.start() + main_thread.join() + +def test_cli_tensorflow_timeout(): + with patch('builtins.input', return_value='stop'): + timeout = 30 # timeout in seconds + my_bci_wrapper = None + + def run_main(): + nonlocal my_bci_wrapper + my_bci_wrapper = mainTensorflow(createPseudoDevice=True, min_epochs_train=1, min_epochs_test=2, timeout=timeout) + + main_thread = threading.Thread(target=run_main) + main_thread.start() + main_thread.join() \ No newline at end of file diff --git a/pybci/CliTests/testPyTorch.py b/pybci/CliTests/testPyTorch.py index 73c9108..0302e63 100644 --- a/pybci/CliTests/testPyTorch.py +++ b/pybci/CliTests/testPyTorch.py @@ -6,104 +6,151 @@ from torch import nn import threading +stop_signal = threading.Event() # Global event to control the main loop -def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, num_chs = 8, num_feats = 2, num_classes = 4, timeout=None): - if createPseudoDevice: - num_chs = 8 # 8 channels are created in the PseudoLSLGenerator - num_feats = 2 # default is mean freq and rms to keep it simple - num_classes = 4 # number of different triggers (can include baseline) sent, defines if we use softmax of binary - - class SimpleNN(nn.Module): - def __init__(self, input_size, hidden_size, num_classes): - super(SimpleNN, self).__init__() - self.fc1 = nn.Linear(input_size, hidden_size) - self.bn1 = nn.BatchNorm1d(hidden_size) - self.relu = nn.ReLU(inplace=True) # In-place operation - self.fc2 = nn.Linear(hidden_size, hidden_size) - self.bn2 = nn.BatchNorm1d(hidden_size) - self.fc3 = nn.Linear(hidden_size, num_classes) +global num_chs_g, num_feats_g, num_classes_g + +class SimpleNN(nn.Module): + def __init__(self, input_size, hidden_size, num_classes): + super(SimpleNN, self).__init__() + self.fc1 = nn.Linear(input_size, hidden_size) + self.bn1 = nn.BatchNorm1d(hidden_size) + self.relu = nn.ReLU(inplace=True) # In-place operation + self.fc2 = nn.Linear(hidden_size, hidden_size) + self.bn2 = nn.BatchNorm1d(hidden_size) + self.fc3 = nn.Linear(hidden_size, num_classes) + + def forward(self, x): + out = self.fc1(x) + if out.shape[0] > 1: # Skip BatchNorm if batch size is 1 + out = self.bn1(out) + out = self.relu(out) + out = self.fc2(out) + if out.shape[0] > 1: # Skip BatchNorm if batch size is 1 + out = self.bn2(out) + out = self.relu(out) + out = self.fc3(out) + return out + +def PyTorchModel(x_train, x_test, y_train, y_test): + input_size = num_feats_g*num_chs_g # num of channels multipled by number of default features (rms and mean freq) + hidden_size = 100 + #num_classes = num_classes # default in pseudodevice + model = SimpleNN(input_size, hidden_size, num_classes_g) + model.train() + criterion = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + epochs = 10 + train_data = TensorDataset(torch.Tensor(x_train), torch.Tensor(y_train).long()) + train_loader = DataLoader(dataset=train_data, batch_size=32, shuffle=True, drop_last=True) # Drop last incomplete batch + for epoch in range(epochs): + for inputs, labels in train_loader: + optimizer.zero_grad() + outputs = model(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + model.eval() + accuracy = 0 + with torch.no_grad(): + test_outputs = model(torch.Tensor(x_test)) + _, predicted = torch.max(test_outputs.data, 1) + correct = (predicted == torch.Tensor(y_test).long()).sum().item() + accuracy = correct / len(y_test) + return accuracy, model + + +def command_listener(): + while not stop_signal.is_set(): + command = input("PyBCI: [CLI] - Enter 'stop' to terminate\n") + if command == 'stop': + stop_signal.set() + break + + +class CLI_testPytorchWrapper: + def __init__(self, createPseudoDevice, min_epochs_train, min_epochs_test,num_chs, num_feats, num_classes, timeout): + if createPseudoDevice: + self.num_chs = 8 # 8 channels are created in the PseudoLSLGenerator + self.num_feats = 2 # default is mean freq and rms to keep it simple + self.num_classes = 4 # number of different triggers (can include baseline) sent, defines if we use softmax of binary + + self.createPseudoDevice = createPseudoDevice + self.timeout = timeout + self.min_epochs_train = min_epochs_train + self.min_epochs_test = min_epochs_test + self.accuracy = 0 + self.currentMarkers = {} + if self.min_epochs_test <= self.min_epochs_train: + self.min_epochs_test = self.min_epochs_train+1 - def forward(self, x): - out = self.fc1(x) - if out.shape[0] > 1: # Skip BatchNorm if batch size is 1 - out = self.bn1(out) - out = self.relu(out) - out = self.fc2(out) - if out.shape[0] > 1: # Skip BatchNorm if batch size is 1 - out = self.bn2(out) - out = self.relu(out) - out = self.fc3(out) - return out - def PyTorchModel(x_train, x_test, y_train, y_test): - input_size = num_feats*num_chs # num of channels multipled by number of default features (rms and mean freq) - hidden_size = 100 - #num_classes = num_classes # default in pseudodevice - model = SimpleNN(input_size, hidden_size, num_classes) - model.train() - criterion = nn.CrossEntropyLoss() - optimizer = torch.optim.Adam(model.parameters(), lr=0.001) - epochs = 10 - train_data = TensorDataset(torch.Tensor(x_train), torch.Tensor(y_train).long()) - train_loader = DataLoader(dataset=train_data, batch_size=32, shuffle=True, drop_last=True) # Drop last incomplete batch - for epoch in range(epochs): - for inputs, labels in train_loader: - optimizer.zero_grad() - outputs = model(inputs) - loss = criterion(outputs, labels) - loss.backward() - optimizer.step() - model.eval() - accuracy = 0 - with torch.no_grad(): - test_outputs = model(torch.Tensor(x_test)) - _, predicted = torch.max(test_outputs.data, 1) - correct = (predicted == torch.Tensor(y_test).long()).sum().item() - accuracy = correct / len(y_test) - return accuracy, model - - def loop(bci): - while not bci.connected: # check to see if lsl marker and datastream are available - bci.Connect() + self.bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, torchModel = PyTorchModel) + #self.bci = PyBCI(minimumEpochsRequired = self.min_epochs_train, createPseudoDevice=self.createPseudoDevice) + main_thread = threading.Thread(target=self.loop) + main_thread.start() + if self.timeout: + print("PyBCI: [CLI] - starting timeout thread") + self.timeout_thread = threading.Thread(target=self.stop_after_timeout) + self.timeout_thread.start() + main_thread.join() + if timeout is not None: + self.timeout_thread.join() + + + def loop(self): + while not self.bci.connected: # check to see if lsl marker and datastream are available + self.bci.Connect() time.sleep(1) - bci.TrainMode() # now both marker and datastreams available start training on received epochs - accuracy = 0 + self.bci.TrainMode() # now both marker and datastreams available start training on received epochs + self.accuracy = 0 test = False try: - while(True): + while not stop_signal.is_set(): # Add the check here if test is False: - currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing + self.currentMarkers = self.bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing time.sleep(0.5) # wait for marker updates - print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r") - if len(currentMarkers) > 1: # check there is more then one marker type received - if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: - classInfo = bci.CurrentClassifierInfo() # hangs if called too early - accuracy = classInfo["accuracy"] - if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test: - bci.TestMode() - break + print("Markers received: " + str(self.currentMarkers) +" Accuracy: " + str(round(self.accuracy,2)), end=" \r") + if len(self.currentMarkers) > 1: # check there is more then one marker type received + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.bci.minimumEpochsRequired: + classInfo = self.bci.CurrentClassifierInfo() # hangs if called too early + self.accuracy = classInfo["accuracy"] + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.min_epochs_test: + self.bci.TestMode() + test = True else: - markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned - guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess] + markerGuess = self.bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned + guess = [key for key, value in self.currentMarkers.items() if value[0] == markerGuess] print("Current marker estimation: " + str(guess), end=" \r") time.sleep(0.2) - - return None + self.bci.StopThreads() except KeyboardInterrupt: # allow user to break while loop print("\nLoop interrupted by user.") - def stop_after_timeout(bci): - time.sleep(timeout) + def stop_after_timeout(self): + time.sleep(self.timeout) + stop_signal.set() print("\nTimeout reached. Stopping threads.") - bci.StopThreads() - - bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, torchModel = PyTorchModel) - main_thread = threading.Thread(target=loop, args=(bci,)) - main_thread.start() - if timeout: - timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,)) - timeout_thread.start() - timeout_thread.join() - main_thread.join() + + # Add these methods in CLI_testSimpleWrapper class + def get_accuracy(self): + return self.accuracy + + def get_current_markers(self): + return self.currentMarkers + +def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, num_chs = 8, num_feats = 2, num_classes = 4, timeout=None): + global num_chs_g, num_feats_g, num_classes_g + num_chs_g = num_chs + num_feats_g = num_feats + num_classes_g = num_classes + command_thread = threading.Thread(target=command_listener) + command_thread.daemon = True + command_thread.start() + + my_bci_wrapper = CLI_testPytorchWrapper(createPseudoDevice, min_epochs_train, min_epochs_test,num_chs, num_feats, num_classes,timeout) + command_thread.join() + return my_bci_wrapper # Return this instance + if __name__ == '__main__': parser = argparse.ArgumentParser(description="PyTorch neural network is used for model and pseudodevice generates 8 channels of 3 marker types and baseline. Similar to the testPytorch.py in the examples folder.") diff --git a/pybci/CliTests/testSimple.py b/pybci/CliTests/testSimple.py index 07cc43f..c822228 100644 --- a/pybci/CliTests/testSimple.py +++ b/pybci/CliTests/testSimple.py @@ -3,52 +3,85 @@ import time import threading -def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, timeout=None): - def loop(bci): - while not bci.connected: # check to see if lsl marker and datastream are available - bci.Connect() +stop_signal = threading.Event() # Global event to control the main loop + +def command_listener(): + while not stop_signal.is_set(): + command = input("PyBCI: [CLI] - Enter 'stop' to terminate\n") + if command == 'stop': + stop_signal.set() + break + +class CLI_testSimpleWrapper: + def __init__(self, createPseudoDevice, min_epochs_train, min_epochs_test, timeout): + self.createPseudoDevice = createPseudoDevice + self.timeout = timeout + self.min_epochs_train = min_epochs_train + self.min_epochs_test = min_epochs_test + self.accuracy = 0 + self.currentMarkers = {} + if self.min_epochs_test <= self.min_epochs_train: + self.min_epochs_test = self.min_epochs_train+1 + self.bci = PyBCI(minimumEpochsRequired = self.min_epochs_train, createPseudoDevice=self.createPseudoDevice) + main_thread = threading.Thread(target=self.loop) + main_thread.start() + if self.timeout: + print("PyBCI: [CLI] - starting timeout thread") + self.timeout_thread = threading.Thread(target=self.stop_after_timeout) + self.timeout_thread.start() + main_thread.join() + if timeout is not None: + self.timeout_thread.join() + + def loop(self): + while not self.bci.connected: # check to see if lsl marker and datastream are available + self.bci.Connect() time.sleep(1) - bci.TrainMode() # now both marker and datastreams available start training on received epochs - accuracy = 0 + self.bci.TrainMode() # now both marker and datastreams available start training on received epochs + self.accuracy = 0 test = False try: - while(True): + while not stop_signal.is_set(): # Add the check here if test is False: - currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing + self.currentMarkers = self.bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing time.sleep(0.5) # wait for marker updates - print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r") - if len(currentMarkers) > 1: # check there is more then one marker type received - if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: - classInfo = bci.CurrentClassifierInfo() # hangs if called too early - accuracy = classInfo["accuracy"] - if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test: - bci.TestMode() + print("Markers received: " + str(self.currentMarkers) +" Accuracy: " + str(round(self.accuracy,2)), end=" \r") + if len(self.currentMarkers) > 1: # check there is more then one marker type received + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.bci.minimumEpochsRequired: + classInfo = self.bci.CurrentClassifierInfo() # hangs if called too early + self.accuracy = classInfo["accuracy"] + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.min_epochs_test: + self.bci.TestMode() test = True else: - markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned - guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess] + markerGuess = self.bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned + guess = [key for key, value in self.currentMarkers.items() if value[0] == markerGuess] print("Current marker estimation: " + str(guess), end=" \r") time.sleep(0.2) + self.bci.StopThreads() except KeyboardInterrupt: # allow user to break while loop print("\nLoop interrupted by user.") - def stop_after_timeout(bci): - time.sleep(timeout) + def stop_after_timeout(self): + time.sleep(self.timeout) + stop_signal.set() print("\nTimeout reached. Stopping threads.") - bci.StopThreads() - - if min_epochs_test <= min_epochs_train: - min_epochs_test = min_epochs_train+1 - bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice) - main_thread = threading.Thread(target=loop, args=(bci,)) - main_thread.start() - if timeout: - timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,)) - timeout_thread.start() - timeout_thread.join() + # Add these methods in CLI_testSimpleWrapper class + def get_accuracy(self): + return self.accuracy - main_thread.join() + def get_current_markers(self): + return self.currentMarkers + +def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, timeout=None): + command_thread = threading.Thread(target=command_listener) + command_thread.daemon = True + command_thread.start() + + my_bci_wrapper = CLI_testSimpleWrapper(createPseudoDevice, min_epochs_train, min_epochs_test,timeout) + command_thread.join() + return my_bci_wrapper # Return this instance if __name__ == '__main__': parser = argparse.ArgumentParser(description="Runs simple setup where sklearn support-vector-machine is used for model and pseudodevice generates 8 channels of 3 marker types and a baseline. Similar to the testSimple.py in the examples folder.") @@ -57,6 +90,5 @@ def stop_after_timeout(bci): parser.add_argument("--min_epochs_test", default=14, type=int, help='Minimum epochs to collect before model testing commences, if less than min_epochs_test defaults to min_epochs_test+1.') parser.add_argument("--timeout", default=None, type=int, help="Timeout in seconds for the script to automatically stop.") - args = parser.parse_args() main(**vars(args)) diff --git a/pybci/CliTests/testSklearn.py b/pybci/CliTests/testSklearn.py index 2ee71f8..1e50b20 100644 --- a/pybci/CliTests/testSklearn.py +++ b/pybci/CliTests/testSklearn.py @@ -4,55 +4,87 @@ from sklearn.neural_network import MLPClassifier import threading -def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, timeout=None): - def loop(bci): - while not bci.connected: # check to see if lsl marker and datastream are available - bci.Connect() +stop_signal = threading.Event() # Global event to control the main loop + +def command_listener(): + while not stop_signal.is_set(): + command = input("PyBCI: [CLI] - Enter 'stop' to terminate\n") + if command == 'stop': + stop_signal.set() + break + +class CLI_testSklearnWrapper: + def __init__(self, createPseudoDevice, min_epochs_train, min_epochs_test, timeout): + self.createPseudoDevice = createPseudoDevice + self.timeout = timeout + self.min_epochs_train = min_epochs_train + self.min_epochs_test = min_epochs_test + self.accuracy = 0 + self.currentMarkers = {} + if self.min_epochs_test <= self.min_epochs_train: + self.min_epochs_test = self.min_epochs_train+1 + clf = MLPClassifier(max_iter = 1000, solver ="lbfgs")#solver=clf, alpha=alpha,hidden_layer_sizes=hid) + self.bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, clf = clf) + main_thread = threading.Thread(target=self.loop) + main_thread.start() + if self.timeout: + print("PyBCI: [CLI] - starting timeout thread") + self.timeout_thread = threading.Thread(target=self.stop_after_timeout) + self.timeout_thread.start() + main_thread.join() + if timeout is not None: + self.timeout_thread.join() + + def loop(self): + while not self.bci.connected: # check to see if lsl marker and datastream are available + self.bci.Connect() time.sleep(1) - bci.TrainMode() # now both marker and datastreams available start training on received epochs - accuracy = 0 + self.bci.TrainMode() # now both marker and datastreams available start training on received epochs + self.accuracy = 0 test = False try: - while(True): + while not stop_signal.is_set(): # Add the check here if test is False: - currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing + self.currentMarkers = self.bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing time.sleep(0.5) # wait for marker updates - print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r") - if len(currentMarkers) > 1: # check there is more then one marker type received - if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: - classInfo = bci.CurrentClassifierInfo() # hangs if called too early - accuracy = classInfo["accuracy"] - if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_train: - bci.TestMode() - break + print("Markers received: " + str(self.currentMarkers) +" Accuracy: " + str(round(self.accuracy,2)), end=" \r") + if len(self.currentMarkers) > 1: # check there is more then one marker type received + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.bci.minimumEpochsRequired: + classInfo = self.bci.CurrentClassifierInfo() # hangs if called too early + self.accuracy = classInfo["accuracy"] + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.min_epochs_test: + self.bci.TestMode() + test = True else: - markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned - guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess] + markerGuess = self.bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned + guess = [key for key, value in self.currentMarkers.items() if value[0] == markerGuess] print("Current marker estimation: " + str(guess), end=" \r") time.sleep(0.2) - return None + self.bci.StopThreads() except KeyboardInterrupt: # allow user to break while loop print("\nLoop interrupted by user.") - def stop_after_timeout(bci): - time.sleep(timeout) + def stop_after_timeout(self): + time.sleep(self.timeout) + stop_signal.set() print("\nTimeout reached. Stopping threads.") - bci.StopThreads() + + # Add these methods in CLI_testSimpleWrapper class + def get_accuracy(self): + return self.accuracy + + def get_current_markers(self): + return self.currentMarkers - if min_epochs_test <= min_epochs_train: - min_epochs_test = min_epochs_train+1 - clf = MLPClassifier(max_iter = 1000, solver ="lbfgs")#solver=clf, alpha=alpha,hidden_layer_sizes=hid) - if min_epochs_test <= min_epochs_train: - min_epochs_test = min_epochs_train+1 - bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, clf = clf) - main_thread = threading.Thread(target=loop, args=(bci,)) - main_thread.start() - if timeout: - timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,)) - timeout_thread.start() - timeout_thread.join() - main_thread.join() +def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, timeout=None): + command_thread = threading.Thread(target=command_listener) + command_thread.daemon = True + command_thread.start() + + my_bci_wrapper = CLI_testSklearnWrapper(createPseudoDevice, min_epochs_train, min_epochs_test,timeout) + command_thread.join() + return my_bci_wrapper # Return this instance if __name__ == '__main__': parser = argparse.ArgumentParser(description="Sklearn multi-layer perceptron is used for model and pseudodevice generates 8 channels of 3 marker types and a baseline. Similar to the testSimple.py in the examples folder.") diff --git a/pybci/CliTests/testTensorflow.py b/pybci/CliTests/testTensorflow.py index 9f653ef..51c8cb8 100644 --- a/pybci/CliTests/testTensorflow.py +++ b/pybci/CliTests/testTensorflow.py @@ -4,67 +4,109 @@ import tensorflow as tf# bring in tf for custom model creation import threading -def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, num_chs = 8, num_feats = 2, num_classes = 4, timeout=None): - def loop(bci): - while not bci.connected: # check to see if lsl marker and datastream are available - bci.Connect() +stop_signal = threading.Event() # Global event to control the main loop + +def command_listener(): + while not stop_signal.is_set(): + command = input("PyBCI: [CLI] - Enter 'stop' to terminate\n") + if command == 'stop': + stop_signal.set() + break + + + +class CLI_testPytorchWrapper: + def __init__(self, createPseudoDevice, min_epochs_train, min_epochs_test,num_chs, num_feats, num_classes, timeout): + if createPseudoDevice: + self.num_chs = 8 # 8 channels are created in the PseudoLSLGenerator + self.num_feats = 2 # default is mean freq and rms to keep it simple + self.num_classes = 4 # number of different triggers (can include baseline) sent, defines if we use softmax of binary + + self.createPseudoDevice = createPseudoDevice + self.timeout = timeout + self.min_epochs_train = min_epochs_train + self.min_epochs_test = min_epochs_test + self.accuracy = 0 + self.currentMarkers = {} + if self.min_epochs_test <= self.min_epochs_train: + self.min_epochs_test = self.min_epochs_train+1 + # Define the GRU model + model = tf.keras.Sequential() + model.add(tf.keras.layers.Reshape((num_chs*num_feats, 1), input_shape=(num_chs*num_feats,))) + model.add(tf.keras.layers.GRU(units=256))#, input_shape=num_chs*num_feats)) # maybe should show this example as 2d with toggleable timesteps disabled + model.add(tf.keras.layers.Dense(units=512, activation='relu')) + model.add(tf.keras.layers.Flatten())# )tf.keras.layers.Dense(units=128, activation='relu')) + model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax')) # softmax as more then binary classification (sparse_categorical_crossentropy) + #model.add(tf.keras.layers.Dense(units=1, activation='sigmoid')) # sigmoid as ninary classification (binary_crossentropy) + model.summary() + model.compile(loss='sparse_categorical_crossentropy',# using sparse_categorical as we expect multi-class (>2) output, sparse because we encode targetvalues with integers + optimizer='adam', + metrics=['accuracy']) + self.bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, model = model) + main_thread = threading.Thread(target=self.loop) + main_thread.start() + if self.timeout: + print("PyBCI: [CLI] - starting timeout thread") + self.timeout_thread = threading.Thread(target=self.stop_after_timeout) + self.timeout_thread.start() + main_thread.join() + if timeout is not None: + self.timeout_thread.join() + + + def loop(self): + while not self.bci.connected: # check to see if lsl marker and datastream are available + self.bci.Connect() time.sleep(1) - bci.TrainMode() # now both marker and datastreams available start training on received epochs - accuracy = 0 + self.bci.TrainMode() # now both marker and datastreams available start training on received epochs + self.accuracy = 0 test = False try: - while(True): + while not stop_signal.is_set(): # Add the check here if test is False: - currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing + self.currentMarkers = self.bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing time.sleep(0.5) # wait for marker updates - print("Markers received: " + str(currentMarkers) +" Accuracy: " + str(round(accuracy,2)), end=" \r") - if len(currentMarkers) > 1: # check there is more then one marker type received - if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: - classInfo = bci.CurrentClassifierInfo() # hangs if called too early - accuracy = classInfo["accuracy"] - if min([currentMarkers[key][1] for key in currentMarkers]) > min_epochs_test: - bci.TestMode() - break + print("Markers received: " + str(self.currentMarkers) +" Accuracy: " + str(round(self.accuracy,2)), end=" \r") + if len(self.currentMarkers) > 1: # check there is more then one marker type received + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.bci.minimumEpochsRequired: + classInfo = self.bci.CurrentClassifierInfo() # hangs if called too early + self.accuracy = classInfo["accuracy"] + if min([self.currentMarkers[key][1] for key in self.currentMarkers]) > self.min_epochs_test: + self.bci.TestMode() + test = True else: - markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned - guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess] + markerGuess = self.bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned + guess = [key for key, value in self.currentMarkers.items() if value[0] == markerGuess] print("Current marker estimation: " + str(guess), end=" \r") time.sleep(0.2) - - return None + self.bci.StopThreads() except KeyboardInterrupt: # allow user to break while loop print("\nLoop interrupted by user.") - - def stop_after_timeout(bci): - time.sleep(timeout) + def stop_after_timeout(self): + time.sleep(self.timeout) + stop_signal.set() print("\nTimeout reached. Stopping threads.") - bci.StopThreads() - if createPseudoDevice: - num_chs = 8 # 8 channels are created in the PseudoLSLGenerator - num_feats = 2 # default is mean freq and rms to keep it simple - num_classes = 4 # number of different triggers (can include baseline) sent, defines if we use softmax of binary - # Define the GRU model - model = tf.keras.Sequential() - model.add(tf.keras.layers.Reshape((num_chs*num_feats, 1), input_shape=(num_chs*num_feats,))) - model.add(tf.keras.layers.GRU(units=256))#, input_shape=num_chs*num_feats)) # maybe should show this example as 2d with toggleable timesteps disabled - model.add(tf.keras.layers.Dense(units=512, activation='relu')) - model.add(tf.keras.layers.Flatten())# )tf.keras.layers.Dense(units=128, activation='relu')) - model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax')) # softmax as more then binary classification (sparse_categorical_crossentropy) - #model.add(tf.keras.layers.Dense(units=1, activation='sigmoid')) # sigmoid as ninary classification (binary_crossentropy) - model.summary() - model.compile(loss='sparse_categorical_crossentropy',# using sparse_categorical as we expect multi-class (>2) output, sparse because we encode targetvalues with integers - optimizer='adam', - metrics=['accuracy']) - bci = PyBCI(minimumEpochsRequired = min_epochs_train, createPseudoDevice=createPseudoDevice, model = model) - main_thread = threading.Thread(target=loop, args=(bci,)) - main_thread.start() - if timeout: - timeout_thread = threading.Thread(target=stop_after_timeout, args=(bci,)) - timeout_thread.start() - timeout_thread.join() - main_thread.join() + # Add these methods in CLI_testSimpleWrapper class + def get_accuracy(self): + return self.accuracy + + def get_current_markers(self): + return self.currentMarkers + +def main(createPseudoDevice=True, min_epochs_train=4, min_epochs_test=10, num_chs = 8, num_feats = 2, num_classes = 4, timeout=None): + global num_chs_g, num_feats_g, num_classes_g + num_chs_g = num_chs + num_feats_g = num_feats + num_classes_g = num_classes + command_thread = threading.Thread(target=command_listener) + command_thread.daemon = True + command_thread.start() + + my_bci_wrapper = CLI_testPytorchWrapper(createPseudoDevice, min_epochs_train, min_epochs_test,num_chs, num_feats, num_classes,timeout) + command_thread.join() + return my_bci_wrapper # Return this instance if __name__ == '__main__': parser = argparse.ArgumentParser(description="Tensorflow GRU is used for model and pseudodevice generates 8 channels of 3 marker types and baseline. Similar to the testTensorflow.py in the examples folder.") diff --git a/pybci/version.py b/pybci/version.py index a4dad29..d83e54a 100644 --- a/pybci/version.py +++ b/pybci/version.py @@ -1,2 +1,2 @@ -__version__ = '1.3.1' +__version__ = '1.3.2'