From f6cccadaca0bcea955b8eba2eb215913d5ae0149 Mon Sep 17 00:00:00 2001 From: cecille Date: Fri, 29 Sep 2023 10:20:00 -0400 Subject: [PATCH] TC-IDM-10.1: Consolidate scripts, add new checks --- .../python/chip/clusters/ClusterObjects.py | 27 ++- .../TC_DeviceBasicComposition.py | 212 +++++++++++++----- src/python_testing/matter_testing_support.py | 7 +- 3 files changed, 187 insertions(+), 59 deletions(-) diff --git a/src/controller/python/chip/clusters/ClusterObjects.py b/src/controller/python/chip/clusters/ClusterObjects.py index 7496099a301e91..1774ca8c083a4c 100644 --- a/src/controller/python/chip/clusters/ClusterObjects.py +++ b/src/controller/python/chip/clusters/ClusterObjects.py @@ -206,8 +206,31 @@ def FromTLV(cls, data: bytes): def descriptor(cls): raise NotImplementedError() +# The below dictionaries will be filled dynamically +# and are used for quick lookup/mapping from cluster/attribute id to the correct class +ALL_CLUSTERS = {} +ALL_ATTRIBUTES = {} +# These need to be separate because there can be overlap in command ids for commands and responses. +ALL_ACCEPTED_COMMANDS = {} +ALL_GENERATED_COMMANDS = {} class ClusterCommand(ClusterObject): + def __init_subclass__(cls, *args, **kwargs) -> None: + """Register a subclass.""" + super().__init_subclass__(*args, **kwargs) + try: + if cls.is_client: + if cls.cluster_id not in ALL_ACCEPTED_COMMANDS: + ALL_ACCEPTED_COMMANDS[cls.cluster_id] = {} + ALL_ACCEPTED_COMMANDS[cls.cluster_id][cls.command_id] = cls + else: + if cls.cluster_id not in ALL_GENERATED_COMMANDS: + ALL_GENERATED_COMMANDS[cls.cluster_id] = {} + ALL_GENERATED_COMMANDS[cls.cluster_id][cls.command_id] = cls + except NotImplementedError: + # handle case where the ClusterAttribute class is not (fully) subclassed + # and accessing the id property throws a NotImplementedError. + pass @ChipUtility.classproperty def cluster_id(self) -> int: raise NotImplementedError() @@ -221,10 +244,6 @@ def must_use_timed_invoke(cls) -> bool: return False -# The below dictionaries will be filled dynamically -# and are used for quick lookup/mapping from cluster/attribute id to the correct class -ALL_CLUSTERS = {} -ALL_ATTRIBUTES = {} class Cluster(ClusterObject): diff --git a/src/python_testing/TC_DeviceBasicComposition.py b/src/python_testing/TC_DeviceBasicComposition.py index 63a03fa9c3a8d7..cb111d569380ec 100644 --- a/src/python_testing/TC_DeviceBasicComposition.py +++ b/src/python_testing/TC_DeviceBasicComposition.py @@ -31,7 +31,7 @@ import chip.clusters.ClusterObjects import chip.tlv from chip.clusters.Attribute import ValueDecodeFailure -from matter_testing_support import AttributePathLocation, MatterBaseTest, async_test_body, default_matter_test_main +from matter_testing_support import AttributePathLocation, CommandPathLocation, ClusterPathLocation, MatterBaseTest, async_test_body, default_matter_test_main from mobly import asserts @@ -352,7 +352,7 @@ async def setup_class(self): endpoints_tlv = wildcard_read.tlvAttributes node_dump_dict = {endpoint_id: MatterTlvToJson(endpoints_tlv[endpoint_id]) for endpoint_id in endpoints_tlv} - logging.info(f"Raw TLV contents of Node: {json.dumps(node_dump_dict, indent=2)}") + logging.debug(f"Raw TLV contents of Node: {json.dumps(node_dump_dict, indent=2)}") if dump_device_composition_path is not None: with open(pathlib.Path(dump_device_composition_path).with_suffix(".json"), "wt+") as outfile: @@ -454,21 +454,24 @@ class RequiredMandatoryAttribute: validators: list[Callable] ATTRIBUTE_LIST_ID = 0xFFFB + ACCEPTED_COMMAND_LIST_ID = 0xFFF9 + GENERATED_COMMAND_LIST_ID = 0xFFF8 + FEATURE_MAP_ID = 0xFFFC ATTRIBUTES_TO_CHECK = [ RequiredMandatoryAttribute(id=0xFFFD, name="ClusterRevision", validators=[check_int_in_range(1, 0xFFFF)]), - RequiredMandatoryAttribute(id=0xFFFC, name="FeatureMap", validators=[check_int_in_range(0, 0xFFFF_FFFF)]), - RequiredMandatoryAttribute(id=0xFFFB, name="AttributeList", + RequiredMandatoryAttribute(id=FEATURE_MAP_ID, name="FeatureMap", validators=[check_int_in_range(0, 0xFFFF_FFFF)]), + RequiredMandatoryAttribute(id=ATTRIBUTE_LIST_ID, name="AttributeList", validators=[check_non_empty_list_of_ints_in_range(0, 0xFFFF_FFFF), check_no_duplicates]), # TODO: Check for EventList # RequiredMandatoryAttribute(id=0xFFFA, name="EventList", validator=check_list_of_ints_in_range(0, 0xFFFF_FFFF)), - RequiredMandatoryAttribute(id=0xFFF9, name="AcceptedCommandList", + RequiredMandatoryAttribute(id=ACCEPTED_COMMAND_LIST_ID, name="AcceptedCommandList", validators=[check_list_of_ints_in_range(0, 0xFFFF_FFFF), check_no_duplicates]), - RequiredMandatoryAttribute(id=0xFFF8, name="GeneratedCommandList", + RequiredMandatoryAttribute(id=GENERATED_COMMAND_LIST_ID, name="GeneratedCommandList", validators=[check_list_of_ints_in_range(0, 0xFFFF_FFFF), check_no_duplicates]), ] - self.print_step(3, "Validate all reported attributes match AttributeList") + self.print_step(3, "Validate the global attributes are present") success = True for endpoint_id, endpoint in self.endpoints_tlv.items(): for cluster_id, cluster in endpoint.items(): @@ -477,7 +480,7 @@ class RequiredMandatoryAttribute: has_attribute = (req_attribute.id in cluster) location = AttributePathLocation(endpoint_id, cluster_id, req_attribute.id) - logging.info( + logging.debug( f"Checking for mandatory global {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}: {'found' if has_attribute else 'not_found'}") # Check attribute is actually present @@ -487,6 +490,10 @@ class RequiredMandatoryAttribute: success = False continue + self.print_step(3, "Validate the global attributes are in range and do not contain duplicates") + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + for req_attribute in ATTRIBUTES_TO_CHECK: # Validate attribute value based on the provided validators. for validator in req_attribute.validators: try: @@ -497,13 +504,53 @@ class RequiredMandatoryAttribute: success = False continue + self.print_step(4, "Validate the attribute list exactly matches the set of reported attributes") + if success: + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + attribute_list = cluster[ATTRIBUTE_LIST_ID] + for attribute_id in attribute_list: + location = AttributePathLocation(endpoint_id, cluster_id, attribute_id) + has_attribute = attribute_id in cluster + + attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id) + logging.debug( + f"Checking presence of claimed supported {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}: {'found' if has_attribute else 'not_found'}") + + # Check attribute is actually present. + if not has_attribute: + # TODO: Handle detecting write-only attributes from schema. + if "WriteOnly" in attribute_string: + continue + + self.record_error(self.get_test_name(), location=location, + problem=f"Did not find {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} when it was claimed in AttributeList ({attribute_list})", spec_location="AttributeList Attribute") + success = False + continue + + attribute_value = cluster[attribute_id] + if isinstance(attribute_value, ValueDecodeFailure): + self.record_warning(self.get_test_name(), location=location, + problem=f"Found a failure to read/decode {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} when it was claimed as supported in AttributeList ({attribute_list}): {str(attribute_value)}", spec_location="AttributeList Attribute") + # Warn only for now + # TODO: Fail in the future + continue + for attribute_id in cluster: + if attribute_id not in attribute_list: + attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id) + location = AttributePathLocation(endpoint_id, cluster_id, attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f'Found attribute {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} not listed in attribute list', spec_location="AttributeList Attribute") + success = False + + self.print_step(5, "Validate that the global attributes do not contain any additional values in the standard or scoped range that are not defined by the cluster") # Validate there are attributes in the global range that are not in the required list allowed_globals = [a.id for a in ATTRIBUTES_TO_CHECK] # also allow event list because it's not disallowed event_list_id = 0xFFFA allowed_globals.append(event_list_id) global_range_min = 0x0000_F000 - standard_range_max = 0x000_4FFF + attribute_standard_range_max = 0x000_4FFF mei_range_min = 0x0001_0000 for endpoint_id, endpoint in self.endpoints_tlv.items(): for cluster_id, cluster in endpoint.items(): @@ -521,7 +568,7 @@ class RequiredMandatoryAttribute: if cluster_id not in chip.clusters.ClusterObjects.ALL_ATTRIBUTES: # Skip clusters that are not part of the standard generated corpus (e.g. MS clusters) continue - standard_attributes = [a for a in cluster[ATTRIBUTE_LIST_ID] if a <= standard_range_max] + standard_attributes = [a for a in cluster[ATTRIBUTE_LIST_ID] if a <= attribute_standard_range_max] allowed_standard_attributes = chip.clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id] unexpected_standard_attributes = sorted(list(set(standard_attributes) - set(allowed_standard_attributes))) for unexpected in unexpected_standard_attributes: @@ -531,17 +578,71 @@ class RequiredMandatoryAttribute: success = False # validate there are no attributes in the range between standard and global + # This is de-facto already covered in the check above, assuming the spec hasn't defined any values in this range, but we should make sure for endpoint_id, endpoint in self.endpoints_tlv.items(): for cluster_id, cluster in endpoint.items(): - bad_range_values = [a for a in cluster[ATTRIBUTE_LIST_ID] if a > standard_range_max and a < global_range_min] + bad_range_values = [a for a in cluster[ATTRIBUTE_LIST_ID] if a > attribute_standard_range_max and a < global_range_min] for bad in bad_range_values: location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=bad) self.record_error(self.get_test_name(), location=location, problem=f"Attribute in undefined range {bad} in cluster {cluster_id}", spec_location=f"Cluster {cluster_id}") success = False + + command_standard_range_max = 0x0000_00FF + # Command lists only have a scoped range, so we only need to check for known command ids, no global range check + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + # Use the all attributes list here because all clusters have attributes, but not all have commands. + if cluster_id not in chip.clusters.ClusterObjects.ALL_CLUSTERS: + continue + standard_accepted_commands = [a for a in cluster[ACCEPTED_COMMAND_LIST_ID] if a <= command_standard_range_max] + standard_generated_commands = [a for a in cluster[GENERATED_COMMAND_LIST_ID] if a <= command_standard_range_max] + if cluster_id in chip.clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS: + allowed_accepted_commands = [a for a in chip.clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]] + else: + allowed_accepted_commands = [] + if cluster_id in chip.clusters.ClusterObjects.ALL_GENERATED_COMMANDS: + allowed_generated_commands = [a for a in chip.clusters.ClusterObjects.ALL_GENERATED_COMMANDS[cluster_id]] + else: + allowed_generated_commands = [] + + unexpected_accepted_commands = sorted(list(set(standard_accepted_commands) - set(allowed_accepted_commands))) + unexpected_generated_commands = sorted(list(set(standard_generated_commands) - set(allowed_generated_commands))) + + for unexpected in unexpected_accepted_commands: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id = cluster_id, command_id=unexpected) + self.record_error(self.get_test_name(), location=location, problem=f'Unexpected accepted command {unexpected} in cluster {cluster_id} allowed: {allowed_accepted_commands} listed: {standard_accepted_commands}', spec_location=f'Cluster {cluster_id}') + success = False + + for unexpected in unexpected_generated_commands: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id = cluster_id, command_id=unexpected) + self.record_error(self.get_test_name(), location=location, problem=f'Unexpected generated command {unexpected} in cluster {cluster_id} allowed: {allowed_generated_commands} listed: {standard_generated_commands}', spec_location=f'Cluster {cluster_id}') + success = False + + + self.print_step(6, "Validate that none of the global attribute IDs contain values outside of the allowed standard, MEI or test vendor prefix range") + # none of the lists should have any prefix > 0xFFF4 + prefix_max = 0xFFF4_0000 + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + attr_prefixes = [a & 0xFFFF_0000 for a in cluster[ATTRIBUTE_LIST_ID]] + cmd_values = cluster[ACCEPTED_COMMAND_LIST_ID] + cluster[GENERATED_COMMAND_LIST_ID] + cmd_prefixes = [a & 0xFFFF_0000 for a in cmd_values] + bad_attrs = [a for a in attr_prefixes if a > prefix_max] + bad_cmds = [a for a in cmd_prefixes if a > prefix_max] + for bad in bad_attrs: + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=bad) + self.record_error(self.get_test_name(), location=location, problem=f'Attribute with bad prefix {attribute_id} in cluster {cluster_id}', spec_location='Manufacturer Extensible Identifier (MEI)') + success = False + for bad in bad_cmds: + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=bad) + self.record_error(self.get_test_name(), location=location, problem=f'Command with bad prefix {attribute_id} in cluster {cluster_id}', spec_location='Manufacturer Extensible Identifier (MEI)') + success = False + + self.print_step(7, "Validate that none of the MEI global attribute IDs contain values outside of the allowed suffix range") # Validate that any attribute in the manufacturer prefix range is in the standard suffix range. - suffix_mask = 0x000_FFFF + suffix_mask = 0x0000_FFFF for endpoint_id, endpoint in self.endpoints_tlv.items(): for cluster_id, cluster in endpoint.items(): manufacturer_range_values = [a for a in cluster[ATTRIBUTE_LIST_ID] if a > mei_range_min] @@ -549,7 +650,7 @@ class RequiredMandatoryAttribute: suffix = manufacturer_value & suffix_mask location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=manufacturer_value) - if suffix > standard_range_max and suffix < global_range_min: + if suffix > attribute_standard_range_max and suffix < global_range_min: self.record_error(self.get_test_name(), location=location, problem=f"Manufacturer attribute in undefined range {manufacturer_value} in cluster {cluster_id}", spec_location=f"Cluster {cluster_id}") @@ -560,54 +661,57 @@ class RequiredMandatoryAttribute: spec_location=f"Cluster {cluster_id}") success = False - # TODO: maybe while we're at it, we should check that the command list doesn't contain unexpected commands. - - # Validate presence of claimed attributes - if success: - # TODO: Also check the reverse: that each attribute appears in the AttributeList. - logging.info( - "Validating that a wildcard read on each cluster provided all attributes claimed in AttributeList mandatory global attribute") - - for endpoint_id, endpoint in self.endpoints_tlv.items(): - for cluster_id, cluster in endpoint.items(): - attribute_list = cluster[ATTRIBUTE_LIST_ID] - for attribute_id in attribute_list: - location = AttributePathLocation(endpoint_id, cluster_id, attribute_id) - has_attribute = attribute_id in cluster + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + accepted_manufacturer_range_values = [a for a in cluster[ACCEPTED_COMMAND_LIST_ID] if a > mei_range_min] + generated_manufacturer_range_values = [a for a in cluster[GENERATED_COMMAND_LIST_ID] if a > mei_range_min] + all_command_manufacturer_range_values = accepted_manufacturer_range_values + generated_manufacturer_range_values + for manufacturer_value in all_command_manufacturer_range_values: + suffix = manufacturer_value & suffix_mask + location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=manufacturer_value) + if suffix > command_standard_range_max: + self.record_error(self.get_test_name(), location=location, problem=f'Manufacturer command in the undefined suffix range {manufacturer_value} in cluster {cluster_id}', spec_location='Manufacturer Extensible Identifier (MEI)') + success = False - attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id) - logging.info( - f"Checking presence of claimed supported {attribute_string} on {location.as_cluster_string(self.cluster_mapper)}: {'found' if has_attribute else 'not_found'}") + self.print_step(8, "Validate that all cluster ID prefixes are in the standard, MEI or test vendor range") + for endpoint_id, endpoint in self.endpoints_tlv.items(): + cluster_prefixes = [a & 0xFFFF_0000 for a in endpoint.keys()] + bad_clusters_ids = [a for a in cluster_prefixes if a > prefix_max] + for bad in bad_clusters_ids: + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=bad) + self.record_error(self.get_test_name(), location=location, problem=f'Bad cluster id prefix {bad}', spec_location='Manufacturer Extensible Identifier (MEI)') + success = False - # Check attribute is actually present. - if not has_attribute: - # TODO: Handle detecting write-only attributes from schema. - if "WriteOnly" in attribute_string: - continue + self.print_step(9, "Validate that all clusters in the standard range have a known cluster ID") + for endpoint_id, endpoint in self.endpoints_tlv.items(): + standard_clusters = [a for a in endpoint.keys() if a < mei_range_min] + unknown_clusters = sorted(list(set(standard_clusters) - set(chip.clusters.ClusterObjects.ALL_CLUSTERS))) + for bad in unknown_clusters: + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=bad) + self.record_error(self.get_test_name(), location=location, problem=f'Unknown cluster ID in the standard range {bad}', spec_location='Manufacturer Extensible Identifier (MEI)') + success = False - self.record_error(self.get_test_name(), location=location, - problem=f"Did not find {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} when it was claimed in AttributeList ({attribute_list})", spec_location="AttributeList Attribute") - success = False - continue + self.print_step(10, "Validate that all clusters in the MEI range have a suffix in the manufacturer suffix range") + for endpoint_id, endpoint in self.endpoints_tlv.items(): + mei_clusters = [a for a in endpoint.keys() if a >= mei_range_min] + bad_clusters = [a for a in mei_clusters if ((a & 0x0000_FFFF) < 0xFC00) or ((a & 0x0000_FFFF) > 0xFFFE)] + for bad in bad_clusters: + location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=bad) + self.record_error(self.get_test_name(), location=location, problem=f'MEI cluster with an out of range suffix {bad}', spec_location='Manufacturer Extensible Identifier (MEI)') + success = False - attribute_value = cluster[attribute_id] - if isinstance(attribute_value, ValueDecodeFailure): - self.record_warning(self.get_test_name(), location=location, - problem=f"Found a failure to read/decode {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} when it was claimed as supported in AttributeList ({attribute_list}): {str(attribute_value)}", spec_location="AttributeList Attribute") - # Warn only for now - # TODO: Fail in the future - continue - for attribute_id in cluster: - if attribute_id not in attribute_list: - attribute_string = self.cluster_mapper.get_attribute_string(cluster_id, attribute_id) - location = AttributePathLocation(endpoint_id, cluster_id, attribute_id) - self.record_error(self.get_test_name(), location=location, - problem=f'Found attribute {attribute_string} on {location.as_cluster_string(self.cluster_mapper)} not listed in attribute list', spec_location="AttributeList Attribute") - success = False + self.print_step(11, "Validate that all feature maps have known feature bits") + for endpoint_id, endpoint in self.endpoints_tlv.items(): + for cluster_id, cluster in endpoint.items(): + if cluster_id not in chip.clusters.ClusterObjects.ALL_CLUSTERS: + continue + feature_map = cluster[FEATURE_MAP_ID] + feature_map_enum = chip.clusters.ClusterObjects.ALL_CLUSTERS[cluster_id].Bitmaps.Feature + #TODO: or these all together, subtract from the feature map, see what's left over if not success: self.fail_current_test( - "At least one cluster was missing a mandatory global attribute or had differences between claimed attributes supported and actual.") + "At least one cluster has failed the global attribute range and support checks") def test_IDM_11_1(self): success = True diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index a394952445de60..7d11496ede7701 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -333,6 +333,11 @@ class CommandPathLocation: cluster_id: int command_id: int +@dataclass +class ClusterPathLocation: + endpoint_id: int + cluster_id: int + # ProblemSeverity is not using StrEnum, but rather Enum, since StrEnum only # appeared in 3.11. To make it JSON serializable easily, multiple inheritance # from `str` is used. See https://stackoverflow.com/a/51976841. @@ -347,7 +352,7 @@ class ProblemSeverity(str, Enum): @dataclass class ProblemNotice: test_name: str - location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation] + location: Union[AttributePathLocation, EventPathLocation, CommandPathLocation, ClusterPathLocation] severity: ProblemSeverity problem: str spec_location: str = ""