diff --git a/src/controller/python/chip/commissioning/__init__.py b/src/controller/python/chip/commissioning/__init__.py index c1105511c67b85..15cbf33a770704 100644 --- a/src/controller/python/chip/commissioning/__init__.py +++ b/src/controller/python/chip/commissioning/__init__.py @@ -72,6 +72,12 @@ class WiFiCredentials: passphrase: bytes +@dataclasses.dataclass +class TermsAndConditionsParameters: + version: int + user_response: int + + @dataclasses.dataclass class Parameters: pase_param: Union[PaseOverBLEParameters, PaseOverIPParameters] @@ -80,6 +86,7 @@ class Parameters: commissionee_info: CommissioneeInfo wifi_credentials: WiFiCredentials thread_credentials: bytes + tc_acknowledgements: Optional[TermsAndConditionsParameters] = None failsafe_expiry_length_seconds: int = 600 diff --git a/src/controller/python/chip/commissioning/commissioning_flow_blocks.py b/src/controller/python/chip/commissioning/commissioning_flow_blocks.py index 7d0d11b37fab1c..cc1999167a8eed 100644 --- a/src/controller/python/chip/commissioning/commissioning_flow_blocks.py +++ b/src/controller/python/chip/commissioning/commissioning_flow_blocks.py @@ -240,6 +240,15 @@ async def send_regulatory_config(self, parameter: commissioning.Parameters, node if response.errorCode != 0: raise commissioning.CommissionFailure(repr(response)) + async def send_terms_and_conditions_acknowledgements(self, parameter: commissioning.Parameters, node_id: int): + self._logger.info("Settings Terms and Conditions") + if parameter.tc_acknowledgements: + response = await self._devCtrl.SendCommand(node_id, commissioning.ROOT_ENDPOINT_ID, Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements( + TCVersion=parameter.tc_acknowledgements.version, TCUserResponse=parameter.tc_acknowledgements.user_response + )) + if response.errorCode != 0: + raise commissioning.CommissionFailure(repr(response)) + async def complete_commission(self, node_id: int): response = await self._devCtrl.SendCommand(node_id, commissioning.ROOT_ENDPOINT_ID, Clusters.GeneralCommissioning.Commands.CommissioningComplete()) if response.errorCode != 0: diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 92cde9edb6011d..8ea771a5c0146a 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -629,6 +629,7 @@ class MatterTestConfig: app_pid: int = 0 commissioning_method: Optional[str] = None + in_test_commissioning_method: Optional[str] = None discriminators: List[int] = field(default_factory=list) setup_passcodes: List[int] = field(default_factory=list) commissionee_ip_address_just_for_testing: Optional[str] = None @@ -666,6 +667,10 @@ class MatterTestConfig: trace_to: List[str] = field(default_factory=list) + # Accepted Terms and Conditions if used + tc_version: int = None + tc_user_response: int = None + class ClusterMapper: """Describe clusters/attributes using schema names.""" @@ -938,6 +943,18 @@ def __init__(self, *args): # The named pipe name must be set in the derived classes self.app_pipe = None + async def commission_devices(self) -> bool: + conf = self.matter_test_config + + for commission_idx, node_id in enumerate(conf.dut_node_ids): + logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % + (conf.root_of_trust_index, conf.fabric_id, node_id)) + logging.info("Commissioning method: %s" % conf.commissioning_method) + + await CommissionDeviceTest.commission_device(self, commission_idx) + + return True + def get_test_steps(self, test: str) -> list[TestStep]: ''' Retrieves the test step list for the given test @@ -1736,6 +1753,7 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf config.dut_node_ids = args.dut_node_ids config.commissioning_method = args.commissioning_method + config.in_test_commissioning_method = args.in_test_commissioning_method config.commission_only = args.commission_only config.qr_code_content.extend(args.qr_code) @@ -1838,6 +1856,9 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.controller_node_id = args.controller_node_id config.trace_to = args.trace_to + config.tc_version = args.tc_version + config.tc_user_response = args.tc_user_response + # Accumulate all command-line-passed named args all_global_args = [] argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg, @@ -1896,6 +1917,10 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig metavar='METHOD_NAME', choices=["on-network", "ble-wifi", "ble-thread", "on-network-ip"], help='Name of commissioning method to use') + commission_group.add_argument('--in-test-commissioning-method', type=str, + metavar='METHOD_NAME', + choices=["on-network", "ble-wifi", "ble-thread", "on-network-ip"], + help='Name of commissioning method to use, for commissioning tests') commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex, metavar='LONG_DISCRIMINATOR', dest='discriminators', @@ -1931,6 +1956,10 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig commission_group.add_argument('--commission-only', action="store_true", default=False, help="If true, test exits after commissioning without running subsequent tests") + commission_group.add_argument('--tc-version', type=int, help="Terms and conditions version") + + commission_group.add_argument('--tc-user-response', type=int, help="Terms and conditions acknowledgements") + code_group = parser.add_mutually_exclusive_group(required=False) code_group.add_argument('-q', '--qr-code', type=str, @@ -2202,20 +2231,21 @@ def __init__(self, *args): self.is_commissioning = True def test_run_commissioning(self): - conf = self.matter_test_config - for commission_idx, node_id in enumerate(conf.dut_node_ids): - logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" % - (conf.root_of_trust_index, conf.fabric_id, node_id)) - logging.info("Commissioning method: %s" % conf.commissioning_method) + if not asyncio.run(self.commission_devices()): + raise signals.TestAbortAll("Failed to commission node") - if not asyncio.run(self._commission_device(commission_idx)): - raise signals.TestAbortAll("Failed to commission node") + async def commission_device(instance: MatterBaseTest, i) -> bool: + dev_ctrl = instance.default_controller + conf = instance.matter_test_config - async def _commission_device(self, i) -> bool: - dev_ctrl = self.default_controller - conf = self.matter_test_config + info = instance.get_setup_payload_info()[i] - info = self.get_setup_payload_info()[i] + if conf.tc_version is not None and conf.tc_user_response is not None: + logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.") + dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response) + dev_ctrl.SetTCRequired(True) + else: + dev_ctrl.SetTCRequired(False) if conf.commissioning_method == "on-network": try: