From 593605a0d267c373b30b5d1a2d433bc582faa406 Mon Sep 17 00:00:00 2001 From: austina-csa <168131796+austina-csa@users.noreply.github.com> Date: Mon, 13 Jan 2025 08:45:18 -0800 Subject: [PATCH] Refactoring --- src/python_testing/TC_IDM_2_2.py | 236 ++++++++++++++++++++----------- 1 file changed, 151 insertions(+), 85 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index e1a78ef8046b6f..d2aa5d2e0a2e93 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -142,6 +142,96 @@ async def read_unsupported_attribute(self): found_unsupported = True return + + async def read_attribute_request(self, attribute_path_list): + """ + Helper function to send read attribute requests. + """ + return await self.default_controller.Read(self.dut_node_id, attribute_path_list) + + async def print_read_attributes_step(self, step_num: int, step_msg: str, attr_path): + self.print_step(step_num, step_msg) + await self.verify_attributes(attr_path) + + async def verify_attributes(self, attr_path: ClusterObject): + + read_request = await self.read_attribute_request(attr_path) + if isinstance(attr_path[0], tuple): + if len(attr_path[0]) == 0 and isinstance(attr_path[0], tuple): + + # NOTE: This is checked in its entirety in IDM-10.1 + + parts_list_a = read_request.tlvAttributes[0][Clusters.Descriptor.id][Clusters.Descriptor.Attributes.PartsList.attribute_id] + parts_list_b = self.endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] + asserts.assert_equal(parts_list_a, parts_list_b, "Parts list is not the expected value") + + for endpoint in read_request.tlvAttributes: + returned_clusters = sorted(list(read_request.tlvAttributes[endpoint].keys())) + server_list = sorted(read_request.attributes[endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList]) + asserts.assert_equal(returned_clusters, server_list) + self.verify_read_request(read_request) + + elif len(attr_path[0]) == 2 and isinstance(attr_path[0][0], int) and isinstance(attr_path[0][1], type): # Has an endpoint, e.g. [(0, Clusters.Descriptor)] + (endpoint, cluster_obj) = attr_path[0] + cluster_ids = list(read_request.tlvAttributes[endpoint].keys()) + if hasattr(cluster_obj, 'cluster_id'): # cluster_obj is an Attribute + attribute = cluster_obj + cluster = ClusterObjects.ALL_CLUSTERS[cluster_obj.cluster_id] + attribute_ids = list(read_request.tlvAttributes[endpoint][cluster_obj.cluster_id].keys()) + asserts.assert_equal({cluster_obj.cluster_id}, read_request.tlvAttributes[endpoint].keys(), f"Cluster not in output: {cluster_obj.cluster_id}") + asserts.assert_equal(attribute_ids, [cluster_obj.attribute_id]) + asserts.assert_equal(cluster_ids, [cluster_obj.cluster_id]) + server_list_attribute_ids = (read_request.tlvAttributes[endpoint][cluster_obj.cluster_id] + [cluster.Attributes.ServerList.attribute_id]) + else: + attribute_ids = list(read_request.tlvAttributes[endpoint][cluster_obj.id].keys()) + returned_attributes = read_request.tlvAttributes[endpoint][cluster_obj.id][cluster_obj.Attributes.AttributeList.attribute_id] + asserts.assert_equal({cluster_obj.id}, read_request.tlvAttributes[endpoint].keys(), f"Cluster not in output: {cluster_obj.id}") + server_list_attribute_ids = (read_request.tlvAttributes[endpoint][cluster_obj.id] + [Clusters.Descriptor.Attributes.ServerList.attribute_id]) + + asserts.assert_equal({endpoint}, read_request.tlvAttributes.keys(), f"Endpoint {endpoint} not in output") + asserts.assert_equal(server_list_attribute_ids, sorted([x.id for x in self.endpoints[endpoint]])) + + elif isinstance(attr_path, list): + if isinstance(attr_path[0], type): # E.g. [Clusters.Descriptor] + + cluster_obj = attr_path[0] + if hasattr(cluster_obj, 'cluster_id'): # cluster_obj is an Attribute + pass + else: + for endpoint in read_request.tlvAttributes: + cluster_ids = list(read_request.tlvAttributes[endpoint].keys()) + attribute_ids = list(read_request.tlvAttributes[endpoint][cluster_obj.id].keys()) + asserts.assert_in(cluster_obj.id, cluster_ids) + + returned_attributes = read_request.tlvAttributes[endpoint][cluster_obj.id][cluster_obj.Attributes.AttributeList.attribute_id] + + asserts.assert_equal(sorted(attribute_ids), sorted(returned_attributes), "Expected attribute list doesn't match") + elif isinstance(attr_path[0], AttributePath): + if hasattr(attr_path[0], 'EndpointId') and attr_path[0] is not None: + endpoint = attr_path[0].EndpointId + + asserts.assert_in(Clusters.Descriptor.id, read_request.tlvAttributes[endpoint].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Descriptor.Attributes.AttributeList.attribute_id, + read_request.tlvAttributes[endpoint][Clusters.Descriptor.id], "AttributeList not in output") + + else: + asserts.assert_in(Clusters.Descriptor.id, read_request.tlvAttributes[0].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Descriptor.Attributes.AttributeList.attribute_id, + read_request.tlvAttributes[Clusters.Descriptor.id], "AttributeList not in output") + return read_request + + def verify_read_request(self, read_request): + for endpoint in read_request.tlvAttributes: + for cluster in read_request.tlvAttributes[endpoint]: + returned_attrs = sorted([x for x in read_request.tlvAttributes[endpoint][cluster].keys()]) + attr_list = sorted([x for x in read_request.tlvAttributes[endpoint][cluster] + [ClusterObjects.ALL_CLUSTERS[cluster].Attributes.AttributeList.attribute_id] + if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id]) + asserts.assert_equal(returned_attrs, attr_list, + f"Mismatch for {cluster} ({ClusterObjects.ALL_CLUSTERS[cluster]}) at endpoint {endpoint}") + @async_test_body async def test_TC_IDM_2_2(self): # Test Setup @@ -162,99 +252,75 @@ async def test_TC_IDM_2_2(self): # AttributePath = [[Endpoint = Specific Endpoint, Cluster = Specific ClusterID, Attribute = Specific Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value to the DUT - self.print_step(1, "Send Request Message to read one attribute on a given cluster and endpoint") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)]) - attribute_ids = [a.attribute_id for a in read_request[0][Clusters.Descriptor].keys() if a != Clusters.Attribute.DataVersion] - returned_attributes = read_request[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList] - asserts.assert_equal({0}, read_request.keys(), "Endpoint 0 not in output") - asserts.assert_equal({Clusters.Descriptor}, read_request[0].keys(), "Descriptor cluster not in output") - # asserts.assert_equal(sorted(attribute_ids), sorted(returned_attributes), "Expected attribute list doesn't match") + # self.print_step(1, "Send Request Message to read one attribute on a given cluster and endpoint") + read_request = await self.print_read_attributes_step(1, "Send Request Message to read one attribute on a given cluster and endpoint", [(0, Clusters.Descriptor.Attributes.ServerList)]) + # Step 2 # TH sends the Read Request Message to the DUT to read all attributes on a given cluster and Endpoint # AttributePath = [[Endpoint = Specific Endpoint, Cluster = Specific ClusterID]] # On receipt of this message, DUT should send a report data action with the attribute value to the DUT. - self.print_step(2, "Send Request Message to read all attributes on a given cluster and endpoint") - - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)]) - - asserts.assert_equal({0}, read_request.keys(), "Endpoint 0 not in output") - asserts.assert_equal({Clusters.Descriptor}, read_request[0].keys(), "Descriptor cluster not in output") - attribute_ids = [a.attribute_id for a in read_request[0][Clusters.Descriptor].keys() if a != Clusters.Attribute.DataVersion] - returned_attributes = read_request[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.AttributeList] - asserts.assert_equal(sorted(attribute_ids), sorted(returned_attributes), "Expected attribute list doesn't match") - + read_request = await self.print_read_attributes_step(2, "Send Request Message to read all attributes on a given cluster and endpoint", [(0, Clusters.Descriptor)]) + # Step 3 # TH sends the Read Request Message to the DUT to read an attribute from a cluster at all Endpoints # AttributePath = [[Cluster = Specific ClusterID, Attribute = Specific Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. - - self.print_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints") - all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) - for endpoint in all_attributes: - asserts.assert_in(Clusters.Objects.Descriptor, - all_attributes[endpoint], f"Descriptor attribute not found in endpoint: {endpoint}") - attribute_ids = [a.attribute_id for a in all_attributes[endpoint] - [Clusters.Descriptor].keys() if a != Clusters.Attribute.DataVersion] - returned_attributes = all_attributes[endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.AttributeList] - asserts.assert_equal(sorted(attribute_ids), sorted(returned_attributes), "Expected attribute list doesn't match") + + read_request = await self.print_read_attributes_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints", [Clusters.Descriptor]) # Step 4 # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at that Endpoint # AttributePath = [[Endpoint = Specific Endpoint, Attribute = Specific Global Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. - self.print_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint") - attribute_path_1 = AttributePath( + attribute_path = AttributePath( EndpointId=0, ClusterId=None, AttributeId=global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) - - read_request = await self.default_controller.ReadAttribute( - self.dut_node_id, - [attribute_path_1] - ) - - asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, - read_request[0][Clusters.Objects.Descriptor], "AttributeList not in output") - asserts.assert_equal(sorted(all_attributes[0][Clusters.Descriptor] - [Clusters.Descriptor.Attributes.ServerList]), - sorted([x.id for x in read_request[0]])) + + read_request = await self.print_read_attributes_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint", [attribute_path]) # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints ### AttributePath = [[]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. - self.print_step(5, "Send Request Message to read all attributes from all clusters on all endpoints") - read_request = await self.default_controller.Read(self.dut_node_id, [()]) + # self.print_step(5, "Send Request Message to read all attributes from all clusters on all endpoints") + + read_request = await self.print_read_attributes_step(5, "Send Request Message to read all attributes from all clusters on all endpoints", [()]) # NOTE: This is checked in its entirety in IDM-10.1 - parts_list_a = read_request.attributes[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] - parts_list_b = self.endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] - asserts.assert_equal(parts_list_a, parts_list_b, "Parts list is not the expected value") + # parts_list_a = read_request.tlvAttributes[0][Clusters.Descriptor.id][Clusters.Descriptor.Attributes.PartsList.attribute_id] + # parts_list_b = self.endpoints[0][Clusters.Descriptor.id][Clusters.Descriptor.Attributes.PartsList.attribute_id] + # asserts.assert_equal(parts_list_a, parts_list_b, "Parts list is not the expected value") - for endpoint in read_request.attributes: - returned_clusters = sorted([x.id for x in read_request.attributes[endpoint]]) - server_list = sorted(read_request.attributes[endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList]) - asserts.assert_equal(returned_clusters, server_list) + # for endpoint in read_request.tlvAttributes: + # returned_clusters = sorted([x.id for x in read_request.tlvAttributes[endpoint]]) + # server_list = sorted(read_request.attributes[endpoint][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList]) + # asserts.assert_equal(returned_clusters, server_list) # TODO: Make this check centralized (move to its own method) - for endpoint in read_request.tlvAttributes: - for cluster in read_request.tlvAttributes[endpoint]: - returned_attrs = sorted([x for x in read_request.tlvAttributes[endpoint][cluster].keys()]) - attr_list = sorted([x for x in read_request.tlvAttributes[endpoint][cluster] - [ClusterObjects.ALL_CLUSTERS[cluster].Attributes.AttributeList.attribute_id] - if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id]) - asserts.assert_equal(returned_attrs, attr_list, - f"Mismatch for {cluster} ({ClusterObjects.ALL_CLUSTERS[cluster]}) at endpoint {endpoint}") + # for endpoint in read_request.tlvAttributes: + # for cluster in read_request.tlvAttributes[endpoint]: + # returned_attrs = sorted([x for x in read_request.tlvAttributes[endpoint][cluster].keys()]) + # attr_list = sorted([x for x in read_request.tlvAttributes[endpoint][cluster] + # [ClusterObjects.ALL_CLUSTERS[cluster].Attributes.AttributeList.attribute_id] + # if x != Clusters.UnitTesting.Attributes.WriteOnlyInt8u.attribute_id]) + # asserts.assert_equal(returned_attrs, attr_list, + # f"Mismatch for {cluster} ({ClusterObjects.ALL_CLUSTERS[cluster]}) at endpoint {endpoint}") # Step 6 # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at all Endpoints # AttributePath = [[Attribute = Specific Global Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. + # attribute_path = AttributePath( + # EndpointId=None, + # ClusterId=None, + # AttributeId=global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) + + # read_request = await self.print_read_attributes_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints", attribute_path) self.print_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints") attribute_path = AttributePath( @@ -268,16 +334,16 @@ async def test_TC_IDM_2_2(self): ) returned_clusters = [] - for endpoint in read_request.attributes: + for endpoint in read_request.tlvAttributes: endpoint_clusters = list(read_request.tlvAttributes[endpoint].keys()) standard_clusters = [x for x in endpoint_clusters if global_attribute_ids.cluster_id_type(x) == global_attribute_ids.ClusterIdType.kStandard] returned_clusters.extend(standard_clusters) - asserts.assert_in(Clusters.Objects.Descriptor, - read_request.attributes[endpoint].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, - read_request.attributes[endpoint][Clusters.Objects.Descriptor], "AttributeList not in output") + asserts.assert_in(Clusters.Descriptor.id, + read_request.tlvAttributes[endpoint].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Descriptor.Attributes.AttributeList.attribute_id, + read_request.tlvAttributes[endpoint][Clusters.Descriptor.id], "AttributeList not in output") for endpoint in read_request.tlvAttributes: cluster_list = read_request.tlvAttributes[endpoint] for cluster in cluster_list: @@ -293,7 +359,7 @@ async def test_TC_IDM_2_2(self): # AttributePath = [[Cluster = Specific ClusterID]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(7, "Send Request Message to read all attributes from one cluster at all endpoints") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Descriptor]) for endpoint in read_request: asserts.assert_in(Clusters.Descriptor, read_request[endpoint].keys(), "Descriptor cluster not in output") @@ -310,9 +376,9 @@ async def test_TC_IDM_2_2(self): self.print_step(8, "Send Request Message to read all attributes from all clusters at one endpoint") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [0]) - asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList, - read_request[0][Clusters.Objects.Descriptor], "ServerList not in output") + asserts.assert_in(Clusters.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Descriptor.Attributes.ServerList, + read_request[0][Clusters.Descriptor], "ServerList not in output") for cluster in read_request[0]: attribute_ids = [a.attribute_id for a in read_request[0] @@ -446,9 +512,9 @@ async def test_TC_IDM_2_2(self): # On the TH verify the received Report data message has the right attribute values for all the 3 times. self.print_step(22, "Send the Read Request Message to the DUT 3 times and check if they are correct each time") - read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) - read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) - read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) + read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Descriptor.Attributes.ServerList]) + read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Descriptor.Attributes.ServerList]) + read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Descriptor.Attributes.ServerList]) asserts.assert_equal(read_request_1, read_request_2) asserts.assert_equal(read_request_2, read_request_3) @@ -459,11 +525,11 @@ async def test_TC_IDM_2_2(self): # DUT should not send a report data action with the attribute value to the TH if the data version is same as that requested. self.print_step( 23, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Descriptor.Attributes.ServerList)]) data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] data_version_filter = [(0, Clusters.Descriptor, data_version)] - read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)], dataVersionFilters=data_version_filter) + read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Descriptor.Attributes.ServerList)], dataVersionFilters=data_version_filter) asserts.assert_equal(read_request_2, {}) # Step 24 @@ -487,7 +553,7 @@ async def test_TC_IDM_2_2(self): self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter) data_version_2 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] - asserts.assert_equal(read_request[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel], + asserts.assert_equal(read_request[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.NodeLabel], node_label_value, f"Data version does not equal {node_label_value}") asserts.assert_equal((data_version_1 + 1), data_version_2, "DataVersion was not incremented") @@ -503,17 +569,17 @@ async def test_TC_IDM_2_2(self): self.print_step( 25, "Send the Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation)]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.BasicInformation)]) data_version_1 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] node_label_value = "Goodbye World" - await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel(value=node_label_value))]) + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel(value=node_label_value))]) data_version = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] data_version_filter = [(0, Clusters.BasicInformation, data_version)] read_request = await self.default_controller.ReadAttribute( - self.dut_node_id, [(0, Clusters.Objects.BasicInformation)], dataVersionFilters=data_version_filter) + self.dut_node_id, [(0, Clusters.BasicInformation)], dataVersionFilters=data_version_filter) data_version_2 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] - asserts.assert_equal(read_request[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel], + asserts.assert_equal(read_request[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.NodeLabel], node_label_value, f"Data version does not equal {node_label_value}") asserts.assert_equal((data_version_1 + 1), data_version_2, "DataVersion was not incremented") @@ -527,17 +593,17 @@ async def test_TC_IDM_2_2(self): self.print_step( 26, "Send the Read Request Message to read a particular attribute on a particular cluster with the DataVersionFilter Field not set") node_label_value = "Hello World Again" - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.BasicInformation.Attributes.DataModelRevision]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.BasicInformation.Attributes.DataModelRevision]) - await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel(value=node_label_value))]) + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel(value=node_label_value))]) data_version = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] data_version_filter = [(0, Clusters.BasicInformation, data_version)] read_request_2 = await self.default_controller.ReadAttribute( - self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter) + self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter) data_version_filter_2 = [(0, Clusters.BasicInformation, data_version-1)] read_request_3 = await self.default_controller.ReadAttribute( - self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter_2) - asserts.assert_equal(read_request_3[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel], + self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter_2) + asserts.assert_equal(read_request_3[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.NodeLabel], node_label_value, f"Data version does not equal {node_label_value}") # Step 27 @@ -552,7 +618,7 @@ async def test_TC_IDM_2_2(self): self.print_step( 27, "Send the Read Request Message to read any supported attribute/wildcard on a particular cluster say A with the DataVersionFilter Field not set") - read_request_27_1_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) + read_request_27_1_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Descriptor.Attributes.ServerList]) data_version_1 = read_request_27_1_1[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] read_request_27_2_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)]) @@ -661,9 +727,9 @@ async def test_TC_IDM_2_2(self): # Verify that there are no errors sent back for attributes the TH has no access to. self.print_step(31, "Send the Read Request Message to the DUT to read all attributes from all clusters at Endpoint1") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, Clusters.Objects.Descriptor)]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, Clusters.Descriptor)]) asserts.assert_true(1 in read_request, "Endpoint 1 missing in response") - asserts.assert_true(Clusters.Objects.Descriptor in read_request[1], "Clusters.Objects.Descriptor not in response") + asserts.assert_true(Clusters.Descriptor in read_request[1], "Clusters.Descriptor not in response") # Step 32