Skip to content

Commit

Permalink
Spec parsing: Use new ID table (#32410) (#32971)
Browse files Browse the repository at this point in the history
* Spec parsing: Use new ID table

The ID table has now been added for every cluster, including alias
clusters. This means we can now just make direct copies of the alias
clusters as we parse the cluster file. It also means we can ignore
the top-level cluster tag, so we don't need to worry about the
new weird naming.

Not yet handled in the XML - pics for aliased clusters.

* Restyled by isort

---------

Co-authored-by: Restyled.io <[email protected]>
  • Loading branch information
cecille and restyled-commits authored Apr 12, 2024
1 parent c3ebe38 commit aa3a804
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 61 deletions.
98 changes: 95 additions & 3 deletions src/python_testing/TestSpecParsingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from global_attribute_ids import GlobalAttributeIds
from matter_testing_support import MatterBaseTest, ProblemNotice, default_matter_test_main
from mobly import asserts
from spec_parsing_support import (ClusterParser, XmlCluster, add_cluster_data_from_xml, check_clusters_for_unknown_commands,
combine_derived_clusters_with_base)
from spec_parsing_support import (ClusterParser, XmlCluster, add_cluster_data_from_xml, build_xml_clusters,
check_clusters_for_unknown_commands, combine_derived_clusters_with_base)

# TODO: improve the test coverage here
# https://github.com/project-chip/connectedhomeip/issues/30958
Expand All @@ -40,6 +40,9 @@ def single_attribute_cluster_xml(read_access: str, write_access: str, write_supp
'<revision revision="2" summary="Some other revision"/>'
'<revision revision="3" summary="another revision"/>'
'</revisionHistory>')
id_table = ('<clusterIds>'
f'<clusterId id="{CLUSTER_ID}" name="{CLUSTER_NAME}"/>'
'</clusterIds>')
classification = '<classification hierarchy="base" role="utility" picsCode="TEST" scope="Node"/>'
read_access_str = f'read="true" readPrivilege="{read_access}"' if read_access is not None else ""
write_access_str = f'write="{write_supported}" writePrivilege="{write_access}"' if write_access is not None else ""
Expand All @@ -53,14 +56,15 @@ def single_attribute_cluster_xml(read_access: str, write_access: str, write_supp

return (f'{xml_cluster}'
f'{revision_table}'
f'{id_table}'
f'{classification}'
f'{attribute}'
'</cluster>')


def parse_cluster(xml: str) -> XmlCluster:
cluster = ElementTree.fromstring(xml)
parser = ClusterParser(cluster, CLUSTER_ID, CLUSTER_NAME, False)
parser = ClusterParser(cluster, CLUSTER_ID, CLUSTER_NAME)
return parser.create_cluster()


Expand All @@ -83,6 +87,9 @@ def get_access_enum_from_string(access_str: str) -> Clusters.AccessControl.Enums
' <revisionHistory>'
' <revision revision="1" summary="Initial version"/>'
' </revisionHistory>'
' <clusterIds>'
' <clusterId name="Test Base"/>'
' </clusterIds>'
' <classification hierarchy="base" role="application" picsCode="BASE" scope="Endpoint"/>'
' <features>'
' <feature bit="0" code="DEPONOFF" name="OnOff" summary="Dependency with the OnOff cluster">'
Expand Down Expand Up @@ -151,6 +158,9 @@ def get_access_enum_from_string(access_str: str) -> Clusters.AccessControl.Enums
' <revisionHistory>'
' <revision revision="1" summary="Initial Release"/>'
' </revisionHistory>'
' <clusterIds>'
' <clusterId id="0xFFFF" name="Test Derived"/>'
' </clusterIds>'
' <classification hierarchy="derived" baseCluster="Test Base" role="application" picsCode="MWOM" scope="Endpoint"/>'
' <attributes>'
' <attribute id="0x0000" name="SupportedModes">'
Expand Down Expand Up @@ -181,6 +191,9 @@ def get_access_enum_from_string(access_str: str) -> Clusters.AccessControl.Enums
' <revisionHistory>'
' <revision revision="1" summary="Initial version"/>'
' </revisionHistory>'
' <clusterIds>'
' <clusterId id="0xFFFE" name="Test Unknown Command"/>'
' </clusterIds>'
' <classification hierarchy="base" role="application" picsCode="BASE" scope="Endpoint"/>'
' <commands>'
' <command id="0x00" name="ChangeToMode" direction="commandToClient">'
Expand All @@ -191,8 +204,32 @@ def get_access_enum_from_string(access_str: str) -> Clusters.AccessControl.Enums
'</cluster>'
)

ALIASED_CLUSTERS = (
'<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="" name="Test Aliases" revision="1">'
' <revisionHistory>'
' <revision revision="1" summary="Initial version"/>'
' </revisionHistory>'
' <clusterIds>'
' <clusterId id="0xFFFE" name="Test Alias1"/>'
' <clusterId id="0xFFFD" name="Test Alias2"/>'
' </clusterIds>'
' <classification hierarchy="base" role="application" picsCode="BASE" scope="Endpoint"/>'
' <commands>'
' <command id="0x00" name="ChangeToMode" direction="commandToServer">'
' <access invokePrivilege="operate"/>'
' <mandatoryConform/>'
' </command>'
' </commands>'
'</cluster>'
)


class TestSpecParsingSupport(MatterBaseTest):
def setup_class(self):
super().setup_class()
self.spec_xml_clusters, self.spec_problems = build_xml_clusters()
self.all_spec_clusters = set([(id, c.name, c.pics) for id, c in self.spec_xml_clusters.items()])

def test_spec_parsing_access(self):
strs = [None, 'view', 'operate', 'manage', 'admin']
for read in strs:
Expand Down Expand Up @@ -296,6 +333,61 @@ def test_missing_command_direction(self):
asserts.assert_equal(problems[0].location.cluster_id, 0xFFFE, "Unexpected problem location (cluster id)")
asserts.assert_equal(problems[0].location.command_id, 0, "Unexpected problem location (command id)")

def test_aliased_clusters(self):
clusters: dict[int, XmlCluster] = {}
pure_base_clusters: dict[str, XmlCluster] = {}
ids_by_name: dict[str, int] = {}
problems: list[ProblemNotice] = []
cluster_xml = ElementTree.fromstring(ALIASED_CLUSTERS)

add_cluster_data_from_xml(cluster_xml, clusters, pure_base_clusters, ids_by_name, problems)
asserts.assert_equal(len(problems), 0, "Unexpected problem parsing aliased clusters")
asserts.assert_equal(len(clusters), 2, "Unexpected number of clusters when parsing aliased cluster set")
asserts.assert_equal(len(pure_base_clusters), 0, "Unexpected number of pure base clusters")
asserts.assert_equal(len(ids_by_name), 2, "Unexpected number of ids by name")

ids = [(id, c.name) for id, c in clusters.items()]
asserts.assert_true((0xFFFE, 'Test Alias1') in ids, "Unable to find Test Alias1 cluster in parsed clusters")
asserts.assert_true((0xFFFD, 'Test Alias2') in ids, "Unable to find Test Alias2 cluster in parsed clusters")

def test_known_aliased_clusters(self):
known_aliased_clusters = set([(0x040C, 'Carbon Monoxide Concentration Measurement', 'CMOCONC'),
(0x040D, 'Carbon Dioxide Concentration Measurement', 'CDOCONC'),
(0x0413, 'Nitrogen Dioxide Concentration Measurement', 'NDOCONC'),
(0x0415, 'Ozone Concentration Measurement', 'OZCONC'),
# Change to "PM2.5 Concentration Measurement" once https://github.com/csa-data-model/projects/issues/453 is fixed
(0x042A, 'PM2', 'PMICONC'),
(0x042B, 'Formaldehyde Concentration Measurement', 'FLDCONC'),
(0x042C, 'PM1 Concentration Measurement', 'PMHCONC'),
(0x042D, 'PM10 Concentration Measurement', 'PMKCONC'),
(0x042E, 'Total Volatile Organic Compounds Concentration Measurement', 'TVOCCONC'),
(0x042F, 'Radon Concentration Measurement', 'RNCONC'),
(0x0071, 'HEPA Filter Monitoring', 'HEPAFREMON'),
(0x0072, 'Activated Carbon Filter Monitoring', 'ACFREMON'),
(0x0405, 'Relative Humidity Measurement', 'RH')])

missing_clusters = known_aliased_clusters - self.all_spec_clusters
asserts.assert_equal(len(missing_clusters), 0, f"Missing aliased clusters from DM XML - {missing_clusters}")

def test_known_derived_clusters(self):
known_derived_clusters = set([(0x0048, 'Oven Cavity Operational State', 'OVENOPSTATE'),
(0x0049, 'Oven Mode', 'OTCCM'),
(0x0051, 'Laundry Washer Mode', 'LWM'),
(0x0052, 'Refrigerator And Temperature Controlled Cabinet Mode', 'TCCM'),
(0x0054, 'RVC Run Mode', 'RVCRUNM'),
(0x0055, 'RVC Clean Mode', 'RVCCLEANM'),
(0x0057, 'Refrigerator Alarm', 'REFALM'),
(0x0059, 'Dishwasher Mode', 'DISHM'),
(0x005c, 'Smoke CO Alarm', 'SMOKECO'),
(0x005d, 'Dishwasher Alarm', 'DISHALM'),
(0x005e, 'Microwave Oven Mode', 'MWOM'),
(0x0061, 'RVC Operational State', 'RVCOPSTATE')])

missing_clusters = known_derived_clusters - self.all_spec_clusters
asserts.assert_equal(len(missing_clusters), 0, f"Missing aliased clusters from DM XML - {missing_clusters}")
for d in known_derived_clusters:
asserts.assert_true(self.spec_xml_clusters is not None, "Derived cluster with no base cluster marker")


if __name__ == "__main__":
default_matter_test_main()
99 changes: 41 additions & 58 deletions src/python_testing/spec_parsing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,40 +113,28 @@ class CommandType(Enum):
UNKNOWN = auto()


# workaround for aliased clusters not appearing in the xml. Remove this once https://github.com/csa-data-model/projects/issues/373 is addressed
CONC_CLUSTERS = {0x040C: ('Carbon Monoxide Concentration Measurement', 'CMOCONC'),
0x040D: ('Carbon Dioxide Concentration Measurement', 'CDOCONC'),
0x0413: ('Nitrogen Dioxide Concentration Measurement', 'NDOCONC'),
0x0415: ('Ozone Concentration Measurement', 'OZCONC'),
0x042A: ('PM2.5 Concentration Measurement', 'PMICONC'),
0x042B: ('Formaldehyde Concentration Measurement', 'FLDCONC'),
0x042C: ('PM1 Concentration Measurement', 'PMHCONC'),
0x042D: ('PM10 Concentration Measurement', 'PMKCONC'),
0x042E: ('Total Volatile Organic Compounds Concentration Measurement', 'TVOCCONC'),
0x042F: ('Radon Concentration Measurement', 'RNCONC')}
CONC_BASE_NAME = 'Concentration Measurement Clusters'
RESOURCE_CLUSTERS = {0x0071: ('HEPA Filter Monitoring', 'HEPAFREMON'),
0x0072: ('Activated Carbon Filter Monitoring', 'ACFREMON')}
RESOURCE_BASE_NAME = 'Resource Monitoring Clusters'
WATER_CLUSTER = {0x0405: ('Relative Humidity Measurement', 'RH')}
WATER_BASE_NAME = 'Water Content Measurement Clusters'
CLUSTER_ALIASES = {CONC_BASE_NAME: CONC_CLUSTERS, RESOURCE_BASE_NAME: RESOURCE_CLUSTERS, WATER_BASE_NAME: WATER_CLUSTER}


def is_alias(id: uint):
for base, alias in CLUSTER_ALIASES.items():
if id in alias:
return True
return False
# workaround for aliased clusters PICS not appearing in the xml. Remove this once https://github.com/csa-data-model/projects/issues/461 is addressed
ALIAS_PICS = {0x040C: 'CMOCONC',
0x040D: 'CDOCONC',
0x0413: 'NDOCONC',
0x0415: 'OZCONC',
0x042A: 'PMICONC',
0x042B: 'FLDCONC',
0x042C: 'PMHCONC',
0x042D: 'PMKCONC',
0x042E: 'TVOCCONC',
0x042F: 'RNCONC',
0x0071: 'HEPAFREMON',
0x0072: 'ACFREMON',
0x0405: 'RH'}


class ClusterParser:
def __init__(self, cluster, cluster_id, name, is_alias):
def __init__(self, cluster, cluster_id, name):
self._problems: list[ProblemNotice] = []
self._cluster = cluster
self._cluster_id = cluster_id
self._name = name
self._is_alias = is_alias

self._derived = None
try:
Expand All @@ -163,6 +151,9 @@ def __init__(self, cluster, cluster_id, name, is_alias):
except (KeyError, StopIteration):
self._pics = None

if self._cluster_id in ALIAS_PICS.keys():
self._pics = ALIAS_PICS[cluster_id]

self.feature_elements = self.get_all_feature_elements()
self.attribute_elements = self.get_all_attribute_elements()
self.command_elements = self.get_all_command_elements()
Expand Down Expand Up @@ -282,12 +273,12 @@ def str_to_access_type(privilege_str: str) -> Clusters.AccessControl.Enums.Acces
return Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue

if access_xml is None:
# Derived and alias clusters can inherit their access from the base and that's fine, so don't add an error
# Derived clusters can inherit their access from the base and that's fine, so don't add an error
# Similarly, pure base clusters can have the access defined in the derived clusters. If neither has it defined,
# we will determine this at the end when we put these together.
# Things with deprecated conformance don't get an access element, and that is also fine.
# If a device properly passes the conformance test, such elements are guaranteed not to appear on the device.
if self._is_alias or self._derived is not None or is_disallowed(conformance):
if self._derived is not None or is_disallowed(conformance):
return (None, None, None)

location = self.get_location_from_element(element_xml)
Expand Down Expand Up @@ -421,31 +412,32 @@ def add_cluster_data_from_xml(xml: ElementTree.Element, clusters: dict[int, XmlC
xml: XML element read from from the XML cluster file
clusters: dict of id -> XmlCluster. This function will append new clusters as appropriate to this dict.
pure_base_clusters: dict of base name -> XmlCluster. This data structure is used to hold pure base clusters that don't have
an ID. This function will append new pure base clusters as approrpriate to this dict.
an ID. This function will append new pure base clusters as appropriate to this dict.
ids_by_name: dict of cluster name -> ID. This function will append new IDs as appropriate to this dict.
problems: list of any problems encountered during spec parsing. This function will append problems as appropriate to this list.
'''
cluster = xml.iter('cluster')
for c in cluster:
name = c.attrib['name']
if not c.attrib['id']:
# Fully derived clusters have no id, but also shouldn't appear on a device.
# We do need to keep them, though, because we need to update the derived
# clusters. We keep them in a special dict by name, so they can be thrown
# away later.
cluster_id = None
else:
cluster_id = int(c.attrib['id'], 0)
ids_by_name[name] = cluster_id

parser = ClusterParser(c, cluster_id, name, is_alias(cluster_id))
new = parser.create_cluster()
problems = problems + parser.get_problems()

if cluster_id:
clusters[cluster_id] = new
else:
pure_base_clusters[name] = new
ids = c.iter('clusterId')
for id in ids:
name = id.get('name')
cluster_id = id.get('id')
if cluster_id:
cluster_id = int(id.get('id'), 0)
ids_by_name[name] = cluster_id

parser = ClusterParser(c, cluster_id, name)
new = parser.create_cluster()
problems = problems + parser.get_problems()

if cluster_id:
clusters[cluster_id] = new
else:
# Fully derived clusters have no id, but also shouldn't appear on a device.
# We do need to keep them, though, because we need to update the derived
# clusters. We keep them in a special dict by name, so they can be thrown
# away later.
pure_base_clusters[name] = new


def check_clusters_for_unknown_commands(clusters: dict[int, XmlCluster], problems: list[ProblemNotice]):
Expand Down Expand Up @@ -489,15 +481,6 @@ def remove_problem(location: typing.Union[CommandPathLocation, FeaturePathLocati

combine_derived_clusters_with_base(clusters, pure_base_clusters, ids_by_name)

for alias_base_name, aliased_clusters in CLUSTER_ALIASES.items():
for id, (alias_name, pics) in aliased_clusters.items():
base = pure_base_clusters[alias_base_name]
new = deepcopy(base)
new.derived = alias_base_name
new.name = alias_name
new.pics = pics
clusters[id] = new

# TODO: All these fixups should be removed BEFORE SVE if at all possible
# Workaround for Color Control cluster - the spec uses a non-standard conformance. Set all to optional now, will need
# to implement either arithmetic conformance handling (once spec changes land here) or specific test
Expand Down

0 comments on commit aa3a804

Please sign in to comment.