Skip to content

Commit

Permalink
[otci] add link metrics commands to otci
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglongxia committed Aug 8, 2024
1 parent c5ad131 commit a1206d8
Showing 1 changed file with 130 additions and 7 deletions.
137 changes: 130 additions & 7 deletions tools/otci/otci/otci.py
Original file line number Diff line number Diff line change
Expand Up @@ -2148,13 +2148,136 @@ def set_domain_name(self, name: str):
#
# Link metrics management
#
# TODO: linkmetrics mgmt <ipaddr> forward <seriesid> [ldraX][pqmr]
# TODO: linkmetrics probe <ipaddr> <seriesid> <length>
# TODO: linkmetrics query <ipaddr> single [pqmr]
# TODO: linkmetrics query <ipaddr> forward <seriesid>
# TODO: linkquality <extaddr>
# TODO: linkquality <extaddr> <linkquality>
#

def linkmetrics_config_enhanced_ack_clear(self, peer_addr: Union[str, Ip6Addr]) -> bool:
output = self.execute_command(f'linkmetrics config {peer_addr} enhanced-ack clear')
return self.__parse_linkmetrics_mgmt_response(peer_addr, output)

def linkmetrics_config_enhanced_ack_register(self,
peer_addr: Union[str, Ip6Addr],
link_metrics_flags: str,
reference: bool = False) -> bool:
if self.__valid_flags(link_metrics_flags, 'qmr') is False:
raise ValueError(link_metrics_flags)

output = self.execute_command(
f'linkmetrics config {peer_addr} enhanced-ack register {link_metrics_flags} {"r" if reference else ""}')
return self.__parse_linkmetrics_mgmt_response(peer_addr, output)

def linkmetrics_config_forward(self, peer_addr: Union[str, Ip6Addr], seriesid: int, series_flags: str,
link_metrics_flags: str) -> bool:
if self.__valid_flags(series_flags, 'ldraX') is False:
raise ValueError(series_flags)

if self.__valid_flags(link_metrics_flags, 'pqmr') is False:
raise ValueError(link_metrics_flags)

output = self.execute_command(
f'linkmetrics config {peer_addr} forward {seriesid} {series_flags} {link_metrics_flags}')
return self.__parse_linkmetrics_mgmt_response(peer_addr, output)

def linkmetrics_probe(self, peer_addr: Union[str, Ip6Addr], seriesid: int, length: int):
if length < 0 or length > 64:
raise ValueError(length)

self.execute_command(f'linkmetrics probe {peer_addr} {seriesid} {length}')

def linkmetrics_request_single(self, peer_addr: Union[str, Ip6Addr], link_metrics_flags: str) -> Dict[str, int]:
if self.__valid_flags(link_metrics_flags, 'pqmr') is False:
raise ValueError(link_metrics_flags)

output = self.execute_command(f'linkmetrics request {peer_addr} single {link_metrics_flags}')
return self.__parse_linkmetrics_report(peer_addr, output)

def linkmetrics_request_forward(self, peer_addr: Union[str, Ip6Addr], seriesid: int) -> Dict[str, int]:
output = self.execute_command(f'linkmetrics request {peer_addr} forward {seriesid}')
return self.__parse_linkmetrics_report(peer_addr, output)

def __parse_linkmetrics_mgmt_response(self, peer_addr: Union[str, Ip6Addr], output: List[str]) -> bool:
#
# Example output:
#
# Received Link Metrics Management Response from: fe80:0:0:0:3092:f334:1455:1ad2
# Status: Success
# Done
#

status = ''
report_received = False
ret = False

for line in output:
if 'Received Link Metrics Management Response from' in line:
address = line.split(': ')[1].strip()
report_received = address == peer_addr
elif 'Status' in line:
status = line.split(':')[1].strip()

return report_received and status == 'Success'

def __parse_linkmetrics_report(self, peer_addr: Union[str, Ip6Addr], output: List[str]) -> Dict[str, int]:
#
# Example output:
#
# Received Link Metrics Report from: fe80:0:0:0:3092:f334:1455:1ad2
#
# - PDU Counter: 2 (Count/Summation)
# - LQI: 76 (Exponential Moving Average)
# - Margin: 82 (dB) (Exponential Moving Average)
# - RSSI: -18 (dBm) (Exponential Moving Average)
# Done
#

results = {}
report_received = False

for line in output:
if 'Received Link Metrics Report' in line:
address = line.split(': ')[1].strip()
report_received = address == peer_addr
elif 'Received Link Metrics data in Enh Ack from neighbor' in line:
# If the Enhanced-ACK Based Probing is enabled, the CLI will output the following
# link metrics info after executing the `linkmetrics request` command. This case is
# used to skip these Enhanced-ACK related link metrics info.
#
# Received Link Metrics data in Enh Ack from neighbor, short address:0x3400 , extended address:c6a24d6514cf9178
# - LQI: 224 (Exponential Moving Average)
# - Margin: 0 (dB) (Exponential Moving Average)
#
# Received Link Metrics Report from: fe80:0:0:0:3092:f334:1455:1ad2
#
# - PDU Counter: 2 (Count/Summation)
# - LQI: 76 (Exponential Moving Average)
# - Margin: 82 (dB) (Exponential Moving Average)
# - RSSI: -18 (dBm) (Exponential Moving Average)
# Done
#
report_received = False

if not report_received:
continue

if '- LQI' in line:
results['lqi'] = self.__parse_numbers(line)[0]
elif '- Margin' in line:
results['margin'] = self.__parse_numbers(line)[0]
elif '- RSSI' in line:
results['rssi'] = self.__parse_numbers(line)[0]
elif '- PDU Counter' in line:
results['pdu_counter'] = self.__parse_numbers(line)[0]

return results

def __parse_numbers(self, line: str) -> List[int]:
values = re.findall("\-?\d+", line)
return list(map(int, values))

def __valid_flags(self, flags: str, flags_set: str):
# check for duplicate chars
if len(flags) != len(set(flags)):
return False

return set(flags).issubset(set(flags_set))

#
# Logging
Expand Down

0 comments on commit a1206d8

Please sign in to comment.