diff --git a/src/controller/python/ChipDeviceController-ScriptBinding.cpp b/src/controller/python/ChipDeviceController-ScriptBinding.cpp index b31cb5b7b8a4e6..d2d5e12a2f0160 100644 --- a/src/controller/python/ChipDeviceController-ScriptBinding.cpp +++ b/src/controller/python/ChipDeviceController-ScriptBinding.cpp @@ -181,6 +181,9 @@ PyChipError pychip_DeviceController_OpenCommissioningWindow(chip::Controller::De bool pychip_DeviceController_GetIPForDiscoveredDevice(chip::Controller::DeviceCommissioner * devCtrl, int idx, char * addrStr, uint32_t len); +PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired); +PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse); +PyChipError pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete); // Pairing Delegate PyChipError @@ -570,6 +573,25 @@ PyChipError pychip_DeviceController_SetDefaultNtp(const char * defaultNTP) return ToPyChipError(CHIP_NO_ERROR); } +PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired) +{ + sCommissioningParameters.SetRequireTermsAndConditionsAcknowledgement(tcRequired); + return ToPyChipError(CHIP_NO_ERROR); +} + +PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse) +{ + sCommissioningParameters.SetTermsAndConditionsAcknowledgement( + { .acceptedTermsAndConditions = tcUserResponse, .acceptedTermsAndConditionsVersion = tcVersion }); + return ToPyChipError(CHIP_NO_ERROR); +} + +PyChipError pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete) +{ + sCommissioningParameters.SetSkipCommissioningComplete(skipCommissioningComplete); + return ToPyChipError(CHIP_NO_ERROR); +} + PyChipError pychip_DeviceController_SetTrustedTimeSource(chip::NodeId nodeId, chip::EndpointId endpoint) { chip::app::Clusters::TimeSynchronization::Structs::FabricScopedTrustedTimeSourceStruct::Type timeSource = { .nodeID = nodeId, diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index d76d415d746945..b028e80436fb34 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -1956,6 +1956,15 @@ def _InitLib(self): self._dmLib.pychip_CreateManualCode.restype = PyChipError self._dmLib.pychip_CreateManualCode.argtypes = [c_uint16, c_uint32, c_char_p, c_size_t, POINTER(c_size_t)] + self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.restype = PyChipError + self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.argtypes = [c_bool] + + self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.restype = PyChipError + self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.argtypes = [c_bool] + + self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.restype = PyChipError + self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.argtypes = [c_uint16, c_uint16] + class ChipDeviceController(ChipDeviceControllerBase): ''' The ChipDeviceCommissioner binding, named as ChipDeviceController @@ -2084,6 +2093,27 @@ def SetDSTOffset(self, offset: int, validStarting: int, validUntil: int): lambda: self._dmLib.pychip_DeviceController_SetDSTOffset(offset, validStarting, validUntil) ).raise_on_error() + def SetTCRequired(self, tcRequired: bool): + ''' Set whether TC Acknowledgements should be set during commissioning''' + self.CheckIsActive() + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(tcRequired) + ).raise_on_error() + + def SetTCAcknowledgements(self, tcAcceptedVersion: int, tcUserResponse: int): + ''' Set the TC acknowledgements to set during commissioning''' + self.CheckIsActive() + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_SetTermsAcknowledgements(tcAcceptedVersion, tcUserResponse) + ).raise_on_error() + + def SetSkipCommissioningComplete(self, skipCommissioningComplete: bool): + ''' Set whether to skip the commissioning complete callback''' + self.CheckIsActive() + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete(skipCommissioningComplete) + ).raise_on_error() + def SetDefaultNTP(self, defaultNTP: str): ''' Set the DefaultNTP to set during commissioning''' self.CheckIsActive() diff --git a/src/python_testing/TC_CGEN_2_5.py b/src/python_testing/TC_CGEN_2_5.py new file mode 100644 index 00000000000000..e430989d50ef7a --- /dev/null +++ b/src/python_testing/TC_CGEN_2_5.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${TERMS_AND_CONDITIONS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --tc-version 1 --tc-user-response 1 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1 +# === END CI TEST ARGUMENTS === + +import logging +import random + +import chip.clusters as Clusters +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_CGEN_2_5(MatterBaseTest): + async def verify_tc_attributes(self, expected_tc_version: int, expected_tc_acknowledgements: int): + attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion + tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + asserts.assert_equal(tc_accepted_version, expected_tc_version, + f"Expected TCAcceptedVersion to be {expected_tc_version}, but got {tc_accepted_version}") + + attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements + tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + asserts.assert_equal(tc_acknowedgements, expected_tc_acknowledgements, + f"Expected TCAcknowledgements to be {expected_tc_acknowledgements}, but got {tc_acknowedgements}") + + @async_test_body + async def test_TC_CGEN_2_5(self): + self.th1 = self.default_controller + self.discriminator = random.randint(0, 4095) + + logging.info('Step 1 - TH reads the TCAcceptedVersion attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion + tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + + print(f"tc_accepted_version: {tc_accepted_version}") + + logging.info('Step 2 - TH reads the TCAcknowledgements attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements + tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + print(f"tc_acknowedgements: {tc_acknowedgements}") + + logging.info('Step 3 - TH reads the TCMinRequiredVersion attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCMinRequiredVersion + tc_min_requried_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + print(f"tc_min_requried_version: {tc_min_requried_version}") + + logging.info('Step 4 - TH reads the TCAcknowledgementsRequired attribute') + attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgementsRequired + tc_required = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr) + print(f"tc_required: {tc_required}") + + logging.info('Step 5 - TH sends SetTCAcknowledgements with greater values than current and verify set') + new_accepted_version = tc_accepted_version + 1 + new_acknowledgements = 65535 + + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, 'Incorrect error code') + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 6 - TH sends SetTCAcknowledgements with no accepted terms at version 0') + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(0, 0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal( + resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCMinVersionNotMet, 'Incorrect error code' + ) + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 7 - TH sends SetTCAcknowledgements with no accepted terms at version 1') + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal( + resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code' + ) + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 8 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 60') + cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(60) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + 'Incorrect error code') + + logging.info('Step 9 - TH sends SetTCAcknowledgements with incremented TCVersion') + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version + 1, new_acknowledgements) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + 'Incorrect error code') + + logging.info('Step 10 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 0') + cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, + 'Incorrect error code') + + # Verify that the TC attributes have been reset to the pre-failsafe state + await self.verify_tc_attributes(new_accepted_version, new_acknowledgements) + + logging.info('Step 11 - TH removes all fabrics from the device') + fabrics = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics) + for fabric in fabrics: + if fabric.fabricIndex == self.th1.fabricId: + continue + cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric.fabricIndex) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk) + + # Remove TH1 fabric last + cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(self.th1.fabricId) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_CGEN_2_6.py b/src/python_testing/TC_CGEN_2_6.py new file mode 100644 index 00000000000000..7d89611cde74f9 --- /dev/null +++ b/src/python_testing/TC_CGEN_2_6.py @@ -0,0 +1,94 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${TERMS_AND_CONDITIONS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1 +# === END CI TEST ARGUMENTS === + +import logging +import random + +import chip.clusters as Clusters +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_CGEN_2_6(MatterBaseTest): + + + def setup_class(self): + logging.info('Setup') + super().setup_class() + + def setup_test(self): + logging.info('setup_test') + super().setup_test() + + def teardown_test(self): + logging.info('teardown_test') + super().teardown_test() + + def teardown_class(self): + logging.info('teardown_class') + super().teardown_class() + + def desc_TC_CGEN_2_6(self) -> str: + """Returns a description of this test""" + return "[TC_CGEN_2_6] Verification of " + + def steps_TC_CGEN_2_6(self) -> list[TestStep]: + return [ + TestStep("0", "DUT commissioned and preconditions", is_commissioning=False), + TestStep("1", "TH commissions the DUT without setting TCs") + ] + + @async_test_body + async def test_TC_CGEN_2_6(self): + self.step("0") + + self.th1 = self.default_controller + self.discriminator = random.randint(0, 4095) + + self.step("1") + + # Don't set TCs for the next commissioning and skip CommissioningComplete so we can manually call CommissioningComplete in order to check the response error code + self.th1.SetTCRequired(False) + self.th1.SetSkipCommissioningComplete(True) + self.matter_test_config.commissioning_method = self.matter_test_config.pairing_method + await self.commission_devices() + + cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete() + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal( + resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived, 'Incorrect error code') + + cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0) + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code') + + cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete() + resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000) + asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived, 'Incorrect error code') + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index c5d46e2306dae2..ea3aa62a326535 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2022 Project CHIP Authors +# Copyright (c) 2022-2024 Project CHIP Authors # All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -629,6 +629,7 @@ class MatterTestConfig: app_pid: int = 0 commissioning_method: Optional[str] = None + pairing_method: Optional[str] = None discriminators: List[int] = field(default_factory=list) setup_passcodes: List[int] = field(default_factory=list) commissionee_ip_address_just_for_testing: Optional[str] = None @@ -666,6 +667,10 @@ class MatterTestConfig: trace_to: List[str] = field(default_factory=list) + # Accepted Terms and Conditions if used + tc_version: int = None + tc_user_response: int = None + class ClusterMapper: """Describe clusters/attributes using schema names.""" @@ -937,6 +942,18 @@ def __init__(self, *args): # The named pipe name must be set in the derived classes self.app_pipe = None + async def commission_devices(self) -> bool: + conf = self.matter_test_config + + for commission_idx, node_id in enumerate(conf.dut_node_ids): + logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % + (conf.root_of_trust_index, conf.fabric_id, node_id)) + logging.info("Commissioning method: %s" % conf.commissioning_method) + + await CommissionDeviceTest.commission_device(self, commission_idx) + + return True + def get_test_steps(self, test: str) -> list[TestStep]: ''' Retrieves the test step list for the given test @@ -1726,6 +1743,7 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf return False config.dut_node_ids = args.dut_node_ids + config.pairing_method = args.pairing_method config.commissioning_method = args.commissioning_method config.commission_only = args.commission_only @@ -1829,6 +1847,9 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.controller_node_id = args.controller_node_id config.trace_to = args.trace_to + config.tc_version = args.tc_version + config.tc_user_response = args.tc_user_response + # Accumulate all command-line-passed named args all_global_args = [] argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg, @@ -1887,6 +1908,10 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig metavar='METHOD_NAME', choices=["on-network", "ble-wifi", "ble-thread", "on-network-ip"], help='Name of commissioning method to use') + commission_group.add_argument('--pairing-method', type=str, + metavar='PAIRING_NAME', + choices=["on-network", "ble-wifi", "ble-thread", "on-network-ip"], + help='Name of pairing method to use') commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex, metavar='LONG_DISCRIMINATOR', dest='discriminators', @@ -1922,6 +1947,10 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig commission_group.add_argument('--commission-only', action="store_true", default=False, help="If true, test exits after commissioning without running subsequent tests") + commission_group.add_argument('--tc-version', type=int, help="Terms and conditions version") + + commission_group.add_argument('--tc-user-response', type=int, help="Terms and conditions acknowledgements") + code_group = parser.add_mutually_exclusive_group(required=False) code_group.add_argument('-q', '--qr-code', type=str, @@ -2193,20 +2222,21 @@ def __init__(self, *args): self.is_commissioning = True def test_run_commissioning(self): - conf = self.matter_test_config - for commission_idx, node_id in enumerate(conf.dut_node_ids): - logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % - (conf.root_of_trust_index, conf.fabric_id, node_id)) - logging.info("Commissioning method: %s" % conf.commissioning_method) + if not asyncio.run(self.commission_devices()): + raise signals.TestAbortAll("Failed to commission node") - if not asyncio.run(self._commission_device(commission_idx)): - raise signals.TestAbortAll("Failed to commission node") + async def commission_device(instance: MatterBaseTest, i) -> bool: + dev_ctrl = instance.default_controller + conf = instance.matter_test_config - async def _commission_device(self, i) -> bool: - dev_ctrl = self.default_controller - conf = self.matter_test_config + info = instance.get_setup_payload_info()[i] - info = self.get_setup_payload_info()[i] + if conf.tc_version is not None and conf.tc_user_response is not None: + logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.") + dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response) + dev_ctrl.SetTCRequired(True) + else: + dev_ctrl.SetTCRequired(False) if conf.commissioning_method == "on-network": try: