Skip to content

Commit

Permalink
fix: changed logic for flash port
Browse files Browse the repository at this point in the history
  • Loading branch information
horw committed Jul 31, 2024
1 parent a1ce3ac commit 1fb0a32
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pytest-embedded-idf/pytest_embedded_idf/dut.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def setup_jtag(self):

def flash_via_jtag(self):
if not self.openocd:
logging.warning("no openocd instance created. can't flash via openocd `program_esp`")
logging.debug("no openocd instance created. can't flash via openocd `program_esp`")
return

if self.app.is_loadable_elf:
Expand Down
14 changes: 9 additions & 5 deletions pytest-embedded-serial-esp/pytest_embedded_serial_esp/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import subprocess
import warnings
from typing import Optional
from typing import List, Optional
from warnings import warn

import esptool
Expand Down Expand Up @@ -62,14 +62,16 @@ def __init__(
skip_autoflash: bool = False,
erase_all: bool = False,
meta: Optional[Meta] = None,
ports_to_occupy: List[str] = (),
**kwargs,
) -> None:
self._meta = meta

esptool_target = beta_target or target or 'auto'
if port is None:
available_ports = esptool.get_port_list()
ports = list(set(available_ports) - set(self.occupied_ports.keys()))
if port is None or port.endswith('*'):
port_filter = port.strip('*') if port else ''
available_ports = [_p for _p in esptool.get_port_list() if port_filter in _p]
ports = list(set(available_ports) - set(self.occupied_ports.keys()) - set(ports_to_occupy))

# sort to make /dev/ttyS* ports before /dev/ttyUSB* ports
# esptool will reverse the list
Expand Down Expand Up @@ -125,7 +127,9 @@ def __init__(
self.esptool_baud = esptool_baud
self.esp_flash_force = esp_flash_force

super().__init__(msg_queue=msg_queue, port=self.esp._port, baud=baud, meta=meta, **kwargs)
super().__init__(
msg_queue=msg_queue, port=self.esp._port, baud=baud, meta=meta, ports_to_occupy=ports_to_occupy, **kwargs
)

def _post_init(self):
if self._meta:
Expand Down
29 changes: 22 additions & 7 deletions pytest-embedded-serial/pytest_embedded_serial/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import queue
import threading
import time
from typing import Any, ClassVar, Dict, Optional
from typing import Any, ClassVar, Dict, List, Optional

import serial as pyserial
from pytest_embedded.log import MessageQueue
Expand Down Expand Up @@ -47,13 +47,16 @@ def __init__(
port_location: Optional[str] = None,
baud: int = DEFAULT_BAUDRATE,
meta: Optional[Meta] = None,
stop_after_init: bool = False,
ports_to_occupy: List[str] = (),
**kwargs,
):
self._q = msg_queue
self._meta = meta
self._redirect_thread: _SerialRedirectThread = None # type: ignore

self.baud = baud
self.ports_to_occupy = ports_to_occupy if ports_to_occupy else []

if isinstance(port, pyserial.SerialBase):
self.proc = port
Expand Down Expand Up @@ -86,11 +89,15 @@ def __init__(
port_config.update(**kwargs)
self.proc = pyserial.serial_for_url(self.port, **port_config)

self.ports_to_occupy.append(self.port)
self._post_init()
self._start()
self._finalize_init()

self.start_redirect_thread()
self._finalize_init()
if not stop_after_init:
self.start_redirect_thread()
else:
self.close()

def start_redirect_thread(self) -> None:
if self._redirect_thread and self._redirect_thread.is_alive():
Expand All @@ -116,14 +123,22 @@ def _start(self):
pass

def _finalize_init(self):
self.occupied_ports[self.port] = None
logging.debug(f'occupied {self.port}')
occupied_ports = []
for port in self.ports_to_occupy:
if port not in self.occupied_ports:
self.occupied_ports[port] = None
occupied_ports.append(port)
logging.debug(f'occupied {port}')
else:
logging.warning(f'port {port} is already occupied')
self.ports_to_occupy = occupied_ports

def close(self):
self.stop_redirect_thread()
self.proc.close()
self.occupied_ports.pop(self.port, None)
logging.debug(f'released {self.port}')
for port in self.ports_to_occupy:
self.occupied_ports.pop(port, None)
logging.debug(f'released {port}')

@contextlib.contextmanager
def disable_redirect_thread(self) -> bool:
Expand Down
23 changes: 23 additions & 0 deletions pytest-embedded/pytest_embedded/dut_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import subprocess
import sys
import time
import typing as t
from collections import defaultdict
from pathlib import Path
Expand Down Expand Up @@ -127,6 +128,7 @@ def _fixture_classes_and_options_fn(
target,
beta_target,
baud,
flash_port,
skip_autoflash,
erase_all,
esptool_baud,
Expand Down Expand Up @@ -159,6 +161,7 @@ def _fixture_classes_and_options_fn(
pexpect_proc,
msg_queue,
_meta,
**kwargs,
) -> ClassCliOptions:
classes: t.Dict[str, type] = {}
mixins: t.Dict[str, t.List[type]] = defaultdict(list)
Expand Down Expand Up @@ -211,6 +214,7 @@ def _fixture_classes_and_options_fn(
'baud': int(baud or EspSerial.DEFAULT_BAUDRATE),
'esptool_baud': int(os.getenv('ESPBAUD') or esptool_baud or EspSerial.ESPTOOL_DEFAULT_BAUDRATE),
'esp_flash_force': esp_flash_force,
'flash_port': flash_port,
'skip_autoflash': skip_autoflash,
'erase_all': erase_all,
'meta': _meta,
Expand Down Expand Up @@ -408,6 +412,22 @@ def serial_gn(_fixture_classes_and_options, msg_queue, app) -> t.Optional[t.Unio
kwargs = _fixture_classes_and_options.kwargs['serial']
if 'app' in kwargs and kwargs['app'] is None:
kwargs['app'] = app

if kwargs.get('flash_port'):
operation_port = kwargs.pop('port', None)
if operation_port is None:
raise SystemExit('If the flash port was set up, the port should also be set up.')

flash_port = kwargs.pop('flash_port')
kwargs['stop_after_init'] = True
kwargs['port'] = flash_port
flash_serial = cls(**_drop_none_kwargs(kwargs))
time.sleep(3) # time for device restart
kwargs['stop_after_init'] = False
kwargs['port'] = operation_port
kwargs['skip_autoflash'] = True
kwargs['ports_to_occupy'] = [flash_serial.port]

return cls(**_drop_none_kwargs(kwargs))


Expand Down Expand Up @@ -586,6 +606,7 @@ def create(
target: t.Optional[str] = None,
beta_target: t.Optional[str] = None,
baud: t.Optional[int] = None,
flash_port: t.Optional[str] = None,
skip_autoflash: t.Optional[bool] = None,
erase_all: t.Optional[bool] = None,
esptool_baud: t.Optional[int] = None,
Expand Down Expand Up @@ -633,6 +654,7 @@ def create(
target: Target configuration.
beta_target: Beta target configuration.
baud: Baud rate.
flash_port: Port used for flashing the app.
skip_autoflash: Skip autoflash flag.
erase_all: Erase all flag.
esptool_baud: ESP tool baud rate.
Expand Down Expand Up @@ -696,6 +718,7 @@ def create(
'target': target,
'beta_target': beta_target,
'baud': baud,
'flash_port': flash_port,
'skip_autoflash': skip_autoflash,
'erase_all': erase_all,
'esptool_baud': esptool_baud,
Expand Down
13 changes: 13 additions & 0 deletions pytest-embedded/pytest_embedded/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ def pytest_addoption(parser):
'y/yes/true for True and n/no/false for False. '
'(Default: False, parametrization not supported, `|` will be escaped to `-`)',
)
esp_group.addoption(
'--flash-port',
help='serial port for flashing. Only set this value when the flashing port is different from the serial port.'
'(Default: None)',
)
esp_group.addoption(
'--skip-autoflash',
help='y/yes/true for True and n/no/false for False. Set to True to disable auto flash. (Default: False)',
Expand Down Expand Up @@ -786,6 +791,13 @@ def beta_target(request: FixtureRequest) -> t.Optional[str]:
return _request_param_or_config_option_or_default(request, 'beta_target', None)


@pytest.fixture
@multi_dut_argument
def flash_port(request: FixtureRequest) -> t.Optional[str]:
"""Enable parametrization for the same cli option"""
return _request_param_or_config_option_or_default(request, 'flash_port', None)


@pytest.fixture
@multi_dut_argument
def skip_autoflash(request: FixtureRequest) -> t.Optional[bool]:
Expand Down Expand Up @@ -1008,6 +1020,7 @@ def parametrize_fixtures(
target,
beta_target,
baud,
flash_port,
skip_autoflash,
erase_all,
esptool_baud,
Expand Down

0 comments on commit 1fb0a32

Please sign in to comment.