diff --git a/src/isar_exr/api/energy_robotics_api.py b/src/isar_exr/api/energy_robotics_api.py index fd25f3d..2951033 100644 --- a/src/isar_exr/api/energy_robotics_api.py +++ b/src/isar_exr/api/energy_robotics_api.py @@ -19,6 +19,7 @@ from isar_exr.api.models.enums import AwakeStatus from isar_exr.api.models.models import ( AddPointOfInterestInput, + BatteryStatusType, Pose3DStampedInput, UpsertPointOfInterestInput, ) @@ -502,6 +503,45 @@ def is_robot_awake(self, exr_robot_id: str) -> bool: success: bool = status in [AwakeStatus.Awake] return success + def get_battery_level(self, exr_robot_id: str) -> float: + params: dict = {"robotID": exr_robot_id} + + variable_definitions_graphql: DSLVariableDefinitions = DSLVariableDefinitions() + + check_battery_query: DSLQuery = DSLQuery( + self.schema.Query.currentRobotStatus.args( + robotID=variable_definitions_graphql.robotID + ).select( + self.schema.RobotStatusType.isConnected, + self.schema.RobotStatusType.batteryStatus.select( + self.schema.BatteryStatusType.percentage + ), + ) + ) + + check_battery_query.variable_definitions = variable_definitions_graphql + + try: + result: Dict[str, Any] = self.client.query( + dsl_gql(check_battery_query), params + ) + except Exception: + message: str = "Could not check robot battery level" + self.logger.error(message) + raise RobotMissionStatusException( + error_description=message, + ) + + if not result["currentRobotStatus"]["isConnected"]: + raise RobotMissionStatusException( + error_description="Robot is not connected", + ) + + battery_level: float = result["currentRobotStatus"]["batteryStatus"][ + "percentage" + ] + return battery_level + def create_mission_definition( self, site_id: str, mission_name: str, robot_id: str ) -> str: diff --git a/src/isar_exr/api/models/enums.py b/src/isar_exr/api/models/enums.py index f8583ba..ea93616 100644 --- a/src/isar_exr/api/models/enums.py +++ b/src/isar_exr/api/models/enums.py @@ -6,3 +6,15 @@ class AwakeStatus(str, Enum): Asleep: str = "ASLEEP" WakingUp: str = "WAKING_UP" GoingToSleep: str = "GOING_TO_SLEEP" + + +class ChargingState(str, Enum): + Discharging: str = "DISCHARGING" + Charging: str = "CHARGING" + Charged: str = "CHARGED" + + +class ChargerType(str, Enum): + NotConnected: str = "NOT_CONNECTED" + WiredCharger: str = "WIRED_CHARGER" + WirelessCharger: str = "WIRELESS_CHARGER" diff --git a/src/isar_exr/api/models/models.py b/src/isar_exr/api/models/models.py index 2f15b8f..65d55ca 100644 --- a/src/isar_exr/api/models/models.py +++ b/src/isar_exr/api/models/models.py @@ -1,5 +1,6 @@ from enum import Enum from typing import Optional +from isar_exr.api.models.enums import ChargerType, ChargingState from pydantic import BaseModel, Field @@ -91,3 +92,10 @@ class UpsertPointOfInterestInput(BaseModel): pose: Pose3DStampedInput producer: PointOfInterestProducerInput inspectionParameters: dict + + +class BatteryStatusType(BaseModel): + percentage: float + chargingState: ChargingState + chargerType: ChargerType + chargingCurrent: float diff --git a/src/isar_exr/robotinterface.py b/src/isar_exr/robotinterface.py index ba722fe..1eca7f9 100644 --- a/src/isar_exr/robotinterface.py +++ b/src/isar_exr/robotinterface.py @@ -167,7 +167,7 @@ def mission_status(self) -> MissionStatus: # This is a temporary solution until we have mission status by mission id return MissionStatus.Successful except Exception: - message: str = "Could not get status of running mission" + message: str = "Could not get status of running mission\n" self.logger.error(message) raise RobotMissionStatusException( error_description=message, @@ -185,7 +185,7 @@ def stop(self) -> None: try: self.api.pause_current_mission(self.exr_robot_id) except Exception: - message: str = "Could not stop the running mission" + message: str = "Could not stop the running mission\n" self.logger.error(message) raise RobotCommunicationException( error_description=message, @@ -198,7 +198,7 @@ def initialize(self, params: InitializeParams) -> None: try: self.api.wake_up_robot(self.exr_robot_id) except Exception: - message: str = "Could not initialize robot" + message: str = "Could not initialize robot\n" self.logger.error(message) raise RobotInitializeException( error_description=message, @@ -242,6 +242,7 @@ def get_telemetry_publishers( return publisher_threads def robot_status(self) -> RobotStatus: + # TODO: check if robot is running a task, or check if it is awake? return RobotStatus.Available def _get_pose_telemetry(self, isar_id: str, robot_name: str) -> str: @@ -253,10 +254,10 @@ def _get_pose_telemetry(self, isar_id: str, robot_name: str) -> str: ) return json.dumps(pose_payload, cls=EnhancedJSONEncoder) - @staticmethod - def _get_battery_telemetry(isar_id: str, robot_name: str) -> str: + def _get_battery_telemetry(self, isar_id: str, robot_name: str) -> str: + battery_level = self.api.get_battery_level(settings.ROBOT_EXR_ID) battery_payload: TelemetryBatteryPayload = TelemetryBatteryPayload( - battery_level=55, + battery_level=battery_level, isar_id=isar_id, robot_name=robot_name, timestamp=datetime.datetime.now(),