Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joshua simctrl #17

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions ff_control/ff_control/ll_binary_ctrl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from ff_msgs.msg import ThrusterCommand
from ff_msgs.msg import BinaryCommand
from ff_params import RobotParams

import numpy as np
import typing as T

from rclpy.node import Node


class LowLevelBinaryController(Node):
def __init__(self, node_name: str = "ll_ctrl_node") -> None:
super().__init__(node_name)

# robot parameters that can be accessed by sub-classes
self.p = RobotParams(self)

# low level control publishers
self._binary_thruster_pub = self.create_publisher(BinaryCommand, "ctrl/binary_command", 10)

def set_thrust_binary(self, binary_command: T.Sequence[bool]) -> None:
"""
Send command to set the thrusters binary output.

:param binary_command: binary switch for each thrust
"""
if len(binary_command) != len(BinaryCommand().binary_command):
self.get_logger().error("Incompatible thruster length sent.")
return
msg = BinaryCommand()
msg.header.stamp = self.get_clock().now().to_msg()
msg.binary_command = binary_command
self._binary_thruster_pub.publish(msg)
158 changes: 158 additions & 0 deletions ff_control/scripts/dynamics_sim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""
Free Flyer dynamics
"""
import math
import random
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation, patches


# VARIABLES
dim1 = 0.11461
dim2 = 0.0955

Fmax = 0.2

# Return x(t+1) given x(t), u(t)

class Dynamics:

def __init__(self, x0, stateDim=6, inputDim=8):
self.init = x0
self._x = x0
self.stateDimn = stateDim
self.inputDimn = inputDim
self.thruster_orien = np.array(([-1,0],[1,0],[0,-1],[0,1],[1,0],[-1,0],[0,1],[0,-1]))
self.thruster_pos = np.array(([dim2, dim1],[-dim2, dim1],[-dim1, dim2],[-dim1, -dim2],
[-dim2, -dim1],[dim2, -dim1],[dim1, -dim2],[dim1, dim2]))

self._u = None

def get_state(self):
return self._x

def derive(self, x, u, t):
return np.zeros((self.stateDimn,1))

def integrate(self, u, t, dt):
self._x = self.get_state() + self.derive(self.get_state(), u, t)*dt
return self._x


class ThrusterDyn(Dynamics):

# Thrusters Configuration
# (2) e_y (1) ___
# <-- ^ --> / \
# ^ | | | ^ v M )
# (3)|--o-------o--|(8) __/
# | free- |
# | flyer | ---> e_x
# | robot |
# (4)|--o-------o--|(7)
# v | | v
# <-- -->
# (5) (6)

def __init__(self, x0 = np.zeros((6,1)), m = 16, Ixx = 0.18, r = 0.1):
super().__init__(x0, stateDim=6, inputDim = 8)
self._m = m
self._Ixx = Ixx
self._r = r

# with the position, return the velocity
def derive(self, X, U, t):
"""
Returns the derivative of the state vector
Args:
X => state variable, 6x1 numpy array at time t
U => input array: 8x1 numpy array, 8 thrusters
t => time
Returns:
xDot: 6x1 x 1 derivative of the state vector
"""
#unpack the state vector
x, y, x_dot, y_dot = X[0,0], X[1,0], X[3, 0], X[4, 0] #velocities
theta, theta_dot = X[2, 0], X[5, 0] #orientations
res = self.resultant_force_and_moment(U)
F_x, F_y = res[0], res[1] # Force x and y

xdot_world, ydot_world = self.get_rotmatrix_body_to_world(theta) @ [x_dot, y_dot]

F = self.get_rotmatrix_body_to_world(theta) @ [F_x, F_y]
M = res[2] # Moment about z
# missing coriolis term? Maybe co
x_ddot, y_ddot = self.get_rotmatrix_body_to_world(theta) @ [F_x/self._m, F_y/self._m]
theta_ddot = M/self._Ixx
deriv = np.array([[x_dot, y_dot, theta_dot, x_ddot, y_ddot, theta_ddot]]).T

# Noise
# for i in range(len(deriv)):
# deriv[i] = random.gauss(1, 0.05) * deriv[i]
return deriv

def thrusters(self, index):
# Thrusters Configuration
# (2) e_y (1) ___
# <-- ^ --> / \
# ^ | | | ^ v M )
# (3)|--o-------o--|(8) __/
# | free- |
# | flyer | ---> e_x
# | robot |
# (4)|--o-------o--|(7)
# v | | v
# <-- -->
# (5) (6)
"""
Returns the resultant force from the thruster specified
Input:
index: number thruster we want to get
Returns:
resultant (3x1) Force + moment ABOUT ROBOT AXIS from thruster
"""
assert index <= 8 and index >= 1, "input must be 8x1 mapping of thrusters"
F = self.thruster_orien[index-1]
tPos = self.thruster_pos[index-1]

deg = np.arctan2(tPos[1], tPos[0])

Moment = np.cross(tPos, F*Fmax)
# Force = tPos[1]/(tPos[0]**2+tPos[1]**2)*np.array([-tPos[0], -tPos[1]]) * Fmax
Force = F*Fmax # Not too sure about this one
return np.append(Force, Moment).reshape(-1,1)

def resultant_force_and_moment(self, input):
"""
Returns the resultant total force and moment that we would predict
Args:
thruster command, (8x1), each corresponding to a thruster index
Returns:
Force and moment of the FF in its own frame.
"""
force_x, force_y, moment = 0 , 0 , 0
sq_input = input.squeeze()
assert len(input) == 8, "check size of input, must have 8 binary values"
for idx in range(len(sq_input)):
if sq_input[idx] == 1:
tPos = self.thruster_pos[idx-1]
thruster_dir = np.rad2deg(np.arctan2(tPos[1], tPos[0]))

force_x += Fmax * np.cos(thruster_dir)
force_y += Fmax *np.sin(thruster_dir)
moment += tPos[0]*Fmax*np.sin(thruster_dir)-tPos[1]*Fmax*np.cos(thruster_dir)
# force_x = sq_input[1]+sq_input[4]-(sq_input[0] + sq_input[5])
# force_y = sq_input[3]+sq_input[6] - (sq_input[2]+sq_input[7])
# print()
return force_x, force_y, moment
# for i in range(len(squeezed_input)):
# # i gives the index in which I want to actuate -1.
# if squeezed_input[i] == 1:
# # print("activated thruster", i+1)
# force_x += self.thrusters(i+1)
# return force_x

def get_rotmatrix_body_to_world(self, theta):
R = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]])
return R
101 changes: 101 additions & 0 deletions ff_control/scripts/mpc_ctrl_py_node
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3

# MIT License
#
# Copyright (c) 2023 Stanford Autonomous Systems Lab
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import copy
import rclpy

import numpy as np

from ff_control.ll_binary_ctrl import LowLevelBinaryController
from ff_msgs.msg import FreeFlyerStateStamped
from geometry_msgs.msg import PoseStamped
from simulator_node.py import FreeFlyerSimulator


class MPCController(LowLevelBinaryController):

def __init__(self):
super().__init__('mpc_control_node')
self._state_sub = self.create_subscription(
FreeFlyerStateStamped,
self.get_parameter("state_channel").get_parameter_value().string_value,
self._state_callback,
10,
)

self.state_sp_sub = self.create_subscription(FreeFlyerStateStamped,
'ctrl/state', self.state_setpoint_callback, 10)
self.rviz_sp_sub = self.create_subscription(PoseStamped,
'/goal_pose', self.rviz_setpoint_callback, 10)
self.timer = self.create_timer(0.1, self.control_loop)
self.state_desired = FreeFlyerStateStamped()

def state_ready_callback(self) -> None:
# copy current position as goal position
self.state_desired.header.stamp = self.get_clock().now().to_msg()
self.state_desired.state = self.get_state()

def state_setpoint_callback(self, msg: FreeFlyerStateStamped) -> None:
self.state_desired = copy.deepcopy(msg)

def rviz_setpoint_callback(self, msg: PoseStamped) -> None:
self.state_desired.header.stamp = msg.header.stamp

self.state_desired.state.pose.x = msg.pose.position.x
self.state_desired.state.pose.y = msg.pose.position.y
z = msg.pose.orientation.z
w = msg.pose.orientation.w
self.state_desired.state.pose.theta = np.arctan2(2 * w * z, w * w - z * z)

self.state_desired.state.twist.vx = 0.
self.state_desired.state.twist.vy = 0.
self.state_desired.state.twist.wz = 0.

def control_loop(self) -> None:
# state not yet ready
if not self.state_is_ready():
return

# build feedback gain matrix
gain_f = self.get_parameter('gain_f').get_parameter_value().double_value
gain_df = self.get_parameter('gain_df').get_parameter_value().double_value
gain_t = self.get_parameter('gain_t').get_parameter_value().double_value
gain_dt = self.get_parameter('gain_dt').get_parameter_value().double_value
K = np.array([[gain_f, 0, 0, gain_df, 0, 0],
[0, gain_f, 0, 0, gain_df, 0],
[0, 0, gain_t, 0, 0, gain_dt]])

self.send_control(self.state_desired.state, K)
self.set_thrust_binary()

def main(args=None):
rclpy.init(args=args)
pd_ctrl = PDControlNode()
rclpy.spin(pd_ctrl)
rclpy.shutdown()


if __name__ == '__main__':
main()
25 changes: 24 additions & 1 deletion ff_control/scripts/safety_filter
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import time

from ff_msgs.msg import ThrusterCommand
from ff_msgs.msg import WheelVelCommand
from ff_msgs.msg import BinaryCommand
from std_msgs.msg import Bool

class SafetyFilter(Node):
Expand All @@ -46,6 +47,7 @@ class SafetyFilter(Node):
# Create publishers for thruster and wheel commands
self.thrust_pub = self.create_publisher(ThrusterCommand, 'commands/duty_cycle', 10)
self.wheel_pub = self.create_publisher(WheelVelCommand, 'commands/velocity', 10)
self.bool_thrust_pub = self.create_publisher(BinaryCommand, 'commands/binary', 10)

# Create subscribers for thruster and wheel commands
self.thrust_sub = self.create_subscription(
Expand All @@ -58,6 +60,11 @@ class SafetyFilter(Node):
'ctrl/velocity',
self.wheel_callback,
10)
self.bool_thrust_sub = self.create_subscription(
BinaryCommand,
'ctrl/binary',
self.bool_thrust_callback,
10)

# Create timer to check if we've received any messages in the last 'wait_period' seconds
self.timer = self.create_timer(0.1, self.check_timer_callback)
Expand Down Expand Up @@ -95,6 +102,15 @@ class SafetyFilter(Node):
# Store time last message was published
self.last_thrust_time = self.get_clock().now()

def bool_thrust_callback(self, msg):
if self.kill_state:
self.get_logger().info("Kill state is active. Ignoring thrust command.")
return
# Publish the message to thrust pub
self.bool_thrust_pub.publish(msg)
# Store time last message was published
self.last_thrust_time = self.get_clock().now()

def wheel_callback(self, msg):
if self.kill_state:
self.get_logger().info("Kill state is active. Ignoring wheel command.")
Expand All @@ -108,7 +124,14 @@ class SafetyFilter(Node):
zero_thrust_msg = ThrusterCommand()
zero_thrust_msg.header.stamp = self.get_clock().now().to_msg()
zero_thrust_msg.duty_cycle = np.zeros(8)
self.thrust_pub.publish(ThrusterCommand())

zero_thrust_binary_msg = BinaryCommand()
zero_thrust_binary_msg.header.stamp = self.get_clock().now().to_msg()
zero_thrust_binary_msg.binary_command = [False] * 8

self.thrust_pub.publish(zero_thrust_msg)
self.bool_thrust_pub.publish(zero_thrust_binary_msg)


def send_zero_wheel(self):
zero_wheel_msg = WheelVelCommand()
Expand Down
1 change: 1 addition & 0 deletions ff_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/FreeFlyerStateStamped.msg"
"msg/ThrusterCommand.msg"
"msg/WheelVelCommand.msg"
"msg/BinaryCommand.msg"
DEPENDENCIES std_msgs
)

Expand Down
2 changes: 2 additions & 0 deletions ff_msgs/msg/BinaryCommand.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
std_msgs/Header header
bool[8] binary_command
8 changes: 7 additions & 1 deletion ff_sim/ff_sim/simulator_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped

from ff_msgs.msg import FreeFlyerStateStamped, Wrench2DStamped, ThrusterCommand, WheelVelCommand
from ff_msgs.msg import FreeFlyerStateStamped, Wrench2DStamped, ThrusterCommand, WheelVelCommand, BinaryCommand

from ff_params import RobotParams

Expand Down Expand Up @@ -162,6 +162,9 @@ def __init__(self):
self.sub_thrusters_cmd_dutycycle = self.create_subscription(
ThrusterCommand, "commands/duty_cycle", self.update_thrusters_dutycycle_cmd_cb, 10
)
self.sub_thrusters_cmd_binary = self.create_subscription(
BinaryCommand, "commands/binary", self.update_thrusters_binary_cmd_cb, 10
)
self.sub_state_init = self.create_subscription(
FreeFlyerStateStamped, "state_init", self.update_state_init_cb, 10
)
Expand Down Expand Up @@ -260,6 +263,9 @@ def update_wheel_cmd_vel_cb(self, msg: WheelVelCommand) -> None:
def update_thrusters_dutycycle_cmd_cb(self, msg: ThrusterCommand) -> None:
# Saturate controls so within [0,1] (in %)
self.thrusters_dutycycle_cmd = np.clip(msg.duty_cycle, 0.0, 1.0)

def update_thrusters_binary_cmd_cb(self, msg: BinaryCommand) -> None:
self.thrusters_dutycycle_cmd = np.array(msg.binary_command, dtype = float)

def update_state_init_cb(self, msg: FreeFlyerStateStamped) -> None:
self.x_cur = np.array(
Expand Down
Loading
Loading