Skip to content

Commit

Permalink
Merge pull request #298 from horw/fix/flash-port
Browse files Browse the repository at this point in the history
  • Loading branch information
hfudev authored Jul 31, 2024
2 parents 5069f98 + 1fb0a32 commit 6c18854
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 290 deletions.
121 changes: 0 additions & 121 deletions docs/concepts/serial.rst

This file was deleted.

2 changes: 1 addition & 1 deletion pytest-embedded-idf/pytest_embedded_idf/dut.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def setup_jtag(self):

def flash_via_jtag(self):
if not self.openocd:
logging.debug('no openocd instance created. NOT flashing via openocd `program_esp`')
logging.debug("no openocd instance created. can't flash via openocd `program_esp`")
return

if self.app.is_loadable_elf:
Expand Down
132 changes: 19 additions & 113 deletions pytest-embedded-idf/pytest_embedded_idf/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import logging
import os
import tempfile
import time
from typing import Optional, TextIO, Union

import esptool
from pytest_embedded.log import MessageQueue
from pytest_embedded_serial_esp.serial import EspSerial

from .app import IdfApp
Expand Down Expand Up @@ -46,56 +44,9 @@ def __init__(
**kwargs,
)

def _before_init_port(self, q: MessageQueue):
if not self.flash_port:
return

self.occupied_ports[self.flash_port] = None
logging.debug(f'occupied {self.flash_port}')

if self.erase_all:
self.skip_autoflash = False # do we really need it? may be a breaking change if removed

cmd = ['--port', self.flash_port, 'erase_flash']
if self._force_flag():
cmd.append('--force')

with contextlib.redirect_stdout(q):
esptool.main(cmd)

if self._meta:
self._meta.drop_port_app_cache(self.flash_port)

if self.skip_autoflash:
return

if self.erase_nvs:
_args, _kwargs = self._get_erase_nvs_cli_args(port=self.flash_port)
with contextlib.redirect_stdout(q):
esptool.main(_args, **_kwargs)

# this is required to wait for the reset happened after erase the nvs partition
time.sleep(1)

_args, _kwargs = self._get_flash_cli_args(port=self.flash_port)
with contextlib.redirect_stdout(q):
esptool.main(_args, **_kwargs)

# after flash caches
# here instead of ``_finalize_init`` to avoid occupying the port wrongly while multi-DUT test case
self._flashed_with_different_port = True
if self._meta:
self._meta.set_port_target_cache(self.flash_port, self.app.target)
self._meta.set_port_app_cache(self.flash_port, self.app)

# this is required to wait for the reset happened after flashing
time.sleep(1)

def _post_init(self):
if self._flashed_with_different_port:
pass
elif self.erase_all:
self.skip_autoflash = False # do we really need it? may be a breaking change if removed
if self.erase_all:
self.skip_autoflash = False
elif self._meta and self._meta.hit_port_app_cache(self.port, self.app):
if self.confirm_target_elf_sha256:
if self.is_target_flashed_same_elf():
Expand All @@ -115,13 +66,6 @@ def _post_init(self):
super()._post_init()

def _start(self):
if self._flashed_with_different_port:
if self._meta:
self._meta.set_port_app_cache(self.port, self.app)

super()._start()
return

if self.skip_autoflash:
logging.info('Skipping auto flash...')
super()._start()
Expand All @@ -131,13 +75,6 @@ def _start(self):
else:
self.flash()

def close(self):
if self._flashed_with_different_port:
self.occupied_ports.pop(self.flash_port, None)
logging.debug(f'released {self.flash_port}')

super().close()

def load_ram(self) -> None:
if not self.app.is_loadable_elf:
raise ValueError('elf should be loadable elf')
Expand Down Expand Up @@ -194,12 +131,11 @@ def erase_flash(self, force: bool = False):
else:
super().erase_flash()

def _get_flash_cli_args(
self,
*,
app: Optional[IdfApp] = None,
port: Optional[str] = None,
):
@EspSerial.use_esptool()
def flash(self, app: Optional[IdfApp] = None) -> None:
"""
Flash the `app.flash_files` to the dut
"""
if not app:
app = self.app

Expand All @@ -212,8 +148,6 @@ def _get_flash_cli_args(
return

_args = []
_kwargs = {}

for k, v in app.flash_args['extra_esptool_args'].items():
if isinstance(v, bool):
if k == 'stub':
Expand All @@ -232,6 +166,17 @@ def _get_flash_cli_args(
_args.extend(['--baud', os.getenv('ESPBAUD', '921600')])
_args.append('write_flash')

if self.erase_nvs:
esptool.main(
[
'erase_region',
str(app.partition_table['nvs']['offset']),
str(app.partition_table['nvs']['size']),
],
esp=self.esp,
)
self.esp.connect()

encrypt_files = []
flash_files = []
for file in app.flash_files:
Expand All @@ -250,46 +195,7 @@ def _get_flash_cli_args(

_args.extend([*app.flash_args['write_flash_args'], *self._force_flag(app)])

if port:
_args = ['--port', port, *_args]
else:
_kwargs.update({'esp': self.esp})

return _args, _kwargs

def _get_erase_nvs_cli_args(self, app: Optional[IdfApp] = None, port: Optional[str] = None):
if not app:
app = self.app

_args = [
'erase_region',
str(app.partition_table['nvs']['offset']),
str(app.partition_table['nvs']['size']),
]
_kwargs = {}

if port:
_args = ['--port', port, *_args]
else:
_kwargs.update({'esp': self.esp})

return _args, _kwargs

@EspSerial.use_esptool()
def flash(self, app: Optional[IdfApp] = None) -> None:
"""
Flash the app to the target device, or the app provided.
"""
if not app:
app = self.app

if self.erase_nvs:
_args, _kwargs = self._get_erase_nvs_cli_args(app=app)
esptool.main(_args, **_kwargs)
self.esp.connect()

_args, _kwargs = self._get_flash_cli_args(app=app)
esptool.main(_args, **_kwargs)
esptool.main(_args, esp=self.esp)

if self._meta:
self._meta.set_port_app_cache(self.port, app)
Expand Down
Loading

0 comments on commit 6c18854

Please sign in to comment.