forked from jaakkopasanen/Impulcifer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
room_correction.py
390 lines (342 loc) · 15.1 KB
/
room_correction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# -*- coding: utf-8 -*-
import os
import re
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from autoeq.frequency_response import FrequencyResponse
from impulse_response import ImpulseResponse
from hrir import HRIR
from utils import sync_axes, save_fig_as_png, read_wav, get_ylim, config_fr_axis
from constants import SPEAKER_NAMES, SPEAKER_LIST_PATTERN, IR_ROOM_SPL, COLORS
def room_correction(
estimator,
dir_path,
target=None,
mic_calibration=None,
fr_combination_method='average',
specific_limit=20000,
generic_limit=1000,
plot=False):
"""Corrects room acoustics
Args:
estimator: ImpulseResponseEstimator
dir_path: Path to directory
target: Path to room target response CSV file
mic_calibration: Path to room measurement microphone calibration file
fr_combination_method: Method for combining generic room measurment frequency responses. "average" or
"conservative"
specific_limit: Upper limit in Hertz for equalization of specific room eq. 0 disables limit.
generic_limit: Upper limit in Hertz for equalization of generic room eq. 0 disables limit.
plot: Plot graphs?
Returns:
- Room Impulse Responses as HRIR or None
- Equalization frequency responses as dict of dicts (similar to HRIR) or None
"""
# Open files
target = open_room_target(estimator, dir_path, target=target)
mic_calibration = open_mic_calibration(estimator, dir_path, mic_calibration=mic_calibration)
rir = open_room_measurements(estimator, dir_path)
missing = [ch for ch in SPEAKER_NAMES if ch not in rir.irs]
room_fr = open_generic_room_measurement(
estimator,
dir_path,
mic_calibration,
target,
method=fr_combination_method,
limit=generic_limit,
plot=plot
)
if not len(rir.irs) and room_fr is None:
# No room recording files found
return None, None
frs = dict()
fr_axes = []
figs = None
if len(rir.irs):
# Crop heads and tails from room impulse responses
for speaker, pair in rir.irs.items():
for side, ir in pair.items():
ir.crop_head()
rir.crop_tails()
rir.write_wav(os.path.join(dir_path, 'room-responses.wav'))
if plot:
# Plot all but frequency response
plot_dir = os.path.join(dir_path, 'plots', 'room')
os.makedirs(plot_dir, exist_ok=True)
figs = rir.plot(plot_fr=False, close_plots=False)
# Create equalization frequency responses
reference_gain = None
for speaker, pair in rir.irs.items():
frs[speaker] = dict()
for side, ir in pair.items():
# Create frequency response
fr = ir.frequency_response()
if mic_calibration is not None:
# Calibrate frequency response
fr.raw -= mic_calibration.raw
# Sync gains
if reference_gain is None:
reference_gain = fr.center([100, 10000]) # Shifted (up) by this many dB
else:
fr.raw += reference_gain
# Adjust target level with the (negative) gain caused by speaker-ear distance in reverberant room
target_adjusted = target.copy()
target_adjusted.raw += IR_ROOM_SPL[speaker][side]
# Compensate with the adjusted room target
fr.compensate(target_adjusted, min_mean_error=False)
# Zero error above limit
if specific_limit > 0:
start = np.argmax(fr.frequency > specific_limit / 2)
end = np.argmax(fr.frequency > specific_limit)
mask = np.concatenate([
np.ones(start if start > 0 else 0),
signal.windows.hann(end - start),
np.zeros(len(fr.frequency) - end)
])
fr.error *= mask
# Add frequency response
frs[speaker][side] = fr
if plot:
file_path = os.path.join(dir_path, 'plots', 'room', f'{speaker}-{side}.png')
fr = fr.copy()
fr.smoothen_fractional_octave(window_size=1/3, treble_window_size=1/3)
_, fr_ax = ir.plot_fr(
fr=fr,
fig=figs[speaker][side],
ax=figs[speaker][side].get_axes()[4],
plot_raw=False,
plot_error=False,
plot_file_path=file_path,
fix_ylim=True
)
fr_axes.append(fr_ax)
if len(missing) > 0 and room_fr is not None:
# Use generic measurement for speakers that don't have specific measurements
for speaker in missing:
frs[speaker] = {'left': room_fr.copy(), 'right': room_fr.copy()}
if plot and figs is not None:
room_plots_dir = os.path.join(dir_path, 'plots', 'room')
os.makedirs(room_plots_dir, exist_ok=True)
# Sync FR plot axes
sync_axes(fr_axes)
# Save specific fR figures
for speaker, pair in figs.items():
for side, fig in pair.items():
save_fig_as_png(os.path.join(room_plots_dir, f'{speaker}-{side}.png'), fig)
plt.close(fig)
return rir, frs
def open_room_measurements(estimator, dir_path):
"""Opens speaker-ear specific room measurements.
Args:
estimator: ImpulseResponseEstimator instance
dir_path: Path to directory
Returns:
HRIR instance with the room measurements
"""
# Read room measurement files
rir = HRIR(estimator)
# room-BL,SL.wav, room-left-FL,FR.wav, room-right-FC.wav, etc...
pattern = rf'^room-{SPEAKER_LIST_PATTERN}(-(left|right))?\.wav$'
for i, file_name in enumerate([f for f in os.listdir(dir_path) if re.match(pattern, f)]):
# Read the speaker names from the file name into a list
speakers = re.search(SPEAKER_LIST_PATTERN, file_name)
if speakers is not None:
speakers = speakers[0].split(',')
# Form absolute path
file_path = os.path.join(dir_path, file_name)
# Read side if present
side = re.search(r'(left|right)', file_name)
if side is not None:
side = side[0]
# Read file
rir.open_recording(file_path, speakers, side=side)
return rir
def open_generic_room_measurement(estimator,
dir_path,
mic_calibration,
target,
method='average',
limit=1000,
plot=False):
"""Opens generic room measurment file
Args:
estimator: ImpulseResponseEstimator instance
dir_path: Path to directory
mic_calibration: Measurement microphone calibration FrequencyResponse
target: Room response target FrequencyResponse
method: Combination method. "average" or "conservative"
limit: Upper limit in Hertz for equalization. Gain will ramp down to 0 dB in the octave leading to this.
0 disables limit.
plot: Plot frequency response?
Returns:
Generic room measurement FrequencyResponse
"""
file_path = os.path.join(dir_path, 'room.wav')
if not os.path.isfile(file_path):
return None
# Read the file
fs, data = read_wav(file_path, expand=True)
if fs != estimator.fs:
raise ValueError(f'Sampling rate of "{file_path}" doesn\'t match!')
# Average frequency responses of all tracks of the generic room measurement file
irs = []
for track in data:
n_cols = int(round((len(track) / estimator.fs - 2) / (estimator.duration + 2)))
for i in range(n_cols):
# Starts at 2 seconds in the beginning plus previous sweeps and their tails
start = int(2 * estimator.fs + i * (2 * estimator.fs + len(estimator)))
# Ends at start plus one more (current) sweep
end = int(start + 2 * estimator.fs + len(estimator))
end = min(end, len(track))
# Select current sweep
sweep = track[start:end]
# Deconvolve as impulse response
ir = ImpulseResponse(estimator.estimate(sweep), estimator.fs, sweep)
# Crop harmonic distortion from the head
# Noise in the tail should not affect frequency response so it doesn't have to be cropped
ir.crop_head(head_ms=1)
irs.append(ir)
# Frequency response for the generic room measurement
room_fr = FrequencyResponse(
name='generic_room',
frequency=FrequencyResponse.generate_frequencies(f_min=10, f_max=estimator.fs / 2, f_step=1.01),
raw=0, error=0, target=target.raw
)
# Calculate and stack errors
raws = []
errors = []
for ir in irs:
fr = ir.frequency_response()
if mic_calibration is not None:
fr.raw -= mic_calibration.raw
fr.center([100, 10000])
room_fr.raw += fr.raw
raws.append(fr.copy())
fr.compensate(target, min_mean_error=True)
if method == 'conservative' and len(irs) > 1:
fr.smoothen_fractional_octave(window_size=1/3, treble_window_size=1/3)
errors.append(fr.error_smoothed)
else:
errors.append(fr.error)
room_fr.raw /= len(irs)
errors = np.vstack(errors)
if errors.shape[0] > 1:
# Combine errors
if method == 'conservative':
# Conservative error curve is zero everywhere else but on indexes where both have the same sign,
# at these indexes the smaller absolute value is selected.
# This ensures that no curve will be adjusted to the other side of zero
mask = np.mean(errors > 0, axis=0) # Average from boolean values per column
positive = mask == 1 # Mask for columns with only positive values
negative = mask == 0 # Mask for columns with only negative values
# Minimum value for columns with only positive values
room_fr.error[positive] = np.min(errors[:, positive], axis=0)
# Maximum value for columns with only negative values (minimum absolute value)
room_fr.error[negative] = np.max(errors[:, negative], axis=0)
# Smoothen out kinks
room_fr.smoothen_fractional_octave(window_size=1 / 6, treble_window_size=1 / 6)
room_fr.error = room_fr.error_smoothed.copy()
elif method == 'average':
room_fr.error = np.mean(errors, axis=0)
room_fr.smoothen_fractional_octave(window_size=1/3, treble_window_size=1/3)
else:
raise ValueError(
f'Invalid value "{method}" for method. Supported values are "conservative" and "average"')
else:
room_fr.error = errors[0, :]
room_fr.smoothen_fractional_octave(window_size=1 / 3, treble_window_size=1 / 3)
if limit > 0:
# Zero error above limit
start = np.argmax(room_fr.frequency > limit / 2)
end = np.argmax(room_fr.frequency > limit)
mask = np.concatenate([
np.ones(start if start > 0 else 0),
signal.windows.hann(end - start),
np.zeros(len(room_fr.frequency) - end)
])
room_fr.error *= mask
room_fr.error_smoothed *= mask
if plot:
# Create dir
room_plots_dir = os.path.join(dir_path, 'plots', 'room')
os.makedirs(room_plots_dir, exist_ok=True)
# Create generic FR plot
fr = room_fr.copy()
fr.name = 'Generic room measurement'
fr.raw = fr.smoothed.copy()
fr.error = fr.error_smoothed.copy()
# Create figure and axes
fig, ax = plt.subplots()
fig.set_size_inches(15, 9)
config_fr_axis(ax)
ax.set_title('Generic room measurement')
# Plot target, raw and error
ax.plot(fr.frequency, fr.target, color=COLORS['lightpurple'], linewidth=5, label='Target')
for raw in raws:
raw.smoothen_fractional_octave(window_size=1/3, treble_window_size=1/3)
ax.plot(raw.frequency, raw.smoothed, color='grey', linewidth=0.5)
ax.plot(fr.frequency, fr.raw, color=COLORS['blue'], label='Raw smoothed')
ax.plot(fr.frequency, fr.error, color=COLORS['red'], label='Error smoothed')
ax.legend()
# Set y limits
sl = np.logical_and(fr.frequency >= 20, fr.frequency <= 20000)
stack = np.vstack([
fr.raw[sl],
fr.error[sl],
fr.target[sl]
])
ax.set_ylim(get_ylim(stack, padding=0.1))
# Save FR figure
save_fig_as_png(os.path.join(room_plots_dir, 'room.png'), fig)
plt.close(fig)
return room_fr
def open_room_target(estimator, dir_path, target=None):
"""Opens room frequency response target file.
Args:
estimator: ImpulseResponseEstimator instance
dir_path: Path to directory
target: Path to explicitly given (if any) room response target file
Returns:
Room response target FrequencyResponse
"""
# Room target
if target is None:
target = os.path.join(dir_path, 'room-target.csv')
if os.path.isfile(target):
# File exists, create frequency response
target = FrequencyResponse.read_from_csv(target)
target.interpolate(f_step=1.01, f_min=10, f_max=estimator.fs / 2)
target.center()
else:
# No room target specified, use flat
target = FrequencyResponse(name='room-target')
target.raw = np.zeros(target.frequency.shape)
target.interpolate(f_step=1.01, f_min=10, f_max=estimator.fs / 2)
return target
def open_mic_calibration(estimator, dir_path, mic_calibration=None):
"""Opens room measurement microphone calibration file
Args:
estimator: ImpulseResponseEstimator instance
dir_path: Path to directory
mic_calibration: Path to explicitly given (if any) room measurement microphone calibration file
Returns:
Microphone calibration FrequencyResponse
"""
if mic_calibration is None:
# Room mic calibration file path not given, try csv first then txt
mic_calibration = os.path.join(dir_path, 'room-mic-calibration.csv')
if not os.path.isfile(mic_calibration):
mic_calibration = os.path.join(dir_path, 'room-mic-calibration.txt')
elif not os.path.isfile(mic_calibration):
# Room mic calibration file path given, but the file doesn't exist
raise FileNotFoundError(f'Room mic calibration file doesn\'t exist at "{mic_calibration}"')
if os.path.isfile(mic_calibration):
# File found, create frequency response
mic_calibration = FrequencyResponse.read_from_csv(mic_calibration)
mic_calibration.interpolate(f_step=1.01, f_min=10, f_max=estimator.fs / 2)
mic_calibration.center()
else:
# File not found, skip calibration
mic_calibration = None
return mic_calibration