diff --git a/hydrabus_framework/utils/hb_generic_cmd.py b/hydrabus_framework/utils/hb_generic_cmd.py index f9c1956..cfc0e50 100644 --- a/hydrabus_framework/utils/hb_generic_cmd.py +++ b/hydrabus_framework/utils/hb_generic_cmd.py @@ -47,34 +47,20 @@ def hb_wait_ubtn(serial_instance): """ # timeout=1 minute timeout = time.time() + 60 * 1 - while True: - if serial_instance.read(1) == 'B'.encode('utf-8'): - if serial_instance.read(3) == 'BIO'.encode('utf-8'): - # carriage return needed to reset interface - serial_instance.write(b'\x0D\x0A') - time.sleep(0.2) - serial_instance.read(serial_instance.in_waiting) + try: + while True: + if serial_instance.hydrabus.read(1) == 'B'.encode('utf-8'): + if serial_instance.hydrabus.read(3) == 'BIO'.encode('utf-8'): + # carriage return needed to reset interface + serial_instance.hydrabus.write(b'\x0D\x0A') + time.sleep(0.2) + serial_instance.read(serial_instance.hydrabus.in_waiting) + break + if time.time() > timeout: + Logger().handle("Wait UBTN timeout reached", Logger.ERROR) break - if time.time() > timeout: - Logger().handle("Wait UBTN timeout reached", Logger.ERROR) - break - - -def hb_connect_bbio(device, baudrate, timeout): - """ - Connect to the hydrabus device, and then, switch to bbio mode - :param device: String, hydrabus device path - :param baudrate: integer, baudrate speed to communicate with hydrabus - :param timeout: integer, read timeout value (sec) - :return: serial instance - """ - serial_instance = hb_connect(device, baudrate, timeout) - if isinstance(serial_instance, serial.Serial): - if hb_switch_bbio(serial_instance): - return serial_instance - else: - Logger().handle('Unable to switch hydrabus into bbio mode', Logger.ERROR) - return None + except KeyboardInterrupt: + pass def hb_connect(device, baudrate, timeout): diff --git a/hydrabus_framework/utils/protocols/__init__.py b/hydrabus_framework/utils/protocols/__init__.py deleted file mode 100644 index 5284146..0000000 --- a/hydrabus_framework/utils/protocols/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__import__("pkg_resources").declare_namespace(__name__) diff --git a/hydrabus_framework/utils/protocols/spi.py b/hydrabus_framework/utils/protocols/spi.py deleted file mode 100644 index 3f08b18..0000000 --- a/hydrabus_framework/utils/protocols/spi.py +++ /dev/null @@ -1,117 +0,0 @@ -from hydrabus_framework.utils.logger import Logger -from hydrabus_framework.utils.hb_generic_cmd import hb_reset, hb_switch_bbio - - -__author__ = "Jordan Ovrè " - - -def hb_switch_spi(serial_instance): - """ - Switch hydrabus to SPI binary mode - :param serial_instance: hydrabus serial instance - :return: Bool - """ - logger = Logger() - serial_instance.write(b'\x01') - if "SPI1".encode('utf-8') not in serial_instance.read(4): - logger.handle("Cannot enter SPI binary mode", Logger.ERROR) - return False - return True - - -def hb_configure_spi_peripheral(serial_instance, config=None): - """ - This function allows to select or unselect the SPI slave. - None = unselect, 1 = select - :param serial_instance: hydrabus serial instance - :param config: - :return: - """ - logger = Logger() - if config is None: - serial_instance.write(0b01000000) - if config == 1: - serial_instance.write(0b01000001) - else: - logger.handle("Invalid value SPI config (valid value is None for unselect or 1 for select)", Logger.ERROR) - - -def hb_configure_spi_port(serial_instance, polarity="low", phase="low", spi_device="SPI1"): - """ - Configure polarity, clock and spi device - :param serial_instance: hydrabus serial instance - :param polarity: clock polarity value (low or high) - :param phase: clock phase value (low or high) - :param spi_device: SPI device (SPI1 or SPI2) - :return: Bool - """ - base_cmd = 0b10000000 - logger = Logger() - valid_device = {'SPI1': 0b00000001, 'SPI2': 0b00000000} - valid_phase = {'low': 0b00000000, 'high': 0b00000010} - valid_polarity = {'low': 0b00000000, 'high': 0b00000100} - - try: - spi_device = valid_device.get(spi_device) - phase = valid_phase.get(phase) - polarity = valid_polarity.get(polarity) - if spi_device is None: - raise ValueError('Invalid SPI device selected (SPI1 or SP2)') - if phase is None: - raise ValueError('Invalid clock phase value (low or high)') - if polarity is None: - raise ValueError('Invalid clock polarity value (low or high)') - except ValueError as err: - logger.handle(err, Logger.ERROR) - return False - - config = base_cmd + spi_device + phase + polarity - serial_instance.write(bytes([config])) - if b'\x01' != serial_instance.read(1): - logger.handle("Cannot set SPI device settings, try again or reset hydrabus. ", Logger.ERROR) - return False - return True - - -def set_spi_speed(serial_instance, spi_speed, spi_device="SPI1"): - """ - Configure the spi speed - :param serial_instance: hydrabus serial instance - :param spi_speed: string speed - :param spi_device: the configured SPI device - :return: Bool - 0b01100xxx - """ - logger = Logger() - base_cmd = 0b01100000 - valid_spi1_speed = {'320KHZ': 0b000, '650KHZ': 0b001, '1.31MHZ': 0b010, '2.62MHZ': 0b011, - '5.25MHZ': 0b100, '10.5MHZ': 0b101, '21MHZ': 0b110, '42MHZ': 0b111} - valid_spi2_speed = {'160KHZ': 0b000, '320KHZ': 0b001, '650KHZ': 0b010, '1.31MHZ': 0b011, - '2.62MHZ': 0b100, '5.25MHZ': 0b101, '10.5MHZ': 0b110, '21MHZ': 0b111} - if spi_device.upper() == "SPI1": - for string_speed, bin_speed in valid_spi1_speed.items(): - if string_speed == spi_speed.upper(): - speed = base_cmd + bin_speed - serial_instance.write(bytes([speed])) - if b'\x01' not in serial_instance.read(1): - logger.handle("Cannot set SPI speed, try again or reset hydrabus.", Logger.ERROR) - return False - return True - else: - logger.handle("Invalid spi speed", Logger.ERROR) - return False - elif spi_device.upper() == "SPI2": - for string_speed, bin_speed in valid_spi2_speed.items(): - if string_speed == spi_speed.upper(): - speed = base_cmd + bin_speed - serial_instance.write(bytes([speed])) - if b'\x01' not in serial_instance.read(1): - logger.handle("Cannot set SPI speed, try again or reset hydrabus.", Logger.ERROR) - return False - return True - else: - logger.handle("Invalid spi speed", Logger.ERROR) - return False - else: - logger.handle("Invalid spi device", Logger.ERROR) - return False diff --git a/hydrabus_framework/utils/protocols/uart.py b/hydrabus_framework/utils/protocols/uart.py deleted file mode 100644 index 32ad4c7..0000000 --- a/hydrabus_framework/utils/protocols/uart.py +++ /dev/null @@ -1,53 +0,0 @@ -from hydrabus_framework.utils.logger import Logger -from hydrabus_framework.utils.hb_generic_cmd import hb_reset, hb_switch_bbio - - -__author__ = "Jordan Ovrè " - - -def hb_set_baudrate(serial_instance, baudrate): - """ - Set the desired baudrate value to communicate with the target device. - :param serial_instance: hydrabus serial instance - :param baudrate: dictionnary containing desired baudrate speed in hexadecimal 'hex' and decimal 'dec' value - :return: Bool - """ - logger = Logger() - hb_reset(serial_instance) - if hb_init_uart(serial_instance): - # Change baudrate speed - serial_instance.write(baudrate["hex"]) - if b'\x01' != serial_instance.read(1): - logger.handle("Failed to change baudrate", Logger.ERROR) - return False - logger.handle("Switching to baudrate: {}".format(baudrate["dec"]), Logger.SUCCESS) - return True - else: - logger.handle("Failed to init BBIO_UART mode", Logger.ERROR) - return False - - -def hb_switch_uart(serial_instance): - """ - Switch hydrabus to UART mode - :param serial_instance: hydrabus serial instance - :return: Bool - """ - serial_instance.write(b'\x03') - if "ART1".encode('utf-8') not in serial_instance.read(4): - Logger().handle("Cannot enter UART mode", Logger.ERROR) - return False - return True - - -def hb_init_uart(serial_instance): - """ - Enter hydrabus in BBIO and then in UART mode - :param serial_instance: - :return: Bool - """ - if hb_switch_bbio(serial_instance): - if hb_switch_uart(serial_instance): - return True - return False - return False diff --git a/hydrabus_framework/utils/pyHydrabus/__init__.py b/hydrabus_framework/utils/pyHydrabus/__init__.py new file mode 100644 index 0000000..0cf741f --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/__init__.py @@ -0,0 +1,33 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .spi import * +from .i2c import * +from .onewire import * +from .rawwire import * +from .smartcard import * +from .uart import * +from .utils import * +from .swd import * + +import logging + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + +INPUT = 1 +OUTPUT = 0 + +name = "pyHydrabus" +__version__ = 0.1 diff --git a/hydrabus_framework/utils/pyHydrabus/auxpin.py b/hydrabus_framework/utils/pyHydrabus/auxpin.py new file mode 100644 index 0000000..163c3f1 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/auxpin.py @@ -0,0 +1,144 @@ +import logging + +from .common import set_bit + + +class AUXPin: + """ + Auxilary pin base class + + This class is meant to be used in any mode and is instanciated in the + Protocol class + + :example: + + >>> import pyHydrabus + >>> i=pyHydrabus.RawWire('/dev/hydrabus') + >>> # Set AUX pin 0 (PC4) to output + >>> i.AUX[0].direction = pyHydrabus.OUTPUT + >>> # Set AUX pin to logical 1 + >>> i.AUX[0].value = 1 + >>> # Read Auxiliary pin 2 (PC5) value + >>> i.AUX[1].value + + """ + + OUTPUT = 0 + INPUT = 1 + + def __init__(self, number, hydrabus): + """ + AUXPin constructor + + :param number: Auxilary pin number (0-3) + :param hydrabus: Initialized pyHydrabus.Hydrabus instance + """ + self.number = number + self._hydrabus = hydrabus + self._logger = logging.getLogger(__name__) + + def _get_config(self): + """ + Gets Auxiliary pin config from Hydrabus + + :return: Auxilary pin config (4 bits pullup for AUX[0-3], 4 bits value for AUX[0-3]) + :rtype: byte + """ + CMD = 0b11100000 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + return self._hydrabus.read(1) + + def _get_values(self): + """ + Gets Auxiliary pin values from Hydrabus + + :return: Auxilary pin values AUX[0-3] + :rtype: byte + """ + CMD = 0b11000000 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + return self._hydrabus.read(1) + + @property + def value(self): + """ + Auxiliary pin getter + """ + return (ord(self._get_values()) >> self.number) & 0b1 + + @value.setter + def value(self, value): + """ + Auxiliary pin setter + + :param value: auxiliary pin value (0 or 1) + """ + CMD = 0b11010000 + CMD = CMD | ord(set_bit(self._get_values(), value, self.number)) + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return + else: + self._logger.error("Error setting auxiliary pins value.") + + def toggle(self): + """ + Toggle pin state + """ + self.value = not self.value + + @property + def direction(self): + """ + Auxiliary pin direction getter + + :return: The pin direction (0=output, 1=input) + :rtype: int + """ + return (ord(self._get_config()) >> self.number) & 0b1 + + @direction.setter + def direction(self, value): + """ + Auxiliary pin direction setter + + :param value: The pin direction (0=output, 1=input) + """ + CMD = 0b11110000 + PARAM = self._get_config() + PARAM = set_bit(PARAM, value, self.number) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(PARAM) + if self._hydrabus.read(1) == b"\x01": + return + else: + self._logger.error("Error setting auxiliary pins direction.") + + @property + def pullup(self): + """ + Auxiliary pin pullup getter + + :return: Auxiliary pin pullup status (1=enabled, 0=disabled") + :rtype: int + """ + return (ord(self._get_config()) >> (4 + self.number)) & 0b1 + + @pullup.setter + def pullup(self, value): + """ + Auxiliary pin pullup setter + + :param value: Auxiliary pin pullup (1=enabled, 0=disabled") + """ + CMD = 0b11110000 + PARAM = self._get_config() + PARAM = set_bit(PARAM, value, 4 + self.number) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(PARAM) + if self._hydrabus.read(1) == b"\x01": + return + else: + self._logger.error("Error setting auxiliary pins value.") diff --git a/hydrabus_framework/utils/pyHydrabus/common.py b/hydrabus_framework/utils/pyHydrabus/common.py new file mode 100644 index 0000000..c301578 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/common.py @@ -0,0 +1,31 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def split(seq, length): + """ + Split a list in chunks of specific length + """ + return [seq[i : i + length] for i in range(0, len(seq), length)] + + +def set_bit(byte, bit, position): + v = ord(byte) + if bit == 1: + v = v | (1 << position) + elif bit == 0: + v = v & (~(1 << position)) & 0xFF + else: + raise ValueError("Bit must be 0 or 1") + return v.to_bytes(1, byteorder="big") diff --git a/hydrabus_framework/utils/pyHydrabus/hydrabus.py b/hydrabus_framework/utils/pyHydrabus/hydrabus.py new file mode 100644 index 0000000..bb8ebbe --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/hydrabus.py @@ -0,0 +1,189 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging + +import serial + + +class Hydrabus: + """ + Base class for all modes. + + Manages all serial communication + """ + + def __init__(self, port=""): + """ + Class init + + :param port: Serial port to use + """ + self.port = port + self._logger = logging.getLogger(__name__) + + try: + self._serialport = serial.Serial(self.port, timeout=None) + self.enter_bbio() + except serial.SerialException as e: + self._logger.error(f"Cannot connect : {e.strerror}") + raise type(e)(f"Cannot open {self.port}") + + def write(self, data=b""): + """ + Base write primitive + + :param data: Bytes to write + :type data: bytes + """ + if not self.connected: + raise serial.SerialException("Not connected.") + try: + self._logger.debug(f"==>[{str(len(data)).zfill(4)}] {data.hex()}") + self._serialport.write(data) + self._serialport.flush() + except serial.SerialException as e: + self._logger.error(f"Cannot send : {e.strerror}") + raise type(e)(f"Cannot send : {e.strerror}") + + def read(self, length=1): + """ + Base read primitive + + :param length: Number of bytes to read + :type length: int + :return: Read data + :rtype: bytes + """ + if not self.connected: + raise serial.SerialException("Not connected.") + try: + data = self._serialport.read(length) + self._logger.debug(f"<==[{str(length).zfill(4)}] {data.hex()}") + self._lastread = data + return data + except serial.SerialException as e: + self._logger.error(f"Cannot read : {e.strerror}") + raise type(e)(f"Cannot read : {e.strerror}") + + def flush_input(self): + """ + Flush input buffer (data from Hydrabus) + """ + self._serialport.reset_input_buffer() + + @property + def in_waiting(self): + """ + Return the number of bytes in the receive buffer. + :return: Number of bytes + :rtype: int + """ + return self._serialport.in_waiting + + def exit_bbio(self): + """ + Reset Hydrabus to CLI mode + :return: Bool + """ + if not self.connected: + raise serial.SerialException("Not connected.") + if self.reset() == True: + self.write(b"\x00") + self.write(b"\x0F\n") + return True + else: + return False + + def enter_bbio(self): + """ + Enter BBIO mode. + This should be done prior all further operations + :return: Bool + """ + if not self.connected: + raise serial.SerialException("Not connected.") + self.timeout = 0.01 + for _ in range(20): + self.write(b"\x00") + if b"BBIO1" in self.read(5): + self.flush_input() + self.timeout = None + return True + self._logger.error("Cannot enter BBIO mode.") + self.timeout = None + return False + + def reset(self): + """ + Force reset to BBIO main mode + :return: Bool + """ + timeout = time.time() + 10 + if not self.connected: + raise serial.SerialException("Not connected.") + self.timeout = 0.1 + while self.read(5) != b"BBIO1": + self.flush_input() + self.write(b"\x00") + if time.time() > timeout: + self._logger.error(f"Unable to reset hydrabus") + return False + self.timeout = None + return True + + def identify(self): + """ + Identify the current mode + + :return: Current mode + :rtype: str + """ + if not self.connected: + raise serial.SerialException("Not connected.") + self.write(b"\x01") + return self.read(4).decode("ascii") + + def close(self): + """ + Close the serial port + """ + self._serialport.close() + + @property + def timeout(self): + """ + Serial port read timeout + :return: timeout + :rtype: int + """ + return self._serialport.timeout + + @timeout.setter + def timeout(self, value): + """ + Set serial port read timeout + :param value: timeout + :type value: int + """ + self._serialport.timeout = value + + @property + def connected(self): + """ + Check if serial port is opened + :return: Bool + """ + return self._serialport.is_open diff --git a/hydrabus_framework/utils/pyHydrabus/i2c.py b/hydrabus_framework/utils/pyHydrabus/i2c.py new file mode 100644 index 0000000..42277c8 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/i2c.py @@ -0,0 +1,246 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .protocol import Protocol + + +class I2C(Protocol): + """ + I2C protocol handler + + :example: + + >>> #Read data from an EEPROM + >>> import pyHydrabus + >>> i=pyHydrabus.I2C('/dev/hydrabus') + >>> i.set_speed(pyHydrabus.I2C.I2C_SPEED_100K) + >>> i.pullup=1 + >>> i.start();i.bulk_write(b'\xa0\x00');print(i.write_read(b'\xa1',64)) + + """ + + __I2C_DEFAULT_CONFIG = 0b000 + + I2C_SPEED_50K = 0b00 + I2C_SPEED_100K = 0b01 + I2C_SPEED_400K = 0b10 + I2C_SPEED_1M = 0b11 + + def __init__(self, port=""): + self._config = self.__I2C_DEFAULT_CONFIG + super().__init__(name=b"I2C1", fname="I2C", mode_byte=b"\x02", port=port) + self._configure_port() + + def start(self): + """ + Send a I2C start condition + """ + self._hydrabus.write(b"\x02") + if self._hydrabus.read(1) == b"\x00": + self._logger.error("Cannot execute command.") + return False + return True + + def stop(self): + """ + Send a I2C stop condition + """ + self._hydrabus.write(b"\x03") + if self._hydrabus.read(1) == b"\x00": + self._logger.error("Cannot execute command.") + return False + return True + + def read_byte(self): + """ + Read a byte from the I2C bus + """ + self._hydrabus.write(b"\x04") + return self._hydrabus.read(1) + + def send_ack(self): + """ + Send an ACK + Used with the read_byte() function + """ + self._hydrabus.write(b"\x06") + if self._hydrabus.read(1) == b"\x00": + self._logger.error("Cannot execute command.") + return False + return True + + def send_nack(self): + """ + Send a NACK + Used with the read_byte() function + """ + self._hydrabus.write(b"\x07") + if self._hydrabus.read(1) == b"\x00": + self._logger.error("Cannot execute command.") + return False + return True + + def write_read(self, data=b"", read_len=0): + """ + Write-then-read operation + https://github.com/hydrabus/hydrafw/wiki/HydraFW-Binary-I2C-mode-guide#i2c-write-then-read-0b00001000 + + This method sends a start condition before writing and a stop condition after reading. + + :param data: Data to be sent + :type data: bytes + :param read_len: Number of bytes to read + :type read_len: int + :return: Read data + :rtype: bytes + """ + CMD = 0b00001000 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + self._hydrabus.write(len(data).to_bytes(2, byteorder="big")) + + self._hydrabus.write(read_len.to_bytes(2, byteorder="big")) + + self._hydrabus.timeout = 0 + if self._hydrabus.read(1) == b"\x00": + self._logger.error("Cannot execute command. Too many bytes requested ?") + return None + self._hydrabus.timeout = None + + self._hydrabus.write(data) + + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Data not ACKed. Aborting.") + return None + + return self._hydrabus.read(read_len) + + def write(self, data=b""): + """ + Write on I2C bus + + :param data: data to be sent + :type data: bytes + """ + self.write_read(data, read_len=0) + + def read(self, length=0): + """ + Read on I2C bus + + :param length: Number of bytes to read + :type length: int + :return: Read data + :rtype: bytes + """ + result = [] + for _ in range(length - 1): + result.append(self.read_byte()) + self.send_ack() + result.append(self.read_byte()) + self.send_nack() + + return b"".join(result) + + def bulk_write(self, data=b""): + """ + Bulk write on I2C bus + https://github.com/hydrabus/hydrafw/wiki/HydraFW-Binary-I2C-mode-guide#bulk-i2c-write-0b0001xxxx + + :param data: Data to be sent + :type data: bytes + + :return: ACK status of the written bytes (b'\x00'=ACK, b'\x01'=NACK) + :rtype: list + """ + CMD = 0b00010000 + if not len(data) > 0: + raise ValueError("Send at least one byte") + if not len(data) <= 16: + raise ValueError("Too many bytes to write") + CMD = CMD | (len(data) - 1) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Unknown error.") + return None + + self._hydrabus.write(data) + + return self._hydrabus.read(len(data)) + + def scan(self): + """ + Scan I2C bus and returns all available device addresses in a list + + :return: List of available device addresses + :rtype: list + """ + result = [] + for i in range(1, 0x78): + addr = (i << 1).to_bytes(1, byteorder="big") + self.start() + if self.bulk_write(addr) == b"\x00": + addr = int.from_bytes(addr, byteorder="big") >> 1 + result.append(addr) + self.stop() + return result + + def set_speed(self, speed): + """ + Set I2C bus speed + + :param speed: Select any of the defined speeds (I2C_SPEED_*) + """ + if not speed <= 0b11: + raise ValueError("Incorrect speed") + CMD = 0b01100000 + CMD = CMD | speed + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting speed.") + return False + + def _configure_port(self): + CMD = 0b01000000 + CMD = CMD | self._config + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting config.") + return False + + @property + def pullup(self): + """ + Pullup status (0=off, 1=on) + """ + if self._config & 0b100: + return 1 + else: + return 0 + + @pullup.setter + def pullup(self, value): + if value == 0: + self._config = self._config & ~(1 << 2) + else: + self._config = self._config | (1 << 2) + self._configure_port() diff --git a/hydrabus_framework/utils/pyHydrabus/onewire.py b/hydrabus_framework/utils/pyHydrabus/onewire.py new file mode 100644 index 0000000..1445247 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/onewire.py @@ -0,0 +1,120 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .protocol import Protocol +from .common import split + + +class OneWire(Protocol): + """ + One wire protocol handler + + :example: + + TODO + + """ + + __ONEWIRE_DEFAULT_CONFIG = 0b100 + + def __init__(self, port=""): + self._config = self.__ONEWIRE_DEFAULT_CONFIG + super().__init__(name=b"1W01", fname="1-Wire", mode_byte=b"\x04", port=port) + self._configure_port() + + def reset(self): + """ + Send a 1-Wire reset + """ + self._hydrabus.write(b"\x02") + return True + + def read_byte(self): + """ + Read a byte from the 1-Wire bus + """ + self._hydrabus.write(b"\x04") + return self._hydrabus.read(1) + + def write(self, data=b""): + """ + Write on 1-Wire bus + + :param data: data to be sent + :type data: bytes + """ + for chunk in split(data, 16): + self.bulk_write(chunk) + + def read(self, read_len=0): + """ + Read on 1-Wire bus + + :param read_len: Number of bytes to be read + :return read_len: int + :return: Read data + :rtype: bytes + """ + return b"".join([self.read_byte() for x in range(read_len)]) + + def bulk_write(self, data=b""): + """ + Bulk write on 1-Wire bus + https://github.com/hydrabus/hydrafw/wiki/HydraFW-binary-1-Wire-mode-guide#bulk-1-wire-transfer-0b0001xxxx + + :param data: Data to be sent + :type data: bytes + """ + CMD = 0b00010000 + if not len(data) > 0: + raise ValueError("Send at least one byte") + if not len(data) <= 16: + raise ValueError("Too many bytes to write") + CMD = CMD | (len(data) - 1) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(data) + + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Unknown error.") + + def _configure_port(self): + CMD = 0b01000000 + CMD = CMD | self._config + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting config.") + return False + + @property + def pullup(self): + """ + Pullup status (0=off, 1=on) + """ + if self._config & 0b100: + return 1 + else: + return 0 + + @pullup.setter + def pullup(self, value): + if value == 0: + self._config = self._config & ~(1 << 2) + else: + self._config = self._config | (1 << 2) + self._configure_port() diff --git a/hydrabus_framework/utils/pyHydrabus/protocol.py b/hydrabus_framework/utils/pyHydrabus/protocol.py new file mode 100644 index 0000000..0df8018 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/protocol.py @@ -0,0 +1,91 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .hydrabus import Hydrabus +from .auxpin import AUXPin + + +class Protocol: + """ + Base class for all supported protocols + + :param name: Name of the protocol (returned by Hydrabus) eg. SPI1 + :type name: str + :param fname: Full name of the protocol + :type fname: str + :param mode_byte: Byte used to enter the mode (eg. \x01 for SPI) + :type mode_byte: bytes + :param port: The port name + :type port: str + """ + + def __init__(self, name="", fname="", mode_byte=b"\x00", port=""): + self.name = name + self.fname = fname + self._mode_byte = mode_byte + self._hydrabus = Hydrabus(port) + self._logger = logging.getLogger(__name__) + + self._enter() + self._hydrabus.flush_input() + + self.AUX = [] + for i in range(4): + self.AUX.append(AUXPin(i, self._hydrabus)) + + def _enter(self): + self._hydrabus.write(self._mode_byte) + if self._hydrabus.read(4) == self.name: + self._hydrabus.mode = self.name + return True + else: + self._logger.error(f"Cannot enter mode.") + return False + + def _exit(self): + return self._hydrabus.reset() + + def identify(self): + """ + Identify the current mode + + :return: The current mode identifier (4 bytes) + :rtype: str + """ + return self._hydrabus.identify() + + def close(self): + """ + Close the communication channel and resets Hydrabus + """ + self._hydrabus.exit_bbio() + self._hydrabus.close() + + @property + def hydrabus(self): + """ + Return _hydrabus instance to access Hydrabus class function and serial method + from any protocol classes instance + :return: _hydrabus class instance + """ + return self._hydrabus + + @property + def timeout(self): + return self._hydrabus.timeout + + @timeout.setter + def timeout(self, value): + self._hydrabus.timeout = value diff --git a/hydrabus_framework/utils/pyHydrabus/rawwire.py b/hydrabus_framework/utils/pyHydrabus/rawwire.py new file mode 100644 index 0000000..b26ea7e --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/rawwire.py @@ -0,0 +1,285 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .protocol import Protocol +from .common import split + + +class RawWire(Protocol): + """ + Raw wire protocol handler + + :example: + + >>> import pyHydrabus + >>> r=pyHydrabus.RawWire('/dev/hydrabus') + >>> # Set SDA to high + >>> r.sda = 1 + >>> # Send two clock ticks + >>> r.clocks(2) + >>> # Read two bytes + >>> data = r.read(2) + + """ + + __RAWWIRE_DEFAULT_CONFIG = 0b000 + + def __init__(self, port=""): + self._config = self.__RAWWIRE_DEFAULT_CONFIG + self._clk = 0 + self._sda = 0 + super().__init__(name=b"RAW1", fname="Raw-Wire", mode_byte=b"\x05", port=port) + self._configure_port() + + def read_bit(self): + """ + Sends a clock tick, and return the read bit value + """ + CMD = 0b00000111 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + return self._hydrabus.read(1) + + def read_byte(self): + """ + Read a byte from the raw wire + + :return: The read byte + :rtype: bytes + """ + CMD = 0b00000110 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + return self._hydrabus.read(1) + + def clock(self): + """ + Send a clock tick + """ + CMD = 0b00001001 + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting pin.") + return False + + def bulk_ticks(self, num): + """ + Sends a bulk of clock ticks (1 to 16) + https://github.com/hydrabus/hydrafw/wiki/HydraFW-binary-raw-wire-mode-guide#bulk-clock-ticks-0b0010xxxx + + :param num: Number of clock ticks to send + :type num: int + """ + if not num > 0: + raise ValueError("Send at least one clock tick") + if not num <= 16: + raise ValueError("Too many ticks to send") + + CMD = 0b00100000 + CMD = CMD | (num - 1) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error sending clocks.") + return False + + def clocks(self, num): + """ + Sends a number of clock ticks + + :param num: Number of clock ticks to send + :type num: int + """ + if not num > 0: + raise ValueError("Must be a positive integer") + + while num > 16: + self.bulk_ticks(16) + num = num - 16 + self.bulk_ticks(num) + + def bulk_write(self, data=b""): + """ + Bulk write on Raw-Wire + https://github.com/hydrabus/hydrafw/wiki/HydraFW-binary-raw-wire-mode-guide#bulk-raw-wire-transfer-0b0001xxxx + + Parameters: + :param data: Data to be sent + :type data: bytes + """ + CMD = 0b00010000 + if not len(data) > 0: + raise ValueError("Send at least one byte") + if not len(data) <= 16: + raise ValueError("Too many bytes to write") + CMD = CMD | (len(data) - 1) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(data) + + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Unknown error.") + + return self._hydrabus.read(len(data)) + + def set_speed(self, speed): + """ + Sets the clock max speed. + + :param speed: speed in Hz. Possible values : TODO + """ + speeds = {5000: 0b00, 50000: 0b01, 100_000: 0b10, 1_000_000: 0b11} + if speed not in speeds.keys(): + raise ValueError(f"Incorrect value. use {speeds.keys()}") + CMD = 0b01100000 + CMD = CMD | speeds[speed] + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting speed.") + return False + + def _configure_port(self): + CMD = 0b10000000 + CMD = CMD | self._config + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting config.") + return False + + def write(self, data=b""): + """ + Write on Raw-Wire bus + + :param data: data to be sent + :type data: bytes + + :return: Read bytes + :rtype: bytes + """ + result = b"" + for chunk in split(data, 16): + result += self.bulk_write(chunk) + return result + + def read(self, length=0): + """ + Read on Raw-Wire bus + + :param length: Number of bytes to read + :type length: int + :return: Read data + :rtype: bytes + """ + result = b"" + for _ in range(length): + result += self.read_byte() + + return result + + @property + def clk(self): + """ + CLK pin status + """ + return self._clk + + @clk.setter + def clk(self, value): + value = value & 1 + CMD = 0b00001010 + CMD = CMD | value + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + self._clk = value + return True + else: + self._logger.error("Error setting pin.") + return False + + @property + def sda(self): + """ + SDA pin status + """ + CMD = 0b00001000 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + return int.from_bytes(self._hydrabus.read(1), byteorder="big") + + @sda.setter + def sda(self, value): + value = value & 1 + CMD = 0b00001100 + CMD = CMD | value + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + self._clk = value + return True + else: + self._logger.error("Error setting pin.") + return False + + @property + def wires(self): + """ + Raw-Wire mode (2=2-Wire, 3=3-Wire) + """ + if self._config & 0b100 == 0: + return 2 + else: + return 3 + + @wires.setter + def wires(self, value): + if value == 2: + self._config = self._config & ~(1 << 2) + self._configure_port() + return True + elif value == 3: + self._config = self._config | (1 << 2) + self._configure_port() + return True + else: + self._logger.error("Incorrect value. Must be 2 or 3") + + @property + def gpio_mode(self): + """ + Raw-Wire GPIO mode (0=Push-Pull, 1=Open Drain) + """ + return (self._config & 0b1000) >> 3 + + @gpio_mode.setter + def gpio_mode(self, value): + if value == 0: + self._config = self._config & ~(1 << 3) + self._configure_port() + return True + elif value == 1: + self._config = self._config | (1 << 3) + self._configure_port() + return True + else: + self._logger.error("Incorrect value. Must be 0 or 1") diff --git a/hydrabus_framework/utils/pyHydrabus/smartcard.py b/hydrabus_framework/utils/pyHydrabus/smartcard.py new file mode 100644 index 0000000..0397422 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/smartcard.py @@ -0,0 +1,201 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from .protocol import Protocol + + +class Smartcard(Protocol): + """ + Smartcard protocol handler + + :example: + + >>> #Read ATR from a smartcard + >>> import pyHydrabus + >>> sm=pyHydrabus.Smartcard('/dev/hydrabus') + >>> sm.prescaler=12 + >>> sm.baud=9600 + >>> sm.rst=1;sm.rst=0;sm.read(1) + + """ + + def __init__(self, port=""): + self._config = 0b0000 + self._rst = 1 + self._baud = 9600 + self._prescaler = 12 + self._guardtime = 16 + super().__init__(name=b"CRD1", fname="Smartcard", mode_byte=b"\x0b", port=port) + self._configure_port() + + def _configure_port(self): + CMD = 0b10000000 + CMD = CMD | self._config + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + self.rst = self._rst + return True + else: + self._logger.error("Error setting config.") + return False + + def write_read(self, data=b"", read_len=0): + """ + Write-then-read operation + https://github.com/hydrabus/hydrafw/wiki/HydraFW-binary-SMARTCARD-mode-guide#write-then-read-operation-0b00000100 + + Parameters: + data: Data to be sent + read_len: number of bytes to read + """ + CMD = 0b00000100 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + self._hydrabus.write(len(data).to_bytes(2, byteorder="big")) + + self._hydrabus.write(read_len.to_bytes(2, byteorder="big")) + + self._hydrabus.timeout = 0 + if self._hydrabus.read(1) == b"\x00": + self._logger.error("Cannot execute command. Too many bytes requested ?") + return None + self._hydrabus.timeout = None + + self._hydrabus.write(data) + + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Unknown error. Aborting") + return None + + return self._hydrabus.read(read_len) + + def write(self, data=b""): + """ + Write on smartard + + Parameters: + data: data to be sent + """ + self.write_read(data, read_len=0) + + def read(self, read_len=0): + """ + Read on smartard + + Parameters: + read_len: number of bytes to be read + """ + return self.write_read(b"", read_len=read_len) + + @property + def prescaler(self): + """ + Prescaler value + """ + return self._prescaler + + @prescaler.setter + def prescaler(self, value): + CMD = 0b00000110 + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(value.to_bytes(1, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + self._prescaler = value + return True + else: + self._logger.error("Error setting prescaler.") + return False + + @property + def guardtime(self): + """ + Guard time value + """ + return self._prescaler + + @guardtime.setter + def guardtime(self, value): + CMD = 0b00000111 + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(value.to_bytes(1, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + self._guardtime = value + return True + else: + self._logger.error("Error setting guard time.") + return False + + @property + def rst(self): + """ + RST pin status + """ + return self._rst + + @rst.setter + def rst(self, value): + value = value & 1 + CMD = 0b00000010 + CMD = CMD | value + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + self._rst = value + return True + else: + self._logger.error("Error setting pin.") + return False + + @property + def baud(self): + """ + Baud rate + """ + return self._baud + + @baud.setter + def baud(self, value): + CMD = 0b01100000 + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(value.to_bytes(4, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + self._baud = value + return True + else: + self._logger.error("Error setting pin.") + return False + + @property + def pullup(self): + if self._config & 0b100: + return 1 + else: + return 0 + + @pullup.setter + def pullup(self, value): + if value == 0: + self._config = self._config & ~(1 << 2) + else: + self._config = self._config | (1 << 2) + self._configure_port() diff --git a/hydrabus_framework/utils/pyHydrabus/spi.py b/hydrabus_framework/utils/pyHydrabus/spi.py new file mode 100644 index 0000000..90e3c7b --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/spi.py @@ -0,0 +1,267 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .protocol import Protocol + + +class SPI(Protocol): + """ + SPI protocol handler + + :example: + + TODO + + """ + + __SPI_DEFAULT_CONFIG = 0b010 + + SPI1_SPEED_320K = 0b000 + SPI1_SPEED_650K = 0b001 + SPI1_SPEED_1M = 0b010 + SPI1_SPEED_2M = 0b011 + SPI1_SPEED_5M = 0b100 + SPI1_SPEED_10M = 0b101 + SPI1_SPEED_21M = 0b110 + SPI1_SPEED_42M = 0b111 + + SPI2_SPEED_160K = 0b000 + SPI2_SPEED_320K = 0b001 + SPI2_SPEED_650M = 0b010 + SPI2_SPEED_1M = 0b011 + SPI2_SPEED_2M = 0b100 + SPI2_SPEED_5M = 0b101 + SPI2_SPEED_10M = 0b110 + SPI2_SPEED_21M = 0b111 + + def __init__(self, port=""): + self._config = self.__SPI_DEFAULT_CONFIG + self._cs_val = 1 + super().__init__(name=b"SPI1", fname="SPI", mode_byte=b"\x01", port=port) + self._configure_port() + + @property + def cs(self): + """ + Chip-Select (CS) status getter + """ + return self._cs_val + + @cs.setter + def cs(self, mode=0): + """ + Chip-Select (CS) status setter + + :param mode: CS pin status (0=low, 1=high) + """ + CMD = 0b00000010 + CMD = CMD | mode + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + self._cs_val = mode + else: + self._logger.error("Error setting CS.") + + def bulk_write(self, data=b""): + """ + Bulk write on SPI bus + https://github.com/hydrabus/hydrafw/wiki/HydraFW-Binary-SPI-mode-guide#bulk-spi-transfer-0b0001xxxx + + :param data: Data to be sent + :type data: bytes + + :return: Bytes read during the transfer + :rtype: bytes + """ + CMD = 0b00010000 + if not len(data) > 0: + raise ValueError("Send at least one byte") + if not len(data) <= 16: + raise ValueError("Too many bytes to write") + CMD = CMD | (len(data) - 1) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Unknown error.") + return None + + self._hydrabus.write(data) + + return self._hydrabus.read(len(data)) + + def write_read(self, data=b"", read_len=0, drive_cs=0): + """ + Write-then-read operation + https://github.com/hydrabus/hydrafw/wiki/HydraFW-Binary-SPI-mode-guide#write-then-read-operation-0b00000100---0b00000101 + + :param data: Data to be sent + :type data: bytes + :param read_len: Number of bytes to read + :type read_len: int + :param drive_cs: Whether to enable chip select before writing/reading (0=yes, 1=no) + :type drive_cs: int + :return: Read data + :rtype: bytes + """ + CMD = 0b00000100 + CMD = CMD | drive_cs + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + self._hydrabus.write(len(data).to_bytes(2, byteorder="big")) + + self._hydrabus.write(read_len.to_bytes(2, byteorder="big")) + + self._hydrabus.timeout = 0 + ret = self._hydrabus.read(1) + self._hydrabus.timeout = None + if ret == b"\x00": + self._logger.error("Cannot execute command. Too many bytes requested ?") + return None + elif ret == b"\x01" and len(data) == 0: + return self._hydrabus.read(read_len) + elif ret == b"": + # No response, we can send data + self._hydrabus.write(data) + + ret = self._hydrabus.read(1) + if ret != b"\x01": + self._logger.error("Transmit error") + return None + + return self._hydrabus.read(read_len) + else: + self._logger.error(f"Unknown error") + + def write(self, data=b"", drive_cs=0): + """ + Write on SPI bus + + :param data: data to be sent + :type data: bytes + :param drive_cs: Whether to enable chip select before writing/reading (0=yes, 1=no) + :type drive_cs: int + """ + self.write_read(data, read_len=0, drive_cs=drive_cs) + + def read(self, read_len=0, drive_cs=0): + """ + Read on SPI bus + + :param read_len: Number of bytes to be read + :type read_len: int + :param drive_cs: Whether to enable chip select before writing/reading (0=yes, 1=no) + :type drive_cs: int + :return: Read data + :rtype: bytes + """ + result = b"" + if drive_cs == 0: + self.cs = 0 + while read_len > 0: + if read_len >= 16: + to_read = 16 + else: + to_read = read_len + result += self.bulk_write(b"\xff" * to_read) + read_len -= to_read + if drive_cs == 0: + self.cs = 1 + return result + + def set_speed(self, speed): + """ + Set SPI bus speed + + :param speed: Select any of the defined speeds (SPI_SPEED_*) + """ + if not speed <= 0b111: + raise ValueError("Incorrect speed") + CMD = 0b01100000 + CMD = CMD | speed + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting speed.") + return False + + def _configure_port(self): + CMD = 0b10000000 + CMD = CMD | self._config + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + return True + else: + self._logger.error("Error setting config.") + return False + + @property + def polarity(self): + """ + SPI polarity + """ + if self._config & 0b100: + return 1 + else: + return 0 + + @polarity.setter + def polarity(self, value): + if value == 0: + self._config = self._config & ~(1 << 2) + else: + self._config = self._config | (1 << 2) + self._config & 0b111 + self._configure_port() + + @property + def phase(self): + """ + SPI clock phase + """ + if self._config & 0b10: + return 1 + else: + return 0 + + @phase.setter + def phase(self, value): + if value == 0: + self._config = self._config & ~(1 << 1) + else: + self._config = self._config | (1 << 1) + self._config & 0b111 + self._configure_port() + + @property + def device(self): + """ + SPI device to use (0=spi1, 1=spi2) + """ + if self._config & 0b1: + return 1 + else: + return 0 + + @device.setter + def device(self, value): + self._config = self.__SPI_DEFAULT_CONFIG + if value == 0: + self._config = self._config & ~(1 << 0) + else: + self._config = self._config | (1 << 0) + self._configure_port() diff --git a/hydrabus_framework/utils/pyHydrabus/swd.py b/hydrabus_framework/utils/pyHydrabus/swd.py new file mode 100644 index 0000000..59b81ca --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/swd.py @@ -0,0 +1,202 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .rawwire import RawWire + + +class SWD: + """ + SWD protocol handler + + :example: + + >>> import pyHydrabus + >>> swd = pyHydrabus.SWD('/dev/ttyACM0') + >>> swd.bus_init() + >>> swd.read_dp(0) + >>> swd.write_dp(4, 0x50000000) + >>> swd.scan_bus() + + """ + + def __init__(self, port=""): + self._interface = RawWire(port) + + self._interface._config = 0xA + self._interface._configure_port() + + def __del__(self): + self.close() + + def _apply_dp_parity(self, value): + tmp = value & 0b00011110 + if (bin(tmp).count("1") % 2) == 1: + value = value | 1 << 5 + return value + + def close(self): + self._interface.close() + + def _sync(self): + self._interface.write(b"\x00") + + def bus_init(self): + """ + Initiate SWD bus. + Sends the JTAG-TO-SWD token and sync clocks + """ + self._interface.write( + b"\xff\xff\xff\xff\xff\xff\x7b\x9e\xff\xff\xff\xff\xff\xff\x0f" + ) + self._sync() + + def read_dp(self, addr, to_ap=0): + """ + Read a register from DP + + :param addr: DP register address + :type addr: int + + :return: Value stored in register + :rtype: int + + :example: + >>> # read RDBUFF + >>> swd.read_dp(0xc) + """ + CMD = 0x85 + CMD = CMD | to_ap << 1 + CMD = CMD | (addr & 0b1100) << 1 + CMD = self._apply_dp_parity(CMD) + + self._interface.write(CMD.to_bytes(1, byteorder="little")) + status = 0 + for i in range(3): + status += ord(self._interface.read_bit()) << i + if status == 1: + retval = int.from_bytes(self._interface.read(4), byteorder="little") + self._sync() + return retval + elif status == 2: + # When receiving WAIT, retry transaction + self._sync() + self.write_dp(0, 0x0000001F) + return self.read_dp(addr, to_ap) + else: + self._sync() + raise ValueError(f"Returned status is {hex(status)}") + + def write_dp(self, addr, value, to_ap=0): + """ + Write to DP register + + :param addr: DP register address + :type addr: int + :param value: Value to be written to register + :type value: int + + :example: + >>> write_dp(4, 0x50000000) + """ + CMD = 0x81 + CMD = CMD | to_ap << 1 + CMD = CMD | (addr & 0b1100) << 1 + CMD = self._apply_dp_parity(CMD) + + self._interface.write(CMD.to_bytes(1, byteorder="little")) + status = 0 + for i in range(3): + status += ord(self._interface.read_bit()) << i + self._interface.clocks(2) + if status == 2: + # When receiving WAIT, retry transaction + self._sync() + self.write_dp(0, 0x0000001F) + return self.write_dp(addr, value, to_ap) + if status != 1: + self._sync() + raise ValueError(f"Returned status is {hex(status)}") + self._interface.write(value.to_bytes(4, byteorder="little")) + + # Send the parity but along with the sync clocks + if (bin(value).count("1") % 2) == 1: + self._interface.write(b"\x01") + else: + self._interface.write(b"\x00") + + def read_ap(self, address, bank): + """ + Read AP register + + :param address: AP address on the bus + :type address: int + :param bank: AP register address + :type bank: int + + :return: Value read from AP + :rtype: int + + :example: + >>> # Read AP IDR + >>> read_ap(0, 0xfc) + + """ + + select_reg = 0 + # Place AP address in DP SELECT register + select_reg = select_reg | address << 24 + # Place bank in register as well + select_reg = select_reg | (bank & 0b11110000) + # Write the SELECT DP register + self.write_dp(8, select_reg) + self.read_dp((bank & 0b1100), to_ap=1) + # Read RDBUFF + return self.read_dp(0xC) + + def write_ap(self, address, bank, value): + """ + Write to AP register + + :param address: AP address on the bus + :type address: int + :param bank: AP register address + :type bank: int + :param value: Value to be written to register + :type value: int + + :example: + >>> write_ap(0, 0x4, 0x20000000) + """ + + select_reg = 0 + # Place AP address in DP SELECT register + select_reg = select_reg | address << 24 + # Place bank in register as well + select_reg = select_reg | (bank & 0b11110000) + # Write the SELECT DP register + self.write_dp(8, select_reg) + # Send the actual value to the AP + self.write_dp((bank & 0b1100), value, to_ap=1) + + def scan_bus(self): + """ + Scan the SWD bus for APs + The SWD bus must have been enabled before using this command. + """ + + for ap in range(256): + idr = self.read_ap(ap, 0xFC) + if idr != 0x0 and idr != 0xFFFFFFFF: + print(f"0x{ap:02x}: 0x{idr:08x}") diff --git a/hydrabus_framework/utils/pyHydrabus/uart.py b/hydrabus_framework/utils/pyHydrabus/uart.py new file mode 100644 index 0000000..1a81469 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/uart.py @@ -0,0 +1,140 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from .protocol import Protocol +from .common import split + + +class UART(Protocol): + """ + UART protocol handler + + :example: TODO + + >>> #Read data from an EEPROM + >>> import pyHydrabus + >>> u=pyHydrabus.UART('/dev/hydrabus') + >>> u.baud=115200 + >>> u.echo=1 + + """ + + def __init__(self, port=""): + self._config = 0b0000 + self._echo = 0 + self._baud = 9600 + super().__init__(name=b"ART1", fname="UART", mode_byte=b"\x03", port=port) + + def bulk_write(self, data=b""): + """ + Bulk write on UART + https://github.com/hydrabus/hydrafw/wiki/HydraFW-Binary-I2C-mode-guide#bulk-i2c-write-0b0001xxxx FIXME: change link + + :param data: Data to be sent + :type data: bytes + + :return: Returns the ACK status of the written bytes (b'\x00'=ACK, b'\x01'=NACK) + :rtype: list + """ + CMD = 0b00010000 + if not len(data) > 0: + raise ValueError("Send at least one byte") + if not len(data) <= 16: + raise ValueError("Too many bytes to write") + CMD = CMD | (len(data) - 1) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + + self._hydrabus.write(data) + + for _ in range(len(data)): + if self._hydrabus.read(1) != b"\x01": + self._logger.warn("Transfer error.") + + def write(self, data=b""): + """ + Write on UART bus + + :param data: data to be sent + :type data: bytes + """ + for chunk in split(data, 16): + self.bulk_write(chunk) + + @property + def echo(self): + """ + Local echo (0=No, 1=Yes) + """ + return self._echo + + @echo.setter + def echo(self, value): + value = value & 1 + self._echo = value + CMD = 0b00000010 + CMD = CMD | (not value) + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + if self._hydrabus.read(1) == b"\x01": + self._rst = value + return True + else: + self._logger.error("Error setting echo.") + return False + + def read(self, length=1): + """ + Read echoed data + + :param length: Number of bytes to read + :type length: int + + :return: read bytes + :rtype: bytes + """ + + return self._hydrabus.read(length) + + @property + def baud(self): + """ + Baud rate + """ + return self._baud + + @baud.setter + def baud(self, value): + # TODO: make a pull request if deleting self._baud = value before settings it + CMD = 0b00000111 + + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) + self._hydrabus.write(value.to_bytes(4, byteorder="big")) + + if self._hydrabus.read(1) == b"\x01": + self._baud = value + return True + else: + self._logger.error("Error setting pin.") + return False + + def bridge(self): + """ + Bind the Hydrabus USB and UART + Note that in order to leave bridge mode, you need to press the UBTN + """ + CMD = 0b00001111 + self._hydrabus.write(CMD.to_bytes(1, byteorder="big")) diff --git a/hydrabus_framework/utils/pyHydrabus/utils.py b/hydrabus_framework/utils/pyHydrabus/utils.py new file mode 100644 index 0000000..7505bb9 --- /dev/null +++ b/hydrabus_framework/utils/pyHydrabus/utils.py @@ -0,0 +1,78 @@ +# Copyright 2019 Nicolas OBERLI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from .hydrabus import Hydrabus + + +class Utils: + """ + Utilities available in hydrafw + + :param port: The port name + :type port: str + """ + + def __init__(self, port=""): + self._hydrabus = Hydrabus(port) + self._logger = logging.getLogger(__name__) + + self._hydrabus.flush_input() + + @property + def adc(self): + """ + Read ADC value + + :return: ADC value (10 bits) + :rtype: int + """ + self._hydrabus.write(b"\x14") + v = self._hydrabus.read(2) + return int.from_bytes(v, byteorder="big") + + def continuous_adc(self): + """ + Continuously print ADC value + """ + try: + self._hydrabus.write(b"\x15") + while 1: + v = self._hydrabus.read(2) + except KeyboardInterrupt: + self._hydrabus.write(b"\x00") + self._hydrabus.reset() + return True + + def frequency(self): + """ + Read frequency value + + :return: (frequency, duty cycle) + :rtype: tuple + """ + self._hydrabus.write(b"\x16") + freq = self._hydrabus.read(4) + duty = self._hydrabus.read(4) + return ( + int.from_bytes(freq, byteorder="little"), + int.from_bytes(duty, byteorder="little"), + ) + + def close(self): + """ + Close the communication channel and resets Hydrabus + """ + self._hydrabus.exit_bbio() + self._hydrabus.close()