Skip to content

Commit

Permalink
keep on truckin
Browse files Browse the repository at this point in the history
  • Loading branch information
mahdi committed Nov 24, 2023
1 parent f2794f6 commit 87c92fd
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 54 deletions.
76 changes: 47 additions & 29 deletions pyssp/levinson.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Chapter 5 algorithm implementations."""

import logging
from typing import NoReturn

import numpy as np
from numpy.typing import ArrayLike

logger = logging.getLogger()


def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]:
"""Recursively map a set of autocorrelations to a set of model parameters.
Expand All @@ -22,8 +26,8 @@ def rtoa(r: ArrayLike) -> tuple[np.ndarray, float]:
anT = np.conjugate(np.flipud(a))
a = an + gamma * np.concatenate((np.zeros(1), anT.ravel())).reshape(-1, 1)
epsilon = epsilon * (1 - np.abs(gamma)**2)
print(f"{gamma=},\n{a=},\n{epsilon=}\n")
return a, epsilon
logger.info(f"{gamma=},\n{a=},\n{epsilon=}\n")
return a.ravel(), epsilon.ravel()[0]


def gtoa(gamma: ArrayLike) -> ArrayLike:
Expand All @@ -40,13 +44,13 @@ def gtoa(gamma: ArrayLike) -> ArrayLike:
a = np.ones((1, 1))
_g = np.array(gamma)
p = len(_g)
for j in range(1, p):
for j in range(1, p + 1):
a = np.concatenate((a, np.zeros((1, 1))))
_a = a.copy()
af = np.conjugate(np.flipud(_a))
a = a + _g[j] * af
a = a + _g[j - 1] * af

return a
return a.ravel()


def atog(a: ArrayLike) -> ArrayLike:
Expand All @@ -70,23 +74,25 @@ def atog(a: ArrayLike) -> ArrayLike:
gamma[p - 2] = _a[p - 2]

for j in range(p - 2, 0, -1):
# print(f"{gamma=}, {_a=}")
# logger.info(f"{gamma=}, {_a=}")
ai1 = _a[:j].copy()
ai2 = _a[:j].copy()
af = np.flipud(np.conjugate(ai1))
# print(f"{ai1=}, {ai2=}, {af=}")
# logger.info(f"{ai1=}, {ai2=}, {af=}")
s1 = ai2 - gamma[j] * af
s2 = 1 - np.abs(gamma[j])**2
_a = np.divide(s1, s2)
# print(f"{s1=}, {s2=}, {_a=}")
# logger.info(f"{s1=}, {s2=}, {_a=}")
gamma[j - 1] = _a[j - 1]

return gamma
return gamma.ravel()


def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike:
"""Find the autocorrelation sequence from the reflection coefficients and the modeling error.
Also called the Inverse Levinson-Durbin Recursion.
Page 241, Figure 5.9.
"""
_g = np.array(gamma)
Expand All @@ -99,19 +105,18 @@ def gtor(gamma: ArrayLike, epsilon: None | float = None) -> ArrayLike:
aa0 = np.concatenate((aa, np.zeros((1, 1)))).reshape(-1, 1)
aaf = np.conjugate(np.flipud(aa1))
aa = aa0 + _g[j] * aaf
print(aa)
logger.info(aa)
rf = -np.fliplr(r) @ aa
print(rf)
print(rf.shape)
print(r)
print(r.shape)
print()
logger.info(rf)
logger.info(rf.shape)
logger.info(r)
logger.info(r.shape)
r = np.concatenate((r[0], rf[0])).reshape(1, -1)

if epsilon is not None:
r = r * epsilon / np.prod(1 - np.abs(gamma)**2)

return r
return r.ravel()


def ator(a: ArrayLike, b: float) -> ArrayLike:
Expand Down Expand Up @@ -147,27 +152,40 @@ def glev(r: ArrayLike, b: ArrayLike) -> ArrayLike:
x = np.array([_b[0] / _r[0]]).reshape(-1, 1)
epsilon = _r[0]
for j in range(1, p):
print(j)
print(f"{_r=}, {_r.shape=}")
logger.info(j)
logger.info(f"{_r=}, {_r.shape=}")
_r1 = np.transpose(np.array(_r[1:j + 1]))
print(f"{_r1=}, {_r1.shape=}")
print(f"{x=}, {x.shape=}")
print(f"{a=}, {a.shape=}")
logger.info(f"{_r1=}, {_r1.shape=}")
logger.info(f"{x=}, {x.shape=}")
logger.info(f"{a=}, {a.shape=}")
g = _r1 @ np.flipud(a)
print(f"{g=}, {g.shape=}")
gamma = -g / epsilon
print(f"{gamma=}, {gamma.shape=}")
logger.info(f"{g=}, {g.shape=}")
gamma = g * -1 / epsilon
logger.info(f"{gamma=}, {gamma.shape=}")
_a0 = np.concatenate([a, [[0]]])
_af = np.conjugate(np.flipud(_a0))
print(f"{_a0=}, {_a0.shape=}")
print(f"{_af=}, {_af.shape=}")
logger.info(f"{_a0=}, {_a0.shape=}")
logger.info(f"{_af=}, {_af.shape=}")
a = _a0 + gamma * _af
epsilon = epsilon * (1 - np.abs(gamma)**2)
print(f"{epsilon=}")
logger.info(f"{epsilon=}")
delta = _r1 @ np.flipud(x)
q = (_b[j] - delta[0, 0]) / epsilon
x = np.concatenate([x, [[0]]])
x = x + q * np.conjugate(np.flipud(a))
print()

return x

return x.ravel()


def shur_cohn_test() -> NoReturn:
"""Check stability of any rational filter."""
raise NotImplementedError()


def splitlev(r: ArrayLike, b: ArrayLike) -> ArrayLike:
"""Implement the split levinson recursion.
Table 5.7, Page 274.
"""
raise NotImplementedError()

Check failure on line 191 in pyssp/levinson.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W292)

pyssp/levinson.py:191:32: W292 No newline at end of file
2 changes: 1 addition & 1 deletion pyssp/modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def covm(x: ArrayLike, p: int)-> tuple[ArrayLike, float]:
X = convm(_x, p + 1)
Xq = X[p - 1:N - 1, :p].copy()
Xsol = np.linalg.lstsq(-Xq, X[p:N, 0], rcond=None)[0]
logger.warning(f"{Xsol=}")
logger.info(f"{Xsol=}")
a = np.hstack(([1], Xsol))
err = np.abs(X[p:N,0] @ X[p:N,] @ a)
return a, err.ravel()[0]
Expand Down
14 changes: 14 additions & 0 deletions tests/test_lattice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Test units for lattice implementations from chapter 7."""


from pyssp import lattice


def test_fcov():
pass

def test_mcov():
pass

def test_burg():
pass
85 changes: 77 additions & 8 deletions tests/test_levinson.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,93 @@
""""""
"""Test validation of levinson based recursion routines."""

import logging

import numpy as np
from pyssp.levinson import glev, gtor
from pyssp import levinson


logger = logging.getLogger(__name__)


def test_glev():
'''Example 5.3.1, Page 266'''
r = [4, 2, 1]
b = [9, 6, 12]

res = glev(r, b)
print(res)
expected = [2, -1, 3]

sol = levinson.glev(r, b)
logger.info(sol)

assert np.allclose(sol, expected)


def test_gtor():
def test_gtor() -> None:
'''Based on example 5.2.6'''
expected_rx = np.array([2, -1, -1/4, 1/8])

gamma = [1/2, 1/2, 1/2]
epsilon = 2 * (3 / 4)**3
res = gtor(gamma, epsilon)
true_results = np.array([2, -1, -1/4, 1/8])
print(res)
rx = levinson.gtor(gamma, epsilon)
assert np.allclose(rx, expected_rx)


def test_atog() -> None:

a = [1, 0.5, -0.1, -0.5]
expected_g = np.array([0.5, 0.2, -0.5])

gamma = levinson.atog(a)
logger.info(gamma)
logger.info(expected_g)

assert np.allclose(gamma, expected_g)


def test_rtog() -> None:
rx = [2, -1, -1/4, 1/8]
expected_g = np.array([0.5, 0.5, 0.5])

gamma = levinson.rtog(rx)
logger.info(gamma)
logger.info(expected_g)

assert np.allclose(gamma, expected_g)


def test_ator() -> None:
a = [1, 1, 7/8, 1/2]
epsilon = 2 * (3 / 4)**3
b = epsilon**2
expected_rx = np.array([2, -1, -1/4, 1/8])

rx = levinson.ator(a, b = b)
logger.info(rx)
logger.info(expected_rx)

assert np.array_equal(rx, expected_rx)


def test_gtoa() -> None:
gamma = [0.5, 0.2, -0.5]
expected_a = np.array([1, 0.5, -0.1, -0.5])

a = levinson.gtoa(gamma)
logger.info(a)
logger.info(expected_a)

assert np.allclose(a, expected_a)

def test_rtoa() -> None:
rx = np.array([2, -1, -1/4, 1/8])
expected_a = [1, 1, 7/8, 1/2]
expected_eps = 2 * (3 / 4)**3

a, eps = levinson.rtoa(rx)

logger.info(f"{a=}")
logger.info(f"{expected_a=}")
logger.info(f"{eps=}")
logger.info(f"{expected_eps=}")

assert np.allclose(a, expected_a) and np.allclose(eps, expected_eps)
32 changes: 16 additions & 16 deletions tests/test_modeling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ def test_pade() -> None:
expected_b = [1, 1.5, 0.75]

a, b = modeling.pade(x, p = 0, q = 2)
logger.warning(a)
logger.warning(b)
logger.info(a)
logger.info(b)

assert np.array_equal(a, expected_a) and np.array_equal(b, expected_b)

expected_a = [1, -0.5]
expected_b = [1, 1]

a, b = modeling.pade(x, p = 1, q = 1)
logger.warning(a)
logger.warning(b)
logger.info(a)
logger.info(b)

assert np.array_equal(a, expected_a) and np.array_equal(b, expected_b)

Expand All @@ -45,7 +45,7 @@ def test_prony():
xn[N:] = 0

res = modeling.prony(xn, p = 1, q = 1)
logger.warning(res)
logger.info(res)


def test_shanks():
Expand All @@ -68,29 +68,29 @@ def test_spike():
gn = np.array([-0.2, 0, 1])
h, err = modeling.spike(gn, 4, 11)
d = np.convolve(h, gn)
logger.warning(f"{h=}")
logger.warning(f"{err=}")
logger.warning(f"{d=}, {np.argmax(d)=}")
logger.info(f"{h=}")
logger.info(f"{err=}")
logger.info(f"{d=}, {np.argmax(d)=}")

def test_ipf(): ...

def test_acm():
x = np.ones(20)
x[1::2] = x[1::2] * -1
logger.warning(x)
logger.info(x)

a, err = modeling.acm(x, 2)
logging.warning(f"{a=}")
logging.warning(f"{err=}")
logger.info(f"{a=}")
logger.info(f"{err=}")

def test_covm():
x = np.ones(20)
x[1::2] = x[1::2] * -1
logger.warning(x)
logger.info(x)

a, err = modeling.covm(x, 1)
logging.warning(f"{a=}")
logging.warning(f"{err=}")
logger.info(f"{a=}")
logger.info(f"{err=}")


def test_durbin():
Expand All @@ -99,7 +99,7 @@ def test_durbin():

zeros, poles, _ = signal.tf2zpk([1], ap)
px = system.ARMA(p=poles, q=[1], N=64)
logger.warning(f"{px=}")
logger.info(f"{px=}")

rx = np.correlate(px, px, "same")
logger.warning(modeling.durbin(rx, p=4, q=0))
logger.info(modeling.durbin(rx, p=4, q=0))

0 comments on commit 87c92fd

Please sign in to comment.