diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index c9655c780f358a..e8325198dd191a 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -504,7 +504,7 @@ class MatterTestConfig: # List of explicit tests to run by name. If empty, all tests will run tests: List[str] = field(default_factory=list) timeout: typing.Union[int, None] = None - endpoint: int = 0 + endpoint: typing.Union[int, None] = 0 app_pid: int = 0 commissioning_method: Optional[str] = None @@ -980,7 +980,7 @@ async def read_single_attribute_check_success( if node_id is None: node_id = self.dut_node_id if endpoint is None: - endpoint = self.matter_test_config.endpoint + endpoint = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabric_filtered) attr_ret = result[endpoint][cluster][attribute] @@ -1012,7 +1012,7 @@ async def read_single_attribute_expect_error( if node_id is None: node_id = self.dut_node_id if endpoint is None: - endpoint = self.matter_test_config.endpoint + endpoint = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabric_filtered) attr_ret = result[endpoint][cluster][attribute] @@ -1041,12 +1041,13 @@ async def write_single_attribute(self, attribute_value: object, endpoint_id: int """ dev_ctrl = self.default_controller node_id = self.dut_node_id - endpoint = self.matter_test_config.endpoint if endpoint_id is None else endpoint_id + if endpoint_id is None: + endpoint_id = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint - write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint, attribute_value)]) + write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint_id, attribute_value)]) if expect_success: asserts.assert_equal(write_result[0].Status, Status.Success, - f"Expected write success for write to attribute {attribute_value} on endpoint {endpoint}") + f"Expected write success for write to attribute {attribute_value} on endpoint {endpoint_id}") return write_result[0].Status async def send_single_cmd( @@ -1059,7 +1060,7 @@ async def send_single_cmd( if node_id is None: node_id = self.dut_node_id if endpoint is None: - endpoint = self.matter_test_config.endpoint + endpoint = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs, payloadCapability=payloadCapability) @@ -1584,7 +1585,7 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig: config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS) config.tests = [] if args.tests is None else args.tests config.timeout = args.timeout # This can be none, we pull the default from the test if it's unspecified - config.endpoint = 0 if args.endpoint is None else args.endpoint + config.endpoint = args.endpoint config.app_pid = 0 if args.app_pid is None else args.app_pid config.controller_node_id = args.controller_node_id @@ -1863,7 +1864,17 @@ async def get_accepted_endpoints_for_test(self: MatterBaseTest, accept_function: Returns a list of endpoints on which the test should be run given the accept_function for the test. """ - wildcard = await self.default_controller.Read(self.dut_node_id, [()]) + + if self.matter_test_config.endpoint is not None: + msg = """ + The --endpoint flag is disallowed for tests that run on all matching endpoints. + To enable running against a single endpoint for development purposes, use + --int-arg force_endpoint:1 (where "1" can be replaced with the desired endpoint) + Note that using force_endpoint is disallowed at certification and should be used + ONLY FOR DEVELOPMENT. + """ + asserts.fail(msg) + matching = [e for e in wildcard.attributes.keys() if accept_function(wildcard, e)] forced_endpoint = self.user_params.get('force_endpoint', None) if forced_endpoint is None: @@ -1931,7 +1942,6 @@ def per_endpoint_runner(self: MatterBaseTest, *args, **kwargs): # Ditto for teardown - we want to tear down after each iteration, and we want to notify the hook that # the test iteration is stopped. test_stop is called by on_pass or on_fail during the last iteration or # on failure. - original_ep = self.matter_test_config.endpoint for e in endpoints: logging.info(f'Running test on endpoint {e}') if e != endpoints[0]: @@ -1942,7 +1952,7 @@ def per_endpoint_runner(self: MatterBaseTest, *args, **kwargs): self.teardown_test() test_duration = (datetime.now(timezone.utc) - self.test_start_time) / timedelta(microseconds=1) self.runner_hook.test_stop(exception=None, duration=test_duration) - self.matter_test_config.endpoint = original_ep + self.matter_test_config.endpoint = None return per_endpoint_runner return run_for_each_matching_endpoint_internal diff --git a/src/python_testing/test_testing/TestDecorators.py b/src/python_testing/test_testing/TestDecorators.py index 083c128aaed903..c9e84946b92a08 100644 --- a/src/python_testing/test_testing/TestDecorators.py +++ b/src/python_testing/test_testing/TestDecorators.py @@ -108,6 +108,7 @@ class TestDecorators(MatterBaseTest): def teardown_test(self): if 'force_endpoint' in self.user_params.keys(): del self.user_params['force_endpoint'] + self.matter_test_config.endpoint = None def test_checkers(self): has_onoff = has_cluster(Clusters.OnOff) @@ -182,6 +183,16 @@ async def test_force_endpoint_bad(self): await get_accepted_endpoints_for_test(self, has_onoff) + @async_test_body + async def test_endpoints_with_endpoint_flag_set_failure(self): + ''' This test should cause an assertion because --endpoint flag is set.''' + has_onoff = has_cluster(Clusters.OnOff) + all_endpoints = await self.default_controller.Read(self.dut_node_id, [()]) + all_endpoints = list(all_endpoints.attributes.keys()) + self.matter_test_config.endpoint = 0 + + await get_accepted_endpoints_for_test(self, has_onoff) + # This test should cause an assertion because it has pics_ method @run_once_for_node @@ -300,6 +311,12 @@ def main(): if ok: failures.append("Test case failure: test_force_endpoint_bad") + test_runner.set_test('TestDecorators.py', 'TestDecorators', 'test_endpoints_with_endpoint_flag_set_failure') + read_resp = get_clusters([0, 1]) + ok = test_runner.run_test_with_mock_read(read_resp, hooks) + if ok: + failures.append("Test case failure: test_endpoints_with_endpoint_flag_set_failure") + test_name = 'test_whole_node_with_pics' test_runner.set_test('TestDecorators.py', 'TestDecorators', test_name) ok = test_runner.run_test_with_mock_read(read_resp, hooks)