Skip to content

Commit

Permalink
added publishers and main
Browse files Browse the repository at this point in the history
  • Loading branch information
TiaSinghania committed Mar 15, 2024
1 parent b258514 commit 4c2d280
Showing 1 changed file with 55 additions and 50 deletions.
105 changes: 55 additions & 50 deletions rb_ws/src/buggy/scripts/auton/rolling_sanity_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from nav_msgs.msg import Odometry
from rb_ws.src.buggy.scripts.auton.pose import Pose
from rb_ws.src.buggy.scripts.auton.world import World
from std_msgs.msg import Float32, Float64, Bool, Int8
from std_msgs.msg import Float32, Float64, Bool, Int8, String, IntList

import rospy

Expand All @@ -14,52 +14,50 @@


class SanityCheck:
#TODO: copied from autonsystem, delete uneeded args
global_trajectory: Trajectory = None
local_controller: Controller = None

def __init__(self,
global_trajectory,
local_controller,
self_name,
other_name,
) -> None:

self.global_trajectory = global_trajectory

#TODO: do we care if there's another buggy actually
self.has_other_buggy = not other_name is None

self.cur_traj = global_trajectory
self.local_controller = local_controller

self.rtk_status = 0
self.covariance = 0

self.location = None
self.filter_location = None

self.imu_overrange_status = None
self.status_flags : MipFilterStatusGq7StatusFlags = None
self.status_flag_val : int = 0
self.flags = []

# string list where indices match the meaning of relevant bits
self.error_messages : list = ["filter stable/recovering", "filter converging", "roll/pitch warning", "heading warning", "position warning", "velocity warning", "IMU bias warning", "gnss clock warning", "antenna lever arm warning", "mounting transform warning", "solution error", "solution error", "solution error", "solution error", "solution error"]


#TODO: do we need lock
self.self_odom_msg = None
self.other_odom_msg = None

rospy.Subscriber(self_name + "/nav/odom", Odometry, self.update_self_odom)
rospy.Subscriber(self_name + "/imu/overrange_status", SensorOverrangeStatus, self.update_overrange_status)

if self.has_other_buggy:
rospy.Subscriber(other_name + "/nav/odom", Odometry, self.update_other_odom)
rospy.Subscriber(self_name + "/nav/status.status_flags", FilterStatus, self.update_status_flags)

# TODO: do we need these two
rospy.Subscriber(self_name + "/gnss1/fix_info_republished_int", Int8, self.update_rtk1_status)
# these publishers are all bools as quick sanity checks
self.overrange_status_publisher = rospy.Publisher(self_name + "/debug/imu_overrange_status", Bool, queue_size=1)

rospy.Subscriber(self_name + "/gnss2/fix_info_republished_int", Int8, self.update_rtk2_status)
self.filter_gps_status_publisher = rospy.Publisher(self_name + "/debug/filter_gps_seperation_status", Bool, queue_size=1)

rospy.Subscriber(self_name + "/imu/overrange_status", SensorOverrangeStatus, self.update_overrange_status)
self.covariance_status_publisher = rospy.Publisher(self_name + "/debug/covariance_status", Bool, queue_size=1)

rospy.Subscriber(self_name + "/nav/status.status_flags", FilterStatus, self.update_status_flags)
self.error_message_publisher = rospy.Publisher(
self_name + "/debug/error_message", String, queue_size=1
)

self.status_flags_publisher = rospy.Publisher(
self_name + "/debug/filter_status_flags", IntList, queue_size=1
)

def update_self_odom(self, msg):
self.self_odom_msg = msg
Expand All @@ -71,15 +69,13 @@ def update_overrange_status(self, msg : SensorOverrangeStatus):
self.imu_overrange_status = msg.status

def update_status_flags(self, msg : FilterStatus):
self.status_flags = msg.gq7_status_flags #TODO: do we have gx5 or gq7??
# also what if we just published these? seems easy to read do we not publish this
#they are NOT bools it IS a number


self.status_flag_val = msg.gq7_status_flags


def calc_covariance(self):
self.covariance = self.self_odom_msg.pose.covariance[0] ** 2 + self.self_odom_msg.pose.covariance[7] ** 2
# publishes true if covariance is good
self.covariance_status_publisher(self.covariance <= 1**2)

def calc_locations(self):
current_rospose = self.self_odom_msg.pose.pose
Expand All @@ -90,35 +86,44 @@ def calc_locations(self):
self.location = World.gps_to_world_pose(pose_gps)

self.filter_location = None #TODO: where does ins publish its location
self.filter_gps_status_publisher(abs(self.filter_location - self.location) < 0.5)

def is_overrange (self):
#TODO: replace with wiht bools we care about
# http://docs.ros.org/en/api/microstrain_inertial_msgs/html/msg/MipSensorOverrangeStatus.html)
# if ANY are true return true
s = self.imu_overrange_status
return s.accel_x # look at all the foxglove outputs
accel_status = s.status_accel_x or s.status_accel_y or s.status_accel_z
gyro_status = s.status_gyro_x or s.status_gyro_y or s.status_gyro_z
mag_status = s.status_mag_x or s.status_mag_y or s.status_mag_z

def filter_status_warning (self):
#TODO: replace with wiht bools we care about
# http://docs.ros.org/en/api/microstrain_inertial_msgs/html/msg/MipFilterStatusGq7StatusFlags.html
return self.status_flags.roll_pitch_warning
# publishes true if ALL flags are FALSE (if the imu looks good)
self.imu_overrange_status(not (accel_status or gyro_status or mag_status or s.status_press))

def filter_status_warning (self, publishers):
b = bin(self.status_flag_val)
self.flags = []
error_message = ""
for i in range (len(b)):
if (b[i] == '1'):
self.flags.append(i)
error_message += self.error_messages[i] + " "

# TODO: add docs to this (this is essentially main)
def sanity_check(self):
if (abs(self.filter_location - self.location) > 0):
print("filter and gps seperate")

if (self.covariance > 1**2):
print("covariance bad")
# publish bit values
publishers[0].publish(self.flags)

if (self.is_overrange):
print("overrange warning raised")

if (self.filter_status_warning):
print("filter unhappy: roll pitch warning raised")
# TODO: publish the relevant everything


# TODO: add node to launchfiles
# publish string translations
publishers[1].publish(error_message)

def sanity_check(self):
self.calc_covariance()
self.calc_locations()
self.is_overrange()
self.filter_status_warning([self.error_message_publisher, self.status_flags_publisher])

if __name__ == "__main__":
rospy.init_node("rolling_sanity_check")
# need global trajectory, local trajectory, buggy name
check = SanityCheck()
rate = rospy.Rate(100)

while not rospy.is_shutdown():
check.sanity_check()
rate.sleep()

0 comments on commit 4c2d280

Please sign in to comment.