Skip to content

Commit

Permalink
add backward belief propagation (#217)
Browse files Browse the repository at this point in the history
Add 'backward' arg for 'prop' and subsequent called functions to apply
backward propagation for predict and update steps (information form not
tested for backward true)

Co-authored-by: Jared Strader <[email protected]>
  • Loading branch information
jaredstrader and Jared Strader authored Oct 30, 2023
1 parent c1c4ea9 commit a9fcde7
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 25 deletions.
83 changes: 81 additions & 2 deletions experimental/beacon_sim/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
np.set_printoptions(linewidth=200)


def add_xy_to_axes(x, y, ax, name='', xlabel='', ylabel='', use_log=False):
ax.semilogy(x, y, marker='o', label=name)
def add_xy_to_axes(x, y, ax, name='', xlabel='', ylabel='', use_log=True):
if use_log:
ax.semilogy(x, y, marker='o', label=name)
else:
ax.plot(x, y, marker='o', label=name)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.grid(True)
Expand Down Expand Up @@ -185,6 +188,82 @@ def sens(
print('||P_ekf - P_tf|| =', diff_ekf_tf)
print('||P_scatter - P_expected|| =', diff_expected)

@main.command(name="back", help='Test backwards belief update.')
@click.argument("model", default="si")
@click.option("--num-props", type=int, default=500)
@click.option("--dt", type=float, default=0.1)
@click.option("--info", is_flag=True, type=bool, default=False, help='Use information filter')
def sens(
model,
num_props, #968, largest num props before fail for di
dt,
info
):
#setup model
if model=='si':
P = np.identity(2)
robot = SingleIntegrator(dt=dt)
elif model=='di':
P = np.identity(4)
robot = DoubleIntegrator(dt=dt)
else:
print('error. invalid model')
raise

filter_type = FilterType.INFORMATION if info else FilterType.COVARIANCE
bp = BeliefPropagation(robot, filter_type)

#print covariance matrices for <num_props>
P_forward = bp.prop(P, k=num_props, backward=False)
P_backward = bp.prop(P_forward, k=num_props, backward=True)
print('\nP:',P)
print('\nP_forward from P:',P_forward)
print('\nP_backward from P_forward:',P_backward)

#compute condition number for each num props up to <num_props>
print('\ngenerating plot...')
data={}
data['initial']=defaultdict(list)
data['forward']=defaultdict(list)
data['backward']=defaultdict(list)
data['k']=defaultdict(list)
for k in range(num_props):
P_forward = bp.prop(P, k=k, backward=False)
P_backward = bp.prop(P_forward, k=k, backward=True)
data['initial']['cond'].append(LA.cond(P))
data['initial']['det'].append(LA.det(P))
data['initial']['k'].append(k)
data['forward']['cond'].append(LA.cond(P_forward))
data['forward']['det'].append(LA.det(P_forward))
data['forward']['k'].append(k)
data['backward']['cond'].append(LA.cond(P_backward))
data['backward']['det'].append(LA.det(P_backward))
data['backward']['k'].append(k)

use_log=True
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(10, 5))

add_xy_to_axes(data['initial']['k'], data['initial']['cond'],axes[0,0], name='$\kappa(\Sigma_{initial})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
add_xy_to_axes(data['forward']['k'], data['forward']['cond'],axes[0,0], name='$\kappa(\Sigma_{forward})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
# axes[0,0].set_title('cond num forward',pad=20)
axes[0,0].legend(loc='upper left')

add_xy_to_axes(data['initial']['k'], data['initial']['cond'],axes[0,1], name='$\kappa(\Sigma_{initial})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
add_xy_to_axes(data['backward']['k'], data['backward']['cond'],axes[0,1], name='$\kappa(\Sigma_{backward})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
# axes[0,1].set_title('cond num backward',pad=20)
axes[0,1].legend(loc='upper left')

add_xy_to_axes(data['initial']['k'], data['initial']['det'],axes[1,0], name='$det(\Sigma_{initial})$', xlabel='number of steps', ylabel='det',use_log=use_log)
add_xy_to_axes(data['forward']['k'], data['forward']['det'],axes[1,0], name='$det(\Sigma_{forward})$', xlabel='number of steps', ylabel='det',use_log=use_log)
# axes[1,0].set_title('det forward',pad=20)
axes[1,0].legend(loc='upper left')

add_xy_to_axes(data['initial']['k'], data['initial']['det'],axes[1,1], name='$det(\Sigma_{initial})$', xlabel='number of steps', ylabel='det',use_log=use_log)
add_xy_to_axes(data['backward']['k'], data['backward']['det'],axes[1,1], name='$det(\Sigma_{backward})$', xlabel='number of steps', ylabel='det',use_log=use_log)
# axes[1,1].set_title('det backward',pad=20)
axes[1,1].legend(loc='upper left')

plt.show()

if __name__ == "__main__":
mpl.use("GTK3Agg")
Expand Down
67 changes: 44 additions & 23 deletions experimental/beacon_sim/analysis/belief_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,67 @@ def remap_vars(self):
M = self.model.H().transpose() @ inv(self.model.R) @ self.model.H()
return G, R, M

def predict_covariance(self,P):
P = self.model.F() @ P @ self.model.F().transpose() + self.model.L() @ self.model.Q @ self.model.L().transpose()
def predict_covariance(self,P,backward=False):
if not backward:
P = self.model.F() @ P @ self.model.F().transpose() + self.model.L() @ self.model.Q @ self.model.L().transpose()
else:
P = inv(self.model.F()) @ (P - self.model.L() @ self.model.Q @ self.model.L().transpose()) @ inv(self.model.F().transpose())
return P

def update_covariance(self,P):
n = P.shape[0]
S = self.model.H() @ P @ self.model.H().transpose() + self.model.M() @ self.model.R @ self.model.M().transpose()
K= P @ self.model.H().transpose() @ inv(S)
return (np.identity(n) - K @ self.model.H()) @ P
def update_covariance(self,P,backward=False):
if not backward:
n = P.shape[0]
S = self.model.H() @ P @ self.model.H().transpose() + self.model.M() @ self.model.R @ self.model.M().transpose()
K= P @ self.model.H().transpose() @ inv(S)
return (np.identity(n) - K @ self.model.H()) @ P
else:
n = P.shape[0]
Sbar = self.model.M() @ self.model.R @ self.model.M().transpose() - self.model.H() @ P @ self.model.H().transpose()
Kbar = P @ self.model.H().transpose() @ inv(Sbar)
return (np.identity(n) + Kbar @ self.model.H()) @ P

def predict_information(self, info):
return inv(self.predict_covariance(inv(info)))
def predict_information(self, info, backward=False):
return inv(self.predict_covariance(inv(info), backward=backward))

def update_information(self, info):
def update_information(self, info, backward=False):
meas_info = self.model.H().transpose() @ inv(self.model.R) @ self.model.H()
return info + meas_info
if not backward:
return info + meas_info
else:
return info - meas_info


def predict(self, state):
if self.filter_type == FilterType.Covariance:
return self.predict_covariance(state)
def predict(self, state, backward=False):
if self.filter_type == FilterType.COVARIANCE:
return self.predict_covariance(state, backward=backward)
else:
return self.predict_information(state)
return self.predict_information(state, backward=backward)

def update(self, state):
if self.filter_type == FilterType.Covariance:
return self.update_covariance(state)
def update(self, state, backward=False):
if self.filter_type == FilterType.COVARIANCE:
return self.update_covariance(state, backward=backward)
else:
return self.update_information(state)
return self.update_information(state, backward=backward)



#functions for propagating belief
def prop(self, P, k=1):
def prop(self, P, k=1, backward=False):
"""
Propagates a belief P through k steps using EKF equations
"""
for i in range(k):
P = self.predict(P)
P = self.update(P)
if not backward:
for i in range(k):
P = self.predict(P, backward=False)
P = self.update(P, backward=False)
else:
#combined equation for backward prop
#S_bar = R - HPH'
#K_bar = P * H' * S_bat^{-1}
#P0 = F^{-1} * [ (I + K_bar*H)*P - Q ] * F'^{-1}'
for i in range(k):
P = self.update(P, backward=True)
P = self.predict(P, backward=True)
return P

def prop_tf(self, P, k=1, only_predict=False, return_tf=False):
Expand Down

0 comments on commit a9fcde7

Please sign in to comment.