Skip to content

Commit

Permalink
Core: services: ping: improve type safety on PingDeviceDescriptor
Browse files Browse the repository at this point in the history
  • Loading branch information
Williangalvani committed Aug 9, 2022
1 parent 5fbf4ad commit 3e9da38
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
8 changes: 7 additions & 1 deletion core/services/ping/pingdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ def detect_highest_baud(self) -> Baudrate:
failure_threshold = 0.1 # allow up to 10% failure rate
attempts = 10 # try up to 10 times per baudrate
max_failures = attempts * failure_threshold

last_valid_baud = Baudrate.b115200

if self.ping.port is None:
raise InvalidDeviceDescriptor("PingDeviceDescriptor has no useable port")

for baud in Baudrate:
# Ping1D hangs with a baudrate bigger than 3M, going to ignore it for now
if baud > 3000000:
Expand All @@ -49,6 +52,9 @@ def detect_highest_baud(self) -> Baudrate:

async def start(self) -> None:
"""Starts the driver"""
if self.ping.port is None:
raise InvalidDeviceDescriptor("PingDeviceDescriptor has no useable port")

baud = self.detect_highest_baud()
# Do a ping connection to set the baudrate
PingDevice().connect_serial(self.ping.port.device, baud)
Expand Down
8 changes: 7 additions & 1 deletion core/services/ping/pingutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from loguru import logger
from serial.tools.list_ports_linux import SysFS

from typedefs import InvalidDeviceDescriptor


class PingType(IntEnum):
UNKNOWN = 0
Expand Down Expand Up @@ -38,10 +40,14 @@ class PingDeviceDescriptor:
firmware_version_major: int
firmware_version_minor: int
firmware_version_patch: int
port: SysFS
port: Optional[SysFS]
ethernet_discovery_info: Optional[str]
driver: Optional["PingDriver"] # type: ignore

def __post_init__(self):
if self.port is None and self.notethernet_discovery_info is None:
raise InvalidDeviceDescriptor("PingDeviceDescriptor needs either port or ethernet_info")

def get_hw_or_eth_info(self) -> str:
if self.port:
return str(self.port.device_path)
Expand Down
4 changes: 4 additions & 0 deletions core/services/ping/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from pingutils import PingDeviceDescriptor


class InvalidDeviceDescriptor(RuntimeError):
"""PingDeviceDescripttor is invalid."""


class DriverStatus(BaseModel):
udp_port: Optional[int]
mavlink_driver_enabled: bool
Expand Down

0 comments on commit 3e9da38

Please sign in to comment.