From 326dcb24fa9b8035b2bf22118af190eae2c011a5 Mon Sep 17 00:00:00 2001 From: Antonio Melo Jr Date: Thu, 5 Sep 2024 18:23:34 +0000 Subject: [PATCH] WIP --- .../matter_yamltests/hooks.py | 2 +- .../matter_yamltests/runner.py | 2 +- src/python_testing/TC_ACE_1_3.py | 238 +++++++++--------- src/python_testing/TC_ACL_2_2.py | 4 +- src/python_testing/TC_BOOLCFG_2_1.py | 18 +- src/python_testing/TC_OpstateCommon.py | 22 +- src/python_testing/matter_testing_support.py | 25 +- 7 files changed, 163 insertions(+), 148 deletions(-) diff --git a/scripts/py_matter_yamltests/matter_yamltests/hooks.py b/scripts/py_matter_yamltests/matter_yamltests/hooks.py index ca739b8ea27633..9bfd1c206af093 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/hooks.py +++ b/scripts/py_matter_yamltests/matter_yamltests/hooks.py @@ -152,7 +152,7 @@ def step_skipped(self, name: str, expression: str): """ pass - def step_start(self, request: TestStep): + def step_start(self, request: TestStep, endpoint: Optional[int] = None): """ This method is called when the runner starts running a step from the test. diff --git a/scripts/py_matter_yamltests/matter_yamltests/runner.py b/scripts/py_matter_yamltests/matter_yamltests/runner.py index 05fe8fc54e851c..cae057f5b59590 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/runner.py +++ b/scripts/py_matter_yamltests/matter_yamltests/runner.py @@ -225,7 +225,7 @@ async def _run(self, parser: TestParser, config: TestRunnerConfig): if config.options.delay_in_ms: await asyncio.sleep(config.options.delay_in_ms / 1000) - hooks.test_stop(round(test_duration)) + hooks.test_stop(exception=None, duration=round(test_duration)) except Exception as exception: status = exception diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index 3c64171571eb95..0640efba3ced2a 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -47,16 +47,16 @@ async def write_acl(self, acl): asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed") print(result) - async def read_descriptor_expect_success(self, th): + async def read_descriptor_expect_success(self, th, endpoint: int): cluster = Clusters.Objects.Descriptor attribute = Clusters.Descriptor.Attributes.DeviceTypeList - await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute) + await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=endpoint, cluster=cluster, attribute=attribute) - async def read_descriptor_expect_unsupported_access(self, th): + async def read_descriptor_expect_unsupported_access(self, th, endpoint: int): cluster = Clusters.Objects.Descriptor attribute = Clusters.Descriptor.Attributes.DeviceTypeList await self.read_single_attribute_expect_error( - dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) + dev_ctrl=th, endpoint=endpoint, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) def desc_TC_ACE_1_3(self) -> str: return "[TC-ACE-1.3] Subjects" @@ -137,6 +137,10 @@ async def test_TC_ACE_1_3(self): cat2v3 = cat2_id | 0x0003 logging.info('cat1v1 0x%x', cat1v1) + endpoint = 0 + if self.matter_test_config.endpoint != None: + endpoint = self.matter_test_config.endpoint + self.step(1) fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] @@ -156,266 +160,266 @@ async def test_TC_ACE_1_3(self): paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), catTags=[cat1v1, cat2v2]) - self.step(2) + self.step(2, endpoint=endpoint) TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH0_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=0x001f)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint, cluster=0x001f)]) all_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, all_view] await self.write_acl(acl) - self.step(3) - await self.read_descriptor_expect_success(TH1) + self.step(3, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(4) - await self.read_descriptor_expect_success(TH2) + self.step(4, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(5) - await self.read_descriptor_expect_success(TH3) + self.step(5, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(6) + self.step(6, endpoint=endpoint) th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th1_view] await self.write_acl(acl) - self.step(7) - await self.read_descriptor_expect_success(TH1) + self.step(7, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(8) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(8, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint=endpoint) - self.step(9) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(9, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint=endpoint) - self.step(10) + self.step(10, endpoint=endpoint) th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH2_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th2_view] await self.write_acl(acl) - self.step(11) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(11, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint=endpoint) - self.step(12) - await self.read_descriptor_expect_success(TH2) + self.step(12, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(13) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(13, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint=endpoint) - self.step(14) + self.step(14, endpoint=endpoint) th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th3_view] await self.write_acl(acl) - self.step(15) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(15, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint=endpoint) - self.step(16) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(16, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint=endpoint) - self.step(17) - await self.read_descriptor_expect_success(TH3) + self.step(17, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(18) + self.step(18, endpoint=endpoint) th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH2_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th12_view] await self.write_acl(acl) - self.step(19) - await self.read_descriptor_expect_success(TH1) + self.step(19, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(20) - await self.read_descriptor_expect_success(TH2) + self.step(20, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(21) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(21, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint=endpoint) - self.step(22) + self.step(22, endpoint=endpoint) th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th13_view] await self.write_acl(acl) - self.step(23) - await self.read_descriptor_expect_success(TH1) + self.step(23, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(24) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(24, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint=endpoint) - self.step(25) - await self.read_descriptor_expect_success(TH3) + self.step(25, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(26) + self.step(26, endpoint=endpoint) th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH2_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th23_view] await self.write_acl(acl) - self.step(27) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(27, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint=endpoint) - self.step(28) - await self.read_descriptor_expect_success(TH2) + self.step(28, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(29) - await self.read_descriptor_expect_success(TH3) + self.step(29, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(30) + self.step(30, endpoint=endpoint) th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH2_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, th123_view] await self.write_acl(acl) - self.step(31) - await self.read_descriptor_expect_success(TH1) + self.step(31, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(32) - await self.read_descriptor_expect_success(TH2) + self.step(32, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(33) - await self.read_descriptor_expect_success(TH3) + self.step(33, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(34) + self.step(34, endpoint=endpoint) cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v1)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat1v1_view] await self.write_acl(acl) - self.step(35) - await self.read_descriptor_expect_success(TH1) + self.step(35, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(36) - await self.read_descriptor_expect_success(TH2) + self.step(36, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(37) - await self.read_descriptor_expect_success(TH3) + self.step(37, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(38) + self.step(38, endpoint=endpoint) cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v2)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat1v2_view] await self.write_acl(acl) - self.step(39) - await self.read_descriptor_expect_success(TH1) + self.step(39, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(40) - await self.read_descriptor_expect_success(TH2) + self.step(40, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(41) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(41, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint=endpoint) - self.step(42) + self.step(42, endpoint=endpoint) cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v3)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat1v3_view] await self.write_acl(acl) - self.step(43) - await self.read_descriptor_expect_success(TH1) + self.step(43, endpoint=endpoint) + await self.read_descriptor_expect_success(TH1, endpoint=endpoint) - self.step(44) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(44, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint=endpoint) - self.step(45) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(45, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint=endpoint) - self.step(46) + self.step(46, endpoint=endpoint) cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v1)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat2v1_view] await self.write_acl(acl) - self.step(47) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(47, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint=endpoint) - self.step(48) - await self.read_descriptor_expect_success(TH2) + self.step(48, endpoint=endpoint) + await self.read_descriptor_expect_success(TH2, endpoint=endpoint) - self.step(49) - await self.read_descriptor_expect_success(TH3) + self.step(49, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(50) + self.step(50, endpoint=endpoint) cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v2)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat2v2_view] await self.write_acl(acl) - self.step(51) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(51, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint=endpoint) - self.step(52) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(52, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint=endpoint) - self.step(53) - await self.read_descriptor_expect_success(TH3) + self.step(53, endpoint=endpoint) + await self.read_descriptor_expect_success(TH3, endpoint=endpoint) - self.step(54) + self.step(54, endpoint=endpoint) cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v3)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) acl = [TH0_admin_acl, cat2v3_view] await self.write_acl(acl) - self.step(55) - await self.read_descriptor_expect_unsupported_access(TH1) + self.step(55, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH1, endpoint=endpoint) - self.step(56) - await self.read_descriptor_expect_unsupported_access(TH2) + self.step(56, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH2, endpoint=endpoint) - self.step(57) - await self.read_descriptor_expect_unsupported_access(TH3) + self.step(57, endpoint=endpoint) + await self.read_descriptor_expect_unsupported_access(TH3, endpoint=endpoint) self.step(58) diff --git a/src/python_testing/TC_ACL_2_2.py b/src/python_testing/TC_ACL_2_2.py index a1e755b7397f88..d2bf7d1733ec9b 100644 --- a/src/python_testing/TC_ACL_2_2.py +++ b/src/python_testing/TC_ACL_2_2.py @@ -45,7 +45,7 @@ def steps_TC_ACL_2_2(self) -> list[TestStep]: @async_test_body async def test_TC_ACL_2_2(self): self.step(1) - self.step(2) + self.step(2, endpoint=0) data = await self.default_controller.ReadAttribute(nodeid=self.dut_node_id, attributes=[(Clusters.Descriptor.Attributes.ServerList)]) asserts.assert_true(Clusters.AccessControl.id in data[0][Clusters.Descriptor] [Clusters.Descriptor.Attributes.ServerList], "ACL cluster not on EP0") @@ -53,6 +53,8 @@ async def test_TC_ACL_2_2(self): for endpoint, ep_data in data.items(): if endpoint == 0: continue + else: + self.current_step_endpoint = endpoint asserts.assert_false(Clusters.AccessControl.id in ep_data[Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList], f"ACL cluster incorrectly present on endpoint {endpoint}") diff --git a/src/python_testing/TC_BOOLCFG_2_1.py b/src/python_testing/TC_BOOLCFG_2_1.py index 484d590b258b6d..452b77b499b39c 100644 --- a/src/python_testing/TC_BOOLCFG_2_1.py +++ b/src/python_testing/TC_BOOLCFG_2_1.py @@ -74,12 +74,12 @@ async def test_TC_BOOLCFG_2_1(self): self.step(1) attributes = Clusters.BooleanStateConfiguration.Attributes - self.step(2) + self.step(2, endpoint=endpoint) attribute_list = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) number_of_supported_levels = 0 - self.step(3) + self.step(3, endpoint=endpoint) if attributes.SupportedSensitivityLevels.attribute_id in attribute_list: number_of_supported_levels = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SupportedSensitivityLevels) asserts.assert_less_equal(number_of_supported_levels, 10, "SupportedSensitivityLevels attribute is out of range") @@ -87,7 +87,7 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(4) + self.step(4, endpoint=endpoint) if attributes.CurrentSensitivityLevel.attribute_id in attribute_list: current_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.CurrentSensitivityLevel) asserts.assert_less_equal(current_sensitivity_level_dut, number_of_supported_levels, @@ -95,7 +95,7 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(5) + self.step(5, endpoint=endpoint) if attributes.DefaultSensitivityLevel.attribute_id in attribute_list: default_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.DefaultSensitivityLevel) asserts.assert_less_equal(default_sensitivity_level_dut, number_of_supported_levels, @@ -103,35 +103,35 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(6) + self.step(6, endpoint=endpoint) if attributes.AlarmsActive.attribute_id in attribute_list: alarms_active_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsActive) asserts.assert_equal(alarms_active_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsActive is not in valid range") else: logging.info("Test step skipped") - self.step(7) + self.step(7, endpoint=endpoint) if attributes.AlarmsSuppressed.attribute_id in attribute_list: alarms_suppressed_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSuppressed) asserts.assert_equal(alarms_suppressed_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSuppressed is not in valid range") else: logging.info("Test step skipped") - self.step(8) + self.step(8, endpoint=endpoint) if attributes.AlarmsEnabled.attribute_id in attribute_list: alarms_enabled_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsEnabled) asserts.assert_equal(alarms_enabled_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsEnabled is not in valid range") else: logging.info("Test step skipped") - self.step(9) + self.step(9, endpoint=endpoint) if attributes.AlarmsSupported.attribute_id in attribute_list: alarms_supported_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSupported) asserts.assert_equal(alarms_supported_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSupported is not in valid range") else: logging.info("Test step skipped") - self.step(10) + self.step(10, endpoint=endpoint) if attributes.SensorFault.attribute_id in attribute_list: sensor_fault_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SensorFault) asserts.assert_equal(sensor_fault_dut & ~all_sensor_fault_bitmap_bits, 0, "SensorFault is not in valid range") diff --git a/src/python_testing/TC_OpstateCommon.py b/src/python_testing/TC_OpstateCommon.py index e0e252ea180eb0..9622818e384108 100644 --- a/src/python_testing/TC_OpstateCommon.py +++ b/src/python_testing/TC_OpstateCommon.py @@ -1250,16 +1250,16 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): self.init_test() # commission - self.step(1) + self.step(1, endpoint) # Note that this does a subscribe-all instead of subscribing only to the CountdownTime attribute. # To-Do: Update the TP to subscribe-all. - self.step(2) + self.step(2, endpoint) sub_handler = ClusterAttributeChangeAccumulator(cluster) await sub_handler.start(self.default_controller, self.dut_node_id, endpoint) if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")): - self.step(3) + self.step(3, endpoint) self.send_manual_or_pipe_command(name="OperationalStateChange", device=self.device, operation="Start") @@ -1275,10 +1275,10 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): self.skip_step(3) sub_handler.reset() - self.step(4) + self.step(4, endpoint) countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if countdownTime is not NullValue: - self.step(5) + self.step(5, endpoint) logging.info('Test will now collect data for 10 seconds') time.sleep(10) @@ -1289,13 +1289,13 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): else: self.skip_step(5) - self.step(6) + self.step(6, endpoint) countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) attr_value = await self.read_expect_success( endpoint=endpoint, attribute=attributes.OperationalState) if attr_value == cluster.Enums.OperationalStateEnum.kRunning and countdownTime is not NullValue: - self.step(7) + self.step(7, endpoint) wait_count = 0 while (attr_value != cluster.Enums.OperationalStateEnum.kStopped) and (wait_count < 20): time.sleep(1) @@ -1311,7 +1311,7 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): sub_handler.reset() if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")): - self.step(8) + self.step(8, endpoint) self.send_manual_or_pipe_command(name="OperationalStateChange", device=self.device, operation="Start") @@ -1326,16 +1326,16 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1): else: self.skip_step(8) - self.step(9) + self.step(9, endpoint) await self.read_and_expect_value(endpoint=endpoint, attribute=attributes.OperationalState, expected_value=cluster.Enums.OperationalStateEnum.kRunning) sub_handler.reset() - self.step(10) + self.step(10, endpoint) countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime) if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")) and countdownTime is not NullValue: - self.step(11) + self.step(11, endpoint) self.send_manual_or_pipe_command(name="OperationalStateChange", device=self.device, operation="Pause") diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index c5d46e2306dae2..60667af52b3aa9 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -584,9 +584,10 @@ def step_skipped(self, name: str, expression: str): # TODO: Do we really need the expression as a string? We can evaluate this in code very easily logging.info(f'\t\t**** Skipping: {name}') - def step_start(self, name: str): - # The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these - logging.info(f'\t\t***** Test Step {name}') + def step_start(self, name: str, endpoint: int | None = None): + # TODO: The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these + endpoint_info = f" with endpoint {endpoint}" if endpoint is not None else "" + logging.info(f'\t\t***** Test Step {name} started{endpoint_info} ') def step_success(self, logger, logs, duration: int, request): pass @@ -915,6 +916,7 @@ def hex_from_bytes(b: bytes) -> str: class TestStep: test_plan_number: typing.Union[int, str] description: str + endpoint: int | None = None expectation: str = "" is_commissioning: bool = False @@ -1451,7 +1453,7 @@ def skip_all_remaining_steps(self, starting_step_number): for step in remaining: self.skip_step(step.test_plan_number) - def step(self, step: typing.Union[int, str]): + def step(self, step: typing.Union[int, str], endpoint: Optional[int] = None): test_name = self.current_test_info.name steps = self.get_test_steps(test_name) @@ -1459,6 +1461,7 @@ def step(self, step: typing.Union[int, str]): if len(steps) <= self.current_step_index or steps[self.current_step_index].test_plan_number != step: asserts.fail(f'Unexpected test step: {step} - steps not called in order, or step does not exist') + current_step = steps[self.current_step_index] if self.runner_hook: # If we've reached the next step with no assertion and the step wasn't skipped, it passed if not self.step_skipped and self.current_step_index != 0: @@ -1467,13 +1470,19 @@ def step(self, step: typing.Union[int, str]): self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) # TODO: it seems like the step start should take a number and a name - name = f'{step} : {steps[self.current_step_index].description}' - self.runner_hook.step_start(name=name) + name = f'{step} : {current_step.description}' + endpoint_info = f" with endpoint {endpoint}" if endpoint is not None else "" + logging.info( + f"========= Step {name} started{endpoint_info} =========" + ) + + current_step.endpoint = endpoint + self.runner_hook.step_start(name=name, endpoint=current_step.endpoint) else: - self.print_step(step, steps[self.current_step_index].description) + self.print_step(step, current_step.description) self.step_start_time = datetime.now(tz=timezone.utc) - self.current_step_index = self.current_step_index + 1 + self.current_step_index += 1 self.step_skipped = False def get_setup_payload_info(self) -> List[SetupPayloadInfo]: