From 6ff0580c7860981ddaef48814779b3c137a0d3cb Mon Sep 17 00:00:00 2001 From: Willian Galvani Date: Tue, 11 Jun 2024 22:51:48 -0300 Subject: [PATCH] wip hw setup wip --- .../ardupilot_manager/ArduPilotManager.py | 1 + .../overlay_source/spi0-led.dts | 44 +++++ .../peripheral_configuration.py | 163 ++++++++++++++++++ .../linux/linux_boards.py | 3 + .../linux/navigator.py | 142 +++++++++++---- .../linux/overlay_loader.py | 146 ++++++++++++++++ core/services/install-services.sh | 1 + 7 files changed, 466 insertions(+), 34 deletions(-) create mode 100755 core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/overlay_source/spi0-led.dts create mode 100755 core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/peripheral_configuration.py create mode 100755 core/services/ardupilot_manager/flight_controller_detector/linux/overlay_loader.py diff --git a/core/services/ardupilot_manager/ArduPilotManager.py b/core/services/ardupilot_manager/ArduPilotManager.py index 957b808134..0f1b6e2ac8 100644 --- a/core/services/ardupilot_manager/ArduPilotManager.py +++ b/core/services/ardupilot_manager/ArduPilotManager.py @@ -178,6 +178,7 @@ def get_default_params_cmdline(self, platform: Platform) -> str: async def start_linux_board(self, board: LinuxFlightController) -> None: self._current_board = board + board.setup_board() if not self.firmware_manager.is_firmware_installed(self._current_board): if board.platform == Platform.Navigator: self.firmware_manager.install_firmware_from_file( diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/overlay_source/spi0-led.dts b/core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/overlay_source/spi0-led.dts new file mode 100755 index 0000000000..01ea9f6a8c --- /dev/null +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/overlay_source/spi0-led.dts @@ -0,0 +1,44 @@ +// This is a custom device tree overlay for the spi0 peripheral on the +// Raspberry Pi 4. It will configure only the spi0 mosi pin +// (The other spi0 pins will not be driven by the spi0 peripheral, +// and can be used for other functions). This is to be used with +// the Blue Robotics Navigator autopilot hat, where the RGB +// 'neopixel' led data pin is connected to the spi0 mosi pin on the +// Raspberry Pi 4. + +/dts-v1/; +/plugin/; + + +/ { + compatible = "brcm,bcm2835"; + + fragment@0 { + target = <&spi0_cs_pins>; + frag0: __overlay__ { + brcm,pins = <>; + }; + }; + + fragment@1 { + target = <&spi0>; + frag1: __overlay__ { + cs-gpios = <>; + status = "okay"; + }; + }; + + fragment@2 { + target = <&spidev1>; + __overlay__ { + status = "disabled"; + }; + }; + + fragment@3 { + target = <&spi0_pins>; + __overlay__ { + brcm,pins = <10>; + }; + }; +}; diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/peripheral_configuration.py b/core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/peripheral_configuration.py new file mode 100755 index 0000000000..c28b13f5cc --- /dev/null +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/hardware_setup/peripheral_configuration.py @@ -0,0 +1,163 @@ +import re +import shlex +import subprocess +import time +from dataclasses import dataclass +from typing import List, Optional + +all_dtparams = [ + "i2c_vc=on", + "i2c_arm_baudrate=1000000", + "spi=on", + "enable_uart=1", +] + + +other_overlays = [ + "dwc2 dr_mode=otg", +] +devices = { + "ADS1115": (0x48, 1), + "AK09915": (0x0C, 1), + "BME280": (0x76, 1), + "PCA9685": (0x40, 4), +} + + +class SetupError(Exception): + pass + + +@dataclass +class I2cDevice: + device: str + overlay: str + pins: dict[int, str] + + +@dataclass +class SpiDevice: + device: str + overlay: str + pins: dict[int, str] + + +@dataclass +class SerialDevice: + device: str + overlay: str + pins: dict[int, str] + + +@dataclass +class GpioSetup: + number: int + function: str + pull: str + value: str + + +i2c_module = "i2c-dev" +i2c_dtparams = [ + "i2c_vc=on", + "i2c_arm_baudrate=1000000", +] + +i2c_devices: List[I2cDevice] = [ + I2cDevice(device="i2c-6", overlay="i2c6 pins_22_23=true baudrate=400000", pins={22: "SDA6", 23: "SCL6"}), + I2cDevice(device="i2c-1", overlay="i2c1", pins={2: "SDA1", 3: "SCL1"}), + I2cDevice(device="i2c-4", overlay="i2c4 pins_6_7=true baudrate=400000", pins={6: "SDA4", 7: "SCL4"}), +] + +spi_devices: List[SpiDevice] = [ + SpiDevice(device="spidev1.0", overlay="spi1-3cs", pins={19: "SPI1_MISO", 20: "SPI1_MOSI", 21: "SPI1_SCLK"}), + SpiDevice( + device="spidev0.0", + overlay="spi0-led", + pins={ + 10: "SPI0_MOSI", + }, + ), +] + +gpios = [ + GpioSetup(number=11, function="OUT", pull="UP", value="HIGH"), + GpioSetup(number=24, function="OUT", pull="UP", value="HIGH"), + GpioSetup(number=25, function="OUT", pull="UP", value="HIGH"), + GpioSetup(number=37, function="OUT", pull="DOWN", value="LOW"), +] + + +def enable_i2c_module() -> None: + modules = subprocess.check_output("lsmod") + if "i2c_dev" in str(modules): + return + print(f"loading module {i2c_module}...") + output = subprocess.check_output(shlex.split(f"modprobe {i2c_module}")) + print(output) + + +def enable_spi_module() -> None: + modules = subprocess.check_output("lsmod") + if "spi_dev" in str(modules): + return + print(f"loading module {i2c_module}...") + output = subprocess.check_output(shlex.split(f"modprobe {i2c_module}")) + print(output) + + +@dataclass +class GpioState: + number: int + level: int + fsel: int + alt: Optional[int] + func: str + pull: str + + +def get_gpios_state() -> dict[int, GpioState]: + output = subprocess.check_output(["raspi-gpio", "get"]).decode("utf-8") + pattern = r"GPIO (?P\d+): level=(?P\d) fsel=(?P\d)(?: alt=(?P\d))? func=(?P[\w\d_]+) pull=(?P\w+)" + + # Using findall to extract all matches + matches = re.finditer(pattern, output) + gpio_states = {} + # Print each match + for match in matches: + gpio_states[int(match.group("gpio"))] = GpioState( + number=int(match.group("gpio")), + level=int(match.group("level")), + fsel=int(match.group("fsel")), + alt=int(match.group("alt")) if match.group("alt") else None, + func=match.group("func"), + pull=match.group("pull"), + ) + return gpio_states + + +def load_overlay(overlay: str) -> None: + output = subprocess.check_output(shlex.split(f"dtoverlay {overlay}")).decode("utf-8") + print(output) + + +enable_i2c_module() +enable_spi_module() + +states = get_gpios_state() +all_devices: List[I2cDevice | SpiDevice] = [*i2c_devices, *spi_devices] +for device in all_devices: + needs_reload = False + for gpio, function in device.pins.items(): + if states[gpio].func != function: + print(f"GPIO {gpio} is not configured as {function}, instad it is {states[gpio].func}") + print(f"{device.overlay} needs to be loaded") + needs_reload = True + if needs_reload: + load_overlay(device.overlay) + time.sleep(2) + new_state = get_gpios_state() + for gpio, function in device.pins.items(): + if states[gpio].func != function: + print(f"GPIO {gpio} is STILL not configured as {function}, instad it is {states[gpio].func}") + raise SetupError("Failed to configure device") diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py b/core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py index 9907597123..c5da0693fe 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/linux_boards.py @@ -22,6 +22,9 @@ def get_serial_cmdlines(self) -> str: def get_serials(self) -> List[Serial]: raise NotImplementedError + def setup_board(self) -> None: + raise NotImplementedError + def check_for_i2c_device(self, bus_number: int, address: int) -> bool: try: bus = SMBus(bus_number) diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py b/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py index 0ebf9da4e4..623d6d6931 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py @@ -1,15 +1,24 @@ -from typing import List +from typing import Any, Dict, List -from commonwealth.utils.commands import load_file +from commonwealth.utils.commands import load_file, locate_file, run_command, save_file +from loguru import logger +from RPi import GPIO from flight_controller_detector.linux.linux_boards import LinuxFlightController +from flight_controller_detector.linux.overlay_loader import load_overlays_in_runtime from typedefs import Platform, Serial +GPIO.setmode(GPIO.BCM) + class Navigator(LinuxFlightController): name = "Navigator" manufacturer = "Blue Robotics" platform = Platform.Navigator + gpio_config: dict[int, Dict[str, Any]] = {} + all_dtparams: List[str] = [] + i2c_overlays: List[str] = [] + all_overlays: List[str] = [] def is_pi5(self) -> bool: with open("/proc/cpuinfo", "r", encoding="utf-8") as f: @@ -18,17 +27,18 @@ def is_pi5(self) -> bool: def detect(self) -> bool: return False - def get_serials(self) -> List[Serial]: - raise NotImplementedError + def setup_board(self) -> None: + self.build_led_overlay() + load_overlays_in_runtime(self.all_dtparams, self.all_overlays, ["i2c_dev"]) + for gpio, config in self.gpio_config.items(): + self.setup_gpio(gpio, config["direction"], config["value"]) + def setup_gpio(self, pin: int, direction: str, value: bool) -> None: + GPIO.setup(pin, direction) + GPIO.output(pin, value) -class NavigatorPi5(Navigator): - devices = { - "ADS1115": (0x48, 1), - "AK09915": (0x0C, 1), - "BME280": (0x76, 1), - "PCA9685": (0x40, 3), - } + def setup_board_for_detection(self) -> None: + load_overlays_in_runtime(self.all_dtparams, self.i2c_overlays, ["i2c_dev"]) def get_serials(self) -> List[Serial]: return [ @@ -38,42 +48,106 @@ def get_serials(self) -> List[Serial]: Serial(port="F", endpoint="/dev/ttyAMA4"), ] + def build_led_overlay(self) -> None: + temp_overlay_file_at_host = "/tmp/spi0-led.dts" + target_overlay_location_pi4 = "/boot/overlays/spi0-led.dtbo" + target_overlay_location_pi5 = "/boot/firmware/overlays/spi0-led.dtbo" + overlay_exists = locate_file([target_overlay_location_pi4, target_overlay_location_pi5]) + if overlay_exists: + logger.info(f"spi0-led overlay found at {overlay_exists}") + return + dts = load_file( + "/home/pi/services/ardupilot_manager/flight_controller_detector/linux/overlay_source/spi0-led.dts" + ) + save_file(temp_overlay_file_at_host, dts, "") + command = f"sudo dtc -@ -Hepapr -I dts -O dtb -o {target_overlay_location_pi4} {temp_overlay_file_at_host}" + run_command(command, False) + copy_command = ( + f"if [ -d /boot/firmware ]; then sudo cp {target_overlay_location_pi4} {target_overlay_location_pi5}; fi" + ) + run_command(copy_command, False) + + +class NavigatorPi5(Navigator): + all_dtparams = [ + "i2c_arm=on", + "i2c_arm_baudrate=1000000", + ] + # i2c overlays are required to detect the board + i2c_overlays = [ + # i2c1: ADS1115, AK09915, BME280 + "i2c1-pi5 baudrate=400000", + # i2c3: PCA + "i2c3-pi5 baudrate=400000", + ] + all_overlays = [ + # serial ports, checked individually + "uart0-pi5", # Navigator serial1 + "uart3-pi5", # Navigator serial4 + "uart4-pi5", # Navigator serial5 + "uart2-pi5", # Navigator serial3 + # i2c-6: bar30 and friends + "i2c-gpio i2c_gpio_sda=22 i2c_gpio_scl=23 bus=6 i2c_gpio_delay_us=0", + # i2c1: ADS1115, AK09915, BME280 + "i2c3-pi5 baudrate=400000", + # i2c3: PCA + "i2c3-pi5 baudrate=400000", + # SPI1: MMC5983 + "spi1-3cs", + # SPI0: LED + "spi0-led", + ] + devices = { + "ADS1115": (0x48, 1), + "AK09915": (0x0C, 1), + "BME280": (0x76, 1), + "PCA9685": (0x40, 3), + } + def detect(self) -> bool: if not self.is_pi5(): return False + self.setup_board_for_detection() return all(self.check_for_i2c_device(bus, address) for address, bus in self.devices.values()) class NavigatorPi4(Navigator): + all_dtparams = [ + "i2c_vc=on", + "i2c_arm_baudrate=1000000", + "spi=on", + "enable_uart=1", + ] + + # i2c overlays are required to detect the board + i2c_overlays = [ + # i2c1: ADS1115, AK09915, BME280 + "i2c1", + # i2c3: PCA + "i2c4 pins_6_7=true baudrate=1000000", + ] + all_overlays = [ + # serial ports, checked individually + # serial ports, checked individually + "uart4", + "uart5", + "i2c6 pins_22_23=true baudrate=400000", + "spi0-led", + "spi1-3cs", + "dwc2 dr_mode=otg", + ] devices = { "ADS1115": (0x48, 1), "AK09915": (0x0C, 1), "BME280": (0x76, 1), "PCA9685": (0x40, 4), } - - def get_serials(self) -> List[Serial]: - release = "Bullseye" - os_release = load_file("/etc/os-release") - if "bookworm" in os_release: - release = "Bookworm" - - match release: - case "Bullseye": - return [ - Serial(port="C", endpoint="/dev/ttyS0"), - Serial(port="B", endpoint="/dev/ttyAMA1"), - Serial(port="E", endpoint="/dev/ttyAMA2"), - Serial(port="F", endpoint="/dev/ttyAMA3"), - ] - case "Bookworm": - return [ - Serial(port="C", endpoint="/dev/ttyS0"), - Serial(port="B", endpoint="/dev/ttyAMA3"), - Serial(port="E", endpoint="/dev/ttyAMA4"), - Serial(port="F", endpoint="/dev/ttyAMA5"), - ] - raise RuntimeError("Unknown release, unable to map ports") + gpio_config = { + 11: {"direction": GPIO.OUT, "value": GPIO.HIGH}, + 24: {"direction": GPIO.OUT, "value": GPIO.HIGH}, + 25: {"direction": GPIO.OUT, "value": GPIO.HIGH}, + 37: {"direction": GPIO.OUT, "value": GPIO.LOW}, + } def detect(self) -> bool: if self.is_pi5(): diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/overlay_loader.py b/core/services/ardupilot_manager/flight_controller_detector/linux/overlay_loader.py new file mode 100755 index 0000000000..13a17d76ab --- /dev/null +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/overlay_loader.py @@ -0,0 +1,146 @@ +import re +from typing import List, Optional + +from commonwealth.utils.commands import run_command +from loguru import logger + + +class Overlay: + def __init__(self, name: str, args: List[str], index: Optional[int] = None): + self.name = name + self.args = args + self.index = index + + def __repr__(self) -> str: + joined_args = " ".join(self.args) + return f"{self.name} {joined_args}" + + def wrongly_set_in(self, overlays: List["Overlay"]) -> bool: + for loaded_overlay in overlays: + if loaded_overlay.name == self.name and loaded_overlay.args != self.args: + return True + return False + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Overlay): + return False + return self.name == other.name and self.args == other.args + + def reload(self) -> None: + command_result = run_command(f"sudo dtoverlay -r {self.name} && sudo dtoverlay {self}", False) + logger.info(command_result) + + def load(self) -> None: + command_result = run_command(f"sudo dtoverlay {self}", False) + logger.info(command_result) + + @staticmethod + def from_string(overlay: str) -> "Overlay": + match = re.match(r"^((?P\d+):\s+)?(?P(\S)+)(?:$|\s+(?P.*))", overlay) + if not match: + logger.error(f"no match for {overlay}") + raise ValueError(f"no match for {overlay}") + overlay_name = match.group("name") + overlay_args = match.group("args").split(" ") if match.group("args") else [] + index = int(match.group("index")) if match.group("index") else None + return Overlay(overlay_name, overlay_args, index=index) + + +class DtParam: + def __init__(self, arg: str, value: str): + self.arg = arg + self.value = value + + def __eq__(self, other: object) -> bool: + if not isinstance(other, DtParam): + return False + return self.arg == other.arg and self.value == other.value + + def wrongly_set_in(self, dt_params: List["DtParam"]) -> bool: + for loaded_dt_param in dt_params: + if loaded_dt_param.arg == self.arg and loaded_dt_param.value != self.value: + return True + return False + + def load(self) -> None: + command_result = run_command(f"sudo dtoverlay {self}", False) + logger.info(command_result) + + def reload(self) -> None: + command_result = run_command(f"sudo dtoverlay -r {self} && sudo dtoverlay {self}", False) + logger.info(command_result) + + def __repr__(self) -> str: + return f"{self.arg}={self.value}" + + +# pylint: disable=too-many-branches,too-many-locals +def load_overlays_in_runtime(dtparams: List[str], dtoverlays: List[str], modules: List[str]) -> bool: + """ + this parses the output of dtoverlay -l and checks if the required overlays are loaded + sample output: + Overlays (in load order): + 0: dtparam i2c_arm=on + 1: dtparam i2c_arm_baudrate=1000000 + 2: uart0-pi5 + 3: uart3-pi5 + 4: uart4-pi5 + 5: uart2-pi5 + 6: i2c-gpio i2c_gpio_sda=22 i2c_gpio_scl=23 bus=6 i2c_gpio_delay_us=0 + 7: i2c1-pi5 baudrate=400000 + 8: i2c3-pi5 baudrate=400000 + 9: spi1-3cs + 10: spi0-led + """ + + dtparams_to_load = [DtParam(*dtparam.split("=")) for dtparam in dtparams] + + overlays_to_load = [Overlay.from_string(overlay) for overlay in dtoverlays] + + loaded_dt_params: List[DtParam] = [] + loaded_overlays: List[Overlay] = [] + + dtoverlay_output = run_command("dtoverlay -l", check=False).stdout + for line in dtoverlay_output.splitlines(): + match = re.match(r"^(?P\d+):\s+(?P(\S)+)(?:$|\s+(?P.*))", line) + if not match: + continue + overlay_name = match.group("name") + overlay_args = match.group("args") + if "dtparam" in overlay_name: + # this is always a key=value pair + loaded_dt_params.append(DtParam(*overlay_args.split("=", maxsplit=1))) + else: + loaded_overlays.append(Overlay(overlay_name, overlay_args.split(" ") if overlay_args else [])) + + logger.info("Loaded dt params: ") + for dt_param in loaded_dt_params: + logger.info(dt_param) + + for dtparam in dtparams_to_load: + if dtparam in loaded_dt_params: + logger.info(f"dtparam {dtparam} is already loaded") + elif dtparam.wrongly_set_in(loaded_dt_params): + logger.info(f"Reloading dtparam {dtparam}") + command_result = run_command(f"sudo dtoverlay -r {dtparam.arg} && sudo dtoverlay {dtparam}", False) + logger.info(command_result) + + for module in modules: + command_result = run_command(f"sudo modprobe {module}", False) + # validate if i2c-dev is loaded using lsmod + if module not in run_command("lsmod", False, log_output=False).stdout: + logger.error(f"Failed to load {module}") + return False + logger.info(command_result) + + for overlay in overlays_to_load: + if overlay in loaded_overlays: + logger.info(f"Overlay {overlay} is already loaded") + elif overlay.wrongly_set_in(loaded_overlays): + logger.info(f"Overlay {overlay} has wrong parameters, reloading.") + overlay.reload() + else: + logger.info(f"Loading overlay {overlay}") + overlay.load() + + return False diff --git a/core/services/install-services.sh b/core/services/install-services.sh index 95c39f28a7..2f83775ab6 100755 --- a/core/services/install-services.sh +++ b/core/services/install-services.sh @@ -35,6 +35,7 @@ SERVICES=( # We need to install loguru, appdirs and pydantic since they may be used inside setup.py python -m pip install appdirs==1.4.4 loguru==0.5.3 pydantic==1.10.12 +python -m pip install --no-binary :all: RPi.GPIO==0.7.1 for SERVICE in "${SERVICES[@]}"; do echo "Installing service: $SERVICE"