Skip to content

Commit

Permalink
[otci] add diag radio related commands support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglongxia committed Jan 3, 2025
1 parent 1024a1f commit 79d2f66
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions tools/otci/otci/otci.py
Original file line number Diff line number Diff line change
Expand Up @@ -2550,9 +2550,29 @@ def diag_cw_stop(self):
"""Stop transmitting continuous carrier wave."""
self.execute_command('diag cw stop')

def diag_frame(self, frame: str):
def diag_frame(self,
frame: str,
max_csma_backoffs: Optional[int] = None,
csma_ca_enabled: bool = False,
rx_channel_after_tx_done: Optional[int] = None,
tx_delay: Optional[int] = None,
tx_power: Optional[int] = None,
max_frame_retries: Optional[int] = None,
is_security_processed: bool = False,
is_header_updated: bool = False):
"""Set the frame (hex encoded) to be used by `diag send` and `diag repeat`."""
self.execute_command(f'diag frame {frame}')
command = f'diag frame '
command += f'-b {max_csma_backoffs} ' if max_csma_backoffs is not None else ''
command += '-c ' if csma_ca_enabled else ''
command += f'-d {tx_delay} ' if tx_delay is not None else ''
command += f'-C {rx_channel_after_tx_done} ' if rx_channel_after_tx_done is not None else ''
command += f'-p {tx_power} ' if tx_power is not None else ''
command += f'-r {max_frame_retries} ' if max_frame_retries is not None else ''
command += '-s ' if is_security_processed else ''
command += '-u ' if is_header_updated else ''
command += f'{frame}'

self.execute_command(command)

def diag_stream_start(self):
"""Start transmitting a stream of characters."""
Expand Down Expand Up @@ -2586,10 +2606,51 @@ def diag_radio_sleep(self):
"""Enter radio sleep mode."""
self.execute_command('diag radio sleep')

def diag_radio_enable(self):
"""Enable the radio."""
self.execute_command('diag radio enable')

def diag_radio_disable(self):
"""Disable the radio."""
self.execute_command('diag radio disable')

def diag_radio_receive(self):
"""Set radio to receive mode."""
self.execute_command('diag radio receive')

def diag_radio_receive_number(self, number: int):
"""Set radio to receive mode and receive specified number of packets."""

output = self.execute_command(f'diag radio receive {number} lpr')

if len(output) != number:
raise UnexpectedCommandOutput(output)

result = []

for line in output:
data = line.split(',')
result.append({
'rssi': int(data[1].split(":")[1]),
'lqi': int(data[2].split(":")[1]),
'len': int(data[3].split(":")[1]),
'psdu': data[4].split(":")[1],
})

return result

def diag_enable_radio_receive_filter(self):
"""Enable the radio receive filter."""
self.execute_command('diag radio receive filter enable')

def diag_disable_radio_receive_filter(self):
"""Disable the radio receive filter."""
self.execute_command('diag radio receive filter disable')

def diag_set_radio_receive_filter_dest_mac_address(self, dest_mac_address: str):
"""Set the destination mac address of the radio receive filter."""
self.execute_command(f'diag radio receive filter {dest_mac_address}')

def diag_get_radio_state(self) -> str:
"""Get the state of the radio."""
return self.__parse_str(self.execute_command('diag radio state'))
Expand Down

0 comments on commit 79d2f66

Please sign in to comment.