Skip to content

Commit

Permalink
core: services: ping: find first free udp port for ping drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
Williangalvani committed Aug 9, 2022
1 parent 7ea227d commit 99b886c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
22 changes: 17 additions & 5 deletions core/services/ping/pingmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from ping1d_driver import Ping1DDriver
from ping360_driver import Ping360Driver
from pingdriver import PingDriver
from pingutils import PingDeviceDescriptor, PingType
from pingutils import PingDeviceDescriptor, PingType, udp_port_is_in_use


class PingManager:
def __init__(self) -> None:
self.drivers: Dict[PingDeviceDescriptor, PingDriver] = {}
self.ping1d_current_port: int = 9090
self.ping360_current_port: int = 9092
self.ping1d_base_port: int = 9090
self.ping360_base_port: int = 9092

def stop_driver_at_port(self, port: Serial) -> None:
"""Stops the driver instance running for port "port" """
Expand All @@ -27,15 +27,27 @@ async def register_ethernet_ping360(self, ping: PingDeviceDescriptor) -> None:
if ping not in self.drivers:
self.drivers[ping] = Ping360Driver(ping, -1)

def find_next_port(self, base_port: int, direction: int) -> int:
"""
Find the next unused udp port, starts on base_port and increments/decrements
accordingly to direction
"""
port = base_port
while udp_port_is_in_use(port):
port = port + direction
return port

async def launch_driver_instance(self, ping: PingDeviceDescriptor) -> None:
"""Launches a new driver instance for the PingDeviceDescriptor "ping" """
driver: PingDriver
if ping.ping_type == PingType.PING1D:
logging.info("Launching ping1d driver")
driver = Ping1DDriver(ping, self.ping1d_current_port)
port = self.find_next_port(self.ping1d_base_port, direction=-1)
driver = Ping1DDriver(ping, port)
elif ping.ping_type == PingType.PING360:
logging.info("Launching ping360 driver")
driver = Ping360Driver(ping, self.ping360_current_port)
port = self.find_next_port(self.ping360_base_port, direction=+1)
driver = Ping360Driver(ping, port)

self.drivers[ping] = driver
loop = asyncio.get_running_loop()
Expand Down
9 changes: 9 additions & 0 deletions core/services/ping/pingutils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import socket
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional

import psutil
from loguru import logger
from serial.tools.list_ports_linux import SysFS

Expand Down Expand Up @@ -53,3 +55,10 @@ def __str__(self) -> str:
ID: {self.device_id}
FW: v{self.firmware_version_major}.{self.firmware_version_minor}.{self.firmware_version_patch}
port: {self.get_hw_or_eth_info()}"""


def udp_port_is_in_use(port: int) -> bool:
for conn in psutil.net_connections():
if conn.laddr.port == port and conn.type == socket.SocketKind.SOCK_DGRAM:
return True
return False

0 comments on commit 99b886c

Please sign in to comment.