diff --git a/lib/controllers/rocket.py b/lib/controllers/rocket.py index c0e7210..22c0006 100644 --- a/lib/controllers/rocket.py +++ b/lib/controllers/rocket.py @@ -7,14 +7,15 @@ # TODO # from inspect import getsourcelines -from rocketpy.rocket.parachute import Parachute as RocketpyParachute -from rocketpy.rocket.rocket import Rocket as RocketpyRocket -from rocketpy.rocket.aero_surface import NoseCone as RocketpyNoseCone +from rocketpy.rocket.parachute import Parachute as RocketPyParachute +from rocketpy.rocket.rocket import Rocket as RocketPyRocket +from rocketpy.rocket.aero_surface import NoseCone as RocketPyNoseCone from rocketpy.rocket.aero_surface import ( - TrapezoidalFins as RocketpyTrapezoidalFins, + TrapezoidalFins as RocketPyTrapezoidalFins, ) -from rocketpy.rocket.aero_surface import Tail as RocketpyTail +from rocketpy.rocket.aero_surface import Tail as RocketPyTail +from lib import logging, parse_error from lib.controllers.motor import MotorController from lib.models.rocket import Rocket, RocketOptions from lib.models.motor import MotorKinds @@ -34,25 +35,56 @@ RocketPickle, ) +logger = logging.getLogger(__name__) + class RocketController: """ Controller for the Rocket model. Init Attributes: - rocket (models.Rocket): Rocket model object. + rocket: models.Rocket. Enables: - create a RocketpyRocket object from a Rocket model object. + - CRUD operations over models.Rocket on the database. """ def __init__(self, rocket: Rocket, rocket_option, motor_kind): - rocketpy_rocket = RocketpyRocket( + self._rocket = rocket + self._rocket_option = rocket_option + self._motor_kind = motor_kind + + @property + def rocket(self) -> Rocket: + return self._rocket + + @rocket.setter + def rocket(self, rocket: Rocket): + self._rocket = rocket + + @property + def rocket_option(self) -> RocketOptions: + return self._rocket_option + + @property + def motor_kind(self) -> MotorKinds: + return self._motor_kind + + @classmethod + def get_rocketpy_rocket(cls, rocket: Rocket) -> RocketPyRocket: + """ + Get a rocketpy rocket object. + + Returns: + RocketPyRocket + """ + + rocketpy_rocket = RocketPyRocket( radius=rocket.radius, mass=rocket.mass, inertia=rocket.inertia, - power_off_drag=f"lib/data/{rocket_option.value.lower()}/powerOffDragCurve.csv", - power_on_drag=f"lib/data/{rocket_option.value.lower()}/powerOnDragCurve.csv", + power_off_drag=f"lib/data/{rocket.rocket_option.value.lower()}/powerOffDragCurve.csv", + power_on_drag=f"lib/data/{rocket.rocket_option.value.lower()}/powerOnDragCurve.csv", center_of_mass_without_motor=rocket.center_of_mass_without_motor, coordinate_system_orientation=rocket.coordinate_system_orientation, ) @@ -64,393 +96,235 @@ def __init__(self, rocket: Rocket, rocket_option, motor_kind): angular_position=rocket.rail_buttons.angular_position, ) rocketpy_rocket.add_motor( - MotorController(rocket.motor, motor_kind).rocketpy_motor, + MotorController.get_rocketpy_motor(rocket.motor), rocket.motor_position, ) # NoseCone - nose = self.NoseConeController(rocket.nose).rocketpy_nose + nose = cls.get_rocketpy_nose(rocket.nose) rocketpy_rocket.aerodynamic_surfaces.add(nose, nose.position) rocketpy_rocket.evaluate_static_margin() # FinSet # TODO: re-write this to match overall fins not only TrapezoidalFins - finset = self.TrapezoidalFinsController(rocket.fins).rocketpy_finset + # Maybe a strategy with different factory methods? + finset = cls.get_rocketpy_finset(rocket.fins) rocketpy_rocket.aerodynamic_surfaces.add(finset, finset.position) rocketpy_rocket.evaluate_static_margin() # Tail - tail = self.TailController(rocket.tail).rocketpy_tail + tail = cls.get_rocketpy_tail(rocket.tail) rocketpy_rocket.aerodynamic_surfaces.add(tail, tail.position) rocketpy_rocket.evaluate_static_margin() # Parachutes for p in range(len(rocket.parachutes)): parachute_trigger = rocket.parachutes[p].triggers[0] - if self.ParachuteController.check_trigger(parachute_trigger): + if cls.check_parachute_trigger(parachute_trigger): rocket.parachutes[p].triggers[0] = compile( parachute_trigger, "", "eval" ) - parachute = self.ParachuteController( - rocket.parachutes, p - ).rocketpy_parachute + parachute = cls.get_rocketpy_parachute(rocket.parachutes, p) rocketpy_rocket.parachutes.append(parachute) else: print("Parachute trigger not valid. Skipping parachute.") continue - self.rocket_option = rocket_option # tracks rocket option state - self.rocketpy_rocket = rocketpy_rocket - self.rocket = rocket - - class NoseConeController: - """ - Controller for the NoseCone model. - - Init Attributes: - nose (models.NoseCone): NoseCone model object. - - Enables: - - Create a rocketpy.AeroSurface.NoseCone object from a NoseCone model object. - """ - - def __init__(self, nose: NoseCone): - rocketpy_nose = RocketpyNoseCone( - length=nose.length, - kind=nose.kind, - base_radius=nose.base_radius, - rocket_radius=nose.rocket_radius, - ) - rocketpy_nose.position = nose.position - self.rocketpy_nose = rocketpy_nose - self.nose = nose - - class TrapezoidalFinsController: - """ - Controller for the TrapezoidalFins model. - - Init Attributes: - trapezoidal_fins (models.TrapezoidalFins): TrapezoidalFins model object. - - Enables: - - Create a rocketpy.AeroSurface.TrapezoidalFins object from a TrapezoidalFins model object. - """ - - def __init__(self, trapezoidal_fins: TrapezoidalFins): - rocketpy_finset = RocketpyTrapezoidalFins( - n=trapezoidal_fins.n, - root_chord=trapezoidal_fins.root_chord, - tip_chord=trapezoidal_fins.tip_chord, - span=trapezoidal_fins.span, - cant_angle=trapezoidal_fins.cant_angle, - rocket_radius=trapezoidal_fins.radius, - airfoil=trapezoidal_fins.airfoil, - ) - rocketpy_finset.position = trapezoidal_fins.position - self.rocketpy_finset = rocketpy_finset - self.trapezoidal_fins = trapezoidal_fins - - class TailController: - """ - Controller for the Tail model. - - Init Attributes: - tail (models.Tail): Tail model object. - - Enables: - - Create a rocketpy.AeroSurface.Tail object from a Tail model object. - """ - - def __init__(self, tail: Tail): - rocketpy_tail = RocketpyTail( - top_radius=tail.top_radius, - bottom_radius=tail.bottom_radius, - length=tail.length, - rocket_radius=tail.radius, - ) - rocketpy_tail.position = tail.position - self.rocketpy_tail = rocketpy_tail - self.tail = tail - - class ParachuteController: - """ - Controller for the Parachute model. - - Init Attributes: - parachute (models.Parachute): Parachute model object. - - Enables: - - Create a RocketpyParachute.Parachute object from a Parachute model object. - """ - - def __init__(self, parachute: Parachute, p: int): - rocketpy_parachute = RocketpyParachute( - name=parachute[p].name[0], - cd_s=parachute[p].cd_s[0], - trigger=eval(parachute[p].triggers[0]), - sampling_rate=parachute[p].sampling_rate[0], - lag=parachute[p].lag[0], - noise=parachute[p].noise[0], - ) - self.rocketpy_parachute = rocketpy_parachute - self.parachute = parachute - - @staticmethod - def check_trigger(expression: str) -> bool: - """ - Check if the trigger expression is valid. - - Args: - expression (str): Trigger expression. - - Returns: - bool: True if the expression is valid, False otherwise. - """ - - # Parsing the expression into an AST - try: - parsed_expression = ast.parse(expression, mode="eval") - except SyntaxError: - print("Invalid syntax.") - return False - - # Constant case (supported after beta v1) - if isinstance(parsed_expression.body, ast.Constant): - return True - # Name case (supported after beta v1) - if ( - isinstance(parsed_expression.body, ast.Name) - and parsed_expression.body.id == "apogee" - ): - global apogee - apogee = "apogee" - return True - - # Validating the expression structure - if not isinstance(parsed_expression.body, ast.Lambda): - print("Invalid expression structure (not a Lambda).") - return False - - lambda_node = parsed_expression.body - if len(lambda_node.args.args) != 3: - print("Invalid expression structure (invalid arity).") - return False - - if not isinstance(lambda_node.body, ast.Compare): - try: - for operand in lambda_node.body.values: - if not isinstance(operand, ast.Compare): - print( - "Invalid expression structure (not a Compare)." - ) - return False - except AttributeError: - print("Invalid expression structure (not a Compare).") - return False - - # Restricting access to functions or attributes - for node in ast.walk(lambda_node): - if isinstance(node, ast.Call): - print( - "Calling functions is not allowed in the expression." - ) - return False - if isinstance(node, ast.Attribute): - print( - "Accessing attributes is not allowed in the expression." - ) - return False - return True + return rocketpy_rocket async def create_rocket(self) -> "Union[RocketCreated, HTTPException]": """ Create a rocket in the database. Returns: - RocketCreated: Rocket id. + views.RocketCreated """ - rocket = RocketRepository(rocket=self.rocket) - successfully_created_rocket = await rocket.create_rocket( - rocket_option=self.rocket_option - ) - if not successfully_created_rocket: + try: + created_rocket = await RocketRepository( + rocket=self.rocket + ).create_rocket( + rocket_option=self.rocket_option, motor_kind=self.motor_kind + ) + except Exception as e: + exc_str = parse_error(e) + logger.error(f"controllers.rocket.create_rocket: {exc_str}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to create rocket.", + detail=f"Failed to create rocket: {exc_str}", + ) from e + else: + return RocketCreated(rocket_id=str(created_rocket.rocket_id)) + finally: + logger.info( + f"Call to controllers.rocket.create_rocket completed; params: Rocket {hash(self.rocket)}" ) - return RocketCreated(rocket_id=str(rocket.rocket_id)) - @staticmethod - async def get_rocket(rocket_id: int) -> "Union[Rocket, HTTPException]": + async def get_rocket_by_id( + rocket_id: str, + ) -> "Union[Rocket, HTTPException]": """ Get a rocket from the database. Args: - rocket_id (int): Rocket id. + rocket_id: str Returns: - rocket model object + models.Rocket Raises: HTTP 404 Not Found: If the rocket is not found in the database. """ - successfully_read_rocket = await RocketRepository( - rocket_id=rocket_id - ).get_rocket() - if not successfully_read_rocket: + try: + read_rocket = await RocketRepository( + rocket_id=rocket_id + ).get_rocket() + except Exception as e: + exc_str = parse_error(e) + logger.error(f"controllers.rocket.get_rocket_by_id: {exc_str}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to read rocket: {e}", + ) from e + else: + if read_rocket: + return read_rocket raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail="Rocket not found.", + detail="Rocket not found", + ) + finally: + logger.info( + f"Call to controllers.rocket.get_rocket_by_id completed; params: RocketID {rocket_id}" ) - return successfully_read_rocket - - @staticmethod - async def get_rocketpy_rocket( - rocket_id: int, + @classmethod + async def get_rocketpy_rocket_as_jsonpickle( + cls, rocket_id: str ) -> "Union[RocketPickle, HTTPException]": """ - Get a rocketpy rocket object encoded as jsonpickle string from the database. + Get a rocketpy.Rocket object as jsonpickle string. Args: - rocket_id (int): rocket id. + rocket_id: str Returns: - str: jsonpickle string of the rocketpy rocket. + views.RocketPickle Raises: HTTP 404 Not Found: If the rocket is not found in the database. """ - successfully_read_rocket = await RocketRepository( - rocket_id=rocket_id - ).get_rocket() - if not successfully_read_rocket: + try: + read_rocket = await cls.get_rocket_by_id(rocket_id) + except HTTPException as e: + raise e from e + except Exception as e: + exc_str = parse_error(e) + logger.error( + f"controllers.rocket.get_rocketpy_rocket_as_jsonpickle: {exc_str}" + ) raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Rocket not found.", + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to read rocket: {e}", + ) from e + else: + rocketpy_rocket = cls.get_rocketpy_rocket(read_rocket) + return RocketPickle( + jsonpickle_rocketpy_rocket=jsonpickle.encode(rocketpy_rocket) ) - - successfully_read_rocketpy_rocket = RocketController( - rocket=successfully_read_rocket, - rocket_option=RocketOptions( - successfully_read_rocket._rocket_option - ), - motor_kind=MotorKinds(successfully_read_rocket.motor._motor_kind), - ).rocketpy_rocket - - return RocketPickle( - jsonpickle_rocketpy_rocket=jsonpickle.encode( - successfully_read_rocketpy_rocket + finally: + logger.info( + f"Call to controllers.rocket.get_rocketpy_rocket_as_jsonpickle completed; params: RocketID {rocket_id}" ) - ) async def update_rocket( - self, rocket_id: int + self, rocket_id: str ) -> "Union[RocketUpdated, HTTPException]": """ Update a rocket in the database. Args: - rocket_id (int): rocket id. + rocket_id: str Returns: - RocketUpdated: rocket id and message. + views.RocketUpdated Raises: HTTP 404 Not Found: If the rocket is not found in the database. """ - successfully_read_rocket = await RocketRepository( - rocket_id=rocket_id - ).get_rocket() - if not successfully_read_rocket: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Rocket not found.", - ) - - successfully_updated_rocket = await RocketRepository( - rocket=self.rocket, rocket_id=rocket_id - ).update_rocket(rocket_option=self.rocket_option) - if not successfully_updated_rocket: + try: + await RocketController.get_rocket_by_id(rocket_id) + updated_rocket = await RocketRepository( + rocket=self.rocket, rocket_id=rocket_id + ).update_rocket(rocket_option=self.rocket_option) + except HTTPException as e: + raise e from e + except Exception as e: + exc_str = parse_error(e) + logger.error(f"controllers.rocket.update_rocket: {exc_str}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to update rocket.", + detail=f"Failed to update rocket: {e}", + ) from e + else: + return RocketUpdated(new_rocket_id=str(updated_rocket.rocket_id)) + finally: + logger.info( + f"Call to controllers.rocket.update_rocket completed; params: RocketID {rocket_id}" ) - return RocketUpdated(new_rocket_id=str(successfully_updated_rocket)) - @staticmethod async def delete_rocket( - rocket_id: int, + rocket_id: str, ) -> "Union[RocketDeleted, HTTPException]": """ Delete a rocket from the database. Args: - rocket_id (int): Rocket id. + rocket_id: str Returns: - RocketDeleted: Rocket id and message. + views.RocketDeleted Raises: HTTP 404 Not Found: If the rocket is not found in the database. """ - successfully_read_rocket = await RocketRepository( - rocket_id=rocket_id - ).get_rocket() - if not successfully_read_rocket: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Rocket not found.", - ) - - successfully_deleted_rocket = await RocketRepository( - rocket_id=rocket_id - ).delete_rocket() - if not successfully_deleted_rocket: + try: + await RocketRepository(rocket_id=rocket_id).delete_rocket() + except Exception as e: + exc_str = parse_error(e) + logger.error(f"controllers.rocket.delete_rocket: {exc_str}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to delete rocket.", + detail=f"Failed to delete rocket: {e}", + ) from e + else: + return RocketDeleted(deleted_rocket_id=str(rocket_id)) + finally: + logger.info( + f"Call to controllers.rocket.delete_rocket completed; params: RocketID {rocket_id}" ) - return RocketDeleted(deleted_rocket_id=str(rocket_id)) - - @staticmethod + @classmethod async def simulate( - rocket_id: int, + cls, + rocket_id: str, ) -> "Union[RocketSummary, HTTPException]": """ Simulate a rocket rocket. Args: - rocket_id (int): Rocket id. + rocket_id: str Returns: - Rocket summary view. + views.RocketSummary Raises: HTTP 404 Not Found: If the rocket does not exist in the database. """ - successfully_read_rocket = await RocketRepository( - rocket_id=rocket_id - ).get_rocket() - if not successfully_read_rocket: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Rocket not found.", - ) - try: - rocket = RocketController( - rocket=successfully_read_rocket, - rocket_option=RocketOptions( - successfully_read_rocket._rocket_option - ), - motor_kind=MotorKinds( - successfully_read_rocket.motor._motor_kind - ), - ).rocketpy_rocket + read_rocket = await cls.get_rocket_by_id(rocket_id) + rocket = await cls.get_rocketpy_rocket(read_rocket) _inertia_details = InertiaDetails( rocket_mass_without_propellant="Rocket Mass: {:.3f} kg (No Propellant)".format( @@ -590,9 +464,156 @@ async def simulate( ) rocket_summary = RocketSummary(rocket_data=_rocket_data) - return rocket_summary + except HTTPException as e: + raise e from e except Exception as e: + exc_str = parse_error(e) + logger.error(f"controllers.rocket.simulate: {exc_str}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to simulate rocket: {e}", ) from e + else: + return rocket_summary + finally: + logger.info( + f"Call to controllers.rocket.simulate completed; params: RocketID {rocket_id}" + ) + + @staticmethod + def get_rocketpy_nose(nose: NoseCone) -> RocketPyNoseCone: + """ + Get a rocketpy nose cone object. + + Returns: + RocketPyNoseCone + """ + + rocketpy_nose = RocketPyNoseCone( + length=nose.length, + kind=nose.kind, + base_radius=nose.base_radius, + rocket_radius=nose.rocket_radius, + ) + rocketpy_nose.position = nose.position + return rocketpy_nose + + @staticmethod + def get_rocketpy_finset( + trapezoidal_fins: TrapezoidalFins, + ) -> RocketPyTrapezoidalFins: + """ + Get a rocketpy finset object. + + Returns: + RocketPyTrapezoidalFins + """ + rocketpy_finset = RocketPyTrapezoidalFins( + n=trapezoidal_fins.n, + root_chord=trapezoidal_fins.root_chord, + tip_chord=trapezoidal_fins.tip_chord, + span=trapezoidal_fins.span, + cant_angle=trapezoidal_fins.cant_angle, + rocket_radius=trapezoidal_fins.radius, + airfoil=trapezoidal_fins.airfoil, + ) + rocketpy_finset.position = trapezoidal_fins.position + return rocketpy_finset + + @staticmethod + def get_rocketpy_tail(tail: Tail) -> RocketPyTail: + """ + Get a rocketpy tail object. + + Returns: + RocketPyTail + """ + rocketpy_tail = RocketPyTail( + top_radius=tail.top_radius, + bottom_radius=tail.bottom_radius, + length=tail.length, + rocket_radius=tail.radius, + ) + rocketpy_tail.position = tail.position + return rocketpy_tail + + @staticmethod + def get_rocketpy_parachute( + parachute: Parachute, p: int + ) -> RocketPyParachute: + """ + Get a rocketpy parachute object. + + Returns: + RocketPyParachute + """ + rocketpy_parachute = RocketPyParachute( + name=parachute[p].name[0], + cd_s=parachute[p].cd_s[0], + trigger=eval(parachute[p].triggers[0]), + sampling_rate=parachute[p].sampling_rate[0], + lag=parachute[p].lag[0], + noise=parachute[p].noise[0], + ) + return rocketpy_parachute + + @staticmethod + def check_parachute_trigger(expression: str) -> bool: + """ + Check if the trigger expression is valid. + + Args: + expression: str + + Returns: + bool: True if the expression is valid, False otherwise. + """ + + # Parsing the expression into an AST + try: + parsed_expression = ast.parse(expression, mode="eval") + except SyntaxError: + print("Invalid syntax.") + return False + + # Constant case (supported after beta v1) + if isinstance(parsed_expression.body, ast.Constant): + return True + # Name case (supported after beta v1) + if ( + isinstance(parsed_expression.body, ast.Name) + and parsed_expression.body.id == "apogee" + ): + global apogee + apogee = "apogee" + return True + + # Validating the expression structure + if not isinstance(parsed_expression.body, ast.Lambda): + print("Invalid expression structure (not a Lambda).") + return False + + lambda_node = parsed_expression.body + if len(lambda_node.args.args) != 3: + print("Invalid expression structure (invalid arity).") + return False + + if not isinstance(lambda_node.body, ast.Compare): + try: + for operand in lambda_node.body.values: + if not isinstance(operand, ast.Compare): + print("Invalid expression structure (not a Compare).") + return False + except AttributeError: + print("Invalid expression structure (not a Compare).") + return False + + # Restricting access to functions or attributes + for node in ast.walk(lambda_node): + if isinstance(node, ast.Call): + print("Calling functions is not allowed in the expression.") + return False + if isinstance(node, ast.Attribute): + print("Accessing attributes is not allowed in the expression.") + return False + return True