Skip to content

Commit

Permalink
pseudodevice - alpha
Browse files Browse the repository at this point in the history
pybci pseudodevice creation
  • Loading branch information
LMBooth committed Sep 2, 2023
1 parent 435c4d3 commit 050a23f
Show file tree
Hide file tree
Showing 56 changed files with 1,994 additions and 41 deletions.
13 changes: 13 additions & 0 deletions build/lib/pybci/Configuration/EpochSettings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
class GlobalEpochSettings:
splitCheck = True # checks whether or not subdivide epochs
tmin = 0 # time in seconds to capture samples before trigger
tmax = 1 # time in seconds to capture samples after trigger
windowLength = 0.5 # if splitcheck true - time in seconds to split epoch
windowOverlap = 0.5 #if splitcheck true percentage value > 0 and < 1, example if epoch has tmin of 0 and tmax of 1 with window
# length of 0.5 we have 1 epoch between t 0 and t0.5 another at 0.25 to 0.75, 0.5 to 1

# customWindowSettings should be dict with marker name and IndividualEpochSetting
class IndividualEpochSetting:
splitCheck = True # checks whether or not subdivide epochs
tmin = 0 # time in seconds to capture samples before trigger
tmax= 1 # time in seconds to capture samples after trigger
15 changes: 15 additions & 0 deletions build/lib/pybci/Configuration/FeatureSettings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class GeneralFeatureChoices:
psdBand = True
appr_entropy = False
perm_entropy = False
spec_entropy = False
svd_entropy = False
samp_entropy = False
rms = True
meanPSD = True
medianPSD = True
variance = True
meanAbs = True
waveformLength = False
zeroCross = False
slopeSignChange = False
Empty file.
3 changes: 3 additions & 0 deletions build/lib/pybci/Configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#from .EpochSettings import EpochSettings
#from .GeneralFeatureChoices import GeneralFeatureChoices

126 changes: 126 additions & 0 deletions build/lib/pybci/ThreadClasses/AsyncDataReceiverThread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import threading, time
from collections import deque
import itertools
from bisect import bisect_left
def slice_fifo_by_time(fifo, start_time, end_time):
"""Find the slice of fifo between start_time and end_time using binary search."""
# separate times and data for easier indexing
times, data = zip(*fifo)
# find the index of the first time that is not less than start_time
start_index = bisect_left(times, start_time)
# find the index of the first time that is not less than end_time
end_index = bisect_left(times, end_time)
# return the slice of data between start_index and end_index
return data[start_index:end_index]

class AsyncDataReceiverThread(threading.Thread):
"""Responsible for receiving data from accepted LSL outlet, slices samples based on tmin+tmax basis,
starts counter for received samples after marker is received in ReceiveMarker. Relies on timestamps to slice array,
suspected more computationally intensive then synchronous method.
"""
startCounting = False
currentMarker = ""
def __init__(self, closeEvent, trainTestEvent, dataQueueTrain,dataQueueTest, dataStreamInlet,
customEpochSettings, globalEpochSettings,devCount, streamChsDropDict = [], maxExpectedSampleRate=100):
# large maxExpectedSampleRate can incur marker drop and slow procesing times for slicing arrays
super().__init__()
self.trainTestEvent = trainTestEvent
self.closeEvent = closeEvent
self.dataQueueTrain = dataQueueTrain
self.dataQueueTest = dataQueueTest
self.dataStreamInlet = dataStreamInlet
self.customEpochSettings = customEpochSettings
self.globalEpochSettings = globalEpochSettings
self.streamChsDropDict = streamChsDropDict
self.sr = maxExpectedSampleRate
#self.dataType = dataStreamInlet.info().type()
self.devCount = devCount # used for tracking which device is sending data to feature extractor

def run(self):
posCount = 0
chCount = self.dataStreamInlet.info().channel_count()
maxTime = (self.globalEpochSettings.tmin + self.globalEpochSettings.tmax)
if len(self.customEpochSettings.keys())>0:
if max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings]) > maxTime:
maxTime = max([self.customEpochSettings[x].tmin + self.customEpochSettings[x].tmax for x in self.customEpochSettings])
fifoLength = int(self.sr*maxTime)
window_end_time = 0
dataFIFOs = [deque(maxlen=fifoLength) for ch in range(chCount - len(self.streamChsDropDict))]
while not self.closeEvent.is_set():
sample, timestamp = self.dataStreamInlet.pull_sample(timeout = 1)
if sample != None:
for index in sorted(self.streamChsDropDict, reverse=True):
del sample[index] # remove the desired channels from the sample
for i,fifo in enumerate(dataFIFOs):
fifo.append((timestamp, sample[i]))
if self.trainTestEvent.is_set(): # We're training!
if self.startCounting: # we received a marker
posCount += 1
if posCount >= self.desiredCount: # enough samples are in FIFO, chop up and put in dataqueue
if len(self.customEpochSettings.keys())>0: # custom marker received
if self.customEpochSettings[self.currentMarker].splitCheck: # slice epochs into overlapping time windows
window_length = self.customEpochSettings[self.currentMarker].windowLength
window_overlap = self.customEpochSettings[self.currentMarker].windowOverlap
window_start_time = self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmin
window_end_time = window_start_time + window_length
while window_end_time <= self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmax:
sliceDataFIFOs = [slice_fifo_by_time(fifo, window_start_time, window_end_time) for fifo in dataFIFOs]
self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount])
window_start_time += window_length * (1 - window_overlap)
window_end_time = window_start_time + window_length
else: # don't slice just take tmin to tmax time
start_time = self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmin
end_time = self.markerTimestamp + self.customEpochSettings[self.currentMarker].tmax
sliceDataFIFOs = [slice_fifo_by_time(fifo, start_time, end_time) for fifo in dataFIFOs]
self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount])
else:
if self.globalEpochSettings.splitCheck: # slice epochs in to overlapping time windows
window_length = self.globalEpochSettings.windowLength
window_overlap = self.globalEpochSettings.windowOverlap
window_start_time = self.markerTimestamp - self.globalEpochSettings.tmin
window_end_time = window_start_time + window_length
while window_end_time <= self.markerTimestamp + self.globalEpochSettings.tmax:
sliceDataFIFOs = [slice_fifo_by_time(fifo, window_start_time, window_end_time) for fifo in dataFIFOs]
self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount])
window_start_time += window_length * (1 - window_overlap)
window_end_time = window_start_time + window_length
self.startCounting = False
else: # don't slice just take tmin to tmax time
start_time = self.markerTimestamp + self.globalEpochSettings.tmin
end_time = self.markerTimestamp + self.globalEpochSettings.tmax
sliceDataFIFOs = [slice_fifo_by_time(fifo, start_time, end_time) for fifo in dataFIFOs]
self.dataQueueTrain.put([sliceDataFIFOs, self.currentMarker, self.sr, self.devCount])
# reset flags and counters
posCount = 0
self.startCounting = False
else: # in Test mode
if self.globalEpochSettings.splitCheck:
window_length = self.globalEpochSettings.windowLength
window_overlap = self.globalEpochSettings.windowOverlap
else:
window_length = self.globalEpochSettings.tmin+self.globalEpochSettings.tmax

if timestamp >= window_end_time:
#sliceDataFIFOs = [[data for time, data in fifo if window_end_time - window_length <= time < window_end_time] for fifo in dataFIFOs]
sliceDataFIFOs = [slice_fifo_by_time(fifo, window_end_time - window_length, window_end_time) for fifo in dataFIFOs]
self.dataQueueTest.put([sliceDataFIFOs, None, self.devCount])
#sliceDataFIFOs = [list(itertools.islice(d, fifoLength-window_samples, fifoLength)) for d in dataFIFOs]
if self.globalEpochSettings.splitCheck:
window_end_time += window_length * (1 - window_overlap)
else:
window_end_time = timestamp + window_length
else:
pass
# add levels of debug?

def ReceiveMarker(self, marker, timestamp): # timestamp will be used for non sample rate specific devices (pupil-labs gazedata)
if self.startCounting == False: # only one marker at a time allow, other in windowed timeframe ignored
self.currentMarker = marker
self.markerTimestamp = timestamp
if len(self.customEpochSettings.keys())>0: # custom marker received
if marker in self.customEpochSettings.keys():
self.desiredCount = int(self.customEpochSettings[marker].tmax * self.sr) # find number of samples after tmax to finish counting
self.startCounting = True
else: # no custom markers set, use global settings
self.desiredCount = int(self.globalEpochSettings.tmax * self.sr) # find number of samples after tmax to finish counting
self.startCounting = True
111 changes: 111 additions & 0 deletions build/lib/pybci/ThreadClasses/ClassifierThread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from ..Utils.Classifier import Classifier
from ..Utils.Logger import Logger
import queue,threading, time
import numpy as np

class ClassifierThread(threading.Thread):
features = np.array([])#[]
targets = np.array([])
mode = "train"
guess = " "
#epochCountsc = {}
def __init__(self, closeEvent,trainTestEvent, featureQueueTest,featureQueueTrain, classifierInfoQueue, classifierInfoRetrieveEvent,
classifierGuessMarkerQueue, classifierGuessMarkerEvent, queryFeaturesQueue, queryFeaturesEvent,
logger = Logger(Logger.INFO), numStreamDevices = 1,
minRequiredEpochs = 10, clf = None, model = None, torchModel = None):
super().__init__()
self.trainTestEvent = trainTestEvent # responsible for tolling between train and test mode
self.closeEvent = closeEvent # responsible for cosing threads
self.featureQueueTest = featureQueueTest # gets feature data from feature processing thread
self.featureQueueTrain = featureQueueTrain # gets feature data from feature processing thread
self.classifier = Classifier(clf = clf, model = model, torchModel = torchModel) # sets classifier class, if clf and model passed, defaults to clf and sklearn
self.minRequiredEpochs = minRequiredEpochs # the minimum number of epochs required for classifier attempt
self.classifierInfoRetrieveEvent = classifierInfoRetrieveEvent
self.classifierInfoQueue = classifierInfoQueue
self.classifierGuessMarkerQueue = classifierGuessMarkerQueue
self.classifierGuessMarkerEvent = classifierGuessMarkerEvent
self.queryFeaturesQueue = queryFeaturesQueue
self.queryFeaturesEvent = queryFeaturesEvent
self.numStreamDevices = numStreamDevices
self.logger = logger

def run(self):
epochCountsc={}
if self.numStreamDevices > 1:
tempdatatrain = {}
tempdatatest = {}
while not self.closeEvent.is_set():
if self.trainTestEvent.is_set(): # We're training!
if self.featureQueueTrain.empty():
if len(epochCountsc) > 1: # check if there is more then one test condition
minNumKeyEpochs = min([epochCountsc[key][1] for key in epochCountsc]) # check minimum viable number of training eochs have been obtained
if minNumKeyEpochs < self.minRequiredEpochs:
pass
else:
start = time.time()
self.classifier.TrainModel(self.features, self.targets)
if (self.logger.level == Logger.TIMING):
end = time.time()
self.logger.log(Logger.TIMING, f" classifier training time {end - start}")
if self.classifierGuessMarkerEvent.is_set():
self.classifierGuessMarkerQueue.put(self.guess)
else:
try:
featuresSingle, devCount, target, epochCountsc = self.featureQueueTrain.get_nowait() #[dataFIFOs, self.currentMarker, self.sr, self.dataType]
if self.numStreamDevices > 1: # Collects multiple data strems feature sets and synchronise here
tempdatatrain[devCount] = featuresSingle
if len(tempdatatrain) == self.numStreamDevices:
flattened_list = np.array([item for sublist in tempdatatrain.values() for item in sublist])
tempdatatrain = {}
self.targets = np.append(self.targets, [target], axis = 0)
#self.features = np.append(self.features, [flattened_list], axis = 0)
if self.features.shape[0] == 0:
self.features = self.features.reshape((0,) + flattened_list.shape)
self.features = np.append(self.features, [flattened_list], axis=0)
# need to check if all device data is captured, then flatten and append
else: # Only one device to collect from
if self.features.shape[0] == 0:
self.features = self.features.reshape((0,) + featuresSingle.shape)
self.targets = np.append(self.targets, [target], axis = 0)
self.features = np.append(self.features, [featuresSingle], axis = 0)
except queue.Empty:
pass
else: # We're testing!
try:
featuresSingle, devCount = self.featureQueueTest.get_nowait() #[dataFIFOs, self.currentMarker, self.sr, self.dataType]
if self.numStreamDevices > 1:
tempdatatest[devCount] = featuresSingle
if len(tempdatatest) == self.numStreamDevices:
flattened_list = []
flattened_list = np.array([item for sublist in tempdatatest.values() for item in sublist])
tempdatatest = {}
start = time.time()
self.guess = self.classifier.TestModel(flattened_list)
if (self.logger.level == Logger.TIMING):
end = time.time()
self.logger.log(Logger.TIMING, f" classifier testing time {end - start}")
else:
start = time.time()
self.guess = self.classifier.TestModel(featuresSingle)
if (self.logger.level == Logger.TIMING):
end = time.time()
self.logger.log(Logger.TIMING, f" classifier testing time {end - start}")
if self.classifierGuessMarkerEvent.is_set():
self.classifierGuessMarkerQueue.put(self.guess)
except queue.Empty:
pass
if self.classifierInfoRetrieveEvent.is_set():
a = self.classifier.accuracy
classdata = {
"clf":self.classifier.clf,
"model":self.classifier.model,
"torchModel":self.classifier.torchModel,
"accuracy":a
}
self.classifierInfoQueue.put(classdata)
if self.queryFeaturesEvent.is_set():
featureData = {
"features":self.features,
"targets":self.targets
}
self.queryFeaturesQueue.put(featureData)
Loading

0 comments on commit 050a23f

Please sign in to comment.