forked from kahst/BirdNET-Analyzer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
audio.py
248 lines (183 loc) · 7.29 KB
/
audio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Module containing audio helper functions.
"""
import numpy as np
import config as cfg
RANDOM = np.random.RandomState(cfg.RANDOM_SEED)
def openAudioFile(path: str, sample_rate=48000, offset=0.0, duration=None, fmin=None, fmax=None):
"""Open an audio file.
Opens an audio file with librosa and the given settings.
Args:
path: Path to the audio file.
sample_rate: The sample rate at which the file should be processed.
offset: The starting offset.
duration: Maximum duration of the loaded content.
Returns:
Returns the audio time series and the sampling rate.
"""
# Open file with librosa (uses ffmpeg or libav)
import librosa
sig, rate = librosa.load(path, sr=sample_rate, offset=offset, duration=duration, mono=True, res_type="kaiser_fast")
# Bandpass filter
if fmin != None and fmax != None:
sig = bandpass(sig, rate, fmin, fmax)
#sig = bandpassKaiserFIR(sig, rate, fmin, fmax)
return sig, rate
def getAudioFileLength(path, sample_rate=48000):
# Open file with librosa (uses ffmpeg or libav)
import librosa
return int(librosa.get_duration(filename=path, sr=sample_rate))
def get_sample_rate(path: str):
import librosa
return librosa.get_samplerate(path)
def saveSignal(sig, fname: str):
"""Saves a signal to file.
Args:
sig: The signal to be saved.
fname: The file path.
"""
import soundfile as sf
sf.write(fname, sig, 48000, "PCM_16")
def pad(sig, seconds, srate, amount=None):
"""Creates noise.
Creates a noise vector with the given shape.
Args:
sig: The original audio signal.
shape: Shape of the noise.
amount: The noise intensity.
Returns:
An numpy array of noise with the given shape.
"""
target_len = int(srate * seconds)
if len(sig) < target_len:
noise_shape = target_len - len(sig)
if not cfg.USE_NOISE:
noise = np.zeros(noise_shape, dtype=sig.dtype)
else:
# Random noise intensity
if amount is None:
amount = RANDOM.uniform(0.1, 0.5)
# Create Gaussian noise
try:
noise = RANDOM.normal(min(sig) * amount, max(sig) * amount, noise_shape).astype(sig.dtype)
except:
noise = np.zeros(noise_shape, dtype=sig.dtype)
return np.concatenate((sig, noise))
return sig
def splitSignal(sig, rate, seconds, overlap, minlen):
"""Split signal with overlap.
Args:
sig: The original signal to be split.
rate: The sampling rate.
seconds: The duration of a segment.
overlap: The overlapping seconds of segments.
minlen: Minimum length of a split.
Returns:
A list of splits.
"""
# Split signal to chunks of duration with overlap, whereas each chunk still has minimum duration of signal
if rate is None or rate <= 0:
rate = cfg.SAMPLE_RATE
if seconds is None or seconds <= 0:
seconds = cfg.SIG_LENGTH
if overlap is None or overlap < 0 or overlap >= seconds:
overlap = cfg.SIG_OVERLAP
if minlen is None or minlen <= 0 or minlen > seconds:
minlen = cfg.SIG_MINLEN
# Number of frames per chunk, per step and per minimum signal
chunksize = int(rate * seconds)
stepsize = int(rate * (seconds - overlap))
minsize = int(rate * minlen)
# Start of last chunk
lastchunkpos = int((sig.size - chunksize + stepsize - 1) / stepsize) * stepsize
# Make sure at least one chunk is returned
if lastchunkpos < 0:
lastchunkpos = 0
# Omit last chunk if minimum signal duration is underrun
elif sig.size - lastchunkpos < minsize:
lastchunkpos = lastchunkpos - stepsize
# Append noise or empty signal of chunk duration, so all splits have desired length
if not cfg.USE_NOISE:
noise = np.zeros(shape=chunksize, dtype=sig.dtype)
else:
# Random noise intensity
if amount is None:
amount = RANDOM.uniform(0.1, 0.5)
# Create Gaussian noise
try:
noise = RANDOM.normal(loc=min(sig) * amount, scale=max(sig) * amount, size=chunksize).astype(sig.dtype)
except:
noise = np.zeros(shape=chunksize, dtype=sig.dtype)
data = np.concatenate((sig, noise))
# Split signal with overlap
sig_splits = []
for i in range(0, 1 + lastchunkpos, stepsize):
sig_splits.append(data[i:i + chunksize])
return sig_splits
def cropCenter(sig, rate, seconds):
"""Crop signal to center.
Args:
sig: The original signal.
rate: The sampling rate.
seconds: The length of the signal.
"""
if len(sig) > int(seconds * rate):
start = int((len(sig) - int(seconds * rate)) / 2)
end = start + int(seconds * rate)
sig = sig[start:end]
# Pad with noise
else:
sig = pad(sig, seconds, rate, 0.5)
return sig
def bandpass(sig, rate, fmin, fmax, order=5):
# Check if we have to bandpass at all
if fmin == cfg.SIG_FMIN and fmax == cfg.SIG_FMAX or fmin > fmax:
return sig
from scipy.signal import butter, lfilter
nyquist = 0.5 * rate
# Highpass?
if fmin > cfg.SIG_FMIN and fmax == cfg.SIG_FMAX:
low = fmin / nyquist
b, a = butter(order, low, btype="high")
sig = lfilter(b, a, sig)
# Lowpass?
elif fmin == cfg.SIG_FMIN and fmax < cfg.SIG_FMAX:
high = fmax / nyquist
b, a = butter(order, high, btype="low")
sig = lfilter(b, a, sig)
# Bandpass?
elif fmin > cfg.SIG_FMIN and fmax < cfg.SIG_FMAX:
low = fmin / nyquist
high = fmax / nyquist
b, a = butter(order, [low, high], btype="band")
sig = lfilter(b, a, sig)
return sig.astype("float32")
# Raven is using Kaiser window FIR filter, so we try to emulate it.
# Raven uses the Window method for FIR filter design.
# A Kaiser window is used with a default transition bandwidth of 0.02 times
# the Nyquist frequency and a default stop band attenuation of 100 dB.
# For a complete description of this method, see Discrete-Time Signal Processing
# (Second Edition), by Alan Oppenheim, Ronald Schafer, and John Buck, Prentice Hall 1998, pp. 474-476.
def bandpassKaiserFIR(sig, rate, fmin, fmax, width=0.02, stopband_attenuation_db=100):
# Check if we have to bandpass at all
if fmin == cfg.SIG_FMIN and fmax == cfg.SIG_FMAX or fmin > fmax:
return sig
from scipy.signal import kaiserord, firwin, lfilter
nyquist = 0.5 * rate
# Calculate the order and Kaiser parameter for the desired specifications.
N, beta = kaiserord(stopband_attenuation_db, width)
# Highpass?
if fmin > cfg.SIG_FMIN and fmax == cfg.SIG_FMAX:
low = fmin / nyquist
taps = firwin(N, low, window=('kaiser', beta), pass_zero=False)
# Lowpass?
elif fmin == cfg.SIG_FMIN and fmax < cfg.SIG_FMAX:
high = fmax / nyquist
taps = firwin(N, high, window=('kaiser', beta), pass_zero=True)
# Bandpass?
elif fmin > cfg.SIG_FMIN and fmax < cfg.SIG_FMAX:
low = fmin / nyquist
high = fmax / nyquist
taps = firwin(N, [low, high], window=('kaiser', beta), pass_zero=False)
# Apply the filter to the signal.
sig = lfilter(taps, 1.0, sig)
return sig.astype("float32")