Skip to content

Commit

Permalink
Improve the pad_audio function
Browse files Browse the repository at this point in the history
This function was the culprit of the error. Broke the function into
other helper functions to make the flow easier to follow
  • Loading branch information
mbsantiago committed Nov 10, 2024
1 parent 25e0a53 commit a4b22d6
Showing 1 changed file with 147 additions and 54 deletions.
201 changes: 147 additions & 54 deletions batdetect2/utils/audio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import numpy as np
import torch

from batdetect2.detector import parameters

from . import wavfile

__all__ = [
Expand All @@ -15,18 +17,42 @@
]


def time_to_x_coords(time_in_file, sampling_rate, fft_win_length, fft_overlap):
nfft = np.floor(fft_win_length * sampling_rate) # int() uses floor
noverlap = np.floor(fft_overlap * nfft)
return (time_in_file * sampling_rate - noverlap) / (nfft - noverlap)
def time_to_x_coords(
time_in_file: float,
samplerate: float = parameters.TARGET_SAMPLERATE_HZ,
window_duration: float = parameters.FFT_WIN_LENGTH_S,
window_overlap: float = parameters.FFT_OVERLAP,
) -> float:
nfft = np.floor(window_duration * samplerate) # int() uses floor
noverlap = np.floor(window_overlap * nfft)
return (time_in_file * samplerate - noverlap) / (nfft - noverlap)


def x_coords_to_time(
x_pos: int,
samplerate: float = parameters.TARGET_SAMPLERATE_HZ,
window_duration: float = parameters.FFT_WIN_LENGTH_S,
window_overlap: float = parameters.FFT_OVERLAP,
) -> float:
n_fft = np.floor(window_duration * samplerate)
n_overlap = np.floor(window_overlap * n_fft)
n_step = n_fft - n_overlap
return ((x_pos * n_step) + n_overlap) / samplerate
# return (1.0 - fft_overlap) * fft_win_length * (x_pos + 0.5) # 0.5 is for center of temporal window


# NOTE this is also defined in post_process
def x_coords_to_time(x_pos, sampling_rate, fft_win_length, fft_overlap):
nfft = np.floor(fft_win_length * sampling_rate)
noverlap = np.floor(fft_overlap * nfft)
return ((x_pos * (nfft - noverlap)) + noverlap) / sampling_rate
# return (1.0 - fft_overlap) * fft_win_length * (x_pos + 0.5) # 0.5 is for center of temporal window
def x_coord_to_sample(
x_pos: int,
samplerate: float = parameters.TARGET_SAMPLERATE_HZ,
window_duration: float = parameters.FFT_WIN_LENGTH_S,
window_overlap: float = parameters.FFT_OVERLAP,
resize_factor: float = parameters.RESIZE_FACTOR,
) -> int:
n_fft = np.floor(window_duration * samplerate)
n_overlap = np.floor(window_overlap * n_fft)
n_step = n_fft - n_overlap
x_pos = int(x_pos / resize_factor)
return int((x_pos * n_step) + n_overlap)


def generate_spectrogram(
Expand Down Expand Up @@ -184,55 +210,118 @@ def load_audio(
return sampling_rate, audio_raw


def compute_spectrogram_width(
length: int,
samplerate: int = parameters.TARGET_SAMPLERATE_HZ,
window_duration: float = parameters.FFT_WIN_LENGTH_S,
window_overlap: float = parameters.FFT_OVERLAP,
resize_factor: float = parameters.RESIZE_FACTOR,
) -> int:
n_fft = int(window_duration * samplerate)
n_overlap = int(window_overlap * n_fft)
n_step = n_fft - n_overlap
width = (length - n_overlap) // n_step
return int(width * resize_factor)


def pad_audio(
audio_raw,
fs,
ms,
overlap_perc,
resize_factor,
divide_factor,
fixed_width=None,
audio: np.ndarray,
samplerate: int = parameters.TARGET_SAMPLERATE_HZ,
window_duration: float = parameters.FFT_WIN_LENGTH_S,
window_overlap: float = parameters.FFT_OVERLAP,
resize_factor: float = parameters.RESIZE_FACTOR,
divide_factor: int = parameters.SPEC_DIVIDE_FACTOR,
fixed_width: Optional[int] = None,
):
# Adds zeros to the end of the raw data so that the generated sepctrogram
# will be evenly divisible by `divide_factor`
# Also deals with very short audio clips and fixed_width during training
"""Pad audio to be evenly divisible by `divide_factor`.
This function pads the audio signal with zeros to ensure that the
generated spectrogram length will be evenly divisible by `divide_factor`.
This is important for the model to work correctly.
This `divide_factor` comes from the model architecture as it downscales
the spectrogram by this factor, so the input must be divisible by this
integer number.
Parameters
----------
audio : np.ndarray
The audio signal.
samplerate : int
The sampling rate of the audio signal.
window_size : float
The window size in seconds used for the spectrogram computation.
window_overlap : float
The overlap between windows in the spectrogram computation.
resize_factor : float
This factor is used to resize the spectrogram after the STFT
computation. Default is 0.5 which means that the spectrogram will be
reduced by half. Important to take into account for the final size of
the spectrogram.
divide_factor : int
The factor by which the spectrogram will be divided.
fixed_width : int, optional
If provided, the audio will be padded or cut so that the resulting
spectrogram width will be equal to this value.
Returns
-------
np.ndarray
The padded audio signal.
"""
spec_width = compute_spectrogram_width(
audio.shape[0],
samplerate=samplerate,
window_duration=window_duration,
window_overlap=window_overlap,
resize_factor=resize_factor,
)

# This code could be clearer, clean up
nfft = int(ms * fs)
noverlap = int(overlap_perc * nfft)
step = nfft - noverlap
min_size = int(divide_factor * (1.0 / resize_factor))
spec_width = (audio_raw.shape[0] - noverlap) // step
spec_width_rs = spec_width * resize_factor

if fixed_width is not None and spec_width < fixed_width:
# too small
# used during training to ensure all the batches are the same size
diff = fixed_width * step + noverlap - audio_raw.shape[0]
audio_raw = np.hstack(
(audio_raw, np.zeros(diff, dtype=audio_raw.dtype))
if fixed_width:
target_samples = x_coord_to_sample(
fixed_width,
samplerate=samplerate,
window_duration=window_duration,
window_overlap=window_overlap,
resize_factor=resize_factor,
)

elif fixed_width is not None and spec_width > fixed_width:
# too big
# used during training to ensure all the batches are the same size
diff = fixed_width * step + noverlap - audio_raw.shape[0]
audio_raw = audio_raw[:diff]

elif (
spec_width_rs < min_size
or (np.floor(spec_width_rs) % divide_factor) != 0
):
# need to be at least min_size
div_amt = np.ceil(spec_width_rs / float(divide_factor))
div_amt = np.maximum(1, div_amt)
target_size = int(div_amt * divide_factor * (1.0 / resize_factor))
diff = target_size * step + noverlap - audio_raw.shape[0]
audio_raw = np.hstack(
(audio_raw, np.zeros(diff, dtype=audio_raw.dtype))
)
if spec_width < fixed_width:
# need to be at least min_size
diff = target_samples - audio.shape[0]
return np.hstack((audio, np.zeros(diff, dtype=audio.dtype)))

if spec_width > fixed_width:
return audio[:target_samples]

return audio_raw
return audio

min_width = int(divide_factor / resize_factor)

if spec_width < min_width:
target_samples = x_coord_to_sample(
min_width,
samplerate=samplerate,
window_duration=window_duration,
window_overlap=window_overlap,
resize_factor=resize_factor,
)
diff = target_samples - audio.shape[0]
return np.hstack((audio, np.zeros(diff, dtype=audio.dtype)))

if (spec_width % divide_factor) == 0:
return audio

target_width = int(np.ceil(spec_width / divide_factor)) * divide_factor
target_samples = x_coord_to_sample(
target_width,
samplerate=samplerate,
window_duration=window_duration,
window_overlap=window_overlap,
resize_factor=resize_factor,
)
diff = target_samples - audio.shape[0]
return np.hstack((audio, np.zeros(diff, dtype=audio.dtype)))


def gen_mag_spectrogram(x, fs, ms, overlap_perc):
Expand All @@ -247,7 +336,11 @@ def gen_mag_spectrogram(x, fs, ms, overlap_perc):

# compute spec
spec, _ = librosa.core.spectrum._spectrogram(
y=x, power=1, n_fft=nfft, hop_length=step, center=False
y=x,
power=1,
n_fft=nfft,
hop_length=step,
center=False,
)

# remove DC component and flip vertical orientation
Expand Down

0 comments on commit a4b22d6

Please sign in to comment.