Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianPugh committed Aug 24, 2023
1 parent b77a6ae commit 8b2c292
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 97 deletions.
243 changes: 146 additions & 97 deletions pyocd/probe/raspberrypi_probe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
# pyOCD debugger
# Copyright (c) 2023 Brian Pugh
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import (Callable, Collection, Optional, overload, Sequence, Set, TYPE_CHECKING, Tuple, Union)
import re
from typing import (Callable, Collection, Optional, overload, Sequence, Set, TYPE_CHECKING, Tuple, Union, List)
from pathlib import Path
from time import sleep
import contextlib
from ..core.plugin import Plugin
from ..core.options import OptionInfo
from .debug_probe import DebugProbe
Expand All @@ -14,25 +34,85 @@

LOG = logging.getLogger(__name__)


def is_raspberry_pi():
with contextlib.suppress():
with open('/proc/cpuinfo', 'r') as f:
for line in f:
if line.startswith('Hardware') and ('BCM' in line):
return True
return False


class SpiWrapper:
def __init__(self, bus: int, device: int):
from spidev import SpiDev

self.bus = bus
self.device = device
self._spi = SpiDev()
self.is_open = False

def open(self):
self._spi.open(self.bus, self.device)
self.is_open = True

def close(self):
self._spi.close()
self.is_close = False

def __enter__(self):
self.open()

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def __str__(self):
return f"/dev/spidev{self.bus}.{self.device}"

@classmethod
def from_str(cls, s):
match = re.match(r'/dev/spidev(\d+)\.(\d+)', s)
if not match:
raise ValueError
bus, device = int(match.group(1)), int(match.group(2))
return cls(bus, device)

@classmethod
def discover_all(cls) -> List["SpiWrapper"]:
return [cls.from_str(str(port)) for port in Path("/dev").glob("spidev*.*")]

def set_clock(self, frequency: float):
self._spi.max_speed_hz = int(frequency)



class RaspberryPiProbe(DebugProbe):
NRESET_GPIO_OPTION = 'raspberrypiprobe.gpio.nreset'
DIO_GPIO_OPTION = 'raspberrypiprobe.gpio.dio'
CLK_GPIO_OPTION = 'raspberrypiprobe.gpio.clk'

def __init__(self):
super(self).__init__()
def __init__(self, spi: SpiWrapper):
import RPi.GPIO as GPIO

super().__init__()
GPIO.setmode(GPIO.BCM)

self._spi = spi
self._is_connected = False
self._reset = False # Record reset_asserted state

@classmethod
def get_all_connected_probes(
cls,
unique_id: Optional[str] = None,
is_explicit: bool = False
) -> Sequence["DebugProbe"]:

raise NotImplementedError
return [cls(x) for x in SpiWrapper.discover_all()]

@classmethod
def get_probe_with_id(cls, unique_id: str, is_explicit: bool = False) -> Optional["DebugProbe"]:
raise NotImplementedError
return cls(SpiWrapper.from_str(unique_id))

@property
def vendor_name(self):
Expand All @@ -48,69 +128,56 @@ def supported_wire_protocols(self):

@property
def unique_id(self) -> str:
"""@brief The unique ID of this device.
This property will be valid before open() is called. This value can be passed to
get_probe_with_id().
"""
return "GPIO"
return str(self._spi)

@property
def wire_protocol(self) -> Optional[DebugProbe.Protocol]:
"""@brief Currently selected wire protocol.
If the probe is not open and connected, i.e., open() and connect() have not been called,
then this property will be None. If a value other than None is returned, then the probe
has been connected successfully.
"""
raise NotImplementedError()
return DebugProbe.Protocol.SWD if self._is_connected else None

@property
def is_open(self) -> bool:
"""@brief Whether the probe is currently open.
To open the probe, call the open() method.
"""
raise NotImplementedError()
return self._spi.is_open

@property
def capabilities(self) -> Set[DebugProbe.Capability]:
"""@brief A set of DebugProbe.Capability enums indicating the probe's features.
return {DebugProbe.Capability.SWJ_SEQUENCE, DebugProbe.Capability.SWD_SEQUENCE}

This value should not be trusted until after the probe is opened.
"""
raise NotImplementedError()
def open(self) -> None:
self._spi.open()

@property
def associated_board_info(self) -> Optional["BoardInfo"]:
"""@brief Info about the board associated with this probe, if known."""
return None
def close(self) -> None:
self._spi.close()

def get_accessible_pins(self, group: DebugProbe.PinGroup) -> Tuple[int, int]:
"""@brief Return masks of pins accessible via the .read_pins()/.write_pins() methods.
def connect(self, protocol: Optional[Protocol] = None) -> None:

Check warning on line 151 in pyocd/probe/raspberrypi_probe.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 3.8)

F821 undefined name 'Protocol'

Check warning on line 151 in pyocd/probe/raspberrypi_probe.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 3.9)

F821 undefined name 'Protocol'

Check warning on line 151 in pyocd/probe/raspberrypi_probe.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 3.10)

F821 undefined name 'Protocol'

Check warning on line 151 in pyocd/probe/raspberrypi_probe.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 3.11)

F821 undefined name 'Protocol'
"""@brief Initialize DAP IO pins for JTAG or SWD"""
import RPi.GPIO as GPIO

This method is only expected to be implemented if Capability.PIN_ACCESS is present.
if (protocol is None) or (protocol == DebugProbe.Protocol.DEFAULT):
protocol = DebugProbe.Protocol.SWD

@return Tuple of pin masks for (0) readable, (1) writable pins. See DebugProbe.Pin for mask
values for those pins that have constants.
"""
raise NotImplementedError()
# Validate selected protocol.
if protocol != DebugProbe.Protocol.SWD:
raise ValueError("unsupported wire protocol %s" % protocol)

def open(self) -> None:
"""@brief Open the USB interface to the probe for sending commands."""
raise NotImplementedError()
self._is_connected = True

def close(self) -> None:
"""@brief Close the probe's USB interface."""
raise NotImplementedError()
GPIO.setup(self.session.options.get(self.NRESET_GPIO_OPTION), GPIO.OUT)

# Subscribe to option change events
self.session.options.subscribe(self._change_options, [self.DIO_GPIO_OPTION, self.CLK_GPIO_OPTION])

def connect(self, protocol: Optional[Protocol] = None) -> None:
"""@brief Initialize DAP IO pins for JTAG or SWD"""
raise NotImplementedError()

def disconnect(self) -> None:
"""@brief Deinitialize the DAP I/O pins"""
raise NotImplementedError()
self._is_connected = False

def swj_sequence(self, length: int, bits: int) -> None:
"""@brief Transfer some number of bits on SWDIO/TMS.
@param self
@param length Number of bits to transfer. Must be less than or equal to 256.
@param bits Integer of the bit values to send on SWDIO/TMS. The LSB is transmitted first.
"""
pass

def swd_sequence(self, sequences: Sequence[Union[Tuple[int], Tuple[int, int]]]) -> Tuple[int, Sequence[bytes]]:
"""@brief Send a sequences of bits on the SWDIO signal.
Expand All @@ -130,63 +197,27 @@ def swd_sequence(self, sequences: Sequence[Union[Tuple[int], Tuple[int, int]]])
raise NotImplementedError()

def set_clock(self, frequency: float) -> None:
"""@brief Set the frequency for JTAG and SWD in Hz.
This function is safe to call before connect is called.
"""
raise NotImplementedError()
self._spi.set_clock(frequency)

def reset(self) -> None:
"""@brief Perform a hardware reset of the target."""
raise NotImplementedError()
self.assert_reset(True)
sleep(self.session.options.get('reset.hold_time'))
self.assert_reset(False)
sleep(self.session.options.get('reset.post_delay'))

def assert_reset(self, asserted: bool) -> None:
"""@brief Assert or de-assert target's nRESET signal.
import RPi.GPIO as GPIO

Because nRESET is negative logic and usually open drain, passing True will drive it low, and
passing False will stop driving so nRESET will be pulled up.
"""
raise NotImplementedError()
GPIO.output(self.session.options.get(self.NRESET_GPIO_OPTION), False)
self._reset = asserted

def is_reset_asserted(self) -> bool:
"""@brief Returns True if nRESET is asserted or False if de-asserted.
If the debug probe cannot actively read the reset signal, the value returned will be the
last value passed to assert_reset().
"""
raise NotImplementedError()

def read_pins(self, group: DebugProbe.PinGroup, mask: int) -> int:
"""@brief Read values of selected debug probe pins.
See DebugProbe.ProtocolPin for mask values for the DebugProbe.PinGroup.PROTOCOL_PINS group.
This method is only expected to be implemented if Capability.PIN_ACCESS is present.
@param self
@param group Select the pin group to read.
@param mask Bit mask indicating which pins will be read. The return value will contain only
bits set in this mask.
@return Bit mask with the current value of selected pins at each pin's relevant bit position.
"""
raise NotImplementedError()

def write_pins(self, group: DebugProbe.PinGroup, mask: int, value: int) -> None:
"""@brief Set values of selected debug probe pins.
See DebugProbe.ProtocolPin for mask values for the DebugProbe.PinGroup.PROTOCOL_PINS group.
Note that input-only pins such as TDO are not writable with most debug probes.
This method is only expected to be implemented if Capability.PIN_ACCESS is present.
@param self
@param group Select the pin group to read.
@param mask Bit mask indicating which pins will be written.
@param value Mask containing the bit value of to written for selected pins at each pin's
relevant bit position..
"""
raise NotImplementedError()

return self._reset

def read_dp(self, addr: int, now: bool = True) -> Union[int, Callable[[], int]]:
raise NotImplementedError
Expand Down Expand Up @@ -230,11 +261,24 @@ def swo_read(self) -> bytearray:
"""
raise NotImplementedError()

def _change_options(self, notification):
import RPi.GPIO as GPIO
if notification.event == self.NRESET_GPIO_OPTION:
GPIO.setup(self.session.options.get(self.NRESET_GPIO_OPTION), GPIO.OUT)
elif notification.event == self.DIO_GPIO_OPTION:
raise NotImplementedError
elif notification.event == self.CLK_GPIO_OPTION:
raise NotImplementedError



class RaspberryPiProbePlugin(Plugin):
"""@brief Plugin class for Raspberry Pi GPIO probe."""

def should_load(self) -> bool:
"""@brief Whether the plugin should be loaded."""
return is_raspberry_pi()

def load(self):
return RaspberryPiProbe

Expand All @@ -250,14 +294,19 @@ def description(self):
def options(self):
"""@brief Returns picoprobe options."""
return [
OptionInfo(RaspberryPiProbe.NRESET_GPIO_OPTION,
int,
23,
"GPIO number (not physical pin) for SWCLK."
),
OptionInfo(RaspberryPiProbe.DIO_GPIO_OPTION,
int,
18,
24,
"GPIO number (not physical pin) for SWDIO."
),
OptionInfo(RaspberryPiProbe.DIO_GPIO_OPTION,
int,
22,
25,
"GPIO number (not physical pin) for SWCLK."
),
]
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pyocd.probe =
picoprobe = pyocd.probe.picoprobe:PicoprobePlugin
remote = pyocd.probe.tcp_client_probe:TCPClientProbePlugin
stlink = pyocd.probe.stlink_probe:StlinkProbePlugin
raspberrypi = pyocd.probe.raspberryp_probe:RaspberryPiProbePlugin

pyocd.rtos =
argon = pyocd.rtos.argon:ArgonPlugin
Expand Down

0 comments on commit 8b2c292

Please sign in to comment.