Skip to content

Commit

Permalink
Disallow --endpoint with automatic endpoint selection
Browse files Browse the repository at this point in the history
  • Loading branch information
cecille committed Aug 7, 2024
1 parent 3bd4b45 commit 4429a43
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
32 changes: 21 additions & 11 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ class MatterTestConfig:
# List of explicit tests to run by name. If empty, all tests will run
tests: List[str] = field(default_factory=list)
timeout: typing.Union[int, None] = None
endpoint: int = 0
endpoint: typing.Union[int, None] = 0
app_pid: int = 0

commissioning_method: Optional[str] = None
Expand Down Expand Up @@ -980,7 +980,7 @@ async def read_single_attribute_check_success(
if node_id is None:
node_id = self.dut_node_id
if endpoint is None:
endpoint = self.matter_test_config.endpoint
endpoint = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint

result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabric_filtered)
attr_ret = result[endpoint][cluster][attribute]
Expand Down Expand Up @@ -1012,7 +1012,7 @@ async def read_single_attribute_expect_error(
if node_id is None:
node_id = self.dut_node_id
if endpoint is None:
endpoint = self.matter_test_config.endpoint
endpoint = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint

result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)], fabricFiltered=fabric_filtered)
attr_ret = result[endpoint][cluster][attribute]
Expand Down Expand Up @@ -1041,12 +1041,13 @@ async def write_single_attribute(self, attribute_value: object, endpoint_id: int
"""
dev_ctrl = self.default_controller
node_id = self.dut_node_id
endpoint = self.matter_test_config.endpoint if endpoint_id is None else endpoint_id
if endpoint_id is None:
endpoint_id = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint

write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint, attribute_value)])
write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint_id, attribute_value)])
if expect_success:
asserts.assert_equal(write_result[0].Status, Status.Success,
f"Expected write success for write to attribute {attribute_value} on endpoint {endpoint}")
f"Expected write success for write to attribute {attribute_value} on endpoint {endpoint_id}")
return write_result[0].Status

async def send_single_cmd(
Expand All @@ -1059,7 +1060,7 @@ async def send_single_cmd(
if node_id is None:
node_id = self.dut_node_id
if endpoint is None:
endpoint = self.matter_test_config.endpoint
endpoint = 0 if self.matter_test_config.endpoint is None else self.matter_test_config.endpoint

result = await dev_ctrl.SendCommand(nodeid=node_id, endpoint=endpoint, payload=cmd, timedRequestTimeoutMs=timedRequestTimeoutMs,
payloadCapability=payloadCapability)
Expand Down Expand Up @@ -1584,7 +1585,7 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config.pics = {} if args.PICS is None else read_pics_from_file(args.PICS)
config.tests = [] if args.tests is None else args.tests
config.timeout = args.timeout # This can be none, we pull the default from the test if it's unspecified
config.endpoint = 0 if args.endpoint is None else args.endpoint
config.endpoint = args.endpoint
config.app_pid = 0 if args.app_pid is None else args.app_pid

config.controller_node_id = args.controller_node_id
Expand Down Expand Up @@ -1863,7 +1864,17 @@ async def get_accepted_endpoints_for_test(self: MatterBaseTest, accept_function:
Returns a list of endpoints on which the test should be run given the accept_function for the test.
"""
wildcard = await self.default_controller.Read(self.dut_node_id, [()])

if self.matter_test_config.endpoint is not None:
msg = """
The --endpoint flag is disallowed for tests that run on all matching endpoints.
To enable running against a single endpoint for development purposes, use
--int-arg force_endpoint:1 (where "1" can be replaced with the desired endpoint)
Note that using force_endpoint is disallowed at certification and should be used
ONLY FOR DEVELOPMENT.
"""
asserts.fail(msg)

matching = [e for e in wildcard.attributes.keys() if accept_function(wildcard, e)]
forced_endpoint = self.user_params.get('force_endpoint', None)
if forced_endpoint is None:
Expand Down Expand Up @@ -1931,7 +1942,6 @@ def per_endpoint_runner(self: MatterBaseTest, *args, **kwargs):
# Ditto for teardown - we want to tear down after each iteration, and we want to notify the hook that
# the test iteration is stopped. test_stop is called by on_pass or on_fail during the last iteration or
# on failure.
original_ep = self.matter_test_config.endpoint
for e in endpoints:
logging.info(f'Running test on endpoint {e}')
if e != endpoints[0]:
Expand All @@ -1942,7 +1952,7 @@ def per_endpoint_runner(self: MatterBaseTest, *args, **kwargs):
self.teardown_test()
test_duration = (datetime.now(timezone.utc) - self.test_start_time) / timedelta(microseconds=1)
self.runner_hook.test_stop(exception=None, duration=test_duration)
self.matter_test_config.endpoint = original_ep
self.matter_test_config.endpoint = None
return per_endpoint_runner
return run_for_each_matching_endpoint_internal

Expand Down
17 changes: 17 additions & 0 deletions src/python_testing/test_testing/TestDecorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class TestDecorators(MatterBaseTest):
def teardown_test(self):
if 'force_endpoint' in self.user_params.keys():
del self.user_params['force_endpoint']
self.matter_test_config.endpoint = None

def test_checkers(self):
has_onoff = has_cluster(Clusters.OnOff)
Expand Down Expand Up @@ -182,6 +183,16 @@ async def test_force_endpoint_bad(self):

await get_accepted_endpoints_for_test(self, has_onoff)

@async_test_body
async def test_endpoints_with_endpoint_flag_set_failure(self):
''' This test should cause an assertion because --endpoint flag is set.'''
has_onoff = has_cluster(Clusters.OnOff)
all_endpoints = await self.default_controller.Read(self.dut_node_id, [()])
all_endpoints = list(all_endpoints.attributes.keys())
self.matter_test_config.endpoint = 0

await get_accepted_endpoints_for_test(self, has_onoff)

# This test should cause an assertion because it has pics_ method

@run_once_for_node
Expand Down Expand Up @@ -300,6 +311,12 @@ def main():
if ok:
failures.append("Test case failure: test_force_endpoint_bad")

test_runner.set_test('TestDecorators.py', 'TestDecorators', 'test_endpoints_with_endpoint_flag_set_failure')
read_resp = get_clusters([0, 1])
ok = test_runner.run_test_with_mock_read(read_resp, hooks)
if ok:
failures.append("Test case failure: test_endpoints_with_endpoint_flag_set_failure")

test_name = 'test_whole_node_with_pics'
test_runner.set_test('TestDecorators.py', 'TestDecorators', test_name)
ok = test_runner.run_test_with_mock_read(read_resp, hooks)
Expand Down

0 comments on commit 4429a43

Please sign in to comment.