Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add backward belief propagation #217

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions experimental/beacon_sim/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
np.set_printoptions(linewidth=200)


def add_xy_to_axes(x, y, ax, name='', xlabel='', ylabel='', use_log=False):
ax.semilogy(x, y, marker='o', label=name)
def add_xy_to_axes(x, y, ax, name='', xlabel='', ylabel='', use_log=True):
if use_log:
ax.semilogy(x, y, marker='o', label=name)
else:
ax.plot(x, y, marker='o', label=name)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.grid(True)
Expand Down Expand Up @@ -185,6 +188,82 @@ def sens(
print('||P_ekf - P_tf|| =', diff_ekf_tf)
print('||P_scatter - P_expected|| =', diff_expected)

@main.command(name="back", help='Test backwards belief update.')
@click.argument("model", default="si")
@click.option("--num-props", type=int, default=500)
@click.option("--dt", type=float, default=0.1)
@click.option("--info", is_flag=True, type=bool, default=False, help='Use information filter')
def sens(
model,
num_props, #968, largest num props before fail for di
dt,
info
):
#setup model
if model=='si':
P = np.identity(2)
robot = SingleIntegrator(dt=dt)
elif model=='di':
P = np.identity(4)
robot = DoubleIntegrator(dt=dt)
else:
print('error. invalid model')
raise

filter_type = FilterType.INFORMATION if info else FilterType.COVARIANCE
bp = BeliefPropagation(robot, filter_type)

#print covariance matrices for <num_props>
P_forward = bp.prop(P, k=num_props, backward=False)
P_backward = bp.prop(P_forward, k=num_props, backward=True)
print('\nP:',P)
print('\nP_forward from P:',P_forward)
print('\nP_backward from P_forward:',P_backward)

#compute condition number for each num props up to <num_props>
print('\ngenerating plot...')
data={}
data['initial']=defaultdict(list)
data['forward']=defaultdict(list)
data['backward']=defaultdict(list)
data['k']=defaultdict(list)
for k in range(num_props):
P_forward = bp.prop(P, k=k, backward=False)
P_backward = bp.prop(P_forward, k=k, backward=True)
data['initial']['cond'].append(LA.cond(P))
data['initial']['det'].append(LA.det(P))
data['initial']['k'].append(k)
data['forward']['cond'].append(LA.cond(P_forward))
data['forward']['det'].append(LA.det(P_forward))
data['forward']['k'].append(k)
data['backward']['cond'].append(LA.cond(P_backward))
data['backward']['det'].append(LA.det(P_backward))
data['backward']['k'].append(k)

use_log=True
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(10, 5))

add_xy_to_axes(data['initial']['k'], data['initial']['cond'],axes[0,0], name='$\kappa(\Sigma_{initial})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
add_xy_to_axes(data['forward']['k'], data['forward']['cond'],axes[0,0], name='$\kappa(\Sigma_{forward})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
# axes[0,0].set_title('cond num forward',pad=20)
axes[0,0].legend(loc='upper left')

add_xy_to_axes(data['initial']['k'], data['initial']['cond'],axes[0,1], name='$\kappa(\Sigma_{initial})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
add_xy_to_axes(data['backward']['k'], data['backward']['cond'],axes[0,1], name='$\kappa(\Sigma_{backward})$', xlabel='number of steps', ylabel='cond',use_log=use_log)
# axes[0,1].set_title('cond num backward',pad=20)
axes[0,1].legend(loc='upper left')

add_xy_to_axes(data['initial']['k'], data['initial']['det'],axes[1,0], name='$det(\Sigma_{initial})$', xlabel='number of steps', ylabel='det',use_log=use_log)
add_xy_to_axes(data['forward']['k'], data['forward']['det'],axes[1,0], name='$det(\Sigma_{forward})$', xlabel='number of steps', ylabel='det',use_log=use_log)
# axes[1,0].set_title('det forward',pad=20)
axes[1,0].legend(loc='upper left')

add_xy_to_axes(data['initial']['k'], data['initial']['det'],axes[1,1], name='$det(\Sigma_{initial})$', xlabel='number of steps', ylabel='det',use_log=use_log)
add_xy_to_axes(data['backward']['k'], data['backward']['det'],axes[1,1], name='$det(\Sigma_{backward})$', xlabel='number of steps', ylabel='det',use_log=use_log)
# axes[1,1].set_title('det backward',pad=20)
axes[1,1].legend(loc='upper left')

plt.show()

if __name__ == "__main__":
mpl.use("GTK3Agg")
Expand Down
67 changes: 44 additions & 23 deletions experimental/beacon_sim/analysis/belief_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,67 @@ def remap_vars(self):
M = self.model.H().transpose() @ inv(self.model.R) @ self.model.H()
return G, R, M

def predict_covariance(self,P):
P = self.model.F() @ P @ self.model.F().transpose() + self.model.L() @ self.model.Q @ self.model.L().transpose()
def predict_covariance(self,P,backward=False):
if not backward:
P = self.model.F() @ P @ self.model.F().transpose() + self.model.L() @ self.model.Q @ self.model.L().transpose()
else:
P = inv(self.model.F()) @ (P - self.model.L() @ self.model.Q @ self.model.L().transpose()) @ inv(self.model.F().transpose())
return P

def update_covariance(self,P):
n = P.shape[0]
S = self.model.H() @ P @ self.model.H().transpose() + self.model.M() @ self.model.R @ self.model.M().transpose()
K= P @ self.model.H().transpose() @ inv(S)
return (np.identity(n) - K @ self.model.H()) @ P
def update_covariance(self,P,backward=False):
if not backward:
n = P.shape[0]
S = self.model.H() @ P @ self.model.H().transpose() + self.model.M() @ self.model.R @ self.model.M().transpose()
K= P @ self.model.H().transpose() @ inv(S)
return (np.identity(n) - K @ self.model.H()) @ P
else:
n = P.shape[0]
Sbar = self.model.M() @ self.model.R @ self.model.M().transpose() - self.model.H() @ P @ self.model.H().transpose()
Kbar = P @ self.model.H().transpose() @ inv(Sbar)
return (np.identity(n) + Kbar @ self.model.H()) @ P
ewfuentes marked this conversation as resolved.
Show resolved Hide resolved

def predict_information(self, info):
return inv(self.predict_covariance(inv(info)))
def predict_information(self, info, backward=False):
return inv(self.predict_covariance(inv(info), backward=backward))

def update_information(self, info):
def update_information(self, info, backward=False):
meas_info = self.model.H().transpose() @ inv(self.model.R) @ self.model.H()
return info + meas_info
if not backward:
return info + meas_info
else:
return info - meas_info


def predict(self, state):
if self.filter_type == FilterType.Covariance:
return self.predict_covariance(state)
def predict(self, state, backward=False):
if self.filter_type == FilterType.COVARIANCE:
return self.predict_covariance(state, backward=backward)
else:
return self.predict_information(state)
return self.predict_information(state, backward=backward)

def update(self, state):
if self.filter_type == FilterType.Covariance:
return self.update_covariance(state)
def update(self, state, backward=False):
if self.filter_type == FilterType.COVARIANCE:
return self.update_covariance(state, backward=backward)
else:
return self.update_information(state)
return self.update_information(state, backward=backward)



#functions for propagating belief
def prop(self, P, k=1):
def prop(self, P, k=1, backward=False):
"""
Propagates a belief P through k steps using EKF equations
"""
for i in range(k):
P = self.predict(P)
P = self.update(P)
if not backward:
for i in range(k):
P = self.predict(P, backward=False)
P = self.update(P, backward=False)
else:
#combined equation for backward prop
#S_bar = R - HPH'
#K_bar = P * H' * S_bat^{-1}
#P0 = F^{-1} * [ (I + K_bar*H)*P - Q ] * F'^{-1}'
for i in range(k):
P = self.update(P, backward=True)
P = self.predict(P, backward=True)
return P

def prop_tf(self, P, k=1, only_predict=False, return_tf=False):
Expand Down