From 6869efb43606a5847c7ee606849a5a308e132e59 Mon Sep 17 00:00:00 2001 From: James Swan <122404367+swan-amazon@users.noreply.github.com> Date: Tue, 27 Aug 2024 18:05:11 +0000 Subject: [PATCH] scripts --- src/python_testing/TC_CGEN_2_5.py | 131 +++++++++++++++ src/python_testing/TC_CGEN_2_6.py | 54 ++++++ src/python_testing/matter_testing_support.py | 163 +++++++++++-------- 3 files changed, 278 insertions(+), 70 deletions(-) create mode 100644 src/python_testing/TC_CGEN_2_5.py create mode 100644 src/python_testing/TC_CGEN_2_6.py diff --git a/src/python_testing/TC_CGEN_2_5.py b/src/python_testing/TC_CGEN_2_5.py new file mode 100644 index 00000000000000..175301e0d9c82d --- /dev/null +++ b/src/python_testing/TC_CGEN_2_5.py @@ -0,0 +1,131 @@ +# +# Copyright (c) 2022 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --tc-version 1 --tc-user-response 1 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1 + +import logging +import random + +import chip.clusters as Clusters +from chip.exceptions import ChipStackError +from matter_testing_support import DiscoveryFilterType, MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_CGEN_2_5(MatterBaseTest): + async def verify_tc_attributes(self, expected_tc_version: int, expected_tc_acknowledgements: int): + attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion + tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + asserts.assert_equal(tc_accepted_version, expected_tc_version, + f"Expected TCAcceptedVersion to be {expected_tc_version}, but got {tc_accepted_version}") + + attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements + tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + asserts.assert_equal(tc_acknowedgements, expected_tc_acknowledgements, + f"Expected TCAcknowledgements to be {expected_tc_acknowledgements}, but got {tc_acknowedgements}") + + @async_test_body + async def test_TC_CGEN_2_5(self): + self.th1 = self.default_controller + self.discriminator = random.randint(0, 4095) + + logging.info('Step 1 - TH reads the TCAcceptedVersion attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion + tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + + print(f"tc_accepted_version: {tc_accepted_version}") + + logging.info('Step 2 - TH reads the TCAcknowledgements attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements + tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + print(f"tc_acknowedgements: {tc_acknowedgements}") + + logging.info('Step 3 - TH reads the TCMinRequiredVersion attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCMinRequiredVersion + tc_min_requried_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + + logging.info('Step 4 - TH reads the TCAcknowledgementsRequired attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgementsRequired + tc_required = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + + logging.info('Step 5 - TH sends SetTCAcknowledgements with greater values than current and verify set') + new_accepted_version = tc_accepted_version + 1 + new_acknowledgements = 65535 + + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, 'Incorrect error code') + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 6 - TH sends SetTCAcknowledgements with no accepted terms at version 0') + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(0, 0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal( + resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCMinVersionNotMet, 'Incorrect error code' + ) + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 7 - TH sends SetTCAcknowledgements with no accepted terms at version 1') + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal( + resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code' + ) + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 8 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 60') + cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(60) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + 'Incorrect error code') + + logging.info('Step 9 - TH sends SetTCAcknowledgements with incremented TCVersion') + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version + 1, new_acknowledgements) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + 'Incorrect error code') + + logging.info('Step 10 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 0') + cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + 'Incorrect error code') + + # Verify that the TC attributes have been reset to the pre-failsafe state + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 11 - TH removes all fabrics from the device') + fabrics = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics) + for fabric in fabrics: + if fabric.fabricIndex == self.th1.fabricId: + continue + cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric.fabricIndex) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk) + + # Remove TH1 fabric last + cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(self.th1.fabricId) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_CGEN_2_6.py b/src/python_testing/TC_CGEN_2_6.py new file mode 100644 index 00000000000000..ac9ab3322f6965 --- /dev/null +++ b/src/python_testing/TC_CGEN_2_6.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1 + +import logging +import random + +import chip.clusters as Clusters +from matter_testing_support import (async_test_body, default_matter_test_main, MatterBaseTest) +from mobly import asserts + + +class TC_CGEN_2_6(MatterBaseTest): + + @async_test_body + async def test_TC_CGEN_2_6(self): + self.th1 = self.default_controller + self.discriminator = random.randint(0, 4095) + + logging.info('Step 1 - TH commissions the DUT without setting TCs') + + # Don't set TCs for the next commissioning and skip CommissioningComplete so we can manually call CommissioningComplete in order to check the response error code + self.th1.SetTCRequired(False) + self.th1.SetSkipCommissioningComplete(True) + await self.commission_devices() + + cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete() + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal( + resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived, 'Incorrect error code') + + +if __name__ == "__main__": + default_matter_test_main({ "pre_commissioning": False }) diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index b085750bc44821..12bf4cc794bf8d 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -520,6 +520,11 @@ def test_skipped(self, filename: str, name: str): @dataclass class MatterTestConfig: + def merge(self, override_attr_dict: dict): + for key, value in override_attr_dict.items(): + setattr(self, key, value) + return self + storage_path: pathlib.Path = pathlib.Path(".") logs_path: pathlib.Path = pathlib.Path(".") paa_trust_store_path: Optional[pathlib.Path] = None @@ -535,6 +540,7 @@ class MatterTestConfig: endpoint: int = 0 app_pid: int = 0 + pre_commissioning: bool = True commissioning_method: Optional[str] = None discriminators: Optional[List[int]] = None setup_passcodes: Optional[List[int]] = None @@ -846,6 +852,85 @@ def __init__(self, *args): self.problems = [] self.is_commissioning = False + + async def commission_device(self, dut_node_idx: int) -> bool: + dev_ctrl = self.default_controller + conf = self.matter_test_config + + info = self.get_setup_payload_info()[dut_node_idx] + + if conf.tc_version is not None and conf.tc_user_response is not None: + logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.") + dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response) + dev_ctrl.SetTCRequired(True) + else: + dev_ctrl.SetTCRequired(False) + + if conf.commissioning_method == "on-network": + try: + await dev_ctrl.CommissionOnNetwork( + nodeId=conf.dut_node_ids[dut_node_idx], + setupPinCode=info.passcode, + filterType=info.filter_type, + filter=info.filter_value + ) + return True + except ChipStackError as e: + logging.error("Commissioning failed: %s" % e) + return False + elif conf.commissioning_method == "ble-wifi": + try: + await dev_ctrl.CommissionWiFi( + info.filter_value, + info.passcode, + conf.dut_node_ids[dut_node_idx], + conf.wifi_ssid, + conf.wifi_passphrase, + isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR) + ) + return True + except ChipStackError as e: + logging.error("Commissioning failed: %s" % e) + return False + elif conf.commissioning_method == "ble-thread": + try: + await dev_ctrl.CommissionThread( + info.filter_value, + info.passcode, + conf.dut_node_ids[dut_node_idx], + conf.thread_operational_dataset, + isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR) + ) + return True + except ChipStackError as e: + logging.error("Commissioning failed: %s" % e) + return False + elif conf.commissioning_method == "on-network-ip": + try: + logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====") + await dev_ctrl.CommissionIP( + ipaddr=conf.commissionee_ip_address_just_for_testing, + setupPinCode=info.passcode, nodeid=conf.dut_node_ids[dut_node_idx] + ) + return True + except ChipStackError as e: + logging.error("Commissioning failed: %s" % e) + return False + else: + raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method) + + + async def commission_devices(self): + conf = self.matter_test_config + + for commission_idx, node_id in enumerate(conf.dut_node_ids): + logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % + (conf.root_of_trust_index, conf.fabric_id, node_id)) + logging.info("Commissioning method: %s" % conf.commissioning_method) + + await self.commission_device(commission_idx) + + def get_test_steps(self, test: str) -> list[TestStep]: ''' Retrieves the test step list for the given test @@ -2039,77 +2124,11 @@ def test_run_commissioning(self): (conf.root_of_trust_index, conf.fabric_id, node_id)) logging.info("Commissioning method: %s" % conf.commissioning_method) - if not asyncio.run(self._commission_device(commission_idx)): + if not asyncio.run(self.commission_device(commission_idx)): raise signals.TestAbortAll("Failed to commission node") - async def _commission_device(self, i) -> bool: - dev_ctrl = self.default_controller - conf = self.matter_test_config - - info = self.get_setup_payload_info()[i] - - if conf.tc_version is not None and conf.tc_user_response is not None: - logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.") - dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response) - dev_ctrl.SetTCRequired(True) - else: - dev_ctrl.SetTCRequired(False) - - if conf.commissioning_method == "on-network": - try: - await dev_ctrl.CommissionOnNetwork( - nodeId=conf.dut_node_ids[i], - setupPinCode=info.passcode, - filterType=info.filter_type, - filter=info.filter_value - ) - return True - except ChipStackError as e: - logging.error("Commissioning failed: %s" % e) - return False - elif conf.commissioning_method == "ble-wifi": - try: - await dev_ctrl.CommissionWiFi( - info.filter_value, - info.passcode, - conf.dut_node_ids[i], - conf.wifi_ssid, - conf.wifi_passphrase, - isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR) - ) - return True - except ChipStackError as e: - logging.error("Commissioning failed: %s" % e) - return False - elif conf.commissioning_method == "ble-thread": - try: - await dev_ctrl.CommissionThread( - info.filter_value, - info.passcode, - conf.dut_node_ids[i], - conf.thread_operational_dataset, - isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR) - ) - return True - except ChipStackError as e: - logging.error("Commissioning failed: %s" % e) - return False - elif conf.commissioning_method == "on-network-ip": - try: - logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====") - await dev_ctrl.CommissionIP( - ipaddr=conf.commissionee_ip_address_just_for_testing, - setupPinCode=info.passcode, nodeid=conf.dut_node_ids[i] - ) - return True - except ChipStackError as e: - logging.error("Commissioning failed: %s" % e) - return False - else: - raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method) - -def default_matter_test_main(): +def default_matter_test_main(override_matter_test_config: dict = {}): """Execute the test class in a test module. This is the default entry point for running a test script file directly. In this case, only one test class in a test script is allowed. @@ -2121,7 +2140,11 @@ def default_matter_test_main(): default_matter_test_main.main() """ - matter_test_config = parse_matter_test_args() + # Parse standard test configuration arguments + matter_test_config: MatterTestConfig = parse_matter_test_args() + + # Apply test configuration overrides + matter_test_config.merge(override_matter_test_config) # Find the test class in the test script. test_class = _find_test_class() @@ -2195,7 +2218,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest testbed_name=test_config.testbed_name) with runner.mobly_logger(): - if matter_test_config.commissioning_method is not None: + if matter_test_config.pre_commissioning and matter_test_config.commissioning_method is not None: runner.add_test_class(test_config, CommissionDeviceTest, None) # Add the tests selected unless we have a commission-only request