Skip to content

Commit

Permalink
Basic skeleton implementation of a UKF node.
Browse files Browse the repository at this point in the history
TODO: Add actual UKF updates, figure out message types and publishing.
  • Loading branch information
rk012 committed Nov 11, 2024
1 parent f7a140b commit 197e3ad
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions rb_ws/src/buggy/scripts/util/ukf.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python3

from threading import Lock

import sys
import numpy as np
import scipy
Expand All @@ -8,6 +10,7 @@
import time

from nav_msgs.msg import Odometry as ROSOdom
from std_msgs.msg import Float64

STATE_SPACE_DIM = 3
MEASUREMENT_SPACE_DIM = 2
Expand Down Expand Up @@ -120,3 +123,49 @@ def update(self, curr_state_est : np.narray, curr_state_cov : np.ndarray, measur
self.updated_state_cov = curr_state_cov - (kalman_gain * (innovation_cov * kalman_gain.T))


class SC_UKF_NODE:
def __init__(self, self_name, wheelbase : float, zeroth_sigma_point_weight : float, process_noise : np.ndarray, gps_noise : np.ndarray):
self.ukf = UKF(wheelbase, zeroth_sigma_point_weight, process_noise, gps_noise);

self.lock = Lock()
# TODO Is this needed?
self.input_write_flags = 0b000

self.self_encoder_speed_msg = None
self.self_stepper_steering_msg = None
self.self_gps_msg = None

# TODO replace these with actual message types
rospy.Subscriber(f"{self_name}/encoder_speed", Float64, self.update_self_encoder_speed)
rospy.Subscriber(f"{self_name}/stepper_steering", Float64, self.update_self_stepper_steering)
rospy.Subscriber(f"{self_name}/gps", Float64, self.update_self_gps)

def update_self_encoder_speed(self, msg):
with self.lock:
self.input_write_flags |= 0b001
self.self_encoder_speed_msg = msg

def update_self_stepper_steering(self, msg):
with self.lock:
self.input_write_flags |= 0b010
self.self_stepper_steering_msg = msg

def update_self_gps(self, msg):
with self.lock:
self.input_write_flags |= 0b100
self.self_gps_msg = msg

def ukf_loop(self):
update_ukf = False
with self.lock:
if self.input_write_flags == 0b111:
update_ukf = True
self.input_write_flags = 0b000

# TODO UKF Math and publish

if update_ukf:
# Update/Predict ...
pass

# Publish

0 comments on commit 197e3ad

Please sign in to comment.