From a270d37713d6d9ad1b9c7c92c4f16ea6eef1668d Mon Sep 17 00:00:00 2001 From: Zhanglong Xia Date: Tue, 2 Jul 2024 13:33:09 +0800 Subject: [PATCH] [test] add script to test which diag commands RCP supports This commit implements a test script tools/cp-caps/rcp_caps_test.py to test which diag commands RCP supports using the DUT and reference device. --- tools/cp-caps/README.md | 130 +++++++++++ tools/cp-caps/rcp_caps_test.py | 414 +++++++++++++++++++++++++++++++++ tools/otci/otci/otci.py | 16 +- 3 files changed, 556 insertions(+), 4 deletions(-) create mode 100644 tools/cp-caps/README.md create mode 100644 tools/cp-caps/rcp_caps_test.py diff --git a/tools/cp-caps/README.md b/tools/cp-caps/README.md new file mode 100644 index 000000000000..74f085d4c4c2 --- /dev/null +++ b/tools/cp-caps/README.md @@ -0,0 +1,130 @@ +# RCP Capabilities Test + +This test is used for testing RCP capabilities. + +## Test Topology + +``` + +-------+ + +---------------| PC |----------------+ + | +-------+ | + | ADB/SSH | ADB/SSH/SERIAL + | | ++-------+ +------------------+ +| DUT |<-----------Thread-------->| Reference Device | ++-------+ +------------------+ + +``` + +- PC : The computer to run the test script. +- DUT : The device under test. +- Reference Device : The device that supports all tested features. + +### Python Dependences + +Before running the test script on PC, testers should install dependences first. + +```bash +$ cd tools/otci +$ pip install build +$ python -m build +$ pip install dist/otci_openthread-0.0.1-py3-none-any.whl +$ pip install adb-shell +$ pip install adb-shell[usb] +``` + +### Reference Device + +The [nRF52840DK][ot-nrf528xx-nrf52840] is set as the reference device by default. Testers can also select the other Thread device as the reference device. + +[ot-nrf528xx-nrf52840]: https://github.com/openthread/ot-nrf528xx/blob/main/src/nrf52840/README.md + +Quick guide to setting up the nRF52840DK: + +```bash +$ git clone git@github.com:openthread/ot-nrf528xx.git +$ cd ot-nrf528xx/ +$ git submodule update --init +$ ./script/bootstrap +$ ./script/build nrf52840 UART_trans -DOT_DIAGNOSTIC=ON +$ arm-none-eabi-objcopy -O ihex build/bin/ot-cli-ftd ot-cli-ftd.hex +$ nrfjprog -f nrf52 --chiperase --program ot-cli-ftd.hex --reset +``` + +## Test Commands + +### Help + +Show help info. + +```bash +$ python3 ./tools/cp-caps/rcp_caps_test.py -h +usage: rcp_caps_test.py [-h] [-d] [-v] + +This script is used for testing RCP capabilities. + +options: + -h, --help show this help message and exit + -d, --diag-commands test whether the RCP supports all diag commands + -v, --verbose output verbose information + +Device Interfaces: + DUT_SSH= Connect to the DUT via ssh + DUT_ADB_TCP= Connect to the DUT via adb tcp + DUT_ADB_USB= Connect to the DUT via adb usb + REF_CLI_SERIAL= Connect to the reference device via cli serial port + REF_ADB_USB= Connect to the reference device via adb usb + REF_SSH= Connect to the reference device via ssh + +Example: + DUT_ADB_USB=1169UC2F2T0M95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 ./tools/cp-caps/rcp_caps_test.py -d +``` + +### Test Diag Commands + +The parameter `-d` or `--diag-commands` starts to test all diag commands. + +Following environment variables are used to configure diag command paremters: + +- DUT_DIAG_GPIO: Diag gpio value. The default value is `0` if it is not set. +- DUT_DIAG_RAW_POWER_SETTING: Diag raw power setting value. The default value is `112233` if it is not set. +- DUT_DIAG_POWER: Diag power value. The default value is `10` if it is not set. + +> Note: If you meet the error `LIBUSB_ERROR_BUSY` when you are using the ADB usb interface, please run the command `adb kill-server` to kill the adb server. + +```bash +$ DUT_ADB_USB=1269UCKFZTAM95OR REF_CLI_SERIAL=/dev/ttyACM0 DUT_DIAG_GPIO=2 DUT_DIAG_RAW_POWER_SETTING=44556688 DUT_DIAG_POWER=11 python3 ./tools/cp-caps/rcp_caps_test.py -d +diag channel --------------------------------------------- OK +diag channel 20 ------------------------------------------ OK +diag power ----------------------------------------------- OK +diag power 11 -------------------------------------------- OK +diag radio sleep ----------------------------------------- OK +diag radio receive --------------------------------------- OK +diag radio state ----------------------------------------- OK +diag repeat 10 64 ---------------------------------------- OK +diag repeat stop ----------------------------------------- OK +diag send 100 64 ----------------------------------------- OK +diag stats ----------------------------------------------- OK +diag stats clear ----------------------------------------- OK +diag frame 00010203040506070809 -------------------------- NotSupported +diag echo 0123456789 ------------------------------------- OK +diag echo -n 10 ------------------------------------------ OK +diag cw start -------------------------------------------- OK +diag cw stop --------------------------------------------- OK +diag stream start ---------------------------------------- OK +diag stream stop ----------------------------------------- OK +diag stats ----------------------------------------------- OK +diag stats clear ----------------------------------------- OK +diag rawpowersetting enable ------------------------------ NotSupported +diag rawpowersetting 44556688 ---------------------------- NotSupported +diag rawpowersetting ------------------------------------- NotSupported +diag rawpowersetting disable ----------------------------- NotSupported +diag powersettings --------------------------------------- NotSupported +diag powersettings 20 ------------------------------------ NotSupported +diag gpio mode 2 ----------------------------------------- NotSupported +diag gpio mode 2 in -------------------------------------- NotSupported +diag gpio mode 2 out ------------------------------------- NotSupported +diag gpio get 2 ------------------------------------------ NotSupported +diag gpio set 2 0 ---------------------------------------- NotSupported +diag gpio set 2 1 ---------------------------------------- NotSupported +``` diff --git a/tools/cp-caps/rcp_caps_test.py b/tools/cp-caps/rcp_caps_test.py new file mode 100644 index 000000000000..9ee904e7d8a1 --- /dev/null +++ b/tools/cp-caps/rcp_caps_test.py @@ -0,0 +1,414 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2024, The OpenThread Authors. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the +# names of its contributors may be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# + +import argparse +import logging +import os +import sys +import textwrap + +from typing import List + +import otci +from otci import OTCI + +logging.basicConfig(level=logging.WARNING) + + +class RcpCaps(object): + """ + This class represents an OpenThread RCP capability test instance. + """ + + def __init__(self): + self.__dut = self.__connect_dut() + self.__ref = self.__connect_reference_device() + + def test_diag_commands(self): + """Test all diag commands.""" + self.__dut.factory_reset() + self.__ref.factory_reset() + + ret = self.__dut.is_command_supported('diag start') + if ret is False: + print('All diag commands are not supported') + return + + self.__dut.diag_start() + self.__ref.diag_start() + + self.__test_diag_channel() + self.__test_diag_power() + self.__test_diag_radio() + self.__test_diag_repeat() + self.__test_diag_send() + self.__test_diag_frame() + self.__test_diag_echo() + self.__test_diag_utils() + self.__test_diag_rawpowersetting() + self.__test_diag_powersettings() + self.__test_diag_gpio_mode() + self.__test_diag_gpio_value() + + self.__ref.diag_stop() + self.__dut.diag_stop() + + # + # Private methods + # + def __test_diag_channel(self): + channel = 20 + commands = ['diag channel', f'diag channel {channel}'] + + if self.__support_commands(commands): + self.__dut.diag_set_channel(channel) + channel = self.__dut.diag_get_channel() + ret = True if channel == channel else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_power(self): + power = self.__get_dut_diag_power() + commands = ['diag power', f'diag power {power}'] + + if self.__support_commands(commands): + self.__dut.diag_set_power(power) + value = self.__dut.diag_get_power() + ret = True if value == power else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_radio(self): + commands = ['diag radio receive', 'diag radio sleep', 'diag radio state'] + + if self.__support_commands(commands): + self.__dut.diag_radio_receive() + receive_state = self.__dut.diag_get_radio_state() + self.__dut.wait(0.1) + self.__dut.diag_radio_sleep() + sleep_state = self.__dut.diag_get_radio_state() + + ret = True if sleep_state == 'sleep' and receive_state == 'receive' else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_gpio_value(self): + gpio = self.__get_dut_diag_gpio() + commands = [f'diag gpio get {gpio}', f'diag gpio set {gpio} 0', f'diag gpio set {gpio} 1'] + + if self.__support_commands(commands): + self.__dut.diag_set_gpio_value(gpio, 0) + value_0 = self.__dut.diag_get_gpio_value(gpio) + self.__dut.diag_set_gpio_value(gpio, 1) + value_1 = self.__dut.diag_get_gpio_value(gpio) + + ret = True if value_0 == 0 and value_1 == 1 else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_gpio_mode(self): + gpio = self.__get_dut_diag_gpio() + commands = [f'diag gpio mode {gpio}', f'diag gpio mode {gpio} in', f'diag gpio mode {gpio} out'] + + if self.__support_commands(commands): + self.__dut.diag_set_gpio_mode(gpio, 'in') + mode_in = self.__dut.diag_get_gpio_mode(gpio) + self.__dut.diag_set_gpio_value(gpio, 'out') + mode_out = self.__dut.diag_get_gpio_mode(gpio) + + ret = True if mode_in == 'in' and mode_out == 'out' else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_echo(self): + echo_msg = '0123456789' + cmd_diag_echo = f'diag echo {echo_msg}' + cmd_diag_echo_num = f'diag echo -n 10' + + if self.__dut.is_command_supported(cmd_diag_echo): + reply = self.__dut.diag_echo(echo_msg) + ret = True if reply == echo_msg else False + else: + ret = False + self.__output_format_bool(cmd_diag_echo, ret) + + if self.__dut.is_command_supported(cmd_diag_echo_num): + reply = self.__dut.diag_echo_number(10) + ret = True if reply == echo_msg else False + else: + ret = False + self.__output_format_bool(cmd_diag_echo_num, ret) + + def __test_diag_utils(self): + commands = [ + 'diag cw start', 'diag cw stop', 'diag stream start', 'diag stream stop', 'diag stats', 'diag stats clear' + ] + + for command in commands: + ret = self.__dut.is_command_supported(command) + self.__output_format_bool(command, ret) + + def __test_diag_rawpowersetting(self): + rawpowersetting = self.__get_dut_diag_raw_power_setting() + commands = [ + 'diag rawpowersetting enable', f'diag rawpowersetting {rawpowersetting}', 'diag rawpowersetting', + 'diag rawpowersetting disable' + ] + + if self.__support_commands(commands): + self.__dut.diag_enable_rawpowersetting() + self.__dut.diag_set_rawpowersetting(rawpowersetting) + reply = self.__dut.diag_get_rawpowersetting() + self.__dut.diag_disable_rawpowersetting() + + ret = True if reply == rawpowersetting else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_powersettings(self): + commands = ['diag powersettings', 'diag powersettings 20'] + + if self.__support_commands(commands): + powersettings = self.__dut.diag_get_powersettings() + ret = True if len(powersettings) > 0 else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_send(self): + packets = 100 + threshold = 80 + length = 64 + channel = 20 + commands = [f'diag send {packets} {length}', f'diag stats', f'diag stats clear'] + + if self.__support_commands(commands): + self.__dut.wait(1) + self.__dut.diag_set_channel(channel) + self.__ref.diag_set_channel(channel) + self.__ref.diag_radio_receive() + + self.__dut.diag_stats_clear() + self.__ref.diag_stats_clear() + + self.__dut.diag_send(packets, length) + self.__dut.wait(1) + dut_stats = self.__dut.diag_get_stats() + ref_stats = self.__ref.diag_get_stats() + + ret = True if dut_stats['sent_packets'] == packets and ref_stats['received_packets'] > threshold else False + else: + ret = False + + self.__output_results(commands, ret) + + def __test_diag_repeat(self): + delay = 10 + threshold = 80 + length = 64 + channel = 20 + cmd_diag_repeat = f'diag repeat {delay} {length}' + cmd_diag_repeat_stop = 'diag repeat stop' + commands = [cmd_diag_repeat, 'diag repeat stop', 'diag stats', 'diag stats clear'] + + if self.__support_commands(commands): + self.__dut.diag_set_channel(channel) + self.__ref.diag_set_channel(channel) + self.__ref.diag_radio_receive() + + self.__dut.diag_stats_clear() + self.__ref.diag_stats_clear() + + self.__dut.diag_repeat(delay, length) + self.__dut.wait(1) + self.__dut.diag_repeat_stop() + dut_stats = self.__dut.diag_get_stats() + ref_stats = self.__ref.diag_get_stats() + + ret = True if dut_stats['sent_packets'] > threshold and ref_stats['received_packets'] > threshold else False + else: + ret = False + + self.__output_format_bool(cmd_diag_repeat, ret) + self.__output_format_bool(cmd_diag_repeat_stop, ret) + + def __test_diag_frame(self): + packets = 100 + threshold = 80 + channel = 20 + frame = '00010203040506070809' + cmd_diag_frame = f'diag frame {frame}' + commands = [cmd_diag_frame, f'diag send {packets}', f'diag stats', f'diag stats clear'] + + if self.__support_commands(commands): + self.__dut.wait(1) + self.__dut.diag_set_channel(channel) + self.__ref.diag_set_channel(channel) + self.__ref.diag_radio_receive() + + self.__dut.diag_stats_clear() + self.__ref.diag_stats_clear() + + self.__ref.diag_frame() + self.__dut.diag_send(packets, None) + self.__dut.wait(1) + dut_stats = self.__dut.diag_get_stats() + ref_stats = self.__ref.diag_get_stats() + + ret = True if dut_stats['sent_packets'] == packets and ref_stats['received_packets'] > threshold else False + else: + ret = False + + self.__output_format_bool(cmd_diag_frame, ret) + + def __support_commands(self, commands: List[str]) -> bool: + ret = True + + for command in commands: + if self.__dut.is_command_supported(command) is False: + ret = False + break + + return ret + + def __output_results(self, commands: List[str], support: bool): + for command in commands: + self.__output_format_bool(command, support) + + def __get_dut_diag_power(self) -> int: + return int(os.getenv('DUT_DIAG_POWER')) if os.getenv('DUT_DIAG_POWER') else 10 + + def __get_dut_diag_gpio(self) -> int: + return int(os.getenv('DUT_DIAG_GPIO')) if os.getenv('DUT_DIAG_GPIO') else 0 + + def __get_dut_diag_raw_power_setting(self) -> str: + return os.getenv('DUT_DIAG_RAW_POWER_SETTING') if os.getenv('DUT_DIAG_RAW_POWER_SETTING') else '112233' + + def __connect_dut(self) -> OTCI: + if os.getenv('DUT_ADB_TCP'): + node = otci.connect_otbr_adb_tcp(os.getenv('DUT_ADB_TCP')) + elif os.getenv('DUT_ADB_USB'): + node = otci.connect_otbr_adb_usb(os.getenv('DUT_ADB_USB')) + elif os.getenv('DUT_SSH'): + node = otci.connect_otbr_ssh(os.getenv('DUT_SSH')) + else: + self.__fail("Please set DUT_SSH, DUT_ADB_TCP or DUT_SSH to connect to the DUT device.") + + return node + + def __connect_reference_device(self) -> OTCI: + if os.getenv('REF_CLI_SERIAL'): + node = otci.connect_cli_serial(os.getenv('REF_CLI_SERIAL')) + elif os.getenv('REF_SSH'): + node = otci.connect_otbr_ssh(os.getenv('REF_SSH')) + elif os.getenv('REF_ADB_USB'): + node = otci.connect_otbr_adb_usb(os.getenv('REF_ADB_USB')) + else: + self.__fail("Please set REF_CLI_SERIAL, REF_SSH or REF_ADB_USB to connect to the reference device.") + + return node + + def __output_format_string(self, name: str, value: str): + prefix = '{0:-<58}'.format('{} '.format(name)) + print(f'{prefix} {value}') + + def __output_format_bool(self, name: str, value: bool): + self.__output_format_string(name, 'OK' if value else 'NotSupported') + + def __fail(self, value: str): + print(f'{value}') + sys.exit(1) + + +def parse_arguments(): + """Parse all arguments.""" + description_msg = 'This script is used for testing RCP capabilities.' + epilog_msg = textwrap.dedent( + 'Device Interfaces:\r\n' + ' DUT_SSH= Connect to the DUT via ssh\r\n' + ' DUT_ADB_TCP= Connect to the DUT via adb tcp\r\n' + ' DUT_ADB_USB= Connect to the DUT via adb usb\r\n' + ' REF_CLI_SERIAL= Connect to the reference device via cli serial port\r\n' + ' REF_ADB_USB= Connect to the reference device via adb usb\r\n' + ' REF_SSH= Connect to the reference device via ssh\r\n' + '\r\n' + 'Example:\r\n' + f' DUT_ADB_USB=1169UC2F2T0M95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 {sys.argv[0]} -d\r\n') + + parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, + description=description_msg, + epilog=epilog_msg) + + parser.add_argument( + '-d', + '--diag-commands', + action='store_true', + default=False, + help='test whether the RCP supports all diag commands', + ) + + parser.add_argument( + '-v', + '--verbose', + action='store_true', + default=False, + help='output verbose information', + ) + + return parser.parse_args() + + +def main(argv): + arguments = parse_arguments() + + if arguments.verbose is True: + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + rcp_caps = RcpCaps() + + if arguments.diag_commands is True: + rcp_caps.test_diag_commands() + + +if __name__ == '__main__': + main(sys.argv) diff --git a/tools/otci/otci/otci.py b/tools/otci/otci/otci.py index a3632afe66c3..eb01ba3cbfa0 100644 --- a/tools/otci/otci/otci.py +++ b/tools/otci/otci/otci.py @@ -2396,13 +2396,21 @@ def diag_stream_stop(self): """Stop transmitting a stream of characters.""" self.execute_command('diag stream stop') - def diag_send(self, packets: int, length: int): + def diag_send(self, packets: int, length: Optional[int] = None): """Transmit a fixed number of packets.""" - self.execute_command(f'diag send {packets} {length}') + if length is None: + command = f'diag send {packets}' + else: + command = f'diag send {packets} {length}' + self.execute_command(command) - def diag_repeat(self, delay: int, length: int): + def diag_repeat(self, delay: int, length: Optional[int] = None): """Transmit packets repeatedly with a fixed interval.""" - self.execute_command(f'diag repeat {delay} {length}') + if length is None: + command = f'diag repeat {delay}' + else: + command = f'diag repeat {delay} {length}' + self.execute_command(command) def diag_repeat_stop(self): """Stop repeated packet transmission."""