From 0ff709f25f6a7af68314c1b0a147a76c30c234f6 Mon Sep 17 00:00:00 2001 From: Gibran Vargas <131407127+gvargas-csa@users.noreply.github.com> Date: Fri, 19 Jan 2024 20:41:32 -0800 Subject: [PATCH] TC-ACE_1.5: Add (#30972) * chore(python_testing): steps 1 and 2 * chore(python_testing): opencommissionwindow, oncommission methods, read currentfabric and steps 3, 4 * chore(python_testing): create ACL structures for th1 and th2. Working on steps (7,8,9,10) * chore(python_testing): change read_descriptor and read_basic for unsupported and expect success all steps added * chore(TC_ACE_1_5): complete all test steps ready to executed * chore(TC_ACE-1.5): fix errors using flake8 Python linter * chore(TC_ACE-1.5): fix errors using flake8 python linter * chore(TC_ACE-1.5): fix error using Restyled path * chore(base): Add openCommissioningWindow to matter base test * chore(TC_ACE-1.5): fix error using wrong node to openComissioningWindow method * chore(base): return a tuple of params onCommissioningWindow Added new random discriminator is store on a dictionary. When is calling onCommissioningWindow method is return as a tuple (CommissioningParameters, random_discriminator) * chore(TC_ACE-1.5): use InteractionModelError instead Exception to handle error onCommissioningWindow * chore(ci): add new test case TC_ACE-1.5 * chore(TC_ACE-1.5): modify openCommissioningWindow and type annotations * chore(TC_ACE-1.5): implement all suggestions by PR --- .github/workflows/tests.yaml | 1 + src/python_testing/TC_ACE_1_5.py | 141 +++++++++++++++++++ src/python_testing/matter_testing_support.py | 19 +++ 3 files changed, 161 insertions(+) create mode 100644 src/python_testing/TC_ACE_1_5.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index ec3abc3d9bda18..b5ed62592ecfc6 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -463,6 +463,7 @@ jobs: scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_ACE_1_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_ACE_1_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --int-arg PIXIT.ACE.APPENDPOINT:1 PIXIT.ACE.APPDEVTYPEID:0x0100 --string-arg PIXIT.ACE.APPCLUSTER:OnOff PIXIT.ACE.APPATTRIBUTE:OnOff --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_ACE_1_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' + scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_ACE_1_5.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_CGEN_2_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TC_DA_1_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace-to json:out/trace_data/app-{SCRIPT_BASE_NAME}.json" --script "src/python_testing/TestGroupTableReports.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:out/trace_data/test-{SCRIPT_BASE_NAME}.json --trace-to perfetto:out/trace_data/test-{SCRIPT_BASE_NAME}.perfetto"' diff --git a/src/python_testing/TC_ACE_1_5.py b/src/python_testing/TC_ACE_1_5.py new file mode 100644 index 00000000000000..93c3fce5c58c11 --- /dev/null +++ b/src/python_testing/TC_ACE_1_5.py @@ -0,0 +1,141 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging + +import chip.clusters as Clusters +from chip import ChipDeviceCtrl +from chip.interaction_model import Status +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_ACE_1_5(MatterBaseTest): + + async def read_currentfabricindex(self, th: ChipDeviceCtrl) -> int: + cluster = Clusters.Objects.OperationalCredentials + attribute = Clusters.OperationalCredentials.Attributes.CurrentFabricIndex + current_fabric_index = await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute) + return current_fabric_index + + async def write_acl(self, acl: Clusters.AccessControl, th: ChipDeviceCtrl): + result = await th.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))]) + asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed") + + @async_test_body + async def test_TC_ACE_1_5(self): + self.print_step(1, "Comissioning, already done") + self.th1 = self.default_controller + + # TODO: move into base class and adjust tests (#31521) + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.matter_test_config.fabric_id + 1) + + TH1_nodeid = self.matter_test_config.controller_node_id + TH2_nodeid = self.matter_test_config.controller_node_id + 2 + + self.th2 = new_fabric_admin.NewController(nodeId=TH2_nodeid, + paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path)) + + params = self.openCommissioningWindow(self.th1, self.dut_node_id) + self.print_step(2, "TH1 opens the commissioning window on the DUT") + + errcode = self.th2.CommissionOnNetwork( + nodeId=self.dut_node_id, setupPinCode=params.commissioningParameters.setupPinCode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=params.randomDiscriminator) + logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(errcode.is_success, errcode)) + self.print_step(3, "TH2 commissions DUT using admin node ID N2") + + self.print_step(4, "TH2 reads its fabric index from the Operational Credentials cluster CurrentFabricIndex attribute") + th2FabricIndex = await self.read_currentfabricindex(self.th2) + + self.print_step( + 5, "TH1 writes DUT Endpoint 0 ACL cluster ACL attribute, value is list of ACLEntryStruct containing 2 elements") + admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( + privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH1_nodeid], + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.AccessControl.id)]) + descriptor_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( + privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[], + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.Descriptor.id)]) + acl = [admin_acl, descriptor_view] + await self.write_acl(acl, self.th1) + + self.print_step( + 6, "TH2 writes DUT Endpoint 0 ACL cluster ACL attribute, value is list of ACLEntryStruct containing 2 elements") + admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( + fabricIndex=th2FabricIndex, + privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH2_nodeid], + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.AccessControl.id)]) + descriptor_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( + fabricIndex=th2FabricIndex, + privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[], + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.BasicInformation.id)]) + acl = [admin_acl, descriptor_view] + await self.write_acl(acl, self.th2) + + self.print_step(7, "TH1 reads DUT Endpoint 0 Descriptor cluster DeviceTypeList attribute") + await self.read_single_attribute_check_success( + dev_ctrl=self.th1, endpoint=0, + cluster=Clusters.Objects.Descriptor, + attribute=Clusters.Descriptor.Attributes.DeviceTypeList) + + self.print_step(8, "TH1 reads DUT Endpoint 0 Basic Information cluster VendorID attribute") + await self.read_single_attribute_expect_error( + dev_ctrl=self.th1, endpoint=0, + cluster=Clusters.Objects.BasicInformation, + attribute=Clusters.BasicInformation.Attributes.VendorID, + error=Status.UnsupportedAccess) + + self.print_step(9, "TH2 reads DUT Endpoint 0 Descriptor cluster DeviceTypeList attribute") + await self.read_single_attribute_expect_error( + dev_ctrl=self.th2, endpoint=0, + cluster=Clusters.Objects.Descriptor, + attribute=Clusters.Descriptor.Attributes.DeviceTypeList, + error=Status.UnsupportedAccess) + + self.print_step(10, "TH2 reads DUT Endpoint 0 Basic Information cluster VendorID attribute") + await self.read_single_attribute_check_success( + dev_ctrl=self.th2, endpoint=0, + cluster=Clusters.Objects.BasicInformation, + attribute=Clusters.BasicInformation.Attributes.VendorID) + + self.print_step(11, "TH1 resets the ACLs to default value by writing DUT EP0") + full_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( + privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, + authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, + subjects=[TH1_nodeid], + targets=[]) + + acl = [full_acl] + await self.write_acl(acl, self.th1) + + self.print_step( + 12, "TH1 removes the TH2 fabric by sending the RemoveFabric command to the DUT with the FabricIndex set to th2FabricIndex") + removeFabricCmd = Clusters.OperationalCredentials.Commands.RemoveFabric(th2FabricIndex) + await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 2cc23e41952315..77e79d980483ac 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -24,6 +24,7 @@ import os import pathlib import queue +import random import re import sys import typing @@ -42,6 +43,7 @@ from chip import ChipDeviceCtrl # Needed before chip.FabricAdmin import chip.FabricAdmin # Needed before chip.CertificateAuthority import chip.CertificateAuthority +from chip.ChipDeviceCtrl import CommissioningParameters # isort: on import chip.clusters as Clusters @@ -379,6 +381,12 @@ def cluster_id_str(id): return 'HERE IS THE PROBLEM' +@dataclass +class CustomCommissioningParameters: + commissioningParameters: CommissioningParameters + randomDiscriminator: int + + @dataclass class AttributePathLocation: endpoint_id: int @@ -728,6 +736,17 @@ def check_pics(self, pics_key: str) -> bool: pics_key = pics_key.strip() return pics_key in picsd and picsd[pics_key] + def openCommissioningWindow(self, dev_ctrl: ChipDeviceCtrl, node_id: int) -> CustomCommissioningParameters: + rnd_discriminator = random.randint(0, 4095) + try: + commissioning_params = dev_ctrl.OpenCommissioningWindow(nodeid=node_id, timeout=900, iteration=1000, + discriminator=rnd_discriminator, option=1) + params = CustomCommissioningParameters(commissioning_params, rnd_discriminator) + return params + + except InteractionModelError as e: + asserts.fail(e.status, 'Failed to open commissioning window') + async def read_single_attribute( self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object, fabricFiltered: bool = True) -> object: result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabricFiltered)