diff --git a/src/python_testing/TC_RR_1_1.py b/src/python_testing/TC_RR_1_1.py index 98db3c3aae5fbd..72fee33bb46abb 100644 --- a/src/python_testing/TC_RR_1_1.py +++ b/src/python_testing/TC_RR_1_1.py @@ -20,6 +20,7 @@ import math import queue import random +import string import time from typing import Any, Dict, List, Set @@ -36,6 +37,10 @@ # +def generate_controller_name(fabric_index: int, controller_index: int): + return f"RD{fabric_index}{string.ascii_uppercase[controller_index]}" + + class TC_RR_1_1(MatterBaseTest): def setup_class(self): self._pseudo_random_generator = random.Random(1234) @@ -96,11 +101,6 @@ async def test_TC_RR_1_1(self): logging.info("--> User label cluster not present on any endpoitns") # Generate list of all clients names - all_names = [] - for fabric_idx in range(num_fabrics_to_commission): - for controller_idx in range(num_controllers_per_fabric): - all_names.append("RD%d%s" % (fabric_idx, chr(ord('A') + controller_idx))) - logging.info(f"Client names that will be used: {all_names}") client_list = [] # TODO: Shall we also verify SupportedFabrics attribute, and the CapabilityMinima attribute? @@ -119,7 +119,8 @@ async def test_TC_RR_1_1(self): node_ids = [200 + (i * 100) for i in range(num_controllers_per_fabric - 1)] # Prepare clients for first fabric, that includes the default controller - dev_ctrl.name = all_names.pop(0) + fabric_index = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=dev_ctrl) + dev_ctrl.name = generate_controller_name(fabric_index, 0) client_list.append(dev_ctrl) if num_controllers_per_fabric > 1: @@ -130,8 +131,8 @@ async def test_TC_RR_1_1(self): privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, targetNodeId=self.dut_node_id, catTags=[0x0001_0001] ) - for controller in new_controllers: - controller.name = all_names.pop(0) + for idx, controller in enumerate(new_controllers): + controller.name = generate_controller_name(fabric_index, idx+1) client_list.extend(new_controllers) # Step 1c - Ensure there are no leftover fabrics from another process. @@ -163,11 +164,11 @@ async def test_TC_RR_1_1(self): fabrics: List[Clusters.OperationalCredentials.Structs.FabricDescriptorStruct] = await self.read_single_attribute( dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, fabricFiltered=False) + current_fabric_index = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex) for fabric in fabrics: - if fabric.fabricID == dev_ctrl.fabricId: + if fabric.fabricIndex == current_fabric_index: continue - - # This is not the initial client's fabric, so remove it. + # This is not the test client's fabric, so remove it. await dev_ctrl.SendCommand( self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=fabric.fabricIndex)) @@ -184,13 +185,13 @@ async def test_TC_RR_1_1(self): new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=admin_index) new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId, catTags=[0x0001_0001]) - new_admin_ctrl.name = all_names.pop(0) - client_list.append(new_admin_ctrl) await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=new_admin_ctrl, existingNodeId=self.dut_node_id, newNodeId=self.dut_node_id) - + fabric_index = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex, dev_ctrl=new_admin_ctrl) + new_admin_ctrl.name = generate_controller_name(fabric_index, 0) + client_list.append(new_admin_ctrl) if num_controllers_per_fabric > 1: new_controllers = await CommissioningBuildingBlocks.CreateControllersOnFabric( fabricAdmin=new_fabric_admin, @@ -200,8 +201,8 @@ async def test_TC_RR_1_1(self): targetNodeId=self.dut_node_id, catTags=[0x0001_0001] ) - for controller in new_controllers: - controller.name = all_names.pop(0) + for idx, controller in enumerate(new_controllers): + controller.name = generate_controller_name(fabric_index, idx+1) client_list.extend(new_controllers) @@ -224,10 +225,8 @@ async def test_TC_RR_1_1(self): # Step 2: Set the Label field for each fabric and BasicInformation.NodeLabel to 32 characters logging.info("Step 2: Setting the Label field for each fabric and BasicInformation.NodeLabel to 32 characters") - for table_idx in range(len(fabric_table)): - # Client is client A for each fabric to set the Label field - fabric = fabric_table[table_idx] - client_name = "RD%dA" % table_idx + for fabric in fabric_table: + client_name = generate_controller_name(fabric.fabricIndex, 0) client = client_by_name[client_name] # Send the UpdateLabel command @@ -451,10 +450,8 @@ async def test_TC_RR_1_1(self): # Create a list of per-fabric clients to use for filling group resources accross all fabrics. fabric_unique_clients: List[Any] = [] - for table_idx in range(len(fabric_table)): - # Client is client A for each fabric - fabric = fabric_table[table_idx] - client_name = "RD%dA" % table_idx + for fabric in fabric_table: + client_name = generate_controller_name(fabric.fabricIndex, 0) fabric_unique_clients.append(client_by_name[client_name]) # Step 13: Write and verify indicated_max_group_keys_per_fabric group keys to all fabrics. @@ -696,9 +693,8 @@ async def send_acl(self, enable_access_to_group_cluster: bool, fabric_table: List[ Clusters.OperationalCredentials.Structs.FabricDescriptorStruct]): - for table_idx, fabric in enumerate(fabric_table): - # Client is client A for each fabric - client_name = "RD%dA" % table_idx + for fabric in fabric_table: + client_name = generate_controller_name(fabric.fabricIndex, 0) client = client_by_name[client_name] acl = self.build_acl(enable_access_to_group_cluster)