-
Notifications
You must be signed in to change notification settings - Fork 1
/
preProcessing.py
106 lines (80 loc) · 3.2 KB
/
preProcessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 22 22:33:37 2020
@author: aguasharo
"""
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
import math
def butter_lowpass_filter(data, fs, order):
# Get the filter coefficients
b, a = signal.butter(order, fs, 'low', analog = False)
y = signal.filtfilt(b, a, data)
return y
def preProcessEMGSegment(EMGsegment_in):
# This function to apply a filter
EMG = max(EMGsegment_in)
if EMG > 1:
EMGnormalized = EMGsegment_in/128
else:
EMGnormalized = EMGsegment_in
EMGrectified = abs(EMGnormalized)
EMGsegment_out = butter_lowpass_filter(EMGrectified, 0.1, 5)
return EMGsegment_out
def detectMuscleActivity(emg_sum):
# This function segments in a EMG the region corresponding to a muscle
# contraction. The indices idxStart and idxEnd correspond to the begining
# and the end of such a region
# Sampling frequency of the EMG
fs = 200
minWindowLength_Segmentation = 100 # Minimum length of the segmented region
hammingWdw_Length = np.hamming(25) # Window length
numSamples_lapBetweenWdws = 10 # Overlap between 2 consecutive windows
threshForSum_AlongFreqInSpec = 0.86
[s, f, t, im] = plt.specgram(emg_sum, NFFT = 25, Fs = fs, window = hammingWdw_Length, noverlap = numSamples_lapBetweenWdws, mode = 'magnitude', pad_to = 50)
# Summing the spectrogram along the frequencies
sumAlongFreq = [sum(x) for x in zip(*s)]
greaterThanThresh = []
# Thresholding the sum sumAlongFreq
for item in sumAlongFreq:
if item >= threshForSum_AlongFreqInSpec:
greaterThanThresh.append(1)
else:
greaterThanThresh.append(0)
greaterThanThresh.insert(0,0)
greaterThanThresh.append(0)
diffGreaterThanThresh = abs(np.diff(greaterThanThresh))
if diffGreaterThanThresh[-1] == 1:
diffGreaterThanThresh[-2] = 1;
x = diffGreaterThanThresh[0:-1];
findNumber = lambda x, xs: [i for (y, i) in zip(xs, range(len(xs))) if x == y]
idxNonZero = findNumber(1,x)
numIdxNonZero = len(idxNonZero)
idx_Samples = np.floor(fs*t)
# Finding the indices of the start and the end of a muscle contraction
if numIdxNonZero == 0:
idx_Start = 1
idx_End = len(emg_sum)
elif numIdxNonZero == 1:
idx_Start = idx_Samples[idxNonZero]
idx_End = len(emg_sum)
else:
idx_Start = idx_Samples[idxNonZero[0]]
idx_End = idx_Samples[idxNonZero[-1]-1]
# Adding a head and a tail to the segmentation
numExtraSamples = 25
idx_Start = max(1,idx_Start - numExtraSamples)
idx_End = min(len(emg_sum), idx_End + numExtraSamples)
if (idx_End - idx_Start) < minWindowLength_Segmentation:
idx_Start = 1
idx_End = len(emg_sum)
return int(idx_Start), int(idx_End)
def EMG_segment(train_filtered_X):
# This function return a segment with corresponding to a muscle
# contraction
df_sum = train_filtered_X.sum(axis=1)
idx_Start, idx_End = detectMuscleActivity(df_sum)
df_seg = train_filtered_X.iloc[idx_Start:idx_End]
return df_seg