diff --git a/scripts/py_matter_yamltests/matter_yamltests/hooks.py b/scripts/py_matter_yamltests/matter_yamltests/hooks.py index d14e90a7aabe90..ca739b8ea27633 100644 --- a/scripts/py_matter_yamltests/matter_yamltests/hooks.py +++ b/scripts/py_matter_yamltests/matter_yamltests/hooks.py @@ -152,7 +152,7 @@ def step_skipped(self, name: str, expression: str): """ pass - def step_start(self, request: TestStep, endpoint: Optional[int] = None): + def step_start(self, request: TestStep): """ This method is called when the runner starts running a step from the test. @@ -160,8 +160,6 @@ def step_start(self, request: TestStep, endpoint: Optional[int] = None): ---------- request: TestStep The original request as defined by the test step. - endpoint: int - An optional device endpoint the step will target. """ pass diff --git a/src/python_testing/TC_ACE_1_3.py b/src/python_testing/TC_ACE_1_3.py index 2868d9548f7733..1672eb65af2a51 100644 --- a/src/python_testing/TC_ACE_1_3.py +++ b/src/python_testing/TC_ACE_1_3.py @@ -54,16 +54,16 @@ async def write_acl(self, acl): asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed") print(result) - async def read_descriptor_expect_success(self, th, endpoint: int): + async def read_descriptor_expect_success(self, th): cluster = Clusters.Objects.Descriptor attribute = Clusters.Descriptor.Attributes.DeviceTypeList - await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=endpoint, cluster=cluster, attribute=attribute) + await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute) - async def read_descriptor_expect_unsupported_access(self, th, endpoint: int): + async def read_descriptor_expect_unsupported_access(self, th): cluster = Clusters.Objects.Descriptor attribute = Clusters.Descriptor.Attributes.DeviceTypeList await self.read_single_attribute_expect_error( - dev_ctrl=th, endpoint=endpoint, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) + dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess) def desc_TC_ACE_1_3(self) -> str: return "[TC-ACE-1.3] Subjects" @@ -144,8 +144,6 @@ async def test_TC_ACE_1_3(self): cat2v3 = cat2_id | 0x0003 logging.info('cat1v1 0x%x', cat1v1) - endpoint = 0 - self.step(1) fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] @@ -165,266 +163,266 @@ async def test_TC_ACE_1_3(self): paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path), catTags=[cat1v1, cat2v2]) - self.step(2, endpoint) + self.step(2) TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH0_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint, cluster=0x001f)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=0x001f)]) all_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, all_view] await self.write_acl(acl) - self.step(3, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(3) + await self.read_descriptor_expect_success(TH1) - self.step(4, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(4) + await self.read_descriptor_expect_success(TH2) - self.step(5, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(5) + await self.read_descriptor_expect_success(TH3) - self.step(6, endpoint) + self.step(6) th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th1_view] await self.write_acl(acl) - self.step(7, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(7) + await self.read_descriptor_expect_success(TH1) - self.step(8, endpoint) - await self.read_descriptor_expect_unsupported_access(TH2, endpoint) + self.step(8) + await self.read_descriptor_expect_unsupported_access(TH2) - self.step(9, endpoint) - await self.read_descriptor_expect_unsupported_access(TH3, endpoint) + self.step(9) + await self.read_descriptor_expect_unsupported_access(TH3) - self.step(10, endpoint) + self.step(10) th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH2_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th2_view] await self.write_acl(acl) - self.step(11, endpoint) - await self.read_descriptor_expect_unsupported_access(TH1, endpoint) + self.step(11) + await self.read_descriptor_expect_unsupported_access(TH1) - self.step(12, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(12) + await self.read_descriptor_expect_success(TH2) - self.step(13, endpoint) - await self.read_descriptor_expect_unsupported_access(TH3, endpoint) + self.step(13) + await self.read_descriptor_expect_unsupported_access(TH3) - self.step(14, endpoint) + self.step(14) th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th3_view] await self.write_acl(acl) - self.step(15, endpoint) - await self.read_descriptor_expect_unsupported_access(TH1, endpoint) + self.step(15) + await self.read_descriptor_expect_unsupported_access(TH1) - self.step(16, endpoint) - await self.read_descriptor_expect_unsupported_access(TH2, endpoint) + self.step(16) + await self.read_descriptor_expect_unsupported_access(TH2) - self.step(17, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(17) + await self.read_descriptor_expect_success(TH3) - self.step(18, endpoint) + self.step(18) th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH2_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th12_view] await self.write_acl(acl) - self.step(19, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(19) + await self.read_descriptor_expect_success(TH1) - self.step(20, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(20) + await self.read_descriptor_expect_success(TH2) - self.step(21, endpoint) - await self.read_descriptor_expect_unsupported_access(TH3, endpoint) + self.step(21) + await self.read_descriptor_expect_unsupported_access(TH3) - self.step(22, endpoint) + self.step(22) th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th13_view] await self.write_acl(acl) - self.step(23, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(23) + await self.read_descriptor_expect_success(TH1) - self.step(24, endpoint) - await self.read_descriptor_expect_unsupported_access(TH2, endpoint) + self.step(24) + await self.read_descriptor_expect_unsupported_access(TH2) - self.step(25, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(25) + await self.read_descriptor_expect_success(TH3) - self.step(26, endpoint) + self.step(26) th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH2_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th23_view] await self.write_acl(acl) - self.step(27, endpoint) - await self.read_descriptor_expect_unsupported_access(TH1, endpoint) + self.step(27) + await self.read_descriptor_expect_unsupported_access(TH1) - self.step(28, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(28) + await self.read_descriptor_expect_success(TH2) - self.step(29, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(29) + await self.read_descriptor_expect_success(TH3) - self.step(30, endpoint) + self.step(30) th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[TH1_nodeid, TH2_nodeid, TH3_nodeid], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, th123_view] await self.write_acl(acl) - self.step(31, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(31) + await self.read_descriptor_expect_success(TH1) - self.step(32, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(32) + await self.read_descriptor_expect_success(TH2) - self.step(33, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(33) + await self.read_descriptor_expect_success(TH3) - self.step(34, endpoint) + self.step(34) cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v1)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, cat1v1_view] await self.write_acl(acl) - self.step(35, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(35) + await self.read_descriptor_expect_success(TH1) - self.step(36, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(36) + await self.read_descriptor_expect_success(TH2) - self.step(37, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(37) + await self.read_descriptor_expect_success(TH3) - self.step(38, endpoint) + self.step(38) cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v2)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, cat1v2_view] await self.write_acl(acl) - self.step(39, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(39) + await self.read_descriptor_expect_success(TH1) - self.step(40, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(40) + await self.read_descriptor_expect_success(TH2) - self.step(41, endpoint) - await self.read_descriptor_expect_unsupported_access(TH3, endpoint) + self.step(41) + await self.read_descriptor_expect_unsupported_access(TH3) - self.step(42, endpoint) + self.step(42) cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat1v3)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, cat1v3_view] await self.write_acl(acl) - self.step(43, endpoint) - await self.read_descriptor_expect_success(TH1, endpoint) + self.step(43) + await self.read_descriptor_expect_success(TH1) - self.step(44, endpoint) - await self.read_descriptor_expect_unsupported_access(TH2, endpoint) + self.step(44) + await self.read_descriptor_expect_unsupported_access(TH2) - self.step(45, endpoint) - await self.read_descriptor_expect_unsupported_access(TH3, endpoint) + self.step(45) + await self.read_descriptor_expect_unsupported_access(TH3) - self.step(46, endpoint) + self.step(46) cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v1)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, cat2v1_view] await self.write_acl(acl) - self.step(47, endpoint) - await self.read_descriptor_expect_unsupported_access(TH1, endpoint) + self.step(47) + await self.read_descriptor_expect_unsupported_access(TH1) - self.step(48, endpoint) - await self.read_descriptor_expect_success(TH2, endpoint) + self.step(48) + await self.read_descriptor_expect_success(TH2) - self.step(49, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(49) + await self.read_descriptor_expect_success(TH3) - self.step(50, endpoint) + self.step(50) cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v2)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, cat2v2_view] await self.write_acl(acl) - self.step(51, endpoint) - await self.read_descriptor_expect_unsupported_access(TH1, endpoint) + self.step(51) + await self.read_descriptor_expect_unsupported_access(TH1) - self.step(52, endpoint) - await self.read_descriptor_expect_unsupported_access(TH2, endpoint) + self.step(52) + await self.read_descriptor_expect_unsupported_access(TH2) - self.step(53, endpoint) - await self.read_descriptor_expect_success(TH3, endpoint) + self.step(53) + await self.read_descriptor_expect_success(TH3) - self.step(54, endpoint) + self.step(54) cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct( privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[acl_subject(cat2v3)], - targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=endpoint)]) + targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)]) acl = [TH0_admin_acl, cat2v3_view] await self.write_acl(acl) - self.step(55, endpoint) - await self.read_descriptor_expect_unsupported_access(TH1, endpoint) + self.step(55) + await self.read_descriptor_expect_unsupported_access(TH1) - self.step(56, endpoint) - await self.read_descriptor_expect_unsupported_access(TH2, endpoint) + self.step(56) + await self.read_descriptor_expect_unsupported_access(TH2) - self.step(57, endpoint) - await self.read_descriptor_expect_unsupported_access(TH3, endpoint) + self.step(57) + await self.read_descriptor_expect_unsupported_access(TH3) self.step(58) diff --git a/src/python_testing/TC_BOOLCFG_2_1.py b/src/python_testing/TC_BOOLCFG_2_1.py index 2e90245f479d3d..2456457ce26ae7 100644 --- a/src/python_testing/TC_BOOLCFG_2_1.py +++ b/src/python_testing/TC_BOOLCFG_2_1.py @@ -81,12 +81,12 @@ async def test_TC_BOOLCFG_2_1(self): self.step(1) attributes = Clusters.BooleanStateConfiguration.Attributes - self.step(2, endpoint) + self.step(2) attribute_list = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) number_of_supported_levels = 0 - self.step(3, endpoint) + self.step(3) if attributes.SupportedSensitivityLevels.attribute_id in attribute_list: number_of_supported_levels = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SupportedSensitivityLevels) asserts.assert_less_equal(number_of_supported_levels, 10, "SupportedSensitivityLevels attribute is out of range") @@ -94,7 +94,7 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(4, endpoint) + self.step(4) if attributes.CurrentSensitivityLevel.attribute_id in attribute_list: current_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.CurrentSensitivityLevel) asserts.assert_less_equal(current_sensitivity_level_dut, number_of_supported_levels, @@ -102,7 +102,7 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(5, endpoint) + self.step(5) if attributes.DefaultSensitivityLevel.attribute_id in attribute_list: default_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.DefaultSensitivityLevel) asserts.assert_less_equal(default_sensitivity_level_dut, number_of_supported_levels, @@ -110,35 +110,35 @@ async def test_TC_BOOLCFG_2_1(self): else: logging.info("Test step skipped") - self.step(6, endpoint) + self.step(6) if attributes.AlarmsActive.attribute_id in attribute_list: alarms_active_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsActive) asserts.assert_equal(alarms_active_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsActive is not in valid range") else: logging.info("Test step skipped") - self.step(7, endpoint) + self.step(7) if attributes.AlarmsSuppressed.attribute_id in attribute_list: alarms_suppressed_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSuppressed) asserts.assert_equal(alarms_suppressed_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSuppressed is not in valid range") else: logging.info("Test step skipped") - self.step(8, endpoint) + self.step(8) if attributes.AlarmsEnabled.attribute_id in attribute_list: alarms_enabled_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsEnabled) asserts.assert_equal(alarms_enabled_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsEnabled is not in valid range") else: logging.info("Test step skipped") - self.step(9, endpoint) + self.step(9) if attributes.AlarmsSupported.attribute_id in attribute_list: alarms_supported_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSupported) asserts.assert_equal(alarms_supported_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSupported is not in valid range") else: logging.info("Test step skipped") - self.step(10, endpoint) + self.step(10) if attributes.SensorFault.attribute_id in attribute_list: sensor_fault_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SensorFault) asserts.assert_equal(sensor_fault_dut & ~all_sensor_fault_bitmap_bits, 0, "SensorFault is not in valid range") diff --git a/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py b/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py index b16d1b42a08b45..0a40b1688d74cf 100644 --- a/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py +++ b/src/python_testing/matter_testing_infrastructure/chip/testing/matter_testing.py @@ -585,9 +585,9 @@ def step_skipped(self, name: str, expression: str): # TODO: Do we really need the expression as a string? We can evaluate this in code very easily logging.info(f'\t\t**** Skipping: {name}') - def step_start(self, name: str, endpoint: int | None = None): - # TODO: The way I'm calling this, the name already includes the step number, but it seems like it might be good to separate these - logging.info(f'\t\t***** Test Step {name} started with endpoint {endpoint} ') + def step_start(self, name: str): + # The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these + logging.info(f'\t\t***** Test Step {name}') def step_success(self, logger, logs, duration: int, request): pass @@ -916,7 +916,6 @@ def hex_from_bytes(b: bytes) -> str: class TestStep: test_plan_number: typing.Union[int, str] description: str - endpoint: int | None = None expectation: str = "" is_commissioning: bool = False @@ -1284,9 +1283,8 @@ async def check_test_event_triggers_enabled(self): test_event_enabled = await self.read_single_attribute_check_success(endpoint=0, cluster=cluster, attribute=full_attr) asserts.assert_equal(test_event_enabled, True, "TestEventTriggersEnabled is False") - def print_step(self, stepnum: typing.Union[int, str], title: str, endpoint: int | None = None) -> None: - endpoint_info = f" with endpoint {endpoint}" if endpoint is not None else "" - logging.info(f'***** Test Step {stepnum} : {title}{endpoint_info}') + def print_step(self, stepnum: typing.Union[int, str], title: str) -> None: + logging.info(f'***** Test Step {stepnum} : {title}') def record_error(self, test_name: str, location: ProblemLocation, problem: str, spec_location: str = ""): self.problems.append(ProblemNotice(test_name, location, ProblemSeverity.ERROR, problem, spec_location)) @@ -1455,7 +1453,7 @@ def skip_all_remaining_steps(self, starting_step_number): for step in remaining: self.skip_step(step.test_plan_number) - def step(self, step: typing.Union[int, str], endpoint: Optional[int] = None): + def step(self, step: typing.Union[int, str]): test_name = self.current_test_info.name steps = self.get_test_steps(test_name) @@ -1464,7 +1462,7 @@ def step(self, step: typing.Union[int, str], endpoint: Optional[int] = None): asserts.fail(f'Unexpected test step: {step} - steps not called in order, or step does not exist') current_step = steps[self.current_step_index] - self.print_step(step, current_step.description, endpoint) + self.print_step(step, current_step.description) if self.runner_hook: # If we've reached the next step with no assertion and the step wasn't skipped, it passed @@ -1473,14 +1471,12 @@ def step(self, step: typing.Union[int, str], endpoint: Optional[int] = None): step_duration = (datetime.now(timezone.utc) - self.step_start_time) / timedelta(microseconds=1) self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None) - current_step.endpoint = endpoint - # TODO: it seems like the step start should take a number and a name name = f'{step} : {current_step.description}' + self.runner_hook.step_start(name=name) - self.runner_hook.step_start(name=name, endpoint=current_step.endpoint) self.step_start_time = datetime.now(tz=timezone.utc) - self.current_step_index += 1 + self.current_step_index = self.current_step_index + 1 self.step_skipped = False def get_setup_payload_info(self) -> List[SetupPayloadInfo]: diff --git a/src/python_testing/test_testing/TestDecorators.py b/src/python_testing/test_testing/TestDecorators.py index ce32518b71c7c3..c4851eaee56c35 100644 --- a/src/python_testing/test_testing/TestDecorators.py +++ b/src/python_testing/test_testing/TestDecorators.py @@ -76,7 +76,7 @@ def test_stop(self, exception: Exception, duration: int): def step_skipped(self, name: str, expression: str): pass - def step_start(self, name: str, endpoint: Optional[int] = None): + def step_start(self, name: str): pass def step_success(self, logger, logs, duration: int, request):