Skip to content

Commit

Permalink
scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
swan-amazon committed Aug 27, 2024
1 parent ae07fb5 commit 6869efb
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 70 deletions.
131 changes: 131 additions & 0 deletions src/python_testing/TC_CGEN_2_5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# test-runner-runs: run1
# test-runner-run/run1/app: ${ALL_CLUSTERS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --tc-version 1 --tc-user-response 1 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1

import logging
import random

import chip.clusters as Clusters
from chip.exceptions import ChipStackError
from matter_testing_support import DiscoveryFilterType, MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts


class TC_CGEN_2_5(MatterBaseTest):
async def verify_tc_attributes(self, expected_tc_version: int, expected_tc_acknowledgements: int):
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_accepted_version, expected_tc_version,
f"Expected TCAcceptedVersion to be {expected_tc_version}, but got {tc_accepted_version}")

attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_acknowedgements, expected_tc_acknowledgements,
f"Expected TCAcknowledgements to be {expected_tc_acknowledgements}, but got {tc_acknowedgements}")

@async_test_body
async def test_TC_CGEN_2_5(self):
self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)

logging.info('Step 1 - TH reads the TCAcceptedVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

print(f"tc_accepted_version: {tc_accepted_version}")

logging.info('Step 2 - TH reads the TCAcknowledgements attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
print(f"tc_acknowedgements: {tc_acknowedgements}")

logging.info('Step 3 - TH reads the TCMinRequiredVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCMinRequiredVersion
tc_min_requried_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

logging.info('Step 4 - TH reads the TCAcknowledgementsRequired attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgementsRequired
tc_required = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

logging.info('Step 5 - TH sends SetTCAcknowledgements with greater values than current and verify set')
new_accepted_version = tc_accepted_version + 1
new_acknowledgements = 65535

cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, 'Incorrect error code')
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 6 - TH sends SetTCAcknowledgements with no accepted terms at version 0')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(0, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCMinVersionNotMet, 'Incorrect error code'
)
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 7 - TH sends SetTCAcknowledgements with no accepted terms at version 1')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code'
)
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 8 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 60')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(60)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
'Incorrect error code')

logging.info('Step 9 - TH sends SetTCAcknowledgements with incremented TCVersion')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version + 1, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
'Incorrect error code')

logging.info('Step 10 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 0')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
'Incorrect error code')

# Verify that the TC attributes have been reset to the pre-failsafe state
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 11 - TH removes all fabrics from the device')
fabrics = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics)
for fabric in fabrics:
if fabric.fabricIndex == self.th1.fabricId:
continue
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric.fabricIndex)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk)

# Remove TH1 fabric last
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(self.th1.fabricId)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)


if __name__ == "__main__":
default_matter_test_main()
54 changes: 54 additions & 0 deletions src/python_testing/TC_CGEN_2_6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# test-runner-runs: run1
# test-runner-run/run1/app: ${ALL_CLUSTERS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1

import logging
import random

import chip.clusters as Clusters
from matter_testing_support import (async_test_body, default_matter_test_main, MatterBaseTest)
from mobly import asserts


class TC_CGEN_2_6(MatterBaseTest):

@async_test_body
async def test_TC_CGEN_2_6(self):
self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)

logging.info('Step 1 - TH commissions the DUT without setting TCs')

# Don't set TCs for the next commissioning and skip CommissioningComplete so we can manually call CommissioningComplete in order to check the response error code
self.th1.SetTCRequired(False)
self.th1.SetSkipCommissioningComplete(True)
await self.commission_devices()

cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived, 'Incorrect error code')


if __name__ == "__main__":
default_matter_test_main({ "pre_commissioning": False })
163 changes: 93 additions & 70 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ def test_skipped(self, filename: str, name: str):

@dataclass
class MatterTestConfig:
def merge(self, override_attr_dict: dict):
for key, value in override_attr_dict.items():
setattr(self, key, value)
return self

storage_path: pathlib.Path = pathlib.Path(".")
logs_path: pathlib.Path = pathlib.Path(".")
paa_trust_store_path: Optional[pathlib.Path] = None
Expand All @@ -535,6 +540,7 @@ class MatterTestConfig:
endpoint: int = 0
app_pid: int = 0

pre_commissioning: bool = True
commissioning_method: Optional[str] = None
discriminators: Optional[List[int]] = None
setup_passcodes: Optional[List[int]] = None
Expand Down Expand Up @@ -846,6 +852,85 @@ def __init__(self, *args):
self.problems = []
self.is_commissioning = False


async def commission_device(self, dut_node_idx: int) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

info = self.get_setup_payload_info()[dut_node_idx]

if conf.tc_version is not None and conf.tc_user_response is not None:
logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.")
dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response)
dev_ctrl.SetTCRequired(True)
else:
dev_ctrl.SetTCRequired(False)

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_ids[dut_node_idx],
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-wifi":
try:
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.dut_node_ids[dut_node_idx],
conf.wifi_ssid,
conf.wifi_passphrase,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-thread":
try:
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.dut_node_ids[dut_node_idx],
conf.thread_operational_dataset,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "on-network-ip":
try:
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
await dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=info.passcode, nodeid=conf.dut_node_ids[dut_node_idx]
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)


async def commission_devices(self):
conf = self.matter_test_config

for commission_idx, node_id in enumerate(conf.dut_node_ids):
logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)

await self.commission_device(commission_idx)


def get_test_steps(self, test: str) -> list[TestStep]:
''' Retrieves the test step list for the given test
Expand Down Expand Up @@ -2039,77 +2124,11 @@ def test_run_commissioning(self):
(conf.root_of_trust_index, conf.fabric_id, node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)

if not asyncio.run(self._commission_device(commission_idx)):
if not asyncio.run(self.commission_device(commission_idx)):
raise signals.TestAbortAll("Failed to commission node")

async def _commission_device(self, i) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

info = self.get_setup_payload_info()[i]

if conf.tc_version is not None and conf.tc_user_response is not None:
logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.")
dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response)
dev_ctrl.SetTCRequired(True)
else:
dev_ctrl.SetTCRequired(False)

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_ids[i],
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-wifi":
try:
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
conf.wifi_ssid,
conf.wifi_passphrase,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-thread":
try:
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
conf.thread_operational_dataset,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "on-network-ip":
try:
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
await dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=info.passcode, nodeid=conf.dut_node_ids[i]
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)


def default_matter_test_main():
def default_matter_test_main(override_matter_test_config: dict = {}):
"""Execute the test class in a test module.
This is the default entry point for running a test script file directly.
In this case, only one test class in a test script is allowed.
Expand All @@ -2121,7 +2140,11 @@ def default_matter_test_main():
default_matter_test_main.main()
"""

matter_test_config = parse_matter_test_args()
# Parse standard test configuration arguments
matter_test_config: MatterTestConfig = parse_matter_test_args()

# Apply test configuration overrides
matter_test_config.merge(override_matter_test_config)

# Find the test class in the test script.
test_class = _find_test_class()
Expand Down Expand Up @@ -2195,7 +2218,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest
testbed_name=test_config.testbed_name)

with runner.mobly_logger():
if matter_test_config.commissioning_method is not None:
if matter_test_config.pre_commissioning and matter_test_config.commissioning_method is not None:
runner.add_test_class(test_config, CommissionDeviceTest, None)

# Add the tests selected unless we have a commission-only request
Expand Down

0 comments on commit 6869efb

Please sign in to comment.