From 8b2c292559b82b06c71ed45cf4b979808777bfd7 Mon Sep 17 00:00:00 2001 From: Brian Pugh Date: Wed, 23 Aug 2023 22:01:27 -0700 Subject: [PATCH] wip --- pyocd/probe/raspberrypi_probe.py | 243 +++++++++++++++++++------------ setup.cfg | 1 + 2 files changed, 147 insertions(+), 97 deletions(-) diff --git a/pyocd/probe/raspberrypi_probe.py b/pyocd/probe/raspberrypi_probe.py index 51051f3d5..2dfc5f267 100644 --- a/pyocd/probe/raspberrypi_probe.py +++ b/pyocd/probe/raspberrypi_probe.py @@ -1,5 +1,25 @@ +# pyOCD debugger +# Copyright (c) 2023 Brian Pugh +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging -from typing import (Callable, Collection, Optional, overload, Sequence, Set, TYPE_CHECKING, Tuple, Union) +import re +from typing import (Callable, Collection, Optional, overload, Sequence, Set, TYPE_CHECKING, Tuple, Union, List) +from pathlib import Path +from time import sleep +import contextlib from ..core.plugin import Plugin from ..core.options import OptionInfo from .debug_probe import DebugProbe @@ -14,12 +34,73 @@ LOG = logging.getLogger(__name__) + +def is_raspberry_pi(): + with contextlib.suppress(): + with open('/proc/cpuinfo', 'r') as f: + for line in f: + if line.startswith('Hardware') and ('BCM' in line): + return True + return False + + +class SpiWrapper: + def __init__(self, bus: int, device: int): + from spidev import SpiDev + + self.bus = bus + self.device = device + self._spi = SpiDev() + self.is_open = False + + def open(self): + self._spi.open(self.bus, self.device) + self.is_open = True + + def close(self): + self._spi.close() + self.is_close = False + + def __enter__(self): + self.open() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def __str__(self): + return f"/dev/spidev{self.bus}.{self.device}" + + @classmethod + def from_str(cls, s): + match = re.match(r'/dev/spidev(\d+)\.(\d+)', s) + if not match: + raise ValueError + bus, device = int(match.group(1)), int(match.group(2)) + return cls(bus, device) + + @classmethod + def discover_all(cls) -> List["SpiWrapper"]: + return [cls.from_str(str(port)) for port in Path("/dev").glob("spidev*.*")] + + def set_clock(self, frequency: float): + self._spi.max_speed_hz = int(frequency) + + + class RaspberryPiProbe(DebugProbe): + NRESET_GPIO_OPTION = 'raspberrypiprobe.gpio.nreset' DIO_GPIO_OPTION = 'raspberrypiprobe.gpio.dio' CLK_GPIO_OPTION = 'raspberrypiprobe.gpio.clk' - def __init__(self): - super(self).__init__() + def __init__(self, spi: SpiWrapper): + import RPi.GPIO as GPIO + + super().__init__() + GPIO.setmode(GPIO.BCM) + + self._spi = spi + self._is_connected = False + self._reset = False # Record reset_asserted state @classmethod def get_all_connected_probes( @@ -27,12 +108,11 @@ def get_all_connected_probes( unique_id: Optional[str] = None, is_explicit: bool = False ) -> Sequence["DebugProbe"]: - - raise NotImplementedError + return [cls(x) for x in SpiWrapper.discover_all()] @classmethod def get_probe_with_id(cls, unique_id: str, is_explicit: bool = False) -> Optional["DebugProbe"]: - raise NotImplementedError + return cls(SpiWrapper.from_str(unique_id)) @property def vendor_name(self): @@ -48,69 +128,56 @@ def supported_wire_protocols(self): @property def unique_id(self) -> str: - """@brief The unique ID of this device. - - This property will be valid before open() is called. This value can be passed to - get_probe_with_id(). - """ - return "GPIO" + return str(self._spi) @property def wire_protocol(self) -> Optional[DebugProbe.Protocol]: - """@brief Currently selected wire protocol. - - If the probe is not open and connected, i.e., open() and connect() have not been called, - then this property will be None. If a value other than None is returned, then the probe - has been connected successfully. - """ - raise NotImplementedError() + return DebugProbe.Protocol.SWD if self._is_connected else None @property def is_open(self) -> bool: - """@brief Whether the probe is currently open. - - To open the probe, call the open() method. - """ - raise NotImplementedError() + return self._spi.is_open @property def capabilities(self) -> Set[DebugProbe.Capability]: - """@brief A set of DebugProbe.Capability enums indicating the probe's features. + return {DebugProbe.Capability.SWJ_SEQUENCE, DebugProbe.Capability.SWD_SEQUENCE} - This value should not be trusted until after the probe is opened. - """ - raise NotImplementedError() + def open(self) -> None: + self._spi.open() - @property - def associated_board_info(self) -> Optional["BoardInfo"]: - """@brief Info about the board associated with this probe, if known.""" - return None + def close(self) -> None: + self._spi.close() - def get_accessible_pins(self, group: DebugProbe.PinGroup) -> Tuple[int, int]: - """@brief Return masks of pins accessible via the .read_pins()/.write_pins() methods. + def connect(self, protocol: Optional[Protocol] = None) -> None: + """@brief Initialize DAP IO pins for JTAG or SWD""" + import RPi.GPIO as GPIO - This method is only expected to be implemented if Capability.PIN_ACCESS is present. + if (protocol is None) or (protocol == DebugProbe.Protocol.DEFAULT): + protocol = DebugProbe.Protocol.SWD - @return Tuple of pin masks for (0) readable, (1) writable pins. See DebugProbe.Pin for mask - values for those pins that have constants. - """ - raise NotImplementedError() + # Validate selected protocol. + if protocol != DebugProbe.Protocol.SWD: + raise ValueError("unsupported wire protocol %s" % protocol) - def open(self) -> None: - """@brief Open the USB interface to the probe for sending commands.""" - raise NotImplementedError() + self._is_connected = True - def close(self) -> None: - """@brief Close the probe's USB interface.""" - raise NotImplementedError() + GPIO.setup(self.session.options.get(self.NRESET_GPIO_OPTION), GPIO.OUT) + + # Subscribe to option change events + self.session.options.subscribe(self._change_options, [self.DIO_GPIO_OPTION, self.CLK_GPIO_OPTION]) - def connect(self, protocol: Optional[Protocol] = None) -> None: - """@brief Initialize DAP IO pins for JTAG or SWD""" - raise NotImplementedError() def disconnect(self) -> None: - """@brief Deinitialize the DAP I/O pins""" - raise NotImplementedError() + self._is_connected = False + + def swj_sequence(self, length: int, bits: int) -> None: + """@brief Transfer some number of bits on SWDIO/TMS. + + @param self + @param length Number of bits to transfer. Must be less than or equal to 256. + @param bits Integer of the bit values to send on SWDIO/TMS. The LSB is transmitted first. + """ + pass def swd_sequence(self, sequences: Sequence[Union[Tuple[int], Tuple[int, int]]]) -> Tuple[int, Sequence[bytes]]: """@brief Send a sequences of bits on the SWDIO signal. @@ -130,23 +197,19 @@ def swd_sequence(self, sequences: Sequence[Union[Tuple[int], Tuple[int, int]]]) raise NotImplementedError() def set_clock(self, frequency: float) -> None: - """@brief Set the frequency for JTAG and SWD in Hz. - - This function is safe to call before connect is called. - """ - raise NotImplementedError() + self._spi.set_clock(frequency) def reset(self) -> None: - """@brief Perform a hardware reset of the target.""" - raise NotImplementedError() + self.assert_reset(True) + sleep(self.session.options.get('reset.hold_time')) + self.assert_reset(False) + sleep(self.session.options.get('reset.post_delay')) def assert_reset(self, asserted: bool) -> None: - """@brief Assert or de-assert target's nRESET signal. + import RPi.GPIO as GPIO - Because nRESET is negative logic and usually open drain, passing True will drive it low, and - passing False will stop driving so nRESET will be pulled up. - """ - raise NotImplementedError() + GPIO.output(self.session.options.get(self.NRESET_GPIO_OPTION), False) + self._reset = asserted def is_reset_asserted(self) -> bool: """@brief Returns True if nRESET is asserted or False if de-asserted. @@ -154,39 +217,7 @@ def is_reset_asserted(self) -> bool: If the debug probe cannot actively read the reset signal, the value returned will be the last value passed to assert_reset(). """ - raise NotImplementedError() - - def read_pins(self, group: DebugProbe.PinGroup, mask: int) -> int: - """@brief Read values of selected debug probe pins. - - See DebugProbe.ProtocolPin for mask values for the DebugProbe.PinGroup.PROTOCOL_PINS group. - - This method is only expected to be implemented if Capability.PIN_ACCESS is present. - - @param self - @param group Select the pin group to read. - @param mask Bit mask indicating which pins will be read. The return value will contain only - bits set in this mask. - @return Bit mask with the current value of selected pins at each pin's relevant bit position. - """ - raise NotImplementedError() - - def write_pins(self, group: DebugProbe.PinGroup, mask: int, value: int) -> None: - """@brief Set values of selected debug probe pins. - - See DebugProbe.ProtocolPin for mask values for the DebugProbe.PinGroup.PROTOCOL_PINS group. - Note that input-only pins such as TDO are not writable with most debug probes. - - This method is only expected to be implemented if Capability.PIN_ACCESS is present. - - @param self - @param group Select the pin group to read. - @param mask Bit mask indicating which pins will be written. - @param value Mask containing the bit value of to written for selected pins at each pin's - relevant bit position.. - """ - raise NotImplementedError() - + return self._reset def read_dp(self, addr: int, now: bool = True) -> Union[int, Callable[[], int]]: raise NotImplementedError @@ -230,11 +261,24 @@ def swo_read(self) -> bytearray: """ raise NotImplementedError() + def _change_options(self, notification): + import RPi.GPIO as GPIO + if notification.event == self.NRESET_GPIO_OPTION: + GPIO.setup(self.session.options.get(self.NRESET_GPIO_OPTION), GPIO.OUT) + elif notification.event == self.DIO_GPIO_OPTION: + raise NotImplementedError + elif notification.event == self.CLK_GPIO_OPTION: + raise NotImplementedError + class RaspberryPiProbePlugin(Plugin): """@brief Plugin class for Raspberry Pi GPIO probe.""" + def should_load(self) -> bool: + """@brief Whether the plugin should be loaded.""" + return is_raspberry_pi() + def load(self): return RaspberryPiProbe @@ -250,14 +294,19 @@ def description(self): def options(self): """@brief Returns picoprobe options.""" return [ + OptionInfo(RaspberryPiProbe.NRESET_GPIO_OPTION, + int, + 23, + "GPIO number (not physical pin) for SWCLK." + ), OptionInfo(RaspberryPiProbe.DIO_GPIO_OPTION, int, - 18, + 24, "GPIO number (not physical pin) for SWDIO." ), OptionInfo(RaspberryPiProbe.DIO_GPIO_OPTION, int, - 22, + 25, "GPIO number (not physical pin) for SWCLK." ), ] diff --git a/setup.cfg b/setup.cfg index d27543a50..82fa31f50 100644 --- a/setup.cfg +++ b/setup.cfg @@ -96,6 +96,7 @@ pyocd.probe = picoprobe = pyocd.probe.picoprobe:PicoprobePlugin remote = pyocd.probe.tcp_client_probe:TCPClientProbePlugin stlink = pyocd.probe.stlink_probe:StlinkProbePlugin + raspberrypi = pyocd.probe.raspberryp_probe:RaspberryPiProbePlugin pyocd.rtos = argon = pyocd.rtos.argon:ArgonPlugin