Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to more (all?) ardupilot serial boards #3025

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/services/ardupilot_manager/api/v1/routers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def get_firmware_vehicle_type() -> Any:
summary="Retrieve dictionary of available firmwares versions with their respective URL.",
)
async def get_available_firmwares(vehicle: Vehicle, board_name: Optional[str] = None) -> Any:
return autopilot.get_available_firmwares(vehicle, (await target_board(board_name)).platform)
return autopilot.get_available_firmwares(vehicle, (await target_board(board_name)))


@index_router_v1.post("/install_firmware_from_url", summary="Install firmware for given URL.")
Expand Down
8 changes: 4 additions & 4 deletions core/services/ardupilot_manager/autopilot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async def start_linux_board(self, board: LinuxFlightController) -> None:
)

firmware_path = self.firmware_manager.firmware_path(self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board)

# ArduPilot process will connect as a client on the UDP server created by the mavlink router
master_endpoint = Endpoint(
Expand Down Expand Up @@ -333,7 +333,7 @@ async def start_sitl(self) -> None:
self.current_sitl_frame = frame

firmware_path = self.firmware_manager.firmware_path(self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform)
self.firmware_manager.validate_firmware(firmware_path, self._current_board)

# ArduPilot SITL binary will bind TCP port 5760 (server) and the mavlink router will connect to it as a client
master_endpoint = Endpoint(
Expand Down Expand Up @@ -642,8 +642,8 @@ async def update_endpoints(self, endpoints_to_update: Set[Endpoint]) -> None:
self._save_current_endpoints()
await self.mavlink_manager.restart()

def get_available_firmwares(self, vehicle: Vehicle, platform: Platform) -> List[Firmware]:
return self.firmware_manager.get_available_firmwares(vehicle, platform)
def get_available_firmwares(self, vehicle: Vehicle, board: FlightController) -> List[Firmware]:
return self.firmware_manager.get_available_firmwares(vehicle, board)

def install_firmware_from_file(
self, firmware_path: pathlib.Path, board: FlightController, default_parameters: Optional[Parameters] = None
Expand Down
39 changes: 21 additions & 18 deletions core/services/ardupilot_manager/firmware/FirmwareDownload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
NoCandidate,
NoVersionAvailable,
)
from typedefs import FirmwareFormat, Platform, PlatformType, Vehicle
from typedefs import FirmwareFormat, FlightController, Platform, PlatformType, Vehicle

# TODO: This should be not necessary
# Disable SSL verification
Expand Down Expand Up @@ -123,21 +123,21 @@ def _find_version_item(self, **args: str) -> List[Dict[str, Any]]:
# Make sure that the item matches all args value
for item in self._manifest["firmware"]:
for key, value in args.items():
real_key = key.replace("_", "-")
if real_key not in item or item[real_key] != value:
real_key = key.replace("_", "-").lower()
if real_key not in item or item[real_key].lower() != value.lower():
break
else:
found_version_item.append(item)

return found_version_item

@temporary_cache(timeout_seconds=3600)
def get_available_versions(self, vehicle: Vehicle, platform: Platform) -> List[str]:
def get_available_versions(self, vehicle: Vehicle, board: FlightController) -> List[str]:
"""Get available firmware versions for the specific plataform and vehicle

Args:
vehicle (Vehicle): Desired vehicle.
platform (Platform): Desired platform.
board (FlightController): Desired Flight Controller.

Returns:
List[str]: List of available versions that match the specific desired configuration.
Expand All @@ -147,16 +147,18 @@ def get_available_versions(self, vehicle: Vehicle, platform: Platform) -> List[s
if not self._manifest_is_valid():
raise InvalidManifest("Manifest file is invalid. Cannot use it to find available versions.")

items = self._find_version_item(vehicletype=vehicle.value, platform=platform.value)
platform = board.platform.value if board.platform != Platform.GenericSerial else board.name
platform_type = board.platform.type if board.platform != Platform.GenericSerial else PlatformType.Serial
items = self._find_version_item(vehicletype=vehicle.value, platform=platform)

for item in items:
if item["format"] == FirmwareDownloader._supported_firmware_formats[platform.type]:
if item["format"] == FirmwareDownloader._supported_firmware_formats[platform_type]:
available_versions.append(item["mav-firmware-version-type"])

return available_versions

@temporary_cache(timeout_seconds=3600)
def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str = "") -> str:
def get_download_url(self, vehicle: Vehicle, board: FlightController, version: str = "") -> str:
"""Find a specific firmware URL from manifest that matches the arguments.

Args:
Expand All @@ -168,16 +170,16 @@ def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str =
Returns:
str: URL of valid firmware.
"""
versions = self.get_available_versions(vehicle, platform)
logger.debug(f"Got following versions for {vehicle} running {platform}: {versions}")
versions = self.get_available_versions(vehicle, board)
logger.debug(f"Got following versions for {vehicle} running {board}: {versions}")

if not versions:
raise NoVersionAvailable(f"Could not find available firmware versions for {platform}/{vehicle}.")
raise NoVersionAvailable(f"Could not find available firmware versions for {board}/{vehicle}.")

if version and version not in versions:
raise NoVersionAvailable(f"Version {version} was not found for {platform}/{vehicle}.")
raise NoVersionAvailable(f"Version {version} was not found for {board}/{vehicle}.")

firmware_format = FirmwareDownloader._supported_firmware_formats[platform.type]
firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.type]

# Autodetect the latest supported version.
# For .apj firmwares (e.g. Pixhawk), we use the latest STABLE version while for the others (e.g. SITL and
Expand All @@ -192,21 +194,22 @@ def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str =
if not newest_version or Version(newest_version) < Version(semver_version):
newest_version = semver_version
if not newest_version:
raise NoVersionAvailable(f"No firmware versions found for {platform}/{vehicle}.")
raise NoVersionAvailable(f"No firmware versions found for {board}/{vehicle}.")
version = f"STABLE-{newest_version}"
else:
version = "BETA"

items = self._find_version_item(
vehicletype=vehicle.value,
platform=platform.value,
platform=board.platform.value if board.platform == Platform.SITL else board.name,
mav_firmware_version_type=version,
format=firmware_format,
)

if len(items) == 0:
logger.error(f"Could not find any firmware for {vehicle=}, {board=}, {version=}, {firmware_format=}")
raise NoCandidate(
f"Found no candidate for configuration: {vehicle=}, {platform=}, {version=}, {firmware_format=}"
f"Found no candidate for configuration: {vehicle=}, {board=}, {version=}, {firmware_format=}"
)

if len(items) != 1:
Expand All @@ -216,7 +219,7 @@ def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str =
logger.debug(f"Downloading following firmware: {item}")
return str(item["url"])

def download(self, vehicle: Vehicle, platform: Platform, version: str = "") -> pathlib.Path:
def download(self, vehicle: Vehicle, board: FlightController, version: str = "") -> pathlib.Path:
"""Download a specific firmware that matches the arguments.

Args:
Expand All @@ -228,5 +231,5 @@ def download(self, vehicle: Vehicle, platform: Platform, version: str = "") -> p
Returns:
pathlib.Path: Temporary path for the firmware file.
"""
url = self.get_download_url(vehicle, platform, version)
url = self.get_download_url(vehicle, board, version)
return FirmwareDownloader._download(url)
Loading
Loading