Skip to content

Commit

Permalink
blackify project
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielBarberini committed Apr 14, 2024
1 parent f3249a8 commit 655c3f2
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 93 deletions.
5 changes: 4 additions & 1 deletion lib/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
This is the main API file for the RocketPy API.
"""

import logging

from fastapi import FastAPI, Request, status
Expand Down Expand Up @@ -79,7 +80,9 @@ async def __perform_healthcheck():

# Errors
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
async def validation_exception_handler(
request: Request, exc: RequestValidationError
):
exc_str = f"{exc}".replace("\n", " ").replace(" ", " ")
logging.error(f"{request}: {exc_str}")
content = {"status_code": 10422, "message": exc_str, "data": None}
Expand Down
105 changes: 77 additions & 28 deletions lib/controllers/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class FlightController:
"""

def __init__(
self, flight: Flight, rocket_option: RocketOptions, motor_kind: MotorKinds
self,
flight: Flight,
rocket_option: RocketOptions,
motor_kind: MotorKinds,
):
rocketpy_rocket = RocketController(
flight.rocket, rocket_option=rocket_option, motor_kind=motor_kind
Expand Down Expand Up @@ -110,7 +113,8 @@ async def get_flight(flight_id: int) -> "Union[Flight, HTTPException]":
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

return successfully_read_flight
Expand All @@ -136,13 +140,18 @@ async def get_rocketpy_flight(
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

successfully_read_rocketpy_flight = FlightController(
flight=successfully_read_flight,
rocket_option=RocketOptions(successfully_read_flight.rocket._rocket_option),
motor_kind=MotorKinds(successfully_read_flight.rocket.motor._motor_kind),
rocket_option=RocketOptions(
successfully_read_flight.rocket._rocket_option
),
motor_kind=MotorKinds(
successfully_read_flight.rocket.motor._motor_kind
),
).rocketpy_flight

return FlightPickle(
Expand Down Expand Up @@ -171,12 +180,15 @@ async def update_flight(
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

successfully_updated_flight = await FlightRepository(
flight=self.flight, flight_id=flight_id
).update_flight(rocket_option=self.rocket_option, motor_kind=self.motor_kind)
).update_flight(
rocket_option=self.rocket_option, motor_kind=self.motor_kind
)
if not successfully_updated_flight:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -207,7 +219,8 @@ async def update_env(
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

flight = successfully_read_flight.dict()
Expand Down Expand Up @@ -246,7 +259,8 @@ async def update_rocket(
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

flight = successfully_read_flight.dict()
Expand All @@ -267,7 +281,9 @@ async def update_rocket(
return FlightUpdated(new_flight_id=str(successfully_updated_flight))

@staticmethod
async def delete_flight(flight_id: int) -> "Union[FlightDeleted, HTTPException]":
async def delete_flight(
flight_id: int,
) -> "Union[FlightDeleted, HTTPException]":
"""
Delete a flight from the database.
Expand All @@ -285,7 +301,8 @@ async def delete_flight(flight_id: int) -> "Union[FlightDeleted, HTTPException]"
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

successfully_deleted_flight = await FlightRepository(
Expand All @@ -300,7 +317,9 @@ async def delete_flight(flight_id: int) -> "Union[FlightDeleted, HTTPException]"
return FlightDeleted(deleted_flight_id=str(flight_id))

@staticmethod
async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
async def simulate(
flight_id: int,
) -> "Union[FlightSummary, HTTPException]":
"""
Simulate a rocket flight.
Expand All @@ -318,7 +337,8 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
).get_flight()
if not successfully_read_flight:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Flight not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Flight not found.",
)

try:
Expand Down Expand Up @@ -351,7 +371,9 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
)

_numerical_integration_settings = NumericalIntegrationSettings(
max_time="Maximum Allowed Flight Time: {:f} s".format(flight.max_time),
max_time="Maximum Allowed Flight Time: {:f} s".format(
flight.max_time
),
max_time_step="Maximum Allowed Time Step: {:f} s".format(
flight.max_time_step
),
Expand All @@ -371,11 +393,15 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
)

_launch_rail_conditions = LaunchRailConditions(
rail_length="Launch Rail Length: {:.2f} m".format(flight.rail_length),
rail_length="Launch Rail Length: {:.2f} m".format(
flight.rail_length
),
flight_inclination="Launch Rail Inclination: {:.2f}°".format(
flight.inclination
),
flight_heading="Launch Rail Heading: {:.2f}°".format(flight.heading),
flight_heading="Launch Rail Heading: {:.2f}°".format(
flight.heading
),
)

_surface_wind_conditions = SurfaceWindConditions(
Expand Down Expand Up @@ -413,17 +439,25 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
flight.rocket.motor.burn_out_time
),
burnout_altitude="Altitude at burn out: {:.3f} m (AGL)".format(
flight.z(flight.rocket.motor.burn_out_time) - flight.env.elevation
flight.z(flight.rocket.motor.burn_out_time)
- flight.env.elevation
),
burnout_rocket_velocity="Rocket velocity at burn out: {:.3f} m/s".format(
flight.speed(flight.rocket.motor.burn_out_time)
),
burnout_freestream_velocity="Freestream velocity at burn out: {:.3f} m/s".format(
(
flight.stream_velocity_x(flight.rocket.motor.burn_out_time) ** 2
+ flight.stream_velocity_y(flight.rocket.motor.burn_out_time)
flight.stream_velocity_x(
flight.rocket.motor.burn_out_time
)
** 2
+ flight.stream_velocity_y(
flight.rocket.motor.burn_out_time
)
** 2
+ flight.stream_velocity_z(flight.rocket.motor.burn_out_time)
+ flight.stream_velocity_z(
flight.rocket.motor.burn_out_time
)
** 2
)
** 0.5
Expand Down Expand Up @@ -457,14 +491,17 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
flight.max_reynolds_number, flight.max_reynolds_number_time
),
maximum_dynamic_pressure="Maximum Dynamic Pressure: {:.3e} Pa at {:.2f} s".format(
flight.max_dynamic_pressure, flight.max_dynamic_pressure_time
flight.max_dynamic_pressure,
flight.max_dynamic_pressure_time,
),
maximum_acceleration_during_motor_burn="Maximum Acceleration During Motor Burn: {:.3f} m/s² at {:.2f} s".format(
flight.max_acceleration, flight.max_acceleration_time
),
maximum_gs_during_motor_burn="Maximum Gs During Motor Burn: {:.3f} g at {:.2f} s".format(
flight.max_acceleration
/ flight.env.gravity(flight.z(flight.max_acceleration_time)),
/ flight.env.gravity(
flight.z(flight.max_acceleration_time)
),
flight.max_acceleration_time,
),
maximum_acceleration_after_motor_burn="Maximum Acceleration After Motor Burn: {:.3f} m/s² at {:.2f} s".format(
Expand All @@ -491,17 +528,25 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":

if len(flight.impact_state) != 0:
_impact_conditions = ImpactConditions(
x_impact_position="X Impact: {:.3f} m".format(flight.x_impact),
y_impact_position="Y Impact: {:.3f} m".format(flight.y_impact),
time_of_impact="Time of Impact: {:.3f} s".format(flight.t_final),
x_impact_position="X Impact: {:.3f} m".format(
flight.x_impact
),
y_impact_position="Y Impact: {:.3f} m".format(
flight.y_impact
),
time_of_impact="Time of Impact: {:.3f} s".format(
flight.t_final
),
impact_velocity="Velocity at Impact: {:.3f} m/s".format(
flight.impact_velocity
),
)
elif flight.terminate_on_apogee is False:
_impact_conditions = ImpactConditions(
time="Time: {:.3f} s".format(flight.solution[-1][0]),
altitude="Altitude: {:.3f} m".format(flight.solution[-1][3]),
altitude="Altitude: {:.3f} m".format(
flight.solution[-1][3]
),
)

if len(flight.parachute_events) == 0:
Expand All @@ -519,10 +564,14 @@ async def simulate(flight_id: int) -> "Union[FlightSummary, HTTPException]":
name = parachute.name.title()
events[name] = []
events[name].append(
name + " Ejection Triggered at: {:.3f} s".format(trigger_time)
name
+ " Ejection Triggered at: {:.3f} s".format(
trigger_time
)
)
events[name].append(
name + " Parachute Inflated at: {:.3f} s".format(open_time)
name
+ " Parachute Inflated at: {:.3f} s".format(open_time)
)
events[name].append(
name
Expand Down
51 changes: 37 additions & 14 deletions lib/controllers/motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,21 @@ async def get_motor(motor_id: int) -> "Union[Motor, HTTPException]":
Raises:
HTTP 404 Not Found: If the motor is not found in the database.
"""
successfully_read_motor = await MotorRepository(motor_id=motor_id).get_motor()
successfully_read_motor = await MotorRepository(
motor_id=motor_id
).get_motor()
if not successfully_read_motor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Motor not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Motor not found.",
)

return successfully_read_motor

@staticmethod
async def get_rocketpy_motor(motor_id: int) -> "Union[MotorPickle, HTTPException]":
async def get_rocketpy_motor(
motor_id: int,
) -> "Union[MotorPickle, HTTPException]":
"""
Get a rocketpy motor object encoded as jsonpickle string from the database.
Expand All @@ -138,10 +143,13 @@ async def get_rocketpy_motor(motor_id: int) -> "Union[MotorPickle, HTTPException
Raises:
HTTP 404 Not Found: If the motor is not found in the database.
"""
successfully_read_motor = await MotorRepository(motor_id=motor_id).get_motor()
successfully_read_motor = await MotorRepository(
motor_id=motor_id
).get_motor()
if not successfully_read_motor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Motor not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Motor not found.",
)

successfully_read_rocketpy_motor = MotorController(
Expand All @@ -155,7 +163,9 @@ async def get_rocketpy_motor(motor_id: int) -> "Union[MotorPickle, HTTPException
)
)

async def update_motor(self, motor_id: int) -> "Union[MotorUpdated, HTTPException]":
async def update_motor(
self, motor_id: int
) -> "Union[MotorUpdated, HTTPException]":
"""
Update a motor in the database.
Expand All @@ -168,10 +178,13 @@ async def update_motor(self, motor_id: int) -> "Union[MotorUpdated, HTTPExceptio
Raises:
HTTP 404 Not Found: If the motor is not found in the database.
"""
successfully_read_motor = await MotorRepository(motor_id=motor_id).get_motor()
successfully_read_motor = await MotorRepository(
motor_id=motor_id
).get_motor()
if not successfully_read_motor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Motor not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Motor not found.",
)

successfully_updated_motor = await MotorRepository(
Expand All @@ -186,7 +199,9 @@ async def update_motor(self, motor_id: int) -> "Union[MotorUpdated, HTTPExceptio
return MotorUpdated(new_motor_id=str(successfully_updated_motor))

@staticmethod
async def delete_motor(motor_id: int) -> "Union[MotorDeleted, HTTPException]":
async def delete_motor(
motor_id: int,
) -> "Union[MotorDeleted, HTTPException]":
"""
Delete a motor from the database.
Expand All @@ -199,10 +214,13 @@ async def delete_motor(motor_id: int) -> "Union[MotorDeleted, HTTPException]":
Raises:
HTTP 404 Not Found: If the motor is not found in the database.
"""
successfully_read_motor = await MotorRepository(motor_id=motor_id).get_motor()
successfully_read_motor = await MotorRepository(
motor_id=motor_id
).get_motor()
if not successfully_read_motor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Motor not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Motor not found.",
)

successfully_deleted_motor = await MotorRepository(
Expand Down Expand Up @@ -230,10 +248,13 @@ async def simulate(motor_id: int) -> "Union[MotorSummary, HTTPException]":
Raises:
HTTP 404 Not Found: If the motor does not exist in the database.
"""
successfully_read_motor = await MotorRepository(motor_id=motor_id).get_motor()
successfully_read_motor = await MotorRepository(
motor_id=motor_id
).get_motor()
if not successfully_read_motor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Motor not found."
status_code=status.HTTP_404_NOT_FOUND,
detail="Motor not found.",
)

try:
Expand All @@ -249,7 +270,9 @@ async def simulate(motor_id: int) -> "Union[MotorSummary, HTTPException]":
+ "{:.3f}".format(motor.propellant_initial_mass)
+ " kg",
average_propellant_exhaust_velocity="Average Propellant Exhaust Velocity: "
+ "{:.3f}".format(motor.exhaust_velocity.average(*motor.burn_time))
+ "{:.3f}".format(
motor.exhaust_velocity.average(*motor.burn_time)
)
+ " m/s",
average_thrust="Average Thrust: "
+ "{:.3f}".format(motor.average_thrust)
Expand Down
Loading

0 comments on commit 655c3f2

Please sign in to comment.