-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
235 lines (186 loc) · 6.66 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from os import listdir
from os.path import isfile, join
import scipy
import pandas as pd
import numpy as np
from sklearn import preprocessing
import matplotlib.pyplot as plt
import seaborn as sns
def sliding_window_pd(
df,
ws=500,
overlap=250,
w_type="hann",
w_center=True,
print_stats=False
) -> list:
"""Applies the sliding window algorithm to the DataFrame rows.
Args:
df: The DataFrame with all the values that will be inserted to the
sliding window algorithm.
ws: The window size in number of samples.
overlap: The hop length in number of samples.
w_type: The windowing function.
w_center: If False, set the window labels as the right edge of the
window index. If True, set the window labels as the center of the
window index.
print_stats: Print statistical inferences from the process. Defaults
to False.
Returns:
A list of DataFrames each one corresponding to a produced window.
"""
counter = 0
windows_list = list()
# min_periods: Minimum number of observations in window required to have
# a value;
# For a window that is specified by an integer, min_periods will default
# to the size of the window.
for window in df.rolling(window=ws, step=overlap, min_periods=ws,
win_type=w_type, center=w_center):
if window[window.columns[0]].count() >= ws:
if print_stats:
print("Print Window:", counter)
print("Number of samples:", window[window.columns[0]].count())
windows_list.append(window)
counter += 1
if print_stats:
print("List number of window instances:", len(windows_list))
return windows_list
def apply_filter(
arr,
order=5,
wn=0.1,
filter_type="lowpass"
) -> np.ndarray:
"""Applies filter to the multi-axis signal.
Args:
arr: The initial NumPy signal array values.
order: The order of the filter.
wn: The critical frequency or frequencies.
filter_type: The type of filter. {‘lowpass’, ‘highpass’, ‘bandpass’,
‘bandstop’}
Returns:
NumPy Array with the filtered signal.
"""
fbd_filter = scipy.signal.butter(N=order, Wn=wn, btype=filter_type,
output="sos")
filtered_signal = scipy.signal.sosfiltfilt(sos=fbd_filter, x=arr, padlen=0)
return filtered_signal
def filter_instances(
instances_list,
order,
wn,
filter_type
) -> list:
"""Applies filter to a list of windows (each window is a DataFrame).
Args:
instances_list: List of DataFrames.
order: The order of the filter.
wn: The critical frequency or frequencies.
filter_type: The type of filter. {‘lowpass’, ‘highpass’, ‘bandpass’,
‘bandstop’}
Returns:
"""
filtered_instances_list = list()
for item in instances_list:
filtered_instance = item.apply(apply_filter,
args=(order, wn, filter_type)
)
filtered_instances_list.append(filtered_instance)
print("Number of filtered instances in the list:",
len(filtered_instances_list)
)
return filtered_instances_list
def flatten_instances_df(instances_list: list) -> pd.DataFrame:
"""Flattens each instance and create a DataFrame with the whole flattened
instances.
Args:
instances_list: The list of DataFrames to be flattened
Returns:
A DataFrame that includes the whole flattened DataFrames
"""
flattened_instances_list = list()
for item in instances_list:
instance = item.to_numpy().flatten()
flattened_instances_list.append(instance)
df = pd.DataFrame(flattened_instances_list)
return df
def df_rebase(
df: pd.DataFrame,
target_list: list,
ref_list: list
) -> pd.DataFrame:
"""Changes the order and name of DataFrame columns to the project's needs
for readability.
Args:
df: The pandas DataFrame.
order_list: List object that contains the proper order of the default
column names.
ref_list: List object that contains the renaming list based
on the project needs.
Returns:
A DataFrame with the new columns order and names.
"""
print("Initial columns:", list(df.columns))
if are_lists_equal(list(df.columns), ref_list):
pass
else:
if len(target_list) == len(ref_list):
# keep and re-order only the necessary columns of the initial DataFrame
df = df[target_list]
rename_dict = dict(zip(target_list, ref_list))
df = df.rename(columns=rename_dict) # rename the columns
else:
print("The length of the target list and the reference list is not equal.")
print("Processed columns:", list(df.columns))
return df
def rename_df_column_values(
np_array: np.ndarray,
y: list,
columns_names: tuple = ("acc_x", "acc_y", "acc_z")
):
"""Creates a DataFrame with a "y" label column and replaces the values of the y with the index
of the unique values of y.
Args:
np_array: 2D NumPy array.
y: List with the y labels
columns_names: List with the DF columns names.
Returns:
DataFrame with the multi-axes values and the target labels column.
"""
arr_y = np.array(y) # list to numpy array
unique_values_list = np.unique(arr_y) # unique list of values
df = pd.DataFrame(np_array, columns=columns_names)
df["y"] = y
# replace the row item value in the y column of the df, with its index in the unique list
for idx, x in enumerate(unique_values_list):
df["y"] = np.where(df["y"] == x, idx, df["y"])
return df
def are_lists_equal(
list1: list,
list2: list
) -> bool:
return set(list1) == set(list2)
def encode_labels(instances_list) -> np.ndarray:
"""Encodes target labels.
Args:
instances_list: List of instances to be encoded.
Returns:
The encoded array.
"""
le = preprocessing.LabelEncoder()
le.fit(instances_list)
instances_arr = le.transform(instances_list)
return instances_arr
def list_files_in_folder(folder_path) -> list:
"""Returns a list with the files in the folder.
Args:
folder_path:
Returns:
"""
files_list = list()
for f in listdir(folder_path):
if isfile(join(folder_path, f)):
if f.endswith(".csv"):
files_list.append(f)
return files_list