diff --git a/uasevent/environment_model.py b/uasevent/environment_model.py index 539741a..1226c7c 100644 --- a/uasevent/environment_model.py +++ b/uasevent/environment_model.py @@ -125,10 +125,12 @@ def __init__( # calculate spherical co-ordinates per frame # for filters and spatialisation - position_per_frame = np.lib.stride_tricks.sliding_window_view( - flightpath, self.frame_len, 1)[:, ::self.hop_len].mean(2) - self.theta_per_frame, self.phi_per_frame, self.r_per_frame = \ - utils.cart_to_sph(position_per_frame) + self.position_per_frame = utils.cart_to_sph( + np.lib.stride_tricks.sliding_window_view( + flightpath, self.frame_len, 1)[:, ::self.hop_len].mean(2) + ).T + # self.theta_per_frame, self.phi_per_frame, self.r_per_frame = \ + # utils.cart_to_sph(position_per_frame) def apply_doppler(self, x): # init output array and initial read position @@ -149,53 +151,35 @@ def apply_doppler(self, x): def apply_amp_env(self, x): return x * self.amp_env - def absorbtion_effects(self, x): + def filter(self, x): '''Apply frequency-dependent atmospheric and ground absorption''' # neat trick to get windowed frames x_windowed = np.lib.stride_tricks.sliding_window_view( x, self.frame_len)[::self.hop_len] * np.hanning(self.frame_len) - atmos_filter = AtmosphericAbsorptionFilter() - # init ground filter if surface is set (reflected path) - if self.reflection_surface is not None: - refl_filter = GroundReflectionFilter(self.reflection_surface) - - x_out = np.zeros(len(x)) - for i, (r, angle, frame) in enumerate( - zip(self.r_per_frame, self.phi_per_frame, x_windowed)): - - # apply atmospheric absorption filter - frame = atmos_filter.filter(frame, r) + # list of filter stages + filters = [] - # apply ground filter if set - if self.reflection_surface is not None: - angle = np.round(np.rad2deg(np.pi - angle)) # reflection angle - frame = refl_filter.filter(frame, angle) - - frame_index = i * self.hop_len - x_out[frame_index:frame_index + self.frame_len] += frame + # add atmospheric absorption + filters.append(AtmosphericAbsorptionFilter()) - return x_out + # add ground filter if surface is set (reflected path) + if self.reflection_surface is not None: + filters.append(GroundReflectionFilter(self.reflection_surface)) - def spatialise(self, x): - # do not spatialise if order not set (mono output) - if self.N is None: - return x - - x_windowed = np.lib.stride_tricks.sliding_window_view( - x, self.frame_len)[::self.hop_len] * np.hanning(self.frame_len) + # add spatialiser (must be last as adds channels) + filters.append(SpatialFilter(self.N)) x_out = np.zeros((len(x), (self.N+1)**2)) + for i, (position, frame) in enumerate( + zip(self.position_per_frame, x_windowed)): - for i, (theta, phi, frame) \ - in enumerate(zip(self.theta_per_frame, - self.phi_per_frame, - x_windowed)): + for filter in filters: + frame = filter.filter(frame, position) frame_index = i * self.hop_len - Y_mn = utils.Y_array(self.N, np.array([theta]), np.array([phi])) - x_out[frame_index:frame_index + self.frame_len] += (frame * Y_mn).T + x_out[frame_index:frame_index + self.frame_len] += frame return x_out @@ -204,15 +188,23 @@ def process(self, x): raise ValueError('Input signal shorter than path to be rendered') return \ - self.spatialise( - self.absorbtion_effects( - self.apply_amp_env( - self.apply_doppler(x) - ) + self.filter( + self.apply_amp_env( + self.apply_doppler(x) ) ) +class SpatialFilter(): + def __init__(self, N=2): + self.N = N + + def filter(self, x, position): + theta, phi, _ = position + Y_mn = utils.Y_array(self.N, np.array([theta]), np.array([phi])) + return (x * Y_mn).T + + class GroundReflectionFilter(): def __init__( self, @@ -230,7 +222,6 @@ def __init__( self.material = material self.fs = fs self.n_taps = n_taps - self.filterbank = self._compute_filterbank() @property def material(self): @@ -252,24 +243,17 @@ def _Z(self): X = -11.9 * (self.freqs / self._sigma) ** -0.73 return (R + 1j*X) * self.Z_0 - def _R(self): + def _R(self, angle): return np.real( - np.array([ - (self._Z() * np.cos(p) - self.Z_0) / - (self._Z() * np.cos(p) + self.Z_0) - for p in self.phi - ]) - ).squeeze() - - def _compute_filterbank(self): - return np.array([ - signal.firls(self.n_taps, self.freqs, abs(r), fs=self.fs) - for r in self._R() - ]) + (self._Z() * np.cos(angle) - self.Z_0) + / (self._Z() * np.cos(angle) + self.Z_0) + ) - def filter(self, x, angle): - angle = round(angle) - h = self.filterbank[angle - 1] + def filter(self, x, position): + _, phi, _ = position + phi = np.pi - phi + h = signal.firls(self.n_taps, self.freqs, + utils.rectify(self._R(phi)), fs=self.fs) return signal.fftconvolve(x, h, 'same') @@ -324,8 +308,9 @@ def alpha(self, freqs, temp=20, humidity=80, pressure=101.325): alpha = freqs**2 * (xc + T_rel**(-5/2) * (xo + xn)) return 1 - alpha - def filter(self, x, distance): + def filter(self, x, position): + _, _, r = position h = signal.firls(self.n_taps, self.freqs, - self.attenuation**distance, + self.attenuation**r, fs=self.fs) return signal.fftconvolve(x, h, 'same') diff --git a/uasevent/utils.py b/uasevent/utils.py index f7f68fc..550e760 100644 --- a/uasevent/utils.py +++ b/uasevent/utils.py @@ -106,3 +106,7 @@ def SN3D(m, n): ) ) ) + + +def rectify(x): + return (np.abs(x) + x) / 2