diff --git a/build/lib/pybci/Configuration/EpochSettings.py b/build/lib/pybci/Configuration/EpochSettings.py new file mode 100644 index 0000000..6823710 --- /dev/null +++ b/build/lib/pybci/Configuration/EpochSettings.py @@ -0,0 +1,13 @@ +class GlobalEpochSettings: + splitCheck = True # checks whether or not subdivide epochs + tmin = 0 # time in seconds to capture samples before trigger + tmax = 1 # time in seconds to capture samples after trigger + windowLength = 0.5 # if splitcheck true - time in seconds to split epoch + windowOverlap = 0.5 #if splitcheck true percentage value > 0 and < 1, example if epoch has tmin of 0 and tmax of 1 with window + # length of 0.5 we have 1 epoch between t 0 and t0.5 another at 0.25 to 0.75, 0.5 to 1 + +# customWindowSettings should be dict with marker name and IndividualEpochSetting +class IndividualEpochSetting: + splitCheck = True # checks whether or not subdivide epochs + tmin = 0 # time in seconds to capture samples before trigger + tmax= 1 # time in seconds to capture samples after trigger \ No newline at end of file diff --git a/build/lib/pybci/Configuration/FeatureSettings.py b/build/lib/pybci/Configuration/FeatureSettings.py new file mode 100644 index 0000000..f387cc7 --- /dev/null +++ b/build/lib/pybci/Configuration/FeatureSettings.py @@ -0,0 +1,15 @@ +class GeneralFeatureChoices: + psdBand = True + appr_entropy = False + perm_entropy = False + spec_entropy = False + svd_entropy = False + samp_entropy = False + rms = True + meanPSD = True + medianPSD = True + variance = True + meanAbs = True + waveformLength = False + zeroCross = False + slopeSignChange = False \ No newline at end of file diff --git a/build/lib/pybci/Configuration/PsuedoDeviceSettings.py b/build/lib/pybci/Configuration/PsuedoDeviceSettings.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/pybci/Configuration/__init__.py b/build/lib/pybci/Configuration/__init__.py new file mode 100644 index 0000000..f15ee6f --- /dev/null +++ b/build/lib/pybci/Configuration/__init__.py @@ -0,0 +1,3 @@ +#from .EpochSettings import EpochSettings +#from .GeneralFeatureChoices import GeneralFeatureChoices + diff --git a/build/lib/pybci/ThreadClasses/AsyncDataReceiverThread.py b/build/lib/pybci/ThreadClasses/AsyncDataReceiverThread.py new file mode 100644 index 0000000..114ee50 --- /dev/null +++ b/build/lib/pybci/ThreadClasses/AsyncDataReceiverThread.py @@ -0,0 +1,126 @@ +import threading, time +from collections import deque +import itertools +from bisect import bisect_left +def slice_fifo_by_time(fifo, start_time, end_time): + """Find the slice of fifo between start_time and end_time using binary search.""" + # separate times and data for easier indexing + times, data = zip(*fifo) + # find the index of the first time that is not less than start_time + start_index = bisect_left(times, start_time) + # find the index of the first time that is not less than end_time + end_index = bisect_left(times, end_time) + # return the slice of data between start_index and end_index + return data[start_index:end_index] + +class AsyncDataReceiverThread(threading.Thread): + """Responsible for receiving data from accepted LSL outlet, slices samples based on tmin+tmax basis, + starts counter for received samples after marker is received in ReceiveMarker. Relies on timestamps to slice array, + suspected more computationally intensive then synchronous method. + """ + startCounting = False + currentMarker = "" + def __init__(self, closeEvent, trainTestEvent, dataQueueTrain,dataQueueTest, dataStreamInlet, + customEpochSettings, globalEpochSettings,devCount, streamChsDropDict = [], maxExpectedSampleRate=100): + # large maxExpectedSampleRate can incur marker drop and slow procesing times for slicing arrays + super().__init__() + self.trainTestEvent = trainTestEvent + self.closeEvent = closeEvent + self.dataQueueTrain = dataQueueTrain + self.dataQueueTest = dataQueueTest + self.dataStreamInlet = dataStreamInlet + self.customEpochSettings = customEpochSettings + self.globalEpochSettings = globalEpochSettings + self.streamChsDropDict = streamChsDropDict + self.sr = maxExpectedSampleRate + #self.dataType = dataStreamInlet.info().type() + self.devCount = devCount # used for tracking which device is sending data to feature extractor + + def run(self): + posCount = 0 + chCount = self.dataStreamInlet.info().channel_count() + maxTime = (self.globalEpochSettings.tmin + self.globalEpochSettings.tmax) + if len(self.customEpochSettings.keys())>0: + if max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) > maxTime: + maxTime = max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) + fifoLength = int(self.sr*maxTime) + window_end_time = 0 + dataFIFOs = [deque(maxlen=fifoLength) for ch in range(chCount - len(self.streamChsDropDict))] + while not self.closeEvent.is_set(): + sample, timestamp = self.dataStreamInlet.pull_sample(timeout = 1) + if sample != None: + for index in sorted(self.streamChsDropDict, reverse=True): + del sample[index] # remove the desired channels from the sample + for i,fifo in enumerate(dataFIFOs): + fifo.append((timestamp, sample[i])) + if self.trainTestEvent.is_set(): # We're training! + if self.startCounting: # we received a marker + posCount += 1 + if posCount >= self.desiredCount: # enough samples are in FIFO, chop up and put in dataqueue + if len(self.customEpochSettings.keys())>0: # custom marker received + if self.customEpochSettings[self.currentMarker].splitCheck: # slice epochs into overlapping time windows + window_length = self.customEpochSettings[self.currentMarker].windowLength + window_overlap = self.customEpochSettings[self.currentMarker].windowOverlap + window_start_time = self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmin + window_end_time = window_start_time + window_length + while window_end_time <= self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmax: + sliceDataFIFOs = [slice_fifo_by_time(fifo, window_start_time, window_end_time) for fifo in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + window_start_time += window_length * (1 - window_overlap) + window_end_time = window_start_time + window_length + else: # don't slice just take tmin to tmax time + start_time = self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmin + end_time = self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmax + sliceDataFIFOs = [slice_fifo_by_time(fifo, start_time, end_time) for fifo in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + else: + if self.globalEpochSettings.splitCheck: # slice epochs in to overlapping time windows + window_length = self.globalEpochSettings.windowLength + window_overlap = self.globalEpochSettings.windowOverlap + window_start_time = self.markerTimestamp - self.globalEpochSettings.tmin + window_end_time = window_start_time + window_length + while window_end_time <= self.markerTimestamp + self.globalEpochSettings.tmax: + sliceDataFIFOs = [slice_fifo_by_time(fifo, window_start_time, window_end_time) for fifo in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + window_start_time += window_length * (1 - window_overlap) + window_end_time = window_start_time + window_length + self.startCounting = False + else: # don't slice just take tmin to tmax time + start_time = self.markerTimestamp + self.globalEpochSettings.tmin + end_time = self.markerTimestamp + self.globalEpochSettings.tmax + sliceDataFIFOs = [slice_fifo_by_time(fifo, start_time, end_time) for fifo in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + # reset flags and counters + posCount = 0 + self.startCounting = False + else: # in Test mode + if self.globalEpochSettings.splitCheck: + window_length = self.globalEpochSettings.windowLength + window_overlap = self.globalEpochSettings.windowOverlap + else: + window_length = self.globalEpochSettings.tmin+self.globalEpochSettings.tmax + + if timestamp >= window_end_time: + #sliceDataFIFOs = [[data for time, data in fifo if window_end_time - window_length <= time < window_end_time] for fifo in dataFIFOs] + sliceDataFIFOs = [slice_fifo_by_time(fifo, window_end_time - window_length, window_end_time) for fifo in dataFIFOs] + self.dataQueueTest.put([sliceDataFIFOs, None, self.devCount]) + #sliceDataFIFOs = [list(itertools.islice(d, fifoLength-window_samples, fifoLength)) for d in dataFIFOs] + if self.globalEpochSettings.splitCheck: + window_end_time += window_length * (1 - window_overlap) + else: + window_end_time = timestamp + window_length + else: + pass + # add levels of debug? + + def ReceiveMarker(self, marker, timestamp): # timestamp will be used for non sample rate specific devices (pupil-labs gazedata) + if self.startCounting == False: # only one marker at a time allow, other in windowed timeframe ignored + self.currentMarker = marker + self.markerTimestamp = timestamp + if len(self.customEpochSettings.keys())>0: # custom marker received + if marker in self.customEpochSettings.keys(): + self.desiredCount = int(self.customEpochSettings[marker].tmax * self.sr) # find number of samples after tmax to finish counting + self.startCounting = True + else: # no custom markers set, use global settings + self.desiredCount = int(self.globalEpochSettings.tmax * self.sr) # find number of samples after tmax to finish counting + self.startCounting = True diff --git a/build/lib/pybci/ThreadClasses/ClassifierThread.py b/build/lib/pybci/ThreadClasses/ClassifierThread.py new file mode 100644 index 0000000..5ec1b7f --- /dev/null +++ b/build/lib/pybci/ThreadClasses/ClassifierThread.py @@ -0,0 +1,111 @@ +from ..Utils.Classifier import Classifier +from ..Utils.Logger import Logger +import queue,threading, time +import numpy as np + +class ClassifierThread(threading.Thread): + features = np.array([])#[] + targets = np.array([]) + mode = "train" + guess = " " + #epochCountsc = {} + def __init__(self, closeEvent,trainTestEvent, featureQueueTest,featureQueueTrain, classifierInfoQueue, classifierInfoRetrieveEvent, + classifierGuessMarkerQueue, classifierGuessMarkerEvent, queryFeaturesQueue, queryFeaturesEvent, + logger = Logger(Logger.INFO), numStreamDevices = 1, + minRequiredEpochs = 10, clf = None, model = None, torchModel = None): + super().__init__() + self.trainTestEvent = trainTestEvent # responsible for tolling between train and test mode + self.closeEvent = closeEvent # responsible for cosing threads + self.featureQueueTest = featureQueueTest # gets feature data from feature processing thread + self.featureQueueTrain = featureQueueTrain # gets feature data from feature processing thread + self.classifier = Classifier(clf = clf, model = model, torchModel = torchModel) # sets classifier class, if clf and model passed, defaults to clf and sklearn + self.minRequiredEpochs = minRequiredEpochs # the minimum number of epochs required for classifier attempt + self.classifierInfoRetrieveEvent = classifierInfoRetrieveEvent + self.classifierInfoQueue = classifierInfoQueue + self.classifierGuessMarkerQueue = classifierGuessMarkerQueue + self.classifierGuessMarkerEvent = classifierGuessMarkerEvent + self.queryFeaturesQueue = queryFeaturesQueue + self.queryFeaturesEvent = queryFeaturesEvent + self.numStreamDevices = numStreamDevices + self.logger = logger + + def run(self): + epochCountsc={} + if self.numStreamDevices > 1: + tempdatatrain = {} + tempdatatest = {} + while not self.closeEvent.is_set(): + if self.trainTestEvent.is_set(): # We're training! + if self.featureQueueTrain.empty(): + if len(epochCountsc) > 1: # check if there is more then one test condition + minNumKeyEpochs = min([epochCountsc[key][1] for key in epochCountsc]) # check minimum viable number of training eochs have been obtained + if minNumKeyEpochs < self.minRequiredEpochs: + pass + else: + start = time.time() + self.classifier.TrainModel(self.features, self.targets) + if (self.logger.level == Logger.TIMING): + end = time.time() + self.logger.log(Logger.TIMING, f" classifier training time {end - start}") + if self.classifierGuessMarkerEvent.is_set(): + self.classifierGuessMarkerQueue.put(self.guess) + else: + try: + featuresSingle, devCount, target, epochCountsc = self.featureQueueTrain.get_nowait() #[dataFIFOs, self.currentMarker, self.sr, self.dataType] + if self.numStreamDevices > 1: # Collects multiple data strems feature sets and synchronise here + tempdatatrain[devCount] = featuresSingle + if len(tempdatatrain) == self.numStreamDevices: + flattened_list = np.array([item for sublist in tempdatatrain.values() for item in sublist]) + tempdatatrain = {} + self.targets = np.append(self.targets, [target], axis = 0) + #self.features = np.append(self.features, [flattened_list], axis = 0) + if self.features.shape[0] == 0: + self.features = self.features.reshape((0,) + flattened_list.shape) + self.features = np.append(self.features, [flattened_list], axis=0) + # need to check if all device data is captured, then flatten and append + else: # Only one device to collect from + if self.features.shape[0] == 0: + self.features = self.features.reshape((0,) + featuresSingle.shape) + self.targets = np.append(self.targets, [target], axis = 0) + self.features = np.append(self.features, [featuresSingle], axis = 0) + except queue.Empty: + pass + else: # We're testing! + try: + featuresSingle, devCount = self.featureQueueTest.get_nowait() #[dataFIFOs, self.currentMarker, self.sr, self.dataType] + if self.numStreamDevices > 1: + tempdatatest[devCount] = featuresSingle + if len(tempdatatest) == self.numStreamDevices: + flattened_list = [] + flattened_list = np.array([item for sublist in tempdatatest.values() for item in sublist]) + tempdatatest = {} + start = time.time() + self.guess = self.classifier.TestModel(flattened_list) + if (self.logger.level == Logger.TIMING): + end = time.time() + self.logger.log(Logger.TIMING, f" classifier testing time {end - start}") + else: + start = time.time() + self.guess = self.classifier.TestModel(featuresSingle) + if (self.logger.level == Logger.TIMING): + end = time.time() + self.logger.log(Logger.TIMING, f" classifier testing time {end - start}") + if self.classifierGuessMarkerEvent.is_set(): + self.classifierGuessMarkerQueue.put(self.guess) + except queue.Empty: + pass + if self.classifierInfoRetrieveEvent.is_set(): + a = self.classifier.accuracy + classdata = { + "clf":self.classifier.clf, + "model":self.classifier.model, + "torchModel":self.classifier.torchModel, + "accuracy":a + } + self.classifierInfoQueue.put(classdata) + if self.queryFeaturesEvent.is_set(): + featureData = { + "features":self.features, + "targets":self.targets + } + self.queryFeaturesQueue.put(featureData) \ No newline at end of file diff --git a/build/lib/pybci/ThreadClasses/DataReceiverThread.py b/build/lib/pybci/ThreadClasses/DataReceiverThread.py new file mode 100644 index 0000000..d5deddb --- /dev/null +++ b/build/lib/pybci/ThreadClasses/DataReceiverThread.py @@ -0,0 +1,99 @@ +import threading +from collections import deque +import itertools + +class DataReceiverThread(threading.Thread): + """Responsible for receiving data from accepted LSL outlet, slices samples based on tmin+tmax basis, + starts counter for received samples after marker is received in ReceiveMarker. + """ + startCounting = False + currentMarker = "" + def __init__(self, closeEvent, trainTestEvent, dataQueueTrain,dataQueueTest, dataStreamInlet, + customEpochSettings, globalEpochSettings,devCount, streamChsDropDict = []): + super().__init__() + self.trainTestEvent = trainTestEvent + self.closeEvent = closeEvent + self.dataQueueTrain = dataQueueTrain + self.dataQueueTest = dataQueueTest + self.dataStreamInlet = dataStreamInlet + self.customEpochSettings = customEpochSettings + self.globalEpochSettings = globalEpochSettings + self.streamChsDropDict = streamChsDropDict + self.sr = dataStreamInlet.info().nominal_srate() + self.devCount = devCount # used for tracking which device is sending data to feature extractor + + def run(self): + posCount = 0 + chCount = self.dataStreamInlet.info().channel_count() + maxTime = (self.globalEpochSettings.tmin + self.globalEpochSettings.tmax) + if len(self.customEpochSettings.keys())>0: + if max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) > maxTime: + maxTime = max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) + fifoLength = int(self.dataStreamInlet.info().nominal_srate()*maxTime) + dataFIFOs = [deque(maxlen=fifoLength) for ch in range(chCount - len(self.streamChsDropDict))] + while not self.closeEvent.is_set(): + sample, timestamp = self.dataStreamInlet.pull_sample(timeout = 1) + if sample != None: + for index in sorted(self.streamChsDropDict, reverse=True): + del sample[index] # remove the desired channels from the sample + for i,fifo in enumerate(dataFIFOs): + fifo.append(sample[i]) + if self.trainTestEvent.is_set(): # We're training! + if self.startCounting: # we received a marker + posCount+=1 + if posCount >= self.desiredCount: # enough samples are in FIFO, chop up and put in dataqueue + if len(self.customEpochSettings.keys())>0: # custom marker received + if self.customEpochSettings[self.currentMarker].splitCheck: # slice epochs in to overlapping time windows + window_samples =int(self.customEpochSettings[self.currentMarker].windowLength * self.sr) #number of samples in each window + increment = int((1-self.customEpochSettings[self.currentMarker].windowOverlap)*window_samples) # if windows overlap each other by how many samples + while posCount - window_samples > 0: + sliceDataFIFOs = [list(itertools.islice(d, posCount - window_samples, posCount)) for d in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + posCount-=increment + else: # don't slice just take tmin to tmax time + sliceDataFIFOs = [list(itertools.islice(d, fifoLength - int((self.customEpochSettings[self.currentMarker].tmax+self.customEpochSettings[self.currentMarker].tmin) * self.sr), fifoLength))for d in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + else: + if self.globalEpochSettings.splitCheck: # slice epochs in to overlapping time windows + window_samples =int(self.globalEpochSettings.windowLength * self.sr) #number of samples in each window + increment = int((1-self.globalEpochSettings.windowOverlap)*window_samples) # if windows overlap each other by how many samples + startCount = self.desiredCount + int(self.globalEpochSettings.tmin * self.sr) + while startCount - window_samples > 0: + sliceDataFIFOs = [list(itertools.islice(d, startCount - window_samples, startCount)) for d in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + startCount-=increment + else: # don't slice just take tmin to tmax time + sliceDataFIFOs = [list(itertools.islice(d, fifoLength - int((self.globalEpochSettings.tmin+self.globalEpochSettings.tmax) * self.sr), fifoLength)) for d in dataFIFOs] + self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount]) + #end = time.time() + #print(f"Data slicing process time {end - start}") + # reset flags and counters + posCount = 0 + self.startCounting = False + else: # in Test mode + posCount+=1 + if self.globalEpochSettings.splitCheck: + window_samples = int(self.globalEpochSettings.windowLength * self.sr) #number of samples in each window + else: + window_samples = int((self.globalEpochSettings.tmin+self.globalEpochSettings.tmax) * self.sr) + if posCount >= window_samples: + sliceDataFIFOs = [list(itertools.islice(d, fifoLength-window_samples, fifoLength)) for d in dataFIFOs] + if self.globalEpochSettings.splitCheck: + posCount = int((1-self.globalEpochSettings.windowOverlap)*window_samples) # offset poscoutn based on window overlap + else: + posCount = 0 + self.dataQueueTest.put([sliceDataFIFOs, self.sr, self.devCount]) + else: + pass + # add levels of debug + + def ReceiveMarker(self, marker, timestamp): # timestamp will be used for non sample rate specific devices (pupil-labs gazedata) + if self.startCounting == False: # only one marker at a time allow, other in windowed timeframe ignored + self.currentMarker = marker + if len(self.customEpochSettings.keys())>0: # custom marker received + if marker in self.customEpochSettings.keys(): + self.desiredCount = int(self.customEpochSettings[marker].tmax * self.sr) # find number of samples after tmax to finish counting + self.startCounting = True + else: # no custom markers set, use global settings + self.desiredCount = int(self.globalEpochSettings.tmax * self.sr) # find number of samples after tmax to finish counting + self.startCounting = True diff --git a/build/lib/pybci/ThreadClasses/FeatureProcessorThread.py b/build/lib/pybci/ThreadClasses/FeatureProcessorThread.py new file mode 100644 index 0000000..2dd753b --- /dev/null +++ b/build/lib/pybci/ThreadClasses/FeatureProcessorThread.py @@ -0,0 +1,64 @@ +import threading, queue, time +from ..Utils.FeatureExtractor import GenericFeatureExtractor +from ..Utils.Logger import Logger +from ..Configuration.EpochSettings import GlobalEpochSettings +import copy + +class FeatureProcessorThread(threading.Thread): + tempDeviceEpochLogger = [] + def __init__(self, closeEvent, trainTestEvent, dataQueueTrain,dataQueueTest, + featureQueueTest,featureQueueTrain, totalDevices,markerCountRetrieveEvent,markerCountQueue, customEpochSettings = {}, + globalEpochSettings = GlobalEpochSettings(),logger = Logger(Logger.INFO), + featureExtractor = GenericFeatureExtractor()): + super().__init__() + self.markerCountQueue = markerCountQueue + self.trainTestEvent = trainTestEvent + self.closeEvent = closeEvent + self.dataQueueTrain = dataQueueTrain + self.dataQueueTest = dataQueueTest + self.featureQueueTrain = featureQueueTrain + self.featureQueueTest = featureQueueTest + self.featureExtractor = featureExtractor + self.logger = logger + self.totalDevices = totalDevices + self.markerCountRetrieveEvent = markerCountRetrieveEvent + self.epochCounts = {} + self.customEpochSettings = customEpochSettings + self.globalWindowSettings = globalEpochSettings + self.tempDeviceEpochLogger = [0 for x in range(self.totalDevices)] + + def run(self): + while not self.closeEvent.is_set(): + if self.markerCountRetrieveEvent.is_set(): + self.markerCountQueue.put(self.epochCounts) + if self.trainTestEvent.is_set(): # We're training! + try: + dataFIFOs, currentMarker, sr, devCount = self.dataQueueTrain.get_nowait() #[sliceDataFIFOs, self.currentMarker, self.sr, self.devCount + if currentMarker in self.epochCounts: + self.epochCounts[currentMarker][1] += 1 + else: + self.epochCounts[currentMarker] = [len(self.epochCounts.keys()),1] + target = self.epochCounts[currentMarker][0] + start = time.time() + features = self.featureExtractor.ProcessFeatures(dataFIFOs, sr, target) # allows custom epoch class to be passed + if (self.logger.level == Logger.TIMING): + end = time.time() + self.logger.log(Logger.TIMING, f" Feature Extraction time {end - start}") + if (end-start) >self.globalWindowSettings.windowLength: + self.logger.log(Logger.WARNING, f" Feature Extraction time > globalEpochSetting.windowLength, will create lag in classification output. Recommended to reduce channels, smapling rate, and features or reduce feature computational complexity.") + self.featureQueueTrain.put( [features, devCount, target, dict(self.epochCounts)] ) + except queue.Empty: + pass + else: + try: + dataFIFOs, sr, devCount = self.dataQueueTest.get_nowait() #[dataFIFOs, self.currentMarker, self.sr, ] + start = time.time() + features = self.featureExtractor.ProcessFeatures(dataFIFOs, sr, None) + if (self.logger.level == Logger.TIMING): + end = time.time() + self.logger.log(Logger.TIMING, f" Feature Extraction time {end - start}") + if (end-start) >self.globalWindowSettings.windowLength: + self.logger.log(Logger.WARNING, f" Feature Extraction time > globalEpochSetting.windowLength, will create lag in classification output. Recommended to reduce channels, smapling rate, and features or reduce feature computational complexity.") + self.featureQueueTest.put([features, devCount]) + except queue.Empty: + pass diff --git a/build/lib/pybci/ThreadClasses/MarkerThread.py b/build/lib/pybci/ThreadClasses/MarkerThread.py new file mode 100644 index 0000000..3533de1 --- /dev/null +++ b/build/lib/pybci/ThreadClasses/MarkerThread.py @@ -0,0 +1,28 @@ +import threading + +class MarkerThread(threading.Thread): + """Receives Marker on chosen lsl Marker outlet. Pushes marker to data threads for framing epochs, + also sends markers to featureprocessing thread for epoch counting and multiple device synchronisation. + """ + def __init__(self,closeEvent, trainTestEvent, markerStreamInlet, dataThreads, featureThreads):#, lock): + super().__init__() + self.trainTestEvent = trainTestEvent + self.closeEvent = closeEvent + self.markerStreamInlet = markerStreamInlet + self.dataThreads = dataThreads + self.featureThreads= featureThreads + + def run(self): + while not self.closeEvent.is_set(): + marker, timestamp = self.markerStreamInlet.pull_sample(timeout = 10) + if self.trainTestEvent.is_set(): # We're training! + if marker != None: + marker = marker[0] + for thread in self.dataThreads: + thread.ReceiveMarker(marker, timestamp) + for thread in self.featureThreads: + thread.ReceiveMarker(marker, timestamp) + else: + pass + # add levels of debug + # print("PyBCI: LSL pull_sample timed out, no marker on stream...") diff --git a/build/lib/pybci/ThreadClasses/OptimisedDataReceiverThread.py b/build/lib/pybci/ThreadClasses/OptimisedDataReceiverThread.py new file mode 100644 index 0000000..252efff --- /dev/null +++ b/build/lib/pybci/ThreadClasses/OptimisedDataReceiverThread.py @@ -0,0 +1,139 @@ +import threading, time +from collections import deque +import itertools +import numpy as np +from bisect import bisect_left + +class OptimisedDataReceiverThread(threading.Thread): + """Responsible for receiving data from accepted LSL outlet, slices samples based on tmin+tmax basis, + starts counter for received samples after marker is received in ReceiveMarker. Relies on timestamps to slice array, + suspected more computationally intensive then synchronous method. + """ + markerReceived = False + currentMarker = "" + def __init__(self, closeEvent, trainTestEvent, dataQueueTrain,dataQueueTest, dataStreamInlet, + customEpochSettings, globalEpochSettings,devCount, streamChsDropDict = [], maxExpectedSampleRate=100): + # large maxExpectedSampleRate can incur marker drop and slow procesing times for slicing arrays + super().__init__() + self.trainTestEvent = trainTestEvent + self.closeEvent = closeEvent + self.dataQueueTrain = dataQueueTrain + self.dataQueueTest = dataQueueTest + self.dataStreamInlet = dataStreamInlet + self.customEpochSettings = customEpochSettings + self.globalEpochSettings = globalEpochSettings + self.streamChsDropDict = streamChsDropDict + self.sr = maxExpectedSampleRate + self.devCount = devCount # used for tracking which device is sending data to feature extractor + + def run(self): + chCount = self.dataStreamInlet.info().channel_count() + maxTime = (self.globalEpochSettings.tmin + self.globalEpochSettings.tmax) + if len(self.customEpochSettings.keys()) > 0: # min time used for max_samples and temp array, maxTime used for longest epochs and permanentDataBuffers + if max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) > maxTime: + maxTime = max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) + if self.globalEpochSettings.splitCheck: + window_length = self.globalEpochSettings.windowLength + window_overlap = self.globalEpochSettings.windowOverlap + else: + window_length = self.globalEpochSettings.tmin+self.globalEpochSettings.tmax + minfifoLength = int(self.sr * window_length * (1-self.globalEpochSettings.windowOverlap)) # sets global window length with overlap as factor for minimum delay in test mode + dataBuffers = np.zeros((minfifoLength,chCount)) + chs_to_drop = np.ones(chCount, dtype=bool) + chs_to_drop[self.streamChsDropDict] = False + fifoLength = int(self.sr * (maxTime+20)) # adds twenty seconds to give more timestamps when buffering (assuming devices dont timeout for longer then 20.0 seconds, migth be worth making configurable) + permanentDataBuffers = np.zeros((fifoLength, chCount - len(self.streamChsDropDict))) + permanentTimestampBuffer = np.zeros(fifoLength) + next_window_time = 0 # sets testing mode window time, duration based on windowlength and overlap + while not self.closeEvent.is_set(): + _, timestamps = self.dataStreamInlet.pull_chunk(timeout=0.0, max_samples=dataBuffers.shape[0], dest_obj=dataBuffers) # optimised method of getting data to pull_sample, dest_obj saves memory re-allocation + if timestamps: + if len(self.streamChsDropDict) == 0: + dataBufferView = dataBuffers[:len(timestamps), :] # [:, :len(timestamps)] + else: + dataBufferView = dataBuffers[:len(timestamps), chs_to_drop] # [:, :len(timestamps)] + permanentDataBuffers = np.roll(permanentDataBuffers, shift=-len(timestamps), axis=0) + permanentTimestampBuffer = np.roll(permanentTimestampBuffer, shift=-len(timestamps)) + permanentDataBuffers[-len(timestamps):,:] = dataBufferView + permanentTimestampBuffer[-len(timestamps):] = timestamps + if self.trainTestEvent.is_set(): # We're training! + if self.markerReceived: # we received a marker + timestamp_tmin = self.targetTimes[1] + timestamp_tmax = self.targetTimes[0] + #print(timestamp_tmin, "timestamp_tmin >= permanentTimestampBuffer[0]:", permanentTimestampBuffer[0]) + #print(timestamp_tmax, "timestamp_tmax <= permanentTimestampBuffer[-1]:", permanentTimestampBuffer[-1]) + if timestamp_tmin >= permanentTimestampBuffer[0] and timestamp_tmax <= permanentTimestampBuffer[-1]: + if len(self.customEpochSettings.keys())>0: # custom marker received + if self.customEpochSettings[self.currentMarker].splitCheck: # slice epochs into overlapping time windows + window_start_time = timestamp_tmin + window_end_time = window_start_time + window_length + while window_end_time <= timestamp_tmax: + idx_tmin = (np.abs(permanentTimestampBuffer - window_start_time)).argmin() # find array index of start of window + idx_tmax = (np.abs(permanentTimestampBuffer - window_end_time)).argmin() # find array index of end of window + slices = permanentDataBuffers[idx_tmin:idx_tmax,:] + self.dataQueueTrain.put([slices, self.currentMarker, self.sr, self.devCount]) + window_start_time += window_length * (1 - window_overlap) + window_end_time = window_start_time + window_length + else: # don't slice just take tmin to tmax time + idx_tmin = (np.abs(permanentTimestampBuffer - timestamp_tmin)).argmin() + idx_tmax = (np.abs(permanentTimestampBuffer - timestamp_tmax)).argmin() + slices = permanentDataBuffers[idx_tmin:idx_tmax,:] + self.dataQueueTrain.put([slices, self.currentMarker, self.sr, self.devCount]) + else: + if self.globalEpochSettings.splitCheck: # slice epochs in to overlapping time windows + window_start_time = timestamp_tmin#self.markerTimestamp - self.globalEpochSettings.tmin + window_end_time = window_start_time + window_length + #print(window_end_time, " ", timestamp_tmax) + while window_end_time <= timestamp_tmax: #:self.markerTimestamp + self.globalEpochSettings.tmax: + idx_tmin = (np.abs(permanentTimestampBuffer - window_start_time)).argmin() + idx_tmax = (np.abs(permanentTimestampBuffer - window_end_time)).argmin() + #print(idx_tmin, " ", idx_tmax) + #print(permanentTimestampBuffer[idx_tmin], " ", permanentTimestampBuffer[idx_tmax]) + #print(permanentDataBuffers.shape) + slices = permanentDataBuffers[idx_tmin:idx_tmax,:] + #print(slices.shape) + self.dataQueueTrain.put([slices, self.currentMarker, self.sr, self.devCount]) + window_start_time += window_length * (1 - window_overlap) + window_end_time = window_start_time + window_length + self.startCounting = False + else: # don't slice just take tmin to tmax time + idx_tmin = (np.abs(permanentTimestampBuffer - timestamp_tmin)).argmin() + idx_tmax = (np.abs(permanentTimestampBuffer - timestamp_tmax)).argmin() + slices = permanentDataBuffers[idx_tmin:idx_tmax,:] + self.dataQueueTrain.put([slices, self.currentMarker, self.sr, self.devCount]) + self.markerReceived = False + else: # in Test mode + if next_window_time+(window_length/2) <= permanentTimestampBuffer[-1]: + idx_tmin = (np.abs(permanentTimestampBuffer - (next_window_time-(window_length/2)))).argmin() + idx_tmax = (np.abs(permanentTimestampBuffer - (next_window_time+(window_length/2)))).argmin() + #print(next_window_time+(window_length/2), "next_window_time+(window_length/2) <= permanentTimestampBuffer[-1]:", permanentTimestampBuffer[-1]) + #print(next_window_time, "next_window_time permanentTimestampBuffer[0]:", permanentTimestampBuffer[0]) + #print(idx_tmin, "idx_tmin idx_tmax", idx_tmax) + if idx_tmin == idx_tmax: + # oops we lost track, get window positions and start again + idx_tmin = (np.abs(permanentTimestampBuffer - (permanentTimestampBuffer[-1] - window_length))).argmin() + idx_tmax = -1 + next_window_time = permanentTimestampBuffer[-1] - (window_length/2) + slices = permanentDataBuffers[idx_tmin:idx_tmax,:] + self.dataQueueTest.put([slices, self.sr, self.devCount]) + if self.globalEpochSettings.splitCheck: + next_window_time += window_length * (1 - window_overlap) + else: + next_window_time += window_length + else: + pass + # add levels of debug? + + def ReceiveMarker(self, marker, timestamp): # timestamp will be used for non sample rate specific devices (pupil-labs gazedata) + if self.markerReceived == False: # only one marker at a time allow, other in windowed timeframe ignored + self.currentMarker = marker + self.markerTimestamp = timestamp + if len(self.customEpochSettings.keys())>0: # custom marker received + if marker in self.customEpochSettings.keys(): + #self.desiredCount = int(self.customEpochSettings[marker].tmax * self.sr) # find number of samples after tmax to finish counting + self.targetTimes = [timestamp+self.customEpochSettings[marker].tmax, timestamp-self.customEpochSettings[marker].tmin] + self.markerReceived = True + else: # no custom markers set, use global settings + #self.desiredCount = int(self.globalEpochSettings.tmax * self.sr) # find number of samples after tmax to finish counting + self.targetTimes = [timestamp+self.globalEpochSettings.tmax, timestamp-self.globalEpochSettings.tmin] + self.markerReceived = True \ No newline at end of file diff --git a/build/lib/pybci/ThreadClasses/__init__.py b/build/lib/pybci/ThreadClasses/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/pybci/Utils/Classifier.py b/build/lib/pybci/Utils/Classifier.py new file mode 100644 index 0000000..3aabbb3 --- /dev/null +++ b/build/lib/pybci/Utils/Classifier.py @@ -0,0 +1,120 @@ +import sklearn +from sklearn.preprocessing import StandardScaler +from sklearn import svm +import tensorflow +import torch + +from sklearn.model_selection import train_test_split +import numpy as np + +class Classifier(): + classifierLibrary = "sklearn" # current default, should be none or somthing different? + clf = svm.SVC(kernel = "rbf")#C=c, kernel=k, degree=d, gamma=g, coef0=c0, tol=t, max_iter=i) + accuracy = 0 + model = None + torchModel = None + + def __init__(self, clf = None, model = None, torchModel = None): + super().__init__() + if clf != None: + self.clf = clf + elif model != None: + self.model = model + elif torchModel != None: + self.torchModel = torchModel + self.CheckClassifierLibrary() + + def CheckClassifierLibrary(self): + if self.model != None: # maybe requires actual check for tensorflow model + self.classifierLibrary = "tensor" + elif self.torchModel != None: # maybe requires actual check for sklearn clf + self.classifierLibrary = "pyTorch" + elif self.clf != None: # maybe requires actual check for sklearn clf + self.classifierLibrary = "sklearn" + + def TrainModel(self, features, targets): + x_train, x_test, y_train, y_test = train_test_split(features, targets, shuffle = True, test_size=0.2) + #print(features.shape) + #print(x_train.shape) + if len(features.shape)==3: + self.scaler = [StandardScaler() for scaler in range(features.shape[2])] # normalise our data (everything is a 0 or a 1 if you think about it, cheers georgey boy boole) + for e in range(features.shape[2]): # this would normalise the channel, maybe better to normalise across other dimension + x_train_channel = x_train[:,:,e].reshape(-1, 1) + x_test_channel = x_test[:,:,e].reshape(-1, 1) + x_train[:,:,e] = self.scaler[e].fit_transform(x_train_channel).reshape(x_train[:,:,e].shape) + x_test[:,:,e] = self.scaler[e].transform(x_test_channel).reshape(x_test[:,:,e].shape) + #x_train[:,:,e] = self.scaler[e].fit_transform(x_train[:,:,e]) # Compute the mean and standard deviation based on the training data + #x_test[:,:,e] = self.scaler[e].transform(x_test[:,:,e]) # Scale the test data + elif len(features.shape)== 2: + self.scaler = StandardScaler() # normalise our data (everything is a 0 or a 1 if you think about it, cheers georgey boy boole) + x_train = self.scaler.fit_transform(x_train) # Compute the mean and standard deviation based on the training data + x_test = self.scaler.transform(x_test) # Scale the test data + if all(item == y_train[0] for item in y_train): + pass + else: + #print(x_train, y_train) + if self.classifierLibrary == "pyTorch": + self.accuracy, self.pymodel = self.torchModel(x_train, x_test, y_train, y_test) + elif self.classifierLibrary == "sklearn": + self.clf.fit(x_train, y_train) + y_predictions = self.clf.predict(x_test) + self.accuracy = sklearn.metrics.accuracy_score(y_test, y_predictions) + elif self.classifierLibrary == "tensor": + self.model.fit(np.array(x_train), np.array(y_train)) # epochs and batch_size should be customisable + self.loss, self.accuracy = self.model.evaluate(np.array(x_test), np.array(y_test)) + else: + # no classifier library selected, print debug? + pass + + def TestModel(self, x): + if len(x.shape)==2: + for e in range(x.shape[1]): + x[:,e] = self.scaler[e].transform(x[:,e].reshape(-1, 1)).reshape(x[:,e].shape) + #x[:,e] = self.scaler[e].transform([x[:,e]])[0] + elif len(x.shape)== 1: + x = self.scaler.transform([x])[0] # Scale the test data + if self.classifierLibrary == "sklearn": + x = np.expand_dims(x, axis=0) + return self.clf.predict(x) + elif self.classifierLibrary == "tensor": + x = np.expand_dims(x, axis=0) + predictions = self.model.predict(x) + if len (predictions[0]) == 1: # assume binary classification + return 1 if predictions[0] > 0.5 else 0 + else: # assume multi-classification + return np.argmax(predictions[0]) + elif self.classifierLibrary == "pyTorch": + x = torch.Tensor(np.expand_dims(x, axis=0)) + self.pymodel.eval() + with torch.no_grad(): + predictions = self.pymodel(x) + if len (predictions[0]) == 1: # assume binary classification + return 1 if predictions[0] > 0.5 else 0 + else: # assume multi-classification + return torch.argmax(predictions).item() + + else: + print("no classifier library selected") + # no classifier library selected, print debug? + pass + +''' + def UpdateModel(self, featuresSingle, target): + # function currently not used, may be redundant, means thread function hold feature and target variables and passes reference to here, + # would be better to hold in classifier class? + featuresSingle = np.where(np.isnan(featuresSingle), 0, featuresSingle) + if (len(np.array(self.features).shape) ==3): + features = np.array(features).reshape(np.array(features).shape[0], -1) + self.features = np.vstack([self.features, featuresSingle]) + self.targets = np.hstack([self.targets, target]) + if self.classifierLibrary == "sklearn": + # Update the model with new data using partial_fit + self.clf.fit(self.features, self.targets) #, classes=np.unique(target)) + self.accuracy = self.clf.score(self.x_test, self.y_test) + elif self.classifierLibrary == "tensor": + self.model.fit(featuresSingle, target, epochs=1, batch_size=32) + self.loss, self.accuracy = self.model.evaluate(self.x_test, self.y_test) + else: + # no classifier library selected, print debug? + pass +''' \ No newline at end of file diff --git a/build/lib/pybci/Utils/FeatureExtractor.py b/build/lib/pybci/Utils/FeatureExtractor.py new file mode 100644 index 0000000..2f698e0 --- /dev/null +++ b/build/lib/pybci/Utils/FeatureExtractor.py @@ -0,0 +1,141 @@ +import antropy as ant +import numpy as np +from scipy.signal import welch +from scipy.integrate import simps +import warnings, time +from ..Configuration.FeatureSettings import GeneralFeatureChoices +# Filter out UserWarning messages from the scipy package, could be worth moving to init and applying printdebug print levels? (typically nans, 0 and infs causing errors) +warnings.filterwarnings("ignore", category=UserWarning, module="scipy") # used to reduce print statements from constant signals being applied +warnings.filterwarnings("ignore", category=UserWarning, module="antropy") # used to reduce print statements from constant signals being applied +warnings.filterwarnings("ignore", category=RuntimeWarning, module="antropy") # used to reduce print statements from constant signals being applied +warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy") # used to reduce print statements from constant signals being applied +#warnings.filterwarnings("ignore", category=RuntimeWarning, module="pybci") # used to reduce print statements from constant signals being applied + +class GenericFeatureExtractor(): + def __init__(self, freqbands = [[1.0, 4.0], [4.0, 8.0], [8.0, 12.0], [12.0, 20.0]], featureChoices = GeneralFeatureChoices()): + super().__init__() + self.freqbands = freqbands + self.featureChoices = featureChoices + #for key, value in self.featureChoices.__dict__.items(): + # print(f"{key} = {value}") + selFeats = sum([self.featureChoices.appr_entropy, + self.featureChoices.perm_entropy, + self.featureChoices.spec_entropy, + self.featureChoices.svd_entropy, + self.featureChoices.samp_entropy, + self.featureChoices.rms, + self.featureChoices.meanPSD, + self.featureChoices.medianPSD, + self.featureChoices.variance, + self.featureChoices.meanAbs, + self.featureChoices.waveformLength, + self.featureChoices.zeroCross, + self.featureChoices.slopeSignChange] + ) + self.numFeatures = (len(self.freqbands)*self.featureChoices.psdBand)+selFeats + + def ProcessFeatures(self, epoch, sr, target): + """Allows 2D time series data to be passed with given sample rate to get various time+frequency based features. + Best for EEG, EMG, EOG, or other consistent data with a consistent sample rate (pupil labs does not) + Which features are chosen is based on self.featureChoices with initialisation. self.freqbands sets the limits for + desired frequency bands average power. + Inputs: + epoch = 2D list or 2D numpy array [chs, samples] + target = string of received marker type + sr = samplerate of current device + Returns: + features = 2D numpy array of size (chs, (len(freqbands) + sum(True in self.featureChoices))) + target = same as input target, can be useful for using a baseline number differently + NOTE: Any channels with a constant value will generate warnings in any frequency based features (constant level == no frequency components). + """ + numchs = epoch.shape[1] + features = np.zeros(numchs * self.numFeatures) + + for ch in range(epoch.shape[1]): + #ch = np.isnan(ch) + if self.featureChoices.psdBand: # get custom average power within given frequency band from freqbands + freqs, psd = welch(epoch[:,ch], sr) + for l, band in enumerate(self.freqbands): + if len(freqs) > 0: # len(freqs) can be 0 if signal is all DC + idx_band = np.logical_and(freqs >= band[0], freqs <= band[1]) + #if len(psd[idx_band]) == 1: # if freq band is only in one field just pass single value instead of calculating average + #print(ch) + bp = np.mean(psd[idx_band]) + #else: + # bp = simps(psd[idx_band], dx=(freqs[1]-freqs[0])) / (band[1] - band[0]) + #bp = simpson(psd[idx_band], dx=freq_res) + features[(ch* self.numFeatures)+l] = bp + else: + features[(ch* self.numFeatures)+l] = 0 + else: + freqs, psd = welch(epoch[:,ch], sr)# calculate for mean and median + l = -1 # accounts for no freqbands being selected + if self.featureChoices.meanPSD: # mean power + l += 1 + if len(freqs) > 0: features[(ch* self.numFeatures)+l] = np.mean(psd) # len(freqs) can be 0 if signal is all DC + else: features[(ch* self.numFeatures)+l] = 0 + if self.featureChoices.medianPSD: # median Power + l += 1 + if len(freqs) > 0: features[(ch* self.numFeatures)+l] = np.median(psd) # len(freqs) can be 0 if signal is all DC + else: features[(ch* self.numFeatures)+l] = 0 + if self.featureChoices.appr_entropy: # Approximate entropy(X,M,R) X = data, M is , R is 30% standard deviation of X + l += 1 + features[(ch* self.numFeatures)+l] = ant.app_entropy(epoch[:,ch]) + if self.featureChoices.perm_entropy: # permutation_entropy + l += 1 + features[(ch* self.numFeatures)+l] = ant.perm_entropy(epoch[:,ch],normalize=True) + if self.featureChoices.spec_entropy: # spectral Entropy + l += 1 + features[(ch* self.numFeatures)+l] = ant.spectral_entropy(epoch[:,ch], sf=sr, method='welch', nperseg = len(epoch[:,ch]), normalize=True) + if self.featureChoices.svd_entropy:# svd Entropy + l += 1 + features[(ch* self.numFeatures)+l] = ant.svd_entropy(epoch[:,ch], normalize=True) + if self.featureChoices.samp_entropy: # sample Entropy + l += 1 + features[(ch* self.numFeatures)+l] = ant.sample_entropy(epoch[:,ch]) + if self.featureChoices.rms: # rms + l += 1 + features[(ch* self.numFeatures)+l] = np.sqrt(np.mean(np.array(epoch[:,ch])**2)) + if self.featureChoices.variance: # variance + l += 1 + features[(ch* self.numFeatures)+l] = np.var(epoch[:,ch]) + if self.featureChoices.meanAbs: # Mean Absolute Value + l += 1 + try: + features[(ch* self.numFeatures)+l] = sum([np.linalg.norm(c) for c in epoch[:,ch]])/len(epoch[:,ch]) + except: + features[(ch* self.numFeatures)+l] = 0 + if self.featureChoices.waveformLength: # waveformLength + l += 1 + try: + features[(ch* self.numFeatures)+l] = sum([np.linalg.norm(c-epoch[inum,ch]) for inum, c in enumerate(epoch[1:,ch])]) + except: + features[(ch* self.numFeatures)+l] = 0 + if self.featureChoices.zeroCross: # zeroCross + l += 1 + features[(ch* self.numFeatures)+l] = sum([1 if c*epoch[inum+1,ch]<0 else 0 for inum, c in enumerate(epoch[:-1,ch])]) + if self.featureChoices.slopeSignChange: # slopeSignChange + l += 1 + ssc = sum([1 if (c-epoch[inum+1,ch])*(c-epoch[inum+1,ch])>=0.1 else 0 for inum, c in enumerate(epoch[:-1,ch])]) + features[(ch* self.numFeatures)+l] = ssc + features[np.isnan(features)] = 0 # checks for nans + features[features == np.inf] = 0#np.iinfo(np.int32).max + #print(features) + + return features + +class GazeFeatureExtractor(): + def __init__(self): + super().__init__() + +'''pupil channels in order +confidence: 1 channel +norm_pos_x/y: 2 channels +gaze_point_3d_x/y/z: 3 channels +eye_center0_3d_x/y/z (right/left, x/y/z): 6 channels (3 channels for each eye) +gaze_normal0/1_x/y/z (right/left, x/y/z): 6 channels (3 channels for each eye) +norm_pos_x/y: 2 channels +diameter0/1_2d (right/left): 2 channels +diameter0/1_3d (right/left): 2 channels +22 total +''' \ No newline at end of file diff --git a/build/lib/pybci/Utils/LSLScanner.py b/build/lib/pybci/Utils/LSLScanner.py new file mode 100644 index 0000000..d88722e --- /dev/null +++ b/build/lib/pybci/Utils/LSLScanner.py @@ -0,0 +1,106 @@ +from pylsl import StreamInlet, resolve_stream +from ..Utils.Logger import Logger + +class LSLScanner: + streamTypes = ["EEG", "ECG", "EMG", "Gaze"] # list of strings, holds desired LSL stream types + markerTypes = ["Markers"] # list of strings, holds desired LSL marker types + dataStreams = [] # list of data StreamInlets, available on LSL as chosen by streamTypes + markerStream = [] # list of marker StreamInlets, available on LSL as chosen by markerTypes + markerStreamPredefined = False + dataStreamPredefined = False + + def __init__(self,parent, dataStreamsNames = None, markerStreamName = None, streamTypes = None, markerTypes = None, logger = Logger(Logger.INFO)): + """ + Intiialises LSLScanner, accepts custom data and marker stream strings to search for, if valid can be obtained after scans with LSLScanner.dataStreams and LSLScanner.makerStream. + Parameters: + streamTypes (List of strings): allows user to set custom acceptable EEG stream definitions, if None defaults to streamTypes scan + markerTypes (List of strings): allows user to set custom acceptable Marker stream definitions, if None defaults to markerTypes scan + streamTypes (List of strings): allows user to set custom acceptable EEG type definitions, ignored if streamTypes not None + markerTypes (List of strings): allows user to set custom acceptable Marker type definitions, ignored if markerTypes not None + logger (pybci.Logger): Custom Logger class or PyBCI, defaults to logger.info if not set, which prints all pybci messages. + """ + self.parent = parent + if streamTypes != None: + self.streamTypes = streamTypes + if markerTypes != None: + self.markerTypes = markerTypes + self.logger = logger + if dataStreamsNames != None: + self.dataStreamPredefined = True + self.dataStreamsNames = dataStreamsNames + else: + self.ScanDataStreams() + if markerStreamName != None: + self.markerStreamPredefined = True + self.markerStreamName = markerStreamName + else: + self.ScanMarkerStreams() + + def ScanStreams(self): + """Scans LSL for both data and marker channels.""" + self.ScanDataStreams() + self.ScanMarkerStreams() + + def ScanDataStreams(self): + """Scans available LSL streams and appends inlet to self.dataStreams""" + streams = resolve_stream() + dataStreams = [] + self.dataStreams = [] + for stream in streams: + if stream.type() in self.streamTypes: + dataStreams.append(StreamInlet(stream)) + if self.dataStreamPredefined: + for s in dataStreams: + name = s.info().name() + if name not in self.dataStreamsNames: + self.logger.log(Logger.WARNING," Predefined LSL Data Stream name not present.") + self.logger.log(Logger.WARNING, " Available Streams: "+str([s.info().name() for s in dataStreams])) + else: + self.dataStreams.append(s) + else: # just add all datastreams as none were specified + self.dataStreams = dataStreams + + def ScanMarkerStreams(self): + """Scans available LSL streams and appends inlet to self.markerStreams""" + streams = resolve_stream() + markerStreams = [] + self.markerStream = None + for stream in streams: + if stream.type() in self.markerTypes: + markerStreams.append(StreamInlet(stream)) + if self.markerStreamPredefined: + if len(markerStreams) > 1: + self.logger.log(Logger.WARNING," Too many Marker streams available, set single desired markerStream in bci.lslScanner.markerStream correctly.") + for s in markerStreams: + name = s.info().name() + if name != self.markerStreamName: + self.logger.log(Logger.WARNING," Predefined LSL Marker Stream name not present.") + self.logger.log(Logger.WARNING, " Available Streams: "+str([s.info().name() for s in markerStreams])) + else: + self.markerStream = s + else: + if len(markerStreams) > 0: + self.markerStream = markerStreams[0] # if none specified grabs first avaialble marker stream + + def CheckAvailableLSL(self): + """Checks streaminlets available, + Returns + ------- + bool : + True if 1 marker stream present and available datastreams are present. + False if no datastreams are present and/or more or less then one marker stream is present, requires hard selection or markser stream if too many. + """ + self.ScanStreams() + if self.markerStream == None: + self.logger.log(Logger.WARNING," No Marker streams available, make sure your accepted marker data Type have been set in bci.lslScanner.markerTypes correctly.") + if len(self.dataStreams) == 0: + self.logger.log(Logger.WARNING," No data streams available, make sure your streamTypes have been set in bci.lslScanner.dataStream correctly.") + if len(self.dataStreams) > 0 and self.markerStream !=None: + self.logger.log(Logger.INFO," Success - "+str(len(self.dataStreams))+" data stream(s) found, 1 marker stream found") + + if len(self.dataStreams) > 0 and self.markerStream != None: + self.parent.dataStreams = self.dataStreams + self.parent.markerStream = self.markerStream + return True + else: + return False \ No newline at end of file diff --git a/build/lib/pybci/Utils/Logger.py b/build/lib/pybci/Utils/Logger.py new file mode 100644 index 0000000..8b83294 --- /dev/null +++ b/build/lib/pybci/Utils/Logger.py @@ -0,0 +1,32 @@ +class Logger: + INFO = "INFO" + WARNING = "WARNING" + NONE = "NONE" + TIMING = "TIMING" + + def __init__(self, level=INFO): + self.level = level + self.check_level(level) + #print(self.level) + def set_level(self, level): + self.level = level + self.check_level(level) + + def check_level(self,level): + if level != self.WARNING and level != self.INFO and level != self.NONE and level != self.TIMING : + print("PyBCI: [INFO] - Invalid or no log level selected, defaulting to info. (options: info, warning, none)") + level = self.INFO + self.level = level + + def log(self, level, message): + if self.level == self.NONE: + return None + if level == self.INFO: + if self.level != self.NONE and self.level != self.WARNING: + print('PyBCI: [INFO] -' + message) + elif level == self.WARNING: + if self.level != self.NONE: + print('PyBCI: [WARNING] -' + message) + elif level == self.TIMING: + if self.level == self.TIMING: + print('PyBCI: [TIMING] -' + message) \ No newline at end of file diff --git a/build/lib/pybci/Utils/PseudoDevice.py b/build/lib/pybci/Utils/PseudoDevice.py new file mode 100644 index 0000000..4c56355 --- /dev/null +++ b/build/lib/pybci/Utils/PseudoDevice.py @@ -0,0 +1,130 @@ +################ PsuedoDevice.py ######################## +# used for creating fake LSL device data and markers # +# Please note! sample rate is not exact, # +# expect some drop over time! # +# Written by Liam Booth 19/08/2023 # +################################################# +from ..Utils.Logger import Logger +import random, time, threading, pylsl +import numpy as np +from collections import deque + +def precise_sleep(duration): + end_time = time.time() + duration + while time.time() < end_time: + pass + +class PseudoDataConfig: + duration = 1.0 + noise_level = 0.1 + amplitude = 0.2 + frequency = 1.0 + +class PseudoDevice: + + def __init__(self, markerName = "PyBCIPsuedoMarkers", markerType = "Markers", markerConfigStrings = ["Marker1", "Marker2", "Marker3", "Marker4", "Marker5"], + pseudoDataConfigs = None, + baselineMarkerString = "baseline", baselineConfig = PseudoDataConfig(), + dataStreamName = "PyBCIPsuedoDataStream" , dataStreamType="EMG", + sampleRate= 250, channelCount= 8, logger = Logger(Logger.INFO)): + self.markerConfigStrings = markerConfigStrings + if pseudoDataConfigs == None: + pseudoDataConfigs = [PseudoDataConfig(), PseudoDataConfig(), PseudoDataConfig(), PseudoDataConfig(),PseudoDataConfig()] + pseudoDataConfigs[0].amplitude = pseudoDataConfigs[0].amplitude*2 + pseudoDataConfigs[1].amplitude = pseudoDataConfigs[1].amplitude*3 + pseudoDataConfigs[2].amplitude = pseudoDataConfigs[2].amplitude*4 + pseudoDataConfigs[3].amplitude = pseudoDataConfigs[3].amplitude*5 + pseudoDataConfigs[4].amplitude = pseudoDataConfigs[4].amplitude*6 + pseudoDataConfigs[0].amplitude = pseudoDataConfigs[0].frequency*2 + pseudoDataConfigs[1].amplitude = pseudoDataConfigs[1].frequency*3 + pseudoDataConfigs[2].amplitude = pseudoDataConfigs[2].frequency*4 + pseudoDataConfigs[3].amplitude = pseudoDataConfigs[3].frequency*5 + pseudoDataConfigs[4].amplitude = pseudoDataConfigs[4].frequency*6 + self.pseudoDataConfigs = pseudoDataConfigs + self.sampleRate = sampleRate + self.channelCount = channelCount + markerInfo = pylsl.StreamInfo(markerName, markerType, 1, 0, 'string', 'Dev') + self.markerOutlet = pylsl.StreamOutlet(markerInfo) + info = pylsl.StreamInfo(dataStreamName, dataStreamType, self.channelCount, self.sampleRate, 'float32', 'Dev') + chns = info.desc().append_child("channels") + for label in range(self.channelCount): + ch = chns.append_child("channel") + ch.append_child_value("label", str(label+1)) + ch.append_child_value("type", dataStreamType) + self.outlet = pylsl.StreamOutlet(info) + + + def GeneratePseudoEMG(self,samplingRate, duration, noise_level, amplitude, frequency): + """ + Generate a pseudo EMG signal for a given gesture. + Arguments: + - sampling_rate: Number of samples per second + - duration: Duration of the signal in seconds + - noise_level: The amplitude of Gaussian noise to be added (default: 0.1) + - amplitude: The amplitude of the EMG signal (default: 1.0) + - frequency: The frequency of the EMG signal in Hz (default: 10.0) + Returns: + - emg_signal: The generated pseudo EMG signal as a 2D numpy array with shape (channels, samples) + """ + num_samples = int(duration * samplingRate) + # Initialize the EMG signal array + emg_signal = np.zeros((self.totchs, num_samples)) + # Generate the pseudo EMG signal for each channel + for channel in range(self.totchs): + # Calculate the time values for the channel + times = np.linspace(0, duration, num_samples) + # Generate the pseudo EMG signal based on the selected gesture + emg_channel = amplitude * np.sin(2 * np.pi * frequency * times)# * times) # Sinusoidal EMG signal + # Add Gaussian noise to the EMG signal + noise = np.random.normal(0, noise_level, num_samples) + emg_channel += noise + # Store the generated channel in the EMG signal array + emg_signal[channel, :] = emg_channel + return emg_signal + + + def update(self): + with self.lock: # Acquire the lock + if self.markerOccurred: + for i, command in enumerate(self.commandStrings): + if self.currentMarker == command: + num_samples = int(self.commandDataConfigs[i].duration/10 * self.sampleRate) + [self.y[0].popleft() for d in range(num_samples)] + num = self.GeneratePseudoEMG(self.sampleRate,self.commandDataConfigs[i].duration/10, self.commandDataConfigs[i].noise_level, + self.commandDataConfigs[i].amplitude, self.commandDataConfigs[i].frequency) + self.chunkCount += 1 + if self.chunkCount >= 10: + self.markerOccurred = False + self.chunkCount = 0 + else:# send baseline + num_samples = int(self.baselineConfig.duration/10 * self.sampleRate) + [self.y[0].popleft() for d in range(num_samples)] + num = self.GeneratePseudoEMG(self.sampleRate,self.baselineConfig.duration/10, self.baselineConfig.noise_level, + self.baselineConfig.amplitude, self.baselineConfig.frequency) + [self.y[0].extend([num[0][d]]) for d in range(num_samples)] + for n in range(num.shape[1]): + self.outlet.push_sample(num[:,n]) + + def StopTest(self): + self.stop_signal = True + self.thread.join() # wait for the thread to finish + + def BeginTest(self): + self.stop_signal = False + self.thread = threading.Thread(target=self._generate_signal) + self.thread.start() + + def _generate_signal(self): + while not self.stop_signal: + start_time = time.time() + self.update() + sleep_duration = max(0, (1.0 / 10) - (start_time - time.time())) + precise_sleep(sleep_duration) + + def SendMarker(self): + with self.lock: # Acquire the lock + self.markerOutlet.push_sample([self.currentMarker]) + self.markerOccurred = True + + def SendBaseline(self): + self.markerOutlet.push_sample(["Baseline"]) \ No newline at end of file diff --git a/build/lib/pybci/Utils/__init__.py b/build/lib/pybci/Utils/__init__.py new file mode 100644 index 0000000..fba0175 --- /dev/null +++ b/build/lib/pybci/Utils/__init__.py @@ -0,0 +1,4 @@ +#from .Classifier import Classifier +#from .FeatureExtractor import FeatureExtractor +#from .LSLScanner import LSLScanner + diff --git a/build/lib/pybci/__init__.py b/build/lib/pybci/__init__.py new file mode 100644 index 0000000..65e1181 --- /dev/null +++ b/build/lib/pybci/__init__.py @@ -0,0 +1,2 @@ +from .pybci import PyBCI +#from .Utils import Classifier, FeatureExtractor, LSLScanner \ No newline at end of file diff --git a/build/lib/pybci/pybci.py b/build/lib/pybci/pybci.py new file mode 100644 index 0000000..0a84d5f --- /dev/null +++ b/build/lib/pybci/pybci.py @@ -0,0 +1,348 @@ +from .Utils.LSLScanner import LSLScanner +from .Utils.Logger import Logger +from .ThreadClasses.FeatureProcessorThread import FeatureProcessorThread +from .ThreadClasses.DataReceiverThread import DataReceiverThread +from .ThreadClasses.AsyncDataReceiverThread import AsyncDataReceiverThread +from .ThreadClasses.OptimisedDataReceiverThread import OptimisedDataReceiverThread +from .ThreadClasses.MarkerThread import MarkerThread +from .ThreadClasses.ClassifierThread import ClassifierThread +from .Configuration.EpochSettings import GlobalEpochSettings, IndividualEpochSetting +from .Configuration.FeatureSettings import GeneralFeatureChoices +import queue, threading, copy +import tensorflow as tf +#import torch +import torch.nn as nn + +#tf.get_logger().setLevel('ERROR') + +class PyBCI: + globalEpochSettings = GlobalEpochSettings() + customEpochSettings = {} + minimumEpochsRequired = 10 + markerThread = [] + dataThreads = [] + streamChsDropDict= {} + dataStreams = [] + markerStream = None + connected = False + epochCounts = {} # holds markers received, their target ids and number received of each + classifierInformation = [] + clf= None + model = None + torchModel = None + def __init__(self, dataStreams = None, markerStream= None, streamTypes = None, markerTypes = None, loggingLevel = Logger.INFO, + globalEpochSettings = GlobalEpochSettings(), customEpochSettings = {}, streamChsDropDict = {}, + streamCustomFeatureExtract = {}, + minimumEpochsRequired = 10, clf= None, model = None, torchModel = None): + """ + The PyBCI object stores data from available lsl time series data streams (EEG, pupilometry, EMG, etc.) + and holds a configurable number of samples based on lsl marker strings. + If no marker strings are available on the LSL the class will close and return an error. + Parameters + ---------- + dataStreams: List[str] + Allows user to set custom acceptable EEG stream definitions, if None defaults to streamTypes scan + markerStream: List[str] + Allows user to set custom acceptable Marker stream definitions, if None defaults to markerTypes scan + streamTypes: List[str] + Allows user to set custom acceptable EEG type definitions, ignored if dataStreams not None + markerTypes: List[str] + Allows user to set custom acceptable Marker type definitions, ignored if markerStream not None + loggingLevel: string + Sets PyBCI print level, ('INFO' prints all statements, 'WARNING' is only warning messages, 'TIMING' gives estimated time for feature extraction, and classifier training or testing, 'NONE' means no prints from PyBCI) + globalEpochSettings (GlobalEpochSettings): + Sets global timing settings for epochs. + customEpochSettings: dict + Sets individual timing settings for epochs. {markerstring1:IndividualEpochSettings(),markerstring2:IndividualEpochSettings()} + streamChsDropDict: dict + Keys for dict should be respective datastreams with corresponding list of which channels to drop. {datastreamstring1: list(ints), datastreamstring2: list(ints)} + streamCustomFeatureExtract: dict + Allows dict to be passed of datastream with custom feature extractor class for analysing data. {datastreamstring1: customClass1(), datastreamstring2: customClass1(),} + minimumEpochsRequired: int + Minimm number of required epochs before model fitting begins, must be of each type of received markers and mroe then 1 type of marker to classify. + clf: sklearn.base.ClassifierMixin + Allows custom Sklearn model to be passed. + model: tf.keras.model + Allows custom tensorflow model to be passed. + torchmodel: [torchModel(), torch.nn.Module] + Currently a list where first item is torchmodel analysis function, second is torch model, check pytorch example - likely to change in future updates. + """ + self.streamCustomFeatureExtract = streamCustomFeatureExtract + self.globalEpochSettings = globalEpochSettings + self.customEpochSettings = customEpochSettings + self.streamChsDropDict = streamChsDropDict + self.loggingLevel = loggingLevel + self.logger = Logger(self.loggingLevel) + self.lslScanner = LSLScanner(self, dataStreams, markerStream,streamTypes, markerTypes, logger =self.logger) + self.ConfigureMachineLearning(minimumEpochsRequired, clf, model, torchModel) # configure first, connect second + self.Connect() + + def __enter__(self, dataStreams = None, markerStream= None, streamTypes = None, markerTypes = None, loggingLevel = Logger.INFO, + globalEpochSettings = GlobalEpochSettings(), customEpochSettings = {}, streamChsDropDict = {}, + streamCustomFeatureExtract = {}, + minimumEpochsRequired = 10, clf= None, model = None, torchModel = None): # with bci + """ + Please look at PyBCI.__init__ (same parameters, setup and description) + """ + self.streamCustomFeatureExtract = streamCustomFeatureExtract + self.globalEpochSettings = globalEpochSettings + self.customEpochSettings = customEpochSettings + self.streamChsDropDict = streamChsDropDict + self.lslScanner = LSLScanner(self, dataStreams, markerStream,streamTypes, markerTypes) + self.loggingLevel = loggingLevel + self.ConfigureMachineLearning(minimumEpochsRequired, clf, model, torchModel) # configure first, connect second + self.Connect() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.StopThreads() + + def Connect(self): # Checks valid data and markers streams are present, controls dependant functions by setting self.connected + if self.lslScanner.CheckAvailableLSL(): + self.__StartThreads() + self.connected = True + return True # uses return statements so user can check if connected with bool returned + else: + self.connected = False + return False + + # set test and train boolean for changing thread operation + def TrainMode(self): + """ + Starts BCI training If PyBCI is connected to valid LSL data and marker streams, if not tries to scan and connect. + """ + if self.connected: + self.logger.log(Logger.INFO," Started training...") + self.trainTestEvent.set() + else: + self.Connect() + + def TestMode(self): + """ + Starts BCI testing If PyBCI is connected to valid LSL data and marker streams, if not tries to scan and connect. + (Need to check if invalid number of epochs are obtained and this is set) + """ + if self.connected: + self.logger.log(Logger.INFO," Started testing...") + self.trainTestEvent.clear() + else: + self.Connect() + + # Get data from threads + def CurrentClassifierInfo(self): + """ + Gets dict with current clf, model, torchModel and accuracy. Accuracy will be 0 if not fiting has occurred. + Returns + ------- + classInfo: dict + dict of "clf", "model, "torchModel"" and "accuracy" where accuracy is 0 if no model training/fitting has occurred. If mode not used corresponding value is None. + If not connected returns {"Not Connected": None} + """ + if self.connected: + self.classifierInfoRetrieveEvent.set() + classInfo = self.classifierInfoQueue.get() + self.classifierInfoRetrieveEvent.clear() + return classInfo + else: + self.Connect() + return {"Not Connected": None} + + def CurrentClassifierMarkerGuess(self): + """ + Gets classifier current marker guess and targets. + Returns + ------- + classGuess: int | None + Returned int correlates to value of key from dict from ReceivedMarkerCount() when in testmode. + If in trainmode returns None. + """ + if self.connected: + # probably needs check that we're in test mode, maybe debu print if not? + self.classifierGuessMarkerEvent.set() + classGuess = self.classifierGuessMarkerQueue.get() + self.classifierGuessMarkerEvent.clear() + return classGuess + else: + self.Connect() + return None + + def CurrentFeaturesTargets(self): + """ + Gets classifier current features and targets. + Returns + ------- + featureTargets: dict + dict of "features" and "targets" where features is 2d list of feature data and targets is a 1d list of epoch targets as ints. + If not connected returns {"Not Connected": None} + """ + if self.connected: + self.queryFeaturesEvent.set() + featureTargets = self.queryFeaturesQueue.get() + self.queryFeaturesEvent.clear() # still needs coding + return featureTargets + else: + self.Connect() + return {"Not Connected": None} + + def ReceivedMarkerCount(self): + """ + Gets number of received training marker, their strings and their respective values to correlate with CurrentClassifierMarkerGuess(). + Returns + ------- + markers: dict + Every key is a string received on the selected LSL marker stream, the value is a list where the first item is the marker id value, + use with CurrentClassifierMarkerGuess() the second value is a received count for that marker type. Will be empty if no markers received. + """ + if self.connected: + self.markerCountRetrieveEvent.set() + markers = self.markerCountQueue.get() + self.markerCountRetrieveEvent.clear() + return markers + else: + self.Connect() + + def __StartThreads(self): + self.featureQueueTrain = queue.Queue() + self.featureQueueTest = queue.Queue() + self.classifierInfoQueue = queue.Queue() + self.markerCountQueue = queue.Queue() + self.classifierGuessMarkerQueue = queue.Queue() + self.classifierGuessMarkerEvent = threading.Event() + self.closeEvent = threading.Event() # used for closing threads + self.trainTestEvent = threading.Event() + self.markerCountRetrieveEvent = threading.Event() + self.classifierInfoRetrieveEvent = threading.Event() + + self.queryFeaturesQueue = queue.Queue() + self.queryFeaturesEvent = threading.Event() # still needs coding + + self.trainTestEvent.set() # if set we're in train mode, if not we're in test mode, always start in train... + self.logger.log(Logger.INFO," Starting threads initialisation...") + # setup data thread + self.dataThreads = [] + self.featureThreads = [] + for stream in self.dataStreams: + self.dataQueueTrain = queue.Queue() + self.dataQueueTest = queue.Queue() + + #if stream.info().nominal_srate() == 0: + # if stream.info().name() in self.streamChsDropDict.keys(): + # dt = AsyncDataReceiverThread(self.closeEvent, self.trainTestEvent, self.dataQueueTrain,self.dataQueueTest, stream, self.customEpochSettings, + # self.globalEpochSettings, len(self.dataThreads), streamChsDropDict=self.streamChsDropDict[stream.info().name()]) + # else: + # dt = AsyncDataReceiverThread(self.closeEvent, self.trainTestEvent, self.dataQueueTrain,self.dataQueueTest, stream, self.customEpochSettings, + # self.globalEpochSettings, len(self.dataThreads)) + #else: # cold be desirable to capture samples only relative to timestammps with async, so maybe make this configurable? + #if stream.info().nominal_srate() == 0: + print(len(self.dataThreads)) + print(stream.info().name()) + if stream.info().name() in self.streamChsDropDict.keys(): ## all use optimised now (pull_chunk and timestamp relative) + #print(self.streamChsDropDict[stream.info().name()]) + dt = OptimisedDataReceiverThread(self.closeEvent, self.trainTestEvent, self.dataQueueTrain,self.dataQueueTest, stream, self.customEpochSettings, + self.globalEpochSettings, len(self.dataThreads), streamChsDropDict=self.streamChsDropDict[stream.info().name()]) + else: + dt = OptimisedDataReceiverThread(self.closeEvent, self.trainTestEvent, self.dataQueueTrain,self.dataQueueTest, stream, self.customEpochSettings, + self.globalEpochSettings, len(self.dataThreads)) + #else: + # if stream.info().name() in self.streamChsDropDict.keys(): ## all use optimised now (pull_chunk and timestamp relative) + # #print(self.streamChsDropDict[stream.info().name()]) + # dt = OptimisedDataReceiverThread(self.closeEvent, self.trainTestEvent, self.dataQueueTrain,self.dataQueueTest, stream, self.customEpochSettings, + # self.globalEpochSettings, len(self.dataThreads), streamChsDropDict=self.streamChsDropDict[stream.info().name()], maxExpectedSampleRate = stream.info().nominal_srate()) + # else: + # dt = OptimisedDataReceiverThread(self.closeEvent, self.trainTestEvent, self.dataQueueTrain,self.dataQueueTest, stream, self.customEpochSettings, + # self.globalEpochSettings, len(self.dataThreads),maxExpectedSampleRate = stream.info().nominal_srate()) + dt.start() + self.dataThreads.append(dt) + if stream.info().name() in self.streamCustomFeatureExtract.keys(): + self.ft = FeatureProcessorThread(self.closeEvent,self.trainTestEvent, self.dataQueueTrain, self.dataQueueTest, + self.featureQueueTest,self.featureQueueTrain, len(self.dataStreams), + self.markerCountRetrieveEvent, self.markerCountQueue,logger=self.logger, + featureExtractor = self.streamCustomFeatureExtract[stream.info().name()], + globalEpochSettings = self.globalEpochSettings, customEpochSettings = self.customEpochSettings) + else: + self.ft = FeatureProcessorThread(self.closeEvent,self.trainTestEvent, self.dataQueueTrain, self.dataQueueTest, + self.featureQueueTest,self.featureQueueTrain, len(self.dataStreams), + self.markerCountRetrieveEvent, self.markerCountQueue,logger=self.logger, + globalEpochSettings = self.globalEpochSettings, customEpochSettings = self.customEpochSettings) + self.ft.start() + self.featureThreads.append(dt) + # marker thread requires data and feature threads to push new markers too + self.markerThread = MarkerThread(self.closeEvent,self.trainTestEvent, self.markerStream,self.dataThreads, self.featureThreads) + self.markerThread.start() + self.classifierThread = ClassifierThread(self.closeEvent,self.trainTestEvent, self.featureQueueTest,self.featureQueueTrain, + self.classifierInfoQueue, self.classifierInfoRetrieveEvent, + self.classifierGuessMarkerQueue, self.classifierGuessMarkerEvent, self.queryFeaturesQueue, self.queryFeaturesEvent, + logger = self.logger, numStreamDevices = len(self.dataThreads), minRequiredEpochs = self.minimumEpochsRequired, + clf = self.clf, model = self.model, torchModel = self.torchModel) + self.classifierThread.start() + self.logger.log(Logger.INFO," Threads initialised.") + + + def StopThreads(self): + """ + Stops all PyBCI threads. + """ + self.closeEvent.set() + self.markerThread.join() + # wait for all threads to finish processing, probably worth pulling out finalised classifier information stored for later use. + for dt in self.dataThreads: + dt.join() + for ft in self.featureThreads: + ft.join() + self.classifierThread.join() + self.connected = False + self.logger.log(Logger.INFO," Threads stopped.") + + def ConfigureMachineLearning(self, minimumEpochsRequired = 10, clf = None, model = None, torchModel = None): + from sklearn.base import ClassifierMixin + self.minimumEpochsRequired = minimumEpochsRequired + + if isinstance(clf, ClassifierMixin): + self.clf = clf + else: + self.clf = None + self.logger.log(Logger.INFO," Invalid or no sklearn classifier passed to clf. Checking tensorflow model... ") + if isinstance(model, tf.keras.Model): + self.model = model + else: + self.model = None + self.logger.log(Logger.INFO," Invalid or no tensorflow model passed to model. Checking pytorch torchModel...") + if callable(torchModel): # isinstance(torchModel, torch.nn.Module): + self.torchModel = torchModel + else: + self.torchModel = None + self.logger.log(Logger.INFO," Invalid or no PyTorch model passed to model. Defaulting to SVM by SkLearn") + + + # Could move all configures to a configuration class, might make options into more descriptive classes? + def ConfigureEpochWindowSettings(self, globalEpochSettings = GlobalEpochSettings(), customEpochSettings = {}): + """ + Allows globalWindowSettings to be modified, customWindowSettings is a dict with value names for marker strings which will appear on avalable markerStreams. + """ + valid = False + for key in customEpochSettings.keys(): + if isinstance(customEpochSettings[key], IndividualEpochSetting): + valid = True + else: + valid = False + self.logger.log(Logger.WARNING," Invalid datatype passed for customWindowSettings, create dict of wanted markers \ + using class bci.IndividualEpochSetting() as value to configure individual epoch window settings.") + break + #if isinstance(customWindowSettings[key], GlobalEpochSettings()): + if valid: + self.customEpochSettings = customEpochSettings + if globalEpochSettings.windowLength > globalEpochSettings.tmax + globalEpochSettings.tmin: + self.logger.log(Logger.WARNING," windowLength < (tmin+tmax), pass vaid settings to ConfigureEpochWindowSettings") + else: + self.globalWindowglobalEpochSettingsSettings = globalEpochSettings + self.ResetThreadsAfterConfigs() + + def ConfigureDataStreamChannels(self,streamChsDropDict = {}): + # potentially should move configuration to generic class which can be used for both test and train + self.streamChsDropDict = streamChsDropDict + self.ResetThreadsAfterConfigs() + + def ResetThreadsAfterConfigs(self): + if self.connected: + self.logger.log(Logger.INFO,"Resetting threads after BCI reconfiguration...") + self.StopThreads() + self.Connect() diff --git a/build/lib/pybci/version.py b/build/lib/pybci/version.py new file mode 100644 index 0000000..a6221b3 --- /dev/null +++ b/build/lib/pybci/version.py @@ -0,0 +1 @@ +__version__ = '1.0.2' diff --git a/dist/install_pybci-1.0.0-py3.11.egg b/dist/install_pybci-1.0.0-py3.11.egg new file mode 100644 index 0000000..e5b6463 Binary files /dev/null and b/dist/install_pybci-1.0.0-py3.11.egg differ diff --git a/dist/install_pybci-1.0.2-py3.11.egg b/dist/install_pybci-1.0.2-py3.11.egg new file mode 100644 index 0000000..587c3c1 Binary files /dev/null and b/dist/install_pybci-1.0.2-py3.11.egg differ diff --git a/install_pybci.egg-info/PKG-INFO b/install_pybci.egg-info/PKG-INFO new file mode 100644 index 0000000..5602234 --- /dev/null +++ b/install_pybci.egg-info/PKG-INFO @@ -0,0 +1,102 @@ +Metadata-Version: 2.1 +Name: install-pybci +Version: 1.0.2 +Summary: A Python interface to create a BCI with the Lab Streaming Layer, Pytorch, SciKit-Learn and Tensorflow packages +Home-page: https://github.com/lmbooth/pybci +Author: Liam Booth +Author-email: liambooth123@hotmail.co.uk +License: MIT +Keywords: machine-learning tensorflow sklearn pytorch human-computer-interaction bci lsl brain-computer-interface labstreaminglayer +Classifier: Development Status :: 5 - Production/Stable +Classifier: Intended Audience :: Developers +Classifier: Intended Audience :: Science/Research +Classifier: Intended Audience :: Healthcare Industry +Classifier: Topic :: Scientific/Engineering :: Human Machine Interfaces +Classifier: Topic :: Scientific/Engineering :: Bio-Informatics +Classifier: Topic :: Scientific/Engineering :: Medical Science Apps. +Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence +Classifier: Topic :: Scientific/Engineering +Classifier: License :: OSI Approved :: MIT License +Classifier: Operating System :: Microsoft :: Windows +Classifier: Operating System :: POSIX :: Linux +Classifier: Operating System :: MacOS +Classifier: Programming Language :: Python :: 3 +Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 +Classifier: Programming Language :: Python :: 3.11 +Requires-Python: >=3.9 +Description-Content-Type: text/markdown +License-File: LICENSE + +[![PyPI - Downloads](https://img.shields.io/pypi/dm/install-pybci)](https://pypi.org/project/install-pybci) [![PyPI - version](https://img.shields.io/pypi/v/install-pybci)](https://pypi.org/project/install-pybci) [![Documentation Status](https://readthedocs.org/projects/pybci/badge/?version=latest)](https://pybci.readthedocs.io/en/latest/?badge=latest) + +[![pybci](https://raw.githubusercontent.com/LMBooth/pybci/main/docs/Images/pyBCITitle.svg)](https://github.com/LMBooth/pybci) + +A Python package to create real-time Brain Computer Interfaces (BCI's). Data synchronisation and pipelining handled by the [Lab Streaming Layer](https://github.com/sccn/labstreaminglayer), machine learning with [Pytorch](https://pytorch.org/), [scikit-learn](https://scikit-learn.org/stable/#) or [TensorFlow](https://www.tensorflow.org/install), leveraging packages like [AntroPy](https://github.com/raphaelvallat/antropy), [SciPy](https://scipy.org/) and [NumPy](https://numpy.org/) for generic time and/or frequency based feature extraction or optionally have the users own custom feature extraction class used. + +The goal of PyBCI is to enable quick iteration when creating pipelines for testing human machine and brain computer interfaces, namely testing applied data processing and feature extraction techniques on custom machine learning models. Training the BCI requires LSL enabled devices and an LSL marker stream for training stimuli. (The [examples folder](https://github.com/LMBooth/pybci/tree/main/pybci/Examples) found on the github has a [pseudo LSL data generator and marker creator](https://github.com/LMBooth/pybci/tree/main/pybci/Examples/PsuedoLSLStreamGenerator) in the [mainSend.py](https://github.com/LMBooth/pybci/tree/main/pybci/Examples/PsuedoLSLStreamGenerator/mainSend.py) file so the examples can run without the need of LSL capable hardware.) + +# Installation +For stable releases use: ```pip install install-pybci``` + +For unstable dev installations and up-to-date git pushes use: ```pip install --index-url https://test.pypi.org/simple/ install-pybci``` + +## Prerequisite for Non-Windows Users +If you are not using windows then there is a prerequisite stipulated on the [pylsl repository](https://github.com/labstreaminglayer/pylsl) to obtain a liblsl shared library. See the [liblsl repo documentation](https://github.com/sccn/liblsl) for more information. +Once the liblsl library has been downloaded ```pip install install-pybci``` should work. + +(currently using install-pybci due to pybci having name too similar with another package on pypi, [issue here.](https://github.com/pypi/support/issues/2840)) + +[ReadTheDocs available here!](https://pybci.readthedocs.io/en/latest/) [Examples found here!](https://github.com/LMBooth/pybci/tree/main/pybci/Examples) + +[Examples of supported LSL hardware here!](https://labstreaminglayer.readthedocs.io/info/supported_devices.html) + +## Python Package Dependencies Version Minimums +The following package versions define the minimum supported by PyBCI, also defined in setup.py: + + "pylsl>=1.16.1", + "scipy>=1.11.1", + "numpy>=1.24.3", + "antropy>=0.1.6", + "tensorflow>=2.13.0", + "scikit-learn>=1.3.0", + "torch>=2.0.1" + +Earlier packages may work but are not guaranteed to be supported. + +## Basic implementation +```python +import time +from pybci import PyBCI +bci = PyBCI() # set default epoch timing, looks for first available lsl marker stream and all data streams +bci.TrainMode() # assume both marker and datastreams available to start training on received epochs +accuracy = 0 +try: + while(True): # training based on couple epochs more then min threshold for classifying + currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent are too close together will be ignored till done processing + time.sleep(1) # wait for marker updates + print("Markers received: " + str(currentMarkers) +" Class accuracy: " + str(accuracy), end="\r") + if len(currentMarkers) > 1: # check there is more then one marker type received + if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: + classInfo = bci.CurrentClassifierInfo() # hangs if called too early + accuracy = classInfo["accuracy"] + if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired+1: + bci.TestMode() + break + while True: # now sufficient epochs gathered start testing + markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned + guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess] + print("Current marker estimation: " + str(guess), end="\r") + time.sleep(0.5) +except KeyboardInterrupt: # allow user to break while loop + pass +``` + +## Background Information +PyBCI is a python brain computer interface software designed to receive a varying number, be it singular or multiple, Lab Streaming Layer enabled data streams. An understanding of time-series data analysis, the lab streaming layer protocol, and machine learning techniques are a must to integrate innovative ideas with this interface. An LSL marker stream is required to train the model, where a received marker epochs the data received on the accepted datastreams based on a configurable time window around certain markers - where custom marker strings can optionally have its epoch timewindow split and overlapped to count as more then one marker, example: in training mode a baseline marker may have one marker sent for a 60 second window, whereas target actions may only be ~0.5s long, when testing the model and data is constantly analysed it would be desirable to standardise the window length, we do this by splitting the 60s window after the received baseline marker in to ~0.5s windows. PyBCI allows optional overlapping of time windows to try to account for potential missed signal patterns/aliasing - as a rule of thumb it would be advised when testing a model to have a time window overlap >= 50% (Shannon-Nyquist criterion). [See here for more information on epoch timing](https://pybci.readthedocs.io/en/latest/BackgroundInformation/Epoch_Timing.html). + +Once the data has been epoched it is sent for feature extraction, there is a general feature extraction class which can be configured for general time and/or frequency analysis based features, ideal for data stream types like "EEG" and "EMG". Since data analysis, preprocessing and feature extraction trechniques can vary greatly between device data inputs, a custom feature extraction class can be created for each data stream maker type. [See here for more information on feature extraction](https://pybci.readthedocs.io/en/latest/BackgroundInformation/Feature_Selection.html). + +Finally a passable pytorch, sklearn or tensorflow classifier can be given to the bci class, once a defined number of epochs have been obtained for each received epoch/marker type the classifier can begin to fit the model. It's advised to use bci.ReceivedMarkerCount() to get the number of received training epochs received, once the min num epochs received of each type is >= pybci.minimumEpochsRequired (default 10 of each epoch) the model will begin to fit. Once fit the classifier info can be queried with CurrentClassifierInfo, this returns the model used and accuracy. If enough epochs are received or high enough accuracy is obtained TestMode() can be called. Once in test mode you can query what pybci estimates the current bci epoch is(typically baseline is used for no state). [Review the examples for sklearn and model implementations](https://pybci.readthedocs.io/en/latest/BackgroundInformation/Examples.html). + +## All issues, recommendations, pull-requests and suggestions are welcome and encouraged! diff --git a/install_pybci.egg-info/SOURCES.txt b/install_pybci.egg-info/SOURCES.txt new file mode 100644 index 0000000..e66f9e3 --- /dev/null +++ b/install_pybci.egg-info/SOURCES.txt @@ -0,0 +1,29 @@ +LICENSE +MANIFEST.in +README.md +setup.py +install_pybci.egg-info/PKG-INFO +install_pybci.egg-info/SOURCES.txt +install_pybci.egg-info/dependency_links.txt +install_pybci.egg-info/requires.txt +install_pybci.egg-info/top_level.txt +pybci/__init__.py +pybci/pybci.py +pybci/version.py +pybci/Configuration/EpochSettings.py +pybci/Configuration/FeatureSettings.py +pybci/Configuration/PsuedoDeviceSettings.py +pybci/Configuration/__init__.py +pybci/ThreadClasses/AsyncDataReceiverThread.py +pybci/ThreadClasses/ClassifierThread.py +pybci/ThreadClasses/DataReceiverThread.py +pybci/ThreadClasses/FeatureProcessorThread.py +pybci/ThreadClasses/MarkerThread.py +pybci/ThreadClasses/OptimisedDataReceiverThread.py +pybci/ThreadClasses/__init__.py +pybci/Utils/Classifier.py +pybci/Utils/FeatureExtractor.py +pybci/Utils/LSLScanner.py +pybci/Utils/Logger.py +pybci/Utils/PseudoDevice.py +pybci/Utils/__init__.py \ No newline at end of file diff --git a/install_pybci.egg-info/dependency_links.txt b/install_pybci.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/install_pybci.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/install_pybci.egg-info/requires.txt b/install_pybci.egg-info/requires.txt new file mode 100644 index 0000000..f93d47e --- /dev/null +++ b/install_pybci.egg-info/requires.txt @@ -0,0 +1,7 @@ +pylsl>=1.16.1 +scipy>=1.11.1 +numpy>=1.24.3 +antropy>=0.1.6 +tensorflow>=2.13.0 +scikit-learn>=1.3.0 +torch>=2.0.1 diff --git a/install_pybci.egg-info/top_level.txt b/install_pybci.egg-info/top_level.txt new file mode 100644 index 0000000..3064b26 --- /dev/null +++ b/install_pybci.egg-info/top_level.txt @@ -0,0 +1 @@ +pybci diff --git a/pybci/Configuration/FeatureSettings.py b/pybci/Configuration/FeatureSettings.py index f387cc7..406b851 100644 --- a/pybci/Configuration/FeatureSettings.py +++ b/pybci/Configuration/FeatureSettings.py @@ -1,5 +1,5 @@ class GeneralFeatureChoices: - psdBand = True + psdBand = False appr_entropy = False perm_entropy = False spec_entropy = False @@ -7,9 +7,9 @@ class GeneralFeatureChoices: samp_entropy = False rms = True meanPSD = True - medianPSD = True - variance = True - meanAbs = True + medianPSD = False + variance = False + meanAbs = False waveformLength = False zeroCross = False slopeSignChange = False \ No newline at end of file diff --git a/pybci/Configuration/PseudoDeviceSettings.py b/pybci/Configuration/PseudoDeviceSettings.py new file mode 100644 index 0000000..22789de --- /dev/null +++ b/pybci/Configuration/PseudoDeviceSettings.py @@ -0,0 +1,18 @@ + +class PseudoDataConfig: + duration = 1.0 + noise_level = 0.5 + amplitude = 2 + frequency = 5 + +class PseudoMarkerConfig: + markerName = "PyBCIPseudoMarkers" + markerType = "Markers" + baselineMarkerString = "baseline" + repeat = True + autoplay = True + num_baseline_markers = 10 + number_marker_iterations = 10 + seconds_between_markers = 5 + seconds_between_baseline_marker = 10 + baselineConfig = PseudoDataConfig() \ No newline at end of file diff --git a/pybci/Configuration/__pycache__/EpochSettings.cpython-311.pyc b/pybci/Configuration/__pycache__/EpochSettings.cpython-311.pyc new file mode 100644 index 0000000..15426cf Binary files /dev/null and b/pybci/Configuration/__pycache__/EpochSettings.cpython-311.pyc differ diff --git a/pybci/Configuration/__pycache__/FeatureSettings.cpython-311.pyc b/pybci/Configuration/__pycache__/FeatureSettings.cpython-311.pyc new file mode 100644 index 0000000..9c78670 Binary files /dev/null and b/pybci/Configuration/__pycache__/FeatureSettings.cpython-311.pyc differ diff --git a/pybci/Configuration/__pycache__/PseudoDeviceSettings.cpython-311.pyc b/pybci/Configuration/__pycache__/PseudoDeviceSettings.cpython-311.pyc new file mode 100644 index 0000000..330a6bc Binary files /dev/null and b/pybci/Configuration/__pycache__/PseudoDeviceSettings.cpython-311.pyc differ diff --git a/pybci/Configuration/__pycache__/PsuedoDeviceSettings.cpython-311.pyc b/pybci/Configuration/__pycache__/PsuedoDeviceSettings.cpython-311.pyc new file mode 100644 index 0000000..abd863a Binary files /dev/null and b/pybci/Configuration/__pycache__/PsuedoDeviceSettings.cpython-311.pyc differ diff --git a/pybci/Configuration/__pycache__/__init__.cpython-311.pyc b/pybci/Configuration/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..8657222 Binary files /dev/null and b/pybci/Configuration/__pycache__/__init__.cpython-311.pyc differ diff --git a/pybci/Examples/PsuedoLSLStreamGenerator/README.md b/pybci/Examples/PseudoLSLStreamGenerator/README.md similarity index 100% rename from pybci/Examples/PsuedoLSLStreamGenerator/README.md rename to pybci/Examples/PseudoLSLStreamGenerator/README.md diff --git a/pybci/Examples/PsuedoLSLStreamGenerator/mainSend.py b/pybci/Examples/PseudoLSLStreamGenerator/mainSend.py similarity index 100% rename from pybci/Examples/PsuedoLSLStreamGenerator/mainSend.py rename to pybci/Examples/PseudoLSLStreamGenerator/mainSend.py diff --git a/pybci/Examples/PseudoLSLStreamGenerator/pseudoEMG.py b/pybci/Examples/PseudoLSLStreamGenerator/pseudoEMG.py new file mode 100644 index 0000000..bc576d5 --- /dev/null +++ b/pybci/Examples/PseudoLSLStreamGenerator/pseudoEMG.py @@ -0,0 +1,36 @@ +from pybci import PyBCI +from pybci.Utils.PseudoDevice import PseudoDevice, PseudoDeviceController +from pybci.Configuration.PseudoDeviceSettings import PseudoMarkerConfig +import time +import numpy as np + +if __name__ == '__main__': + pseudoDevice = PseudoDeviceController(execution_mode='process') + pseudoDevice.BeginStreaming() + + bci = PyBCI(minimumEpochsRequired = 6)# loggingLevel = Logger.TIMING)# loggingLevel = Logger.NONE) + while not bci.connected: # check to see if lsl marker and datastream are available + bci.Connect() + time.sleep(1) + bci.TrainMode() # now both marker and datastreams available start training on received epochs + accuracy = 0 + try: + while(True): + currentMarkers = bci.ReceivedMarkerCount() # check to see how many received epochs, if markers sent to close together will be ignored till done processing + time.sleep(0.5) # wait for marker updates + print("Markers received: " + str(currentMarkers) +" Class accuracy: " + str(accuracy), end="\r") + if len(currentMarkers) > 1: # check there is more then one marker type received + if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired: + classInfo = bci.CurrentClassifierInfo() # hangs if called too early + accuracy = classInfo["accuracy"] + if min([currentMarkers[key][1] for key in currentMarkers]) > bci.minimumEpochsRequired+10: + bci.TestMode() + break + while True: + markerGuess = bci.CurrentClassifierMarkerGuess() # when in test mode only y_pred returned + guess = [key for key, value in currentMarkers.items() if value[0] == markerGuess] + print("Current marker estimation: " + str(guess), end="\r") + time.sleep(0.2) + except KeyboardInterrupt: # allow user to break while loop + pseudoDevice.StopStreaming() + print("\nLoop interrupted by user.") diff --git a/pybci/Examples/asyncTest.py b/pybci/Examples/asyncTest.py deleted file mode 100644 index 4360b7d..0000000 --- a/pybci/Examples/asyncTest.py +++ /dev/null @@ -1,24 +0,0 @@ -from pylsl import StreamInlet, resolve_stream -import threading -import time -class MyInlet(threading.Thread): - def run(self): - streams = resolve_stream() - for stream in streams: - print(stream.name()) - if stream.name() == "pupil_capture": - my_inlet = StreamInlet(stream) - # ... setup inlet for an irregular stream - while True: - data, timestamps = my_inlet.pull_chunk(timeout=5.0) - print(f"Received {len(timestamps)} samples.", time.time()) -class waiter(threading.Thread): - def run(self): - while True: - print("waiting...", time.time()) - time.sleep(1) -def run(): - MyInlet().start() - time.sleep(1.0) - waiter().start() -run() \ No newline at end of file diff --git a/pybci/ThreadClasses/__pycache__/AsyncDataReceiverThread.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/AsyncDataReceiverThread.cpython-311.pyc new file mode 100644 index 0000000..e0132ad Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/AsyncDataReceiverThread.cpython-311.pyc differ diff --git a/pybci/ThreadClasses/__pycache__/ClassifierThread.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/ClassifierThread.cpython-311.pyc new file mode 100644 index 0000000..1ff1d6b Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/ClassifierThread.cpython-311.pyc differ diff --git a/pybci/ThreadClasses/__pycache__/DataReceiverThread.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/DataReceiverThread.cpython-311.pyc new file mode 100644 index 0000000..eb7528f Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/DataReceiverThread.cpython-311.pyc differ diff --git a/pybci/ThreadClasses/__pycache__/FeatureProcessorThread.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/FeatureProcessorThread.cpython-311.pyc new file mode 100644 index 0000000..04ce339 Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/FeatureProcessorThread.cpython-311.pyc differ diff --git a/pybci/ThreadClasses/__pycache__/MarkerThread.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/MarkerThread.cpython-311.pyc new file mode 100644 index 0000000..cbd4252 Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/MarkerThread.cpython-311.pyc differ diff --git a/pybci/ThreadClasses/__pycache__/OptimisedDataReceiverThread.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/OptimisedDataReceiverThread.cpython-311.pyc new file mode 100644 index 0000000..eb5415b Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/OptimisedDataReceiverThread.cpython-311.pyc differ diff --git a/pybci/ThreadClasses/__pycache__/__init__.cpython-311.pyc b/pybci/ThreadClasses/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..ffb97a4 Binary files /dev/null and b/pybci/ThreadClasses/__pycache__/__init__.cpython-311.pyc differ diff --git a/pybci/Utils/Logger.py b/pybci/Utils/Logger.py index 8b83294..fc08ea5 100644 --- a/pybci/Utils/Logger.py +++ b/pybci/Utils/Logger.py @@ -1,10 +1,13 @@ +import multiprocessing + class Logger: INFO = "INFO" WARNING = "WARNING" NONE = "NONE" TIMING = "TIMING" - def __init__(self, level=INFO): + def __init__(self, level=INFO, log_queue=None): + self.queue = log_queue self.level = level self.check_level(level) #print(self.level) @@ -19,14 +22,34 @@ def check_level(self,level): self.level = level def log(self, level, message): - if self.level == self.NONE: - return None - if level == self.INFO: - if self.level != self.NONE and self.level != self.WARNING: - print('PyBCI: [INFO] -' + message) - elif level == self.WARNING: - if self.level != self.NONE: - print('PyBCI: [WARNING] -' + message) - elif level == self.TIMING: - if self.level == self.TIMING: - print('PyBCI: [TIMING] -' + message) \ No newline at end of file + if self.queue is not None and isinstance(self.queue, multiprocessing.Queue): + if self.level == self.NONE: + return None + if level == self.INFO: + if self.level != self.NONE and self.level != self.WARNING: + self.queue.put('PyBCI: [INFO] -' + message) + elif level == self.WARNING: + if self.level != self.NONE: + self.queue.put('PyBCI: [WARNING] -' + message) + elif level == self.TIMING: + if self.level == self.TIMING: + self.queue.put('PyBCI: [TIMING] -' + message) + else: + if self.level == self.NONE: + return None + if level == self.INFO: + if self.level != self.NONE and self.level != self.WARNING: + print('PyBCI: [INFO] -' + message) + elif level == self.WARNING: + if self.level != self.NONE: + print('PyBCI: [WARNING] -' + message) + elif level == self.TIMING: + if self.level == self.TIMING: + print('PyBCI: [TIMING] -' + message) + + def start_queue_reader(self): + while True: + message = self.queue.get() + if message == "STOP": + break + print(message) diff --git a/pybci/Utils/PseudoDevice.py b/pybci/Utils/PseudoDevice.py new file mode 100644 index 0000000..bd7829b --- /dev/null +++ b/pybci/Utils/PseudoDevice.py @@ -0,0 +1,278 @@ +################ PsuedoDevice.py ######################## +# used for creating fake LSL device data and markers # +# Please note! sample rate is not exact, expect # +# some drop over time! linked to overall cpu strain # +# Written by Liam Booth 19/08/2023 # +######################################################### +from ..Utils.Logger import Logger +from ..Configuration.PseudoDeviceSettings import PseudoDataConfig, PseudoMarkerConfig +import multiprocessing, time, threading, pylsl, queue +import numpy as np +from collections import deque + +class PseudoDeviceController: + log_queue = None + def __init__(self, execution_mode='process', *args, **kwargs): + self.execution_mode = execution_mode + self.args = args + self.kwargs = kwargs + + # Create a command queue for the worker + if self.execution_mode == 'process': + self.command_queue = multiprocessing.Queue() + self.stop_signal = multiprocessing.Event() + self.worker = multiprocessing.Process(target=self._run_device) + self.log_queue = multiprocessing.Queue() + # Note: Don't initialize self.device here for 'process' mode! + elif self.execution_mode == 'thread': + self.command_queue = None # Not needed for threads, but kept for consistency + self.stop_signal = False + self.device = PseudoDevice(*self.args, **self.kwargs, stop_signal=self.stop_signal, is_multiprocessing=False) # Initialize for 'thread' mode + self.worker = threading.Thread(target=self._run_device) + else: + raise ValueError(f"Unsupported execution mode: {execution_mode}") + + self.worker.start() + # Initialize the logger + + self.logger = Logger(log_queue=self.log_queue) + self.log_reader_process = None + if self.execution_mode == 'process': + self.log_reader_process = multiprocessing.Process(target=self.logger.start_queue_reader) + self.log_reader_process.start() + + + def _run_device(self): + if self.execution_mode == 'process': + device = PseudoDevice(*self.args, **self.kwargs, stop_signal=self.stop_signal, log_queue=self.log_queue, is_multiprocessing=True) # Initialize locally for 'process' mode + + while not self._should_stop(): + if not self.command_queue.empty(): + command = self.command_queue.get() + if command == "BeginStreaming": + device.BeginStreaming() + # Sleep for a brief moment before checking again + time.sleep(0.01) + + elif self.execution_mode == 'thread': + while not self._should_stop(): + self.device.update() # or any other method you want to run continuously + time.sleep(0.01) + + def _should_stop(self): + if self.execution_mode == 'process': + return self.stop_signal.is_set() + else: # thread + return self.stop_signal + + def BeginStreaming(self): + if self.execution_mode == 'process': + self.command_queue.put("BeginStreaming") + else: # thread + self.device.BeginStreaming() + + def StopStreaming(self): + if self.execution_mode == 'process': + self.stop_signal.set() + else: # thread + self.stop_signal = True + self.worker.join() # Wait for the worker to finish + + def close(): + # add close logic + print("close it") + +def precise_sleep(duration): + end_time = time.time() + duration + while time.time() < end_time: + pass + +class PseudoDevice: + samples_generated = 0 + chunkCount = 0 + #markerOccurred = False + current_marker = None + def __init__(self,stop_signal, is_multiprocessing=True, markerConfigStrings = ["Marker1", "Marker2", "Marker3"], + pseudoMarkerDataConfigs = None, + pseudoMarkerConfig = PseudoMarkerConfig, + dataStreamName = "PyBCIPseudoDataStream" , dataStreamType="EMG", + sampleRate= 250, channelCount= 8, logger = Logger(Logger.INFO),log_queue=None): + #self.currentMarker_lock = threading.Lock() + self.markerQueue = queue.Queue() + self.is_multiprocessing = is_multiprocessing + self.stop_signal = stop_signal + self.logger = logger + self.log_queue = log_queue + self.lock = threading.Lock() # Lock for thread safety + self.markerConfigStrings = markerConfigStrings + self.baselineConfig = pseudoMarkerConfig.baselineConfig + self.baselineMarkerString = pseudoMarkerConfig.baselineMarkerString + self.currentMarker = markerConfigStrings[0] + if pseudoMarkerDataConfigs == None: + pseudoMarkerDataConfigs = [PseudoDataConfig(), PseudoDataConfig(), PseudoDataConfig()] + pseudoMarkerDataConfigs[0].amplitude = 5 + pseudoMarkerDataConfigs[1].amplitude = 6 + pseudoMarkerDataConfigs[2].amplitude = 7 + pseudoMarkerDataConfigs[0].frequency = 6 + pseudoMarkerDataConfigs[1].frequency = 9 + pseudoMarkerDataConfigs[2].frequency = 12 + self.pseudoMarkerDataConfigs = pseudoMarkerDataConfigs + self.pseudoMarkerConfig = pseudoMarkerConfig + self.sampleRate = sampleRate + self.channelCount = channelCount + markerInfo = pylsl.StreamInfo(pseudoMarkerConfig.markerName, pseudoMarkerConfig.markerType, 1, 0, 'string', 'Dev') + self.markerOutlet = pylsl.StreamOutlet(markerInfo) + info = pylsl.StreamInfo(dataStreamName, dataStreamType, self.channelCount, self.sampleRate, 'float32', 'Dev') + chns = info.desc().append_child("channels") + for label in range(self.channelCount): + ch = chns.append_child("channel") + ch.append_child_value("label", str(label+1)) + ch.append_child_value("type", dataStreamType) + self.outlet = pylsl.StreamOutlet(info) + self.last_update_time = time.time() + self.phase_offset = 0 + + def log_message(self, level='INFO', message = ""): + if self.log_queue is not None and isinstance(self.log_queue, type(multiprocessing.Queue)): + self.log_queue.put(f'PyBCI: [{level}] - {message}') + else: + self.logger.log(level, message) + + def _should_stop(self): + if isinstance(self.stop_signal, multiprocessing.synchronize.Event): + return self.stop_signal.is_set() + else: # boolean flag for threads + return self.stop_signal + + + def GeneratePseudoEMG(self,samplingRate, duration, noise_level, amplitude, frequency): + """ + Generate a pseudo EMG signal for a given gesture. + Arguments: + - sampling_rate: Number of samples per second + - duration: Duration of the signal in seconds + - noise_level: The amplitude of Gaussian noise to be added (default: 0.1) + - amplitude: The amplitude of the EMG signal (default: 1.0) + - frequency: The frequency of the EMG signal in Hz (default: 10.0) + Returns: + - emg_signal: The generated pseudo EMG signal as a 2D numpy array with shape (channels, samples) + """ + num_samples = int(samplingRate * duration) + # Initialize the EMG signal array + emg_signal = np.zeros((num_samples, self.channelCount)) + times = np.linspace(0, duration, num_samples) + # Generate the pseudo EMG signal based on the marker settings + emg_channel = amplitude * np.sin(2 * np.pi * frequency * times + self.phase_offset)# * times) # Sinusoidal EMG signal + # Add Gaussian noise to the EMG signal + noise = np.random.normal(0, noise_level, num_samples) + emg_channel += noise + # Store the generated channel in the EMG signal array + self.phase_offset = (self.phase_offset + 2 * np.pi * frequency * (num_samples/samplingRate)) % (2*np.pi) + emg_channel = emg_channel[::-1] + for channel in range(self.channelCount): + emg_signal[:, channel] = emg_channel + return emg_signal + + + def update(self): + with self.lock: # Acquire the lock + if not (self.stop_signal.is_set() if self.is_multiprocessing else self.stop_signal): + current_time = time.time() + delta_time = current_time - self.last_update_time + if not self.markerQueue.empty(): + self.current_marker = self.markerQueue.get() + if self.current_marker != None: + if not self.markerQueue.empty(): + self.current_marker = self.markerQueue.get() + for i, command in enumerate(self.markerConfigStrings): + if self.current_marker == command: + total_samples_required = int(self.sampleRate * self.pseudoMarkerDataConfigs[i].duration) + num = self.GeneratePseudoEMG(self.sampleRate,delta_time, self.pseudoMarkerDataConfigs[i].noise_level, + self.pseudoMarkerDataConfigs[i].amplitude, self.pseudoMarkerDataConfigs[i].frequency) + # If this is the start of marker data generation, set the end time for this marker + self.samples_generated += len(num) + if self.samples_generated >= total_samples_required: + self.current_marker = None + self.samples_generated = 0 + else:# send baseline + num = self.GeneratePseudoEMG(self.sampleRate,delta_time, self.baselineConfig.noise_level, + self.baselineConfig.amplitude, self.baselineConfig.frequency) + self.outlet.push_chunk(num.tolist()) + self.last_update_time = current_time + + def StopStreaming(self): + if self.is_multiprocessing: + self.stop_signal.set() + else: # For threading + self.stop_signal = True + self.thread.join() # Wait for the thread to finish + + if self.pseudoMarkerConfig.autoplay: + self.marker_thread.join() # Wait for the marker thread to finish + self.log_message(Logger.INFO, " PseudoDevice - Stopped streaming.") + + + def BeginStreaming(self): + if self.is_multiprocessing: + # For multiprocessing, we assume the worker process is already running + self.stop_signal.clear() + else: + self.stop_signal = False + #else: # For threading + self.thread = threading.Thread(target=self._generate_signal) + self.thread.start() + if self.pseudoMarkerConfig.autoplay: + self.StartMarkers() + self.log_message(Logger.INFO, " PseudoDevice - Begin streaming.") + + def _generate_signal(self): + while not self._should_stop(): + start_time = time.time() + self.update() + sleep_duration = max(0, (1.0 / 10) - (start_time - time.time())) + time.sleep(sleep_duration) +# precise_sleep(sleep_duration) + + def _should_stop(self): + if self.is_multiprocessing: + return self.stop_signal.is_set() + else: + return self.stop_signal + + def StartMarkers(self): + self.marker_thread = threading.Thread(target=self._maker_timing) + self.marker_thread.start() + + def _maker_timing(self): + marker_iterations = 0 + baseline_iterations = 0 + while not (self.stop_signal.is_set() if self.is_multiprocessing else self.stop_signal): + if marker_iterations < self.pseudoMarkerConfig.number_marker_iterations: + for marker in self.markerConfigStrings: + self.markerOutlet.push_sample([marker]) + self.markerQueue.put(marker) # Put the marker into the queue + self.log_message(Logger.INFO," PseudoDevice - sending marker " + marker) + time.sleep(self.pseudoMarkerConfig.seconds_between_markers) + if baseline_iterations < self.pseudoMarkerConfig.num_baseline_markers: + self.markerOutlet.push_sample([self.pseudoMarkerConfig.baselineMarkerString]) + self.log_message(Logger.INFO," PseudoDevice - sending " + self.pseudoMarkerConfig.baselineMarkerString) + baseline_iterations += 1 + marker_iterations += 1 + time.sleep(self.pseudoMarkerConfig.seconds_between_baseline_marker) + if baseline_iterations < self.pseudoMarkerConfig.num_baseline_markers and marker_iterations < self.pseudoMarkerConfig.number_marker_iterations: + if self.pseudoMarkerConfig.repeat: + marker_iterations = 0 + baseline_iterations = 0 + else: + break + +''' +class PseudoMarkerConfig: + markerName = "PyBCIPsuedoMarkers" + markerType = "Markers" + repeat = True + num_baseline_markers = 10 + number_marker_iterations = 10 + seconds_between_markers = 5 + seconds_between_baseline_marker = 10 +''' \ No newline at end of file diff --git a/pybci/Utils/__pycache__/Classifier.cpython-311.pyc b/pybci/Utils/__pycache__/Classifier.cpython-311.pyc new file mode 100644 index 0000000..8a6a786 Binary files /dev/null and b/pybci/Utils/__pycache__/Classifier.cpython-311.pyc differ diff --git a/pybci/Utils/__pycache__/FeatureExtractor.cpython-311.pyc b/pybci/Utils/__pycache__/FeatureExtractor.cpython-311.pyc new file mode 100644 index 0000000..64d56e6 Binary files /dev/null and b/pybci/Utils/__pycache__/FeatureExtractor.cpython-311.pyc differ diff --git a/pybci/Utils/__pycache__/LSLScanner.cpython-311.pyc b/pybci/Utils/__pycache__/LSLScanner.cpython-311.pyc new file mode 100644 index 0000000..93f4929 Binary files /dev/null and b/pybci/Utils/__pycache__/LSLScanner.cpython-311.pyc differ diff --git a/pybci/Utils/__pycache__/Logger.cpython-311.pyc b/pybci/Utils/__pycache__/Logger.cpython-311.pyc new file mode 100644 index 0000000..c04e1ec Binary files /dev/null and b/pybci/Utils/__pycache__/Logger.cpython-311.pyc differ diff --git a/pybci/Utils/__pycache__/PseudoDevice.cpython-311.pyc b/pybci/Utils/__pycache__/PseudoDevice.cpython-311.pyc new file mode 100644 index 0000000..b4c6367 Binary files /dev/null and b/pybci/Utils/__pycache__/PseudoDevice.cpython-311.pyc differ diff --git a/pybci/Utils/__pycache__/__init__.cpython-311.pyc b/pybci/Utils/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..204c519 Binary files /dev/null and b/pybci/Utils/__pycache__/__init__.cpython-311.pyc differ diff --git a/pybci/__pycache__/__init__.cpython-311.pyc b/pybci/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..1bb7960 Binary files /dev/null and b/pybci/__pycache__/__init__.cpython-311.pyc differ diff --git a/pybci/__pycache__/pybci.cpython-311.pyc b/pybci/__pycache__/pybci.cpython-311.pyc new file mode 100644 index 0000000..2922c53 Binary files /dev/null and b/pybci/__pycache__/pybci.cpython-311.pyc differ diff --git a/pybci/version.py b/pybci/version.py index a6221b3..930d377 100644 --- a/pybci/version.py +++ b/pybci/version.py @@ -1 +1 @@ -__version__ = '1.0.2' +__version__ = '1.1.0-alpha'