Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Access checker: Improve write check support #31452

Merged
merged 10 commits into from
Jan 19, 2024
35 changes: 29 additions & 6 deletions src/python_testing/TC_AccessChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class AccessTestType(Enum):
WRITE = auto()


def is_global(id: int) -> bool:
cecille marked this conversation as resolved.
Show resolved Hide resolved
return id >= 0xF000 and id <= 0xFFFE


def operation_allowed(spec_requires: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum,
acl_set_to: Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]) -> bool:
''' Determines if the action is allowed on the device based on the spec_requirements and the current ACL privilege granted.
Expand Down Expand Up @@ -137,20 +141,38 @@ async def _run_read_access_test_for_cluster_privilege(self, endpoint_id, cluster
async def _run_write_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, cluster, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum, wildcard_read):
for attribute_id in checkable_attributes(cluster_id, cluster, xml_cluster):
spec_requires = xml_cluster.attributes[attribute_id].write_access
if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue:
# not writeable
continue
is_optional_write = xml_cluster.attributes[attribute_id].write_optional

attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id]
cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id]
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
test_name = f'Write access checker - {privilege}'
logging.info(f"Testing attribute {attribute} on endpoint {endpoint_id}")
if attribute == Clusters.AccessControl.Attributes.Acl and privilege == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister:
logging.info("Skipping ACL attribute check for admin privilege as this is known to be writeable and is being used for this test")
continue

print(f"Testing attribute {attribute} on endpoint {endpoint_id}")
# Because we read everything with admin, we should have this in the wildcard read
# This will only not work if we end up with write-only attributes. We do not currently have any of these.
val = wildcard_read.attributes[endpoint_id][cluster_class][attribute]
if isinstance(val, list):
# Use an empty list for writes in case the list is large and does not fit
val = []
cecille marked this conversation as resolved.
Show resolved Hide resolved

resp = await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))])
if operation_allowed(spec_requires, privilege):
if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue:
# not writeable - expect an unsupported write response
# Global vars currently return the wrong error code - see #31448
cecille marked this conversation as resolved.
Show resolved Hide resolved
ok = (is_global(attribute.attribute_id) and resp[0].Status ==
Status.UnsupportedAttribute) or resp[0].Status == Status.UnsupportedWrite
if not ok:
self.record_error(test_name=test_name, location=location,
problem=f"Unexpected error writing non-writeable attribute - expected Unsupported Write, got {resp[0].Status}")
self.success = False
elif is_optional_write and resp[0].Status == Status.UnsupportedWrite:
# unsupported optional writeable attribute - this is fine, no error
continue
elif operation_allowed(spec_requires, privilege):
# Write the default attribute. We don't care if this fails, as long as it fails with a DIFFERENT error than the access
# This is OK because access is required to be checked BEFORE any other thing to avoid leaking device information.
# For example, because we don't have any range information, we might be writing an out of range value, but this will
Expand All @@ -172,7 +194,8 @@ async def run_access_test(self, test_type: AccessTestType):
await self._setup_acl(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister)
wildcard_read = await self.TH2.Read(self.dut_node_id, [()])

privilege_enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for privilege in privilege_enum:
logging.info(f"Testing for {privilege}")
await self._setup_acl(privilege=privilege)
Expand Down
18 changes: 15 additions & 3 deletions src/python_testing/TestSpecParsingSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ATTRIBUTE_ID = 0x0000


def single_attribute_cluster_xml(read_access: str, write_access: str):
def single_attribute_cluster_xml(read_access: str, write_access: str, write_supported: str):
xml_cluster = f'<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="{CLUSTER_ID}" name="{CLUSTER_NAME}" revision="3">'
revision_table = ('<revisionHistory>'
'<revision revision="1" summary="Initial Release"/>'
Expand All @@ -41,7 +41,7 @@ def single_attribute_cluster_xml(read_access: str, write_access: str):
'</revisionHistory>')
classification = '<classification hierarchy="base" role="utility" picsCode="TEST" scope="Node"/>'
read_access_str = f'read="true" readPrivilege="{read_access}"' if read_access is not None else ""
write_access_str = f'write="true" writePrivilege="{write_access}"' if write_access is not None else ""
write_access_str = f'write="{write_supported}" writePrivilege="{write_access}"' if write_access is not None else ""
attribute = ('<attributes>'
f'<attribute id="{ATTRIBUTE_ID}" name="{ATTRIBUTE_NAME}" type="uint16" default="MS">'
f'<access {read_access_str} {write_access_str}/>'
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_spec_parsing_access(self):
strs = [None, 'view', 'operate', 'manage', 'admin']
for read in strs:
for write in strs:
xml = single_attribute_cluster_xml(read, write)
xml = single_attribute_cluster_xml(read, write, "true")
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster")
asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster")
Expand All @@ -94,6 +94,18 @@ def test_spec_parsing_access(self):
asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_access,
get_access_enum_from_string(write), "Unexpected write access")

def test_write_optional(self):
for write_support in ['true', 'optional']:
xml = single_attribute_cluster_xml('view', 'view', write_support)
xml_cluster = parse_cluster(xml)
asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster")
asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster")
asserts.assert_equal(len(xml_cluster.attributes), len(GlobalAttributeIds) + 1, "Unexpected number of attributes")
asserts.assert_true(ATTRIBUTE_ID in xml_cluster.attributes.keys(),
"Did not find test attribute in XmlCluster.attributes")
asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_optional,
write_support == 'optional', "Unexpected write_optional value")


if __name__ == "__main__":
default_matter_test_main()
23 changes: 15 additions & 8 deletions src/python_testing/spec_parsing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class XmlAttribute:
conformance: Callable[[uint], ConformanceDecision]
read_access: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
write_access: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
write_optional: bool

def access_string(self):
read_marker = "R" if self.read_access is not Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue else ""
Expand Down Expand Up @@ -221,6 +222,9 @@ def parse_conformance(self, conformance_xml: ElementTree.Element) -> Callable:
severity=ProblemSeverity.WARNING, problem=str(ex)))
return None

def parse_write_optional(self, element_xml: ElementTree.Element, access_xml: ElementTree.Element) -> bool:
return access_xml.attrib['write'] == 'optional'

def parse_access(self, element_xml: ElementTree.Element, access_xml: ElementTree.Element, conformance: Callable) -> tuple[Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum], Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum], Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]]:
''' Returns a tuple of access types for read / write / invoke'''
def str_to_access_type(privilege_str: str) -> Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum:
Expand Down Expand Up @@ -294,13 +298,16 @@ def parse_attributes(self) -> dict[uint, XmlAttribute]:
# I don't have a good way to relate the ranges to the conformance, but they're both acceptable, so let's just or them.
conformance = or_operation([conformance, attributes[code].conformance])
read_access, write_access, _ = self.parse_access(element, access_xml, conformance)
write_optional = False
if write_access not in [None, Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue]:
write_optional = self.parse_write_optional(element, access_xml)
attributes[code] = XmlAttribute(name=element.attrib['name'], datatype=datatype,
conformance=conformance, read_access=read_access, write_access=write_access)
conformance=conformance, read_access=read_access, write_access=write_access, write_optional=write_optional)
# Add in the global attributes for the base class
for id in GlobalAttributeIds:
# TODO: Add data type here. Right now it's unused. We should parse this from the spec.
attributes[id] = XmlAttribute(name=id.to_name(), datatype="", conformance=mandatory(
), read_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, write_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue)
), read_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, write_access=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue, write_optional=False)
return attributes

def parse_commands(self, command_type: CommandType) -> dict[uint, XmlAttribute]:
Expand Down Expand Up @@ -502,12 +509,12 @@ def remove_problem(location: typing.Union[CommandPathLocation, FeaturePathLocati
view = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView
none = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue
clusters[temp_control_id].attributes = {
0x00: XmlAttribute(name='TemperatureSetpoint', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none),
0x01: XmlAttribute(name='MinTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none),
0x02: XmlAttribute(name='MaxTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none),
0x03: XmlAttribute(name='Step', datatype='temperature', conformance=feature(0x04, 'STEP'), read_access=view, write_access=none),
0x04: XmlAttribute(name='SelectedTemperatureLevel', datatype='uint8', conformance=feature(0x02, 'TL'), read_access=view, write_access=none),
0x05: XmlAttribute(name='SupportedTemperatureLevels', datatype='list', conformance=feature(0x02, 'TL'), read_access=view, write_access=none),
0x00: XmlAttribute(name='TemperatureSetpoint', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none, write_optional=False),
0x01: XmlAttribute(name='MinTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none, write_optional=False),
0x02: XmlAttribute(name='MaxTemperature', datatype='temperature', conformance=feature(0x01, 'TN'), read_access=view, write_access=none, write_optional=False),
0x03: XmlAttribute(name='Step', datatype='temperature', conformance=feature(0x04, 'STEP'), read_access=view, write_access=none, write_optional=False),
0x04: XmlAttribute(name='SelectedTemperatureLevel', datatype='uint8', conformance=feature(0x02, 'TL'), read_access=view, write_access=none, write_optional=False),
0x05: XmlAttribute(name='SupportedTemperatureLevels', datatype='list', conformance=feature(0x02, 'TL'), read_access=view, write_access=none, write_optional=False),
}

# Workaround for incorrect parsing of access control cluster.
Expand Down
Loading