Skip to content

Commit

Permalink
[otci] test the tx info of the radio frame
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglongxia committed Jan 16, 2025
1 parent f6cee79 commit 7ffc61e
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 5 deletions.
19 changes: 18 additions & 1 deletion tools/cp-caps/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Show help info.

```bash
$ python3 ./tools/cp-caps/rcp_caps_test.py -h
usage: rcp_caps_test.py [-h] [-c] [-l] [-d] [-f] [-p] [-t] [-v] [-D]
usage: rcp_caps_test.py [-h] [-c] [-l] [-d] [-f] [-p] [-t] [-T] [-v] [-D]

This script is used for testing RCP capabilities.

Expand All @@ -66,6 +66,7 @@ options:
-f, --frame-format test whether the RCP supports 802.15.4 frames of all formats
-p, --data-poll test whether the RCP supports data poll
-t, --throughput test Thread network 1-hop throughput
-T, --tx-info test mTxInfo field of the radio frame
-v, --version output version
-D, --debug output debug information

Expand Down Expand Up @@ -221,3 +222,19 @@ RX ver:2015,Data,seq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie[csl
TX ver:2015,Data,noseq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0 ----------- OK
RX ver:2015,Data,noseq,dst[addr:short,pan:id],src[addr:short,pan:id],sec:no,ie:no,plen:0 ----------- OK
```

### Test mTxInfo field of the radio frame.

The option `-T` or `--tx-info` tests whether the RCP supports the mTxInfo field of the radio frame.

```bash
$ DUT_ADB_USB=1269UCKFZTAM95OR REF_CLI_SERIAL=/dev/ttyACM0 python3 ./tools/cp-caps/rcp_caps_test.py -T
mIsSecurityProcessed=True -------------------------------- OK
mIsSecurityProcessed=False ------------------------------- OK
mTxDelayBaseTime=now,mTxDelay=500000 --------------------- OK
mRxChannelAfterTxDone ------------------------------------ OK
mCsmaCaEnabled = 0 --------------------------------------- OK
mCsmaCaEnabled = 1 --------------------------------------- OK
mMaxCsmaBackoffs=0 --------------------------------------- OK (6 ms)
mMaxCsmaBackoffs=100 ------------------------------------- OK (560 ms)
```
251 changes: 247 additions & 4 deletions tools/cp-caps/rcp_caps_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import sys
import textwrap
import threading
import time
import queue

from dataclasses import dataclass
Expand All @@ -41,7 +42,7 @@
import otci
from otci import OTCI
from otci.types import Ip6Addr
from otci.errors import ExpectLineTimeoutError
from otci.errors import ExpectLineTimeoutError, InvalidArgumentsError

CP_CAPABILITY_VERSION = "0.1.1-dev"

Expand Down Expand Up @@ -100,11 +101,23 @@ class Frame:
Attributes:
name: The description of the frame.
tx_frame: The psdu of the frame that to be sent.
dst_address: The destination Mac address of the tx_frame.
dst_address: The destination Mac address of the tx_frame. It is used by the receiver to filter
out the tx_frame.
is_security_processed: The value of the otRadioFrame.mInfo.mTxInfo.mIsSecurityProcessed field.
If it is set to False, the active_dataset and src_ext_address are also should be set for the
radio driver to encrypt the tx_frame.
expect_rx_frame: The frame expected to be received. The frame expected to be received should be
the same with the tx_frame if the expect_rx_frame is set to None.
active_dataset: The active dataset.
src_ext_address: The source extended Mac address of the transmitter.
"""
name: str
tx_frame: str
dst_address: str
is_security_processed: Optional[bool] = True
expect_rx_frame: Optional[str] = None
active_dataset: Optional[str] = None
src_ext_address: Optional[str] = None

def test_frame_format(self):
"""Test whether the DUT supports sending and receiving 802.15.4 frames of all formats."""
Expand Down Expand Up @@ -268,9 +281,216 @@ def test_link_metrics(self):
self.__ref.leave()
self.__dut.leave()

def test_radio_frame_tx_info(self):
self.__test_radio_frame_tx_info_is_security_processed()
self.__test_radio_frame_tx_info_tx_delay()
self.__test_radio_frame_tx_info_rx_channel_after_tx_done()
self.__test_radio_frame_tx_info_csma_ca_enabled()
self.__test_radio_frame_tx_info_max_csma_backoffs()

#
# Private methods
#
def __test_radio_frame_tx_info_is_security_processed(self):
self.__dut.factory_reset()
active_dataset = self.__get_default_dataset()

frames = [
self.Frame(
name='mIsSecurityProcessed=True',
tx_frame='09ec00dddd102030405060708001020304050607080d0000000001db622c220fde64408db9128c93d50000',
dst_address='8070605040302010',
is_security_processed=True),
self.Frame(
name='mIsSecurityProcessed=False',
tx_frame='09ec00dddd102030405060708001020304050607080d000000000000010203040506070809000000000000',
expect_rx_frame=
'09ec00dddd102030405060708001020304050607080d0000000001db622c220fde64408db9128c93d50000',
is_security_processed=False,
dst_address='8070605040302010',
active_dataset=active_dataset,
src_ext_address='0807060504030201'),
]

for frame in frames:
ret = self.__test_send_formatted_frame(self.__dut, self.__ref, frame)
self.__output_format_bool(frame.name, ret)

def __test_radio_frame_tx_info_tx_delay(self):
self.__dut.factory_reset()
self.__ref.factory_reset()

# Enable the IPv6 interface to force the host and the RCP to synchronize the radio time.
self.__dut.set_dataset_bytes('active', self.__get_default_dataset())
self.__dut.ifconfig_up()
self.__dut.wait(0.5)
self.__dut.ifconfig_down()

self.__dut.diag_start()
self.__ref.diag_start()

channel = 11
packets = 1
dut_tx_delay_sec = 0.5
dut_tx_delay_us = int(dut_tx_delay_sec * 1000000)
ref_rx_delay_sec = dut_tx_delay_sec / 2
ref_address = 'dead00beefcafe01'
dut_tx_frame = '01ec00dddd01fecaefbe00adde02fecaefbe00adde000102030405060708090000'

self.__dut.diag_set_channel(channel)
self.__ref.diag_set_channel(channel)
self.__ref.diag_radio_receive()
self.__ref.diag_set_radio_receive_filter_dest_mac_address(ref_address)
self.__ref.diag_enable_radio_receive_filter()

self.__dut.diag_frame(dut_tx_frame, tx_delay=dut_tx_delay_us)
self.__dut.diag_send(packets, is_async=True)

self.__ref.wait(ref_rx_delay_sec)
stats = self.__ref.diag_get_stats()
ret = stats['received_packets'] == 0

if ret is True:
self.__ref.wait(ref_rx_delay_sec)
stats = self.__ref.diag_get_stats()
ret = stats['received_packets'] == 1

self.__output_format_bool(f'mTxDelayBaseTime=now,mTxDelay={dut_tx_delay_us}', ret)

self.__ref.diag_stop()
self.__dut.diag_stop()

def __test_radio_frame_tx_info_rx_channel_after_tx_done(self):
self.__dut.factory_reset()
self.__ref.factory_reset()

self.__dut.diag_start()
self.__ref.diag_start()

channel = 11
num_sent_frames = 1
rx_channel_after_tx_done = 25
dut_address = 'dead00beefcafe01'
dut_tx_frame = '01ec00dddd02fecaefbe00adde01fecaefbe00adde000102030405060708090000'
ref_tx_frame = '01ec00dddd01fecaefbe00adde02fecaefbe00adde000102030405060708090000'

self.__dut.diag_set_channel(channel)
self.__dut.diag_set_radio_receive_filter_dest_mac_address(dut_address)
self.__dut.diag_enable_radio_receive_filter()
self.__dut.diag_stats_clear()

self.__dut.diag_frame(dut_tx_frame, rx_channel_after_tx_done=rx_channel_after_tx_done)
self.__dut.diag_send(num_sent_frames, is_async=False)
stats = self.__dut.diag_get_stats()
ret = stats['sent_success_packets'] == num_sent_frames

if ret is True:
self.__ref.diag_set_channel(rx_channel_after_tx_done)
self.__ref.diag_frame(ref_tx_frame)
self.__ref.diag_send(num_sent_frames, is_async=False)
stats = self.__dut.diag_get_stats()
ret = stats['received_packets'] == num_sent_frames

self.__ref.diag_stop()
self.__dut.diag_stop()

self.__output_format_bool('mRxChannelAfterTxDone', ret)

def __test_radio_frame_tx_info_csma_ca_enabled(self):
self.__dut.factory_reset()
self.__ref.factory_reset()

self.__dut.diag_start()
self.__ref.diag_start()

channel = 11
num_sent_frames = 1
tx_frame = '01ec00dddd01fecaefbe00adde02fecaefbe00adde000102030405060708090000'

self.__dut.diag_set_channel(channel)
self.__ref.diag_set_channel(channel)
self.__ref.diag_cw_start()

self.__dut.diag_stats_clear()
self.__dut.diag_frame(tx_frame, csma_ca_enabled=False)
self.__dut.diag_send(num_sent_frames, is_async=False)
dut_stats = self.__dut.diag_get_stats()
ret = dut_stats['sent_success_packets'] == num_sent_frames
self.__output_format_bool('mCsmaCaEnabled = 0', ret)

self.__dut.diag_stats_clear()
self.__dut.diag_frame(tx_frame, csma_ca_enabled=True)
self.__dut.diag_send(num_sent_frames, is_async=False)
dut_stats = self.__dut.diag_get_stats()
ret = dut_stats['sent_error_cca_packets'] == num_sent_frames
self.__output_format_bool('mCsmaCaEnabled = 1', ret)

self.__ref.diag_cw_stop()
self.__ref.diag_stop()
self.__dut.diag_stop()

def __test_radio_frame_tx_info_max_csma_backoffs(self):
self.__dut.factory_reset()
self.__ref.factory_reset()

self.__dut.diag_start()
self.__ref.diag_start()

channel = 11
num_sent_frames = 1
tx_frame = '01ec00dddd01fecaefbe00adde02fecaefbe00adde000102030405060708090000'

self.__dut.diag_set_channel(channel)
self.__ref.diag_set_channel(channel)

self.__ref.diag_cw_start()
self.__dut.wait(0.05)

# When the max_csma_backoffs is set to 0, the radio driver should skip backoff and do CCA once.
# The CCA time is 192 us. Theoretically, the `diag send` command should return after 192us.
# But considering the Spinel delay and the system IO delay, here sets the max_time_cost to 10 ms.
max_time_cost = 10
max_csma_backoffs = 0
self.__dut.diag_stats_clear()
self.__dut.diag_frame(tx_frame, csma_ca_enabled=True, max_frame_retries=0, max_csma_backoffs=max_csma_backoffs)
start_time = time.time()
self.__dut.diag_send(num_sent_frames, is_async=False)
end_time = time.time()
time_cost = int((end_time - start_time) * 1000)
ret = 'OK' if time_cost < max_time_cost else 'NotSupport'
self.__output_format_string(f'mMaxCsmaBackoffs={max_csma_backoffs}', f'{ret} ({time_cost} ms)')

# Basic information for calculating the backoff time:
# aTurnaroundTime = 192 us
# aCCATime = 128 us
# backoffExponent = (macMinBe, macMaxBe) = (3, 5)
# backoffPeriod = random() % (1<< backoffExponent)
# backoff = backoffPeriod * aUnitBackoffPeriod = backoffPeriod * (aTurnaroundTime + aCCATime)
# = backoffPeriod * 320 us
# backoff = (random() % (1<< backoffExponent)) * 320us
#
# The max_csma_backoffs is set to 100 here, the `backoffExponent` be will be set to 5 in most retries.
# backoff = (random() % 32) * 320us
# average_backoff = 16 * 320us = 5120 us
# total_backoff = average_backoff * 100 = 5120 us *100 = 512 ms
#
# Here sets the max_time_cost to half of total_backoff.
#
max_time_cost = 256
max_frame_retries = 0
max_csma_backoffs = 100
self.__dut.diag_frame(tx_frame, csma_ca_enabled=True, max_frame_retries=0, max_csma_backoffs=max_csma_backoffs)
start_time = time.time()
self.__dut.diag_send(num_sent_frames, is_async=False)
end_time = time.time()
time_cost = int((end_time - start_time) * 1000)
ret = 'OK' if time_cost > max_time_cost else 'NotSupport'
self.__output_format_string(f'mMaxCsmaBackoffs={max_csma_backoffs}', f'{ret} ({time_cost} ms)')

self.__ref.diag_cw_stop()
self.__ref.diag_stop()
self.__dut.diag_stop()

def __run_link_metrics_test_commands(self, initiator: OTCI, subject_address: Ip6Addr) -> bool:
seriesid = 1
series_flags = 'ldra'
Expand Down Expand Up @@ -594,6 +814,17 @@ def __test_send_formatted_frame(self, sender: OTCI, receiver: OTCI, frame: Frame
sender.factory_reset()
receiver.factory_reset()

# When the 'is_security_processed' is False, it means the frame may need the radio driver
# to encrypt the frame. Here sets the active dataset and the MAC source address for the
# radio driver to encrypt the frame.
if frame.is_security_processed is False:
if frame.active_dataset is None or frame.src_ext_address is None:
raise InvalidArgumentsError(
"When the 'is_security_processed' is 'False', the 'active_dataset' and 'src_ext_address' must be set"
)
sender.set_dataset_bytes('active', frame.active_dataset)
sender.set_extaddr(frame.src_ext_address)

sender.diag_start()
receiver.diag_start()

Expand All @@ -615,7 +846,7 @@ def __test_send_formatted_frame(self, sender: OTCI, receiver: OTCI, frame: Frame
sender.wait(0.1)

sender.diag_frame(frame.tx_frame,
is_security_processed=True,
is_security_processed=frame.is_security_processed,
max_csma_backoffs=4,
max_frame_retries=4,
csma_ca_enabled=True)
Expand All @@ -632,7 +863,8 @@ def __test_send_formatted_frame(self, sender: OTCI, receiver: OTCI, frame: Frame
else:
# The radio driver may not append the FCF field to the received frame. Do not check the FCF field here.
FCF_LENGTH = 4
ret = frame.tx_frame[:-FCF_LENGTH] == received_frames[0]['psdu'][:-FCF_LENGTH]
expect_frame = frame.tx_frame if frame.expect_rx_frame is None else frame.expect_rx_frame
ret = expect_frame[:-FCF_LENGTH] == received_frames[0]['psdu'][:-FCF_LENGTH]

if ret:
sender.diag_stop()
Expand Down Expand Up @@ -792,6 +1024,14 @@ def parse_arguments():
help='test Thread network 1-hop throughput',
)

parser.add_argument(
'-T',
'--tx-info',
action='store_true',
default=False,
help='test mTxInfo field of the radio frame',
)

parser.add_argument(
'-v',
'--version',
Expand Down Expand Up @@ -839,6 +1079,9 @@ def main():
if arguments.throughput:
rcp_caps.test_throughput()

if arguments.transmit_info:
rcp_caps.test_radio_frame_tx_info()

if arguments.frame_format:
rcp_caps.test_frame_format()

Expand Down

0 comments on commit 7ffc61e

Please sign in to comment.