Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MNT: making simple refactors in the Flight class #442

Merged
merged 8 commits into from
Nov 9, 2023
317 changes: 175 additions & 142 deletions rocketpy/simulation/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,7 @@ def stream_velocity_y(self):
return stream_velocity_y

@funcify_method("Time (s)", "Freestream Velocity Z (m/s)", "spline", "constant")
def stream_velocity_z(self, interpolation="spline", extrapolation="natural"):
def stream_velocity_z(self):
"""Freestream velocity Z component as a Function of time."""
stream_velocity_z = np.column_stack((self.time, -self.vz[:, 1]))
return stream_velocity_z
Expand Down Expand Up @@ -2634,29 +2634,6 @@ def stability_margin(self):
return [(t, self.rocket.stability_margin(m, t)) for t, m in self.mach_number]

# Rail Button Forces
@cached_property
def frontal_surface_wind(self):
"""Surface wind speed in m/s aligned with the launch rail."""
# Surface wind magnitude in the frontal direction at the rail's elevation
wind_u = self.env.wind_velocity_x(self.env.elevation)
wind_v = self.env.wind_velocity_y(self.env.elevation)
heading_rad = self.heading * np.pi / 180
frontal_surface_wind = wind_u * np.sin(heading_rad) + wind_v * np.cos(
heading_rad
)
return frontal_surface_wind

@cached_property
def lateral_surface_wind(self):
"""Surface wind speed in m/s perpendicular to launch rail."""
# Surface wind magnitude in the lateral direction at the rail's elevation
wind_u = self.env.wind_velocity_x(self.env.elevation)
wind_v = self.env.wind_velocity_y(self.env.elevation)
heading_rad = self.heading * np.pi / 180
lateral_surface_wind = -wind_u * np.cos(heading_rad) + wind_v * np.sin(
heading_rad
)
return lateral_surface_wind

@funcify_method("Time (s)", "Upper Rail Button Normal Force (N)", "spline", "zero")
def rail_button1_normal_force(self):
Expand Down Expand Up @@ -2712,8 +2689,6 @@ def drift(self):
(self.time, (self.x[:, 1] ** 2 + self.y[:, 1] ** 2) ** 0.5)
)

return drift

@funcify_method("Time (s)", "Bearing (°)", "spline", "constant")
def bearing(self):
"""Rocket bearing compass, in degrees, as a Function of time."""
Expand Down Expand Up @@ -3069,9 +3044,8 @@ def calculate_stall_wind_velocity(self, stall_angle):
# Convert stall_angle to degrees
stall_angle = stall_angle * 180 / np.pi
print(
"Maximum wind velocity at Rail Departure time before angle of attack exceeds {:.3f}°: {:.3f} m/s".format(
stall_angle, w_v
)
"Maximum wind velocity at Rail Departure time before angle"
+ f" of attack exceeds {stall_angle:.3f}°: {w_v:.3f} m/s"
)

return None
Expand Down Expand Up @@ -3102,41 +3076,21 @@ def export_pressures(self, file_name, time_step):
------
None
"""

time_points = np.arange(0, self.t_final, time_step)

# Create the file
file = open(file_name, "w")

if len(self.rocket.parachutes) == 0:
pressure = self.env.pressure(self.z(time_points))
for i in range(0, time_points.size, 1):
file.write("{:f}, {:.5f}\n".format(time_points[i], pressure[i]))

else:
for parachute in self.rocket.parachutes:
for i in range(0, time_points.size, 1):
pCl = Function(
parachute.clean_pressure_signal,
"Time (s)",
"Pressure - Without Noise (Pa)",
"linear",
)(time_points[i])
pNs = Function(
parachute.noisy_pressure_signal,
"Time (s)",
"Pressure - With Noise (Pa)",
"linear",
)(time_points[i])
file.write(
"{:f}, {:.5f}, {:.5f}\n".format(time_points[i], pCl, pNs)
)
# We need to save only 1 parachute data
pass

file.close()

return None
# pylint: disable=W1514, E1121
with open(file_name, "w") as file:
if len(self.rocket.parachutes) == 0:
print("No parachutes in the rocket, saving static pressure.")
for t in time_points:
file.write(f"{t:f}, {self.pressure(t):.5f}\n")
else:
for parachute in self.rocket.parachutes:
for t in time_points:
p_cl = parachute.clean_pressure_signal(t)
p_ns = parachute.noisy_pressure_signal(t)
file.write(f"{t:f}, {p_cl:.5f}, {p_ns:.5f}\n")
# We need to save only 1 parachute data
break

def export_data(self, file_name, *variables, time_step=None):
"""Exports flight data to a comma separated value file (.csv).
Expand Down Expand Up @@ -3354,7 +3308,19 @@ def time_iterator(self, node_list):
i += 1

class FlightPhases:
def __init__(self, init_list=[]):
"""Class to handle flight phases. It is used to store the derivatives
and callbacks for each flight phase. It is also used to handle the
insertion of flight phases in the correct order, according to their
initial time.

Attributes
----------
list : list
A list of FlightPhase objects. See FlightPhase subclass.
"""

def __init__(self, init_list=None):
init_list = [] if init_list is None else init_list
self.list = init_list[:]

def __getitem__(self, index):
Expand All @@ -3366,102 +3332,169 @@ def __len__(self):
def __repr__(self):
return str(self.list)

def display_warning(self, *messages):
"""A simple function to print a warning message."""
print("WARNING:", *messages)

def add(self, flight_phase, index=None):
"""Add a flight phase to the list. It will be inserted in the
correct position, according to its initial time. If no index is
provided, it will be appended to the end of the list. If by any
reason the flight phase cannot be inserted in the correct position,
a warning will be displayed and the flight phase will be inserted
in the closest position possible.

Parameters
----------
flight_phase : FlightPhase
The flight phase object to be added. See FlightPhase class.
index : int, optional
The index of the flight phase to be added. If no index is
provided, the flight phase will be appended to the end of the
list. Default is None.

Returns
-------
None
"""
# Handle first phase
if len(self.list) == 0:
self.list.append(flight_phase)
return None

# Handle appending to last position
elif index is None:
# Check if new flight phase respects time
if index is None:
previous_phase = self.list[-1]
if flight_phase.t > previous_phase.t:
# All good! Add phase.
self.list.append(flight_phase)
elif flight_phase.t == previous_phase.t:
print(
"WARNING: Trying to add a flight phase starting "
+ "together with the one preceding it."
)
print(
"This may be caused by more than when parachute being "
+ "triggered simultaneously."
)
flight_phase.t += 1e-7
self.add(flight_phase)
elif flight_phase.t < previous_phase.t:
print(
"WARNING: Trying to add a flight phase starting before "
+ "the one preceding it."
)
print(
"This may be caused by more than when parachute being "
+ "triggered simultaneously."
)
print("Or by having a negative parachute lag.")
self.add(flight_phase, -2)
# Handle inserting into intermediary position
else:
# Check if new flight phase respects time
next_phase = self.list[index]
previous_phase = self.list[index - 1]
if previous_phase.t < flight_phase.t < next_phase.t:
# All good! Add phase.
self.list.insert(index, flight_phase)
elif flight_phase.t < previous_phase.t:
print(
"WARNING: Trying to add a flight phase starting before "
+ "the one preceding it."
)
print(
"This may be caused by more than when parachute being "
+ "triggered simultaneously."
)
print("Or by having a negative parachute lag.")
self.add(flight_phase, index - 1)
elif flight_phase.t == previous_phase.t:
print(
"WARNING: Trying to add a flight phase starting "
+ "together with the one preceding it."
)
print(
"This may be caused by more than when parachute being "
+ "triggered simultaneously."
)
flight_phase.t += 1e-7
self.add(flight_phase, index)
elif flight_phase.t == next_phase.t:
print(
"WARNING: Trying to add a flight phase starting "
+ "together with the one proceeding it."
)
print(
"This may be caused by more than when parachute being "
+ "triggered simultaneously."
)
flight_phase.t += 1e-7
self.add(flight_phase, index + 1)
elif flight_phase.t > next_phase.t:
print(
"WARNING: Trying to add a flight phase starting after "
+ "the one proceeding it."
return None
warning_msg = (
(
"Trying to add flight phase starting together with the one preceding it. ",
"This may be caused by multiple parachutes being triggered simultaneously.",
)
print(
"This may be caused by more than when parachute being "
+ "triggered simultaneously."
if flight_phase.t == previous_phase.t
else (
"Trying to add flight phase starting *before* the one *preceding* it. ",
"This may be caused by multiple parachutes being triggered simultaneously",
" or by having a negative parachute lag.",
)
self.add(flight_phase, index + 1)

def add_phase(self, t, derivatives=None, callback=[], clear=True, index=None):
)
self.display_warning(*warning_msg)
flight_phase.t += 1e-7 if flight_phase.t == previous_phase.t else 0
self.add(
flight_phase, -2 if flight_phase.t < previous_phase.t else None
)
return None

# Handle inserting into intermediary position.
# Check if new flight phase respects time
next_phase = self.list[index]
previous_phase = self.list[index - 1]
if previous_phase.t < flight_phase.t < next_phase.t:
self.list.insert(index, flight_phase)
return None
warning_msg = (
(
"Trying to add flight phase starting *together* with the one *preceding* it. ",
"This may be caused by multiple parachutes being triggered simultaneously.",
)
if flight_phase.t == previous_phase.t
else (
"Trying to add flight phase starting *together* with the one *proceeding* it. ",
"This may be caused by multiple parachutes being triggered simultaneously.",
)
if flight_phase.t == next_phase.t
else (
"Trying to add flight phase starting *before* the one *preceding* it. ",
"This may be caused by multiple parachutes being triggered simultaneously",
" or by having a negative parachute lag.",
)
if flight_phase.t < previous_phase.t
else (
"Trying to add flight phase starting *after* the one *proceeding* it.",
"This may be caused by multiple parachutes being triggered simultaneously.",
)
)
self.display_warning(*warning_msg)
adjust = 1e-7 if flight_phase.t in {previous_phase.t, next_phase.t} else 0
new_index = (
index - 1
if flight_phase.t < previous_phase.t
else index + 1
if flight_phase.t > next_phase.t
else index
)
flight_phase.t += adjust
self.add(flight_phase, new_index)

def add_phase(self, t, derivatives=None, callback=None, clear=True, index=None):
"""Add a new flight phase to the list, with the specified
characteristics. This method creates a new FlightPhase instance and
adds it to the flight phases list, either at the specified index
position or appended to the end. See FlightPhases.add() for more
information.

Parameters
----------
t : float
The initial time of the new flight phase.
derivatives : function, optional
A function representing the derivatives of the flight phase.
Default is None.
callback : list of functions, optional
A list of callback functions to be executed during the flight
phase. Default is None. You can also pass an empty list.
clear : bool, optional
A flag indicating whether to clear the solution after the phase.
Default is True.
index : int, optional
The index at which the new flight phase should be inserted.
If not provided, the flight phase will be appended
to the end of the list. Default is None.

Returns
-------
None
"""
self.add(self.FlightPhase(t, derivatives, callback, clear), index)

def flush_after(self, index):
"""This function deletes all flight phases after a given index.

Parameters
----------
index : int
The index of the last flight phase to be kept.

Returns
-------
None
"""
del self.list[index + 1 :]

class FlightPhase:
def __init__(self, t, derivative=None, callbacks=[], clear=True):
"""Class to store a flight phase. It stores the initial time, the
derivative function, the callback functions and a flag to clear
the solution after the phase.

Attributes
----------
t : float
The initial time of the flight phase.
derivative : function
A function representing the derivatives of the flight phase.
callbacks : list of functions
A list of callback functions to be executed during the flight
phase.
clear : bool
A flag indicating whether to clear the solution after the phase.
"""

def __init__(self, t, derivative=None, callbacks=None, clear=True):
self.t = t
self.derivative = derivative
self.callbacks = callbacks[:]
self.callbacks = callbacks[:] if callbacks is not None else []
self.clear = clear

def __repr__(self):
Expand Down
Loading