Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

23 on the fly angle #25

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 46 additions & 61 deletions uasevent/environment_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ def __init__(

# calculate spherical co-ordinates per frame
# for filters and spatialisation
position_per_frame = np.lib.stride_tricks.sliding_window_view(
flightpath, self.frame_len, 1)[:, ::self.hop_len].mean(2)
self.theta_per_frame, self.phi_per_frame, self.r_per_frame = \
utils.cart_to_sph(position_per_frame)
self.position_per_frame = utils.cart_to_sph(
np.lib.stride_tricks.sliding_window_view(
flightpath, self.frame_len, 1)[:, ::self.hop_len].mean(2)
).T
# self.theta_per_frame, self.phi_per_frame, self.r_per_frame = \
# utils.cart_to_sph(position_per_frame)

def apply_doppler(self, x):
# init output array and initial read position
Expand All @@ -149,53 +151,35 @@ def apply_doppler(self, x):
def apply_amp_env(self, x):
return x * self.amp_env

def absorbtion_effects(self, x):
def filter(self, x):
'''Apply frequency-dependent atmospheric and ground absorption'''

# neat trick to get windowed frames
x_windowed = np.lib.stride_tricks.sliding_window_view(
x, self.frame_len)[::self.hop_len] * np.hanning(self.frame_len)

atmos_filter = AtmosphericAbsorptionFilter()
# init ground filter if surface is set (reflected path)
if self.reflection_surface is not None:
refl_filter = GroundReflectionFilter(self.reflection_surface)

x_out = np.zeros(len(x))
for i, (r, angle, frame) in enumerate(
zip(self.r_per_frame, self.phi_per_frame, x_windowed)):

# apply atmospheric absorption filter
frame = atmos_filter.filter(frame, r)
# list of filter stages
filters = []

# apply ground filter if set
if self.reflection_surface is not None:
angle = np.round(np.rad2deg(np.pi - angle)) # reflection angle
frame = refl_filter.filter(frame, angle)

frame_index = i * self.hop_len
x_out[frame_index:frame_index + self.frame_len] += frame
# add atmospheric absorption
filters.append(AtmosphericAbsorptionFilter())

return x_out
# add ground filter if surface is set (reflected path)
if self.reflection_surface is not None:
filters.append(GroundReflectionFilter(self.reflection_surface))

def spatialise(self, x):
# do not spatialise if order not set (mono output)
if self.N is None:
return x

x_windowed = np.lib.stride_tricks.sliding_window_view(
x, self.frame_len)[::self.hop_len] * np.hanning(self.frame_len)
# add spatialiser (must be last as adds channels)
filters.append(SpatialFilter(self.N))

x_out = np.zeros((len(x), (self.N+1)**2))
for i, (position, frame) in enumerate(
zip(self.position_per_frame, x_windowed)):

for i, (theta, phi, frame) \
in enumerate(zip(self.theta_per_frame,
self.phi_per_frame,
x_windowed)):
for filter in filters:
frame = filter.filter(frame, position)

frame_index = i * self.hop_len
Y_mn = utils.Y_array(self.N, np.array([theta]), np.array([phi]))
x_out[frame_index:frame_index + self.frame_len] += (frame * Y_mn).T
x_out[frame_index:frame_index + self.frame_len] += frame

return x_out

Expand All @@ -204,15 +188,23 @@ def process(self, x):
raise ValueError('Input signal shorter than path to be rendered')

return \
self.spatialise(
self.absorbtion_effects(
self.apply_amp_env(
self.apply_doppler(x)
)
self.filter(
self.apply_amp_env(
self.apply_doppler(x)
)
)


class SpatialFilter():
def __init__(self, N=2):
self.N = N

def filter(self, x, position):
theta, phi, _ = position
Y_mn = utils.Y_array(self.N, np.array([theta]), np.array([phi]))
return (x * Y_mn).T


class GroundReflectionFilter():
def __init__(
self,
Expand All @@ -230,7 +222,6 @@ def __init__(
self.material = material
self.fs = fs
self.n_taps = n_taps
self.filterbank = self._compute_filterbank()

@property
def material(self):
Expand All @@ -252,24 +243,17 @@ def _Z(self):
X = -11.9 * (self.freqs / self._sigma) ** -0.73
return (R + 1j*X) * self.Z_0

def _R(self):
def _R(self, angle):
return np.real(
np.array([
(self._Z() * np.cos(p) - self.Z_0) /
(self._Z() * np.cos(p) + self.Z_0)
for p in self.phi
])
).squeeze()

def _compute_filterbank(self):
return np.array([
signal.firls(self.n_taps, self.freqs, abs(r), fs=self.fs)
for r in self._R()
])
(self._Z() * np.cos(angle) - self.Z_0)
/ (self._Z() * np.cos(angle) + self.Z_0)
)

def filter(self, x, angle):
angle = round(angle)
h = self.filterbank[angle - 1]
def filter(self, x, position):
_, phi, _ = position
phi = np.pi - phi
h = signal.firls(self.n_taps, self.freqs,
utils.rectify(self._R(phi)), fs=self.fs)
return signal.fftconvolve(x, h, 'same')


Expand Down Expand Up @@ -324,8 +308,9 @@ def alpha(self, freqs, temp=20, humidity=80, pressure=101.325):
alpha = freqs**2 * (xc + T_rel**(-5/2) * (xo + xn))
return 1 - alpha

def filter(self, x, distance):
def filter(self, x, position):
_, _, r = position
h = signal.firls(self.n_taps, self.freqs,
self.attenuation**distance,
self.attenuation**r,
fs=self.fs)
return signal.fftconvolve(x, h, 'same')
4 changes: 4 additions & 0 deletions uasevent/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ def SN3D(m, n):
)
)
)


def rectify(x):
return (np.abs(x) + x) / 2
Loading