Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
antonio-amjr committed Sep 17, 2024
1 parent 17b1a38 commit 326dcb2
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 148 deletions.
2 changes: 1 addition & 1 deletion scripts/py_matter_yamltests/matter_yamltests/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def step_skipped(self, name: str, expression: str):
"""
pass

def step_start(self, request: TestStep):
def step_start(self, request: TestStep, endpoint: Optional[int] = None):
"""
This method is called when the runner starts running a step from the test.
Expand Down
2 changes: 1 addition & 1 deletion scripts/py_matter_yamltests/matter_yamltests/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ async def _run(self, parser: TestParser, config: TestRunnerConfig):
if config.options.delay_in_ms:
await asyncio.sleep(config.options.delay_in_ms / 1000)

hooks.test_stop(round(test_duration))
hooks.test_stop(exception=None, duration=round(test_duration))

except Exception as exception:
status = exception
Expand Down
238 changes: 121 additions & 117 deletions src/python_testing/TC_ACE_1_3.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion src/python_testing/TC_ACL_2_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ def steps_TC_ACL_2_2(self) -> list[TestStep]:
@async_test_body
async def test_TC_ACL_2_2(self):
self.step(1)
self.step(2)
self.step(2, endpoint=0)
data = await self.default_controller.ReadAttribute(nodeid=self.dut_node_id, attributes=[(Clusters.Descriptor.Attributes.ServerList)])
asserts.assert_true(Clusters.AccessControl.id in data[0][Clusters.Descriptor]
[Clusters.Descriptor.Attributes.ServerList], "ACL cluster not on EP0")
self.step(3)
for endpoint, ep_data in data.items():
if endpoint == 0:
continue
else:
self.current_step_endpoint = endpoint
asserts.assert_false(Clusters.AccessControl.id in ep_data[Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList],
f"ACL cluster incorrectly present on endpoint {endpoint}")

Expand Down
18 changes: 9 additions & 9 deletions src/python_testing/TC_BOOLCFG_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,64 +74,64 @@ async def test_TC_BOOLCFG_2_1(self):
self.step(1)
attributes = Clusters.BooleanStateConfiguration.Attributes

self.step(2)
self.step(2, endpoint=endpoint)
attribute_list = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList)

number_of_supported_levels = 0

self.step(3)
self.step(3, endpoint=endpoint)
if attributes.SupportedSensitivityLevels.attribute_id in attribute_list:
number_of_supported_levels = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SupportedSensitivityLevels)
asserts.assert_less_equal(number_of_supported_levels, 10, "SupportedSensitivityLevels attribute is out of range")
asserts.assert_greater_equal(number_of_supported_levels, 2, "SupportedSensitivityLevels attribute is out of range")
else:
logging.info("Test step skipped")

self.step(4)
self.step(4, endpoint=endpoint)
if attributes.CurrentSensitivityLevel.attribute_id in attribute_list:
current_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.CurrentSensitivityLevel)
asserts.assert_less_equal(current_sensitivity_level_dut, number_of_supported_levels,
"CurrentSensitivityLevel is not in valid range")
else:
logging.info("Test step skipped")

self.step(5)
self.step(5, endpoint=endpoint)
if attributes.DefaultSensitivityLevel.attribute_id in attribute_list:
default_sensitivity_level_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.DefaultSensitivityLevel)
asserts.assert_less_equal(default_sensitivity_level_dut, number_of_supported_levels,
"DefaultSensitivityLevel is not in valid range")
else:
logging.info("Test step skipped")

self.step(6)
self.step(6, endpoint=endpoint)
if attributes.AlarmsActive.attribute_id in attribute_list:
alarms_active_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsActive)
asserts.assert_equal(alarms_active_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsActive is not in valid range")
else:
logging.info("Test step skipped")

self.step(7)
self.step(7, endpoint=endpoint)
if attributes.AlarmsSuppressed.attribute_id in attribute_list:
alarms_suppressed_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSuppressed)
asserts.assert_equal(alarms_suppressed_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSuppressed is not in valid range")
else:
logging.info("Test step skipped")

self.step(8)
self.step(8, endpoint=endpoint)
if attributes.AlarmsEnabled.attribute_id in attribute_list:
alarms_enabled_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsEnabled)
asserts.assert_equal(alarms_enabled_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsEnabled is not in valid range")
else:
logging.info("Test step skipped")

self.step(9)
self.step(9, endpoint=endpoint)
if attributes.AlarmsSupported.attribute_id in attribute_list:
alarms_supported_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.AlarmsSupported)
asserts.assert_equal(alarms_supported_dut & ~all_alarm_mode_bitmap_bits, 0, "AlarmsSupported is not in valid range")
else:
logging.info("Test step skipped")

self.step(10)
self.step(10, endpoint=endpoint)
if attributes.SensorFault.attribute_id in attribute_list:
sensor_fault_dut = await self.read_boolcfg_attribute_expect_success(endpoint=endpoint, attribute=attributes.SensorFault)
asserts.assert_equal(sensor_fault_dut & ~all_sensor_fault_bitmap_bits, 0, "SensorFault is not in valid range")
Expand Down
22 changes: 11 additions & 11 deletions src/python_testing/TC_OpstateCommon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,16 +1250,16 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
self.init_test()

# commission
self.step(1)
self.step(1, endpoint)

# Note that this does a subscribe-all instead of subscribing only to the CountdownTime attribute.
# To-Do: Update the TP to subscribe-all.
self.step(2)
self.step(2, endpoint)
sub_handler = ClusterAttributeChangeAccumulator(cluster)
await sub_handler.start(self.default_controller, self.dut_node_id, endpoint)

if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")):
self.step(3)
self.step(3, endpoint)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start")
Expand All @@ -1275,10 +1275,10 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
self.skip_step(3)

sub_handler.reset()
self.step(4)
self.step(4, endpoint)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
self.step(5)
self.step(5, endpoint)
logging.info('Test will now collect data for 10 seconds')
time.sleep(10)

Expand All @@ -1289,13 +1289,13 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
else:
self.skip_step(5)

self.step(6)
self.step(6, endpoint)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attributes.OperationalState)
if attr_value == cluster.Enums.OperationalStateEnum.kRunning and countdownTime is not NullValue:
self.step(7)
self.step(7, endpoint)
wait_count = 0
while (attr_value != cluster.Enums.OperationalStateEnum.kStopped) and (wait_count < 20):
time.sleep(1)
Expand All @@ -1311,7 +1311,7 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):

sub_handler.reset()
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")):
self.step(8)
self.step(8, endpoint)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start")
Expand All @@ -1326,16 +1326,16 @@ async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
else:
self.skip_step(8)

self.step(9)
self.step(9, endpoint)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)

sub_handler.reset()
self.step(10)
self.step(10, endpoint)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")) and countdownTime is not NullValue:
self.step(11)
self.step(11, endpoint)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Pause")
Expand Down
25 changes: 17 additions & 8 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,10 @@ def step_skipped(self, name: str, expression: str):
# TODO: Do we really need the expression as a string? We can evaluate this in code very easily
logging.info(f'\t\t**** Skipping: {name}')

def step_start(self, name: str):
# The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these
logging.info(f'\t\t***** Test Step {name}')
def step_start(self, name: str, endpoint: int | None = None):
# TODO: The way I'm calling this, the name is already includes the step number, but it seems like it might be good to separate these
endpoint_info = f" with endpoint {endpoint}" if endpoint is not None else ""
logging.info(f'\t\t***** Test Step {name} started{endpoint_info} ')

def step_success(self, logger, logs, duration: int, request):
pass
Expand Down Expand Up @@ -915,6 +916,7 @@ def hex_from_bytes(b: bytes) -> str:
class TestStep:
test_plan_number: typing.Union[int, str]
description: str
endpoint: int | None = None
expectation: str = ""
is_commissioning: bool = False

Expand Down Expand Up @@ -1451,14 +1453,15 @@ def skip_all_remaining_steps(self, starting_step_number):
for step in remaining:
self.skip_step(step.test_plan_number)

def step(self, step: typing.Union[int, str]):
def step(self, step: typing.Union[int, str], endpoint: Optional[int] = None):
test_name = self.current_test_info.name
steps = self.get_test_steps(test_name)

# TODO: this might be annoying during dev. Remove? Flag?
if len(steps) <= self.current_step_index or steps[self.current_step_index].test_plan_number != step:
asserts.fail(f'Unexpected test step: {step} - steps not called in order, or step does not exist')

current_step = steps[self.current_step_index]
if self.runner_hook:
# If we've reached the next step with no assertion and the step wasn't skipped, it passed
if not self.step_skipped and self.current_step_index != 0:
Expand All @@ -1467,13 +1470,19 @@ def step(self, step: typing.Union[int, str]):
self.runner_hook.step_success(logger=None, logs=None, duration=step_duration, request=None)

# TODO: it seems like the step start should take a number and a name
name = f'{step} : {steps[self.current_step_index].description}'
self.runner_hook.step_start(name=name)
name = f'{step} : {current_step.description}'
endpoint_info = f" with endpoint {endpoint}" if endpoint is not None else ""
logging.info(
f"========= Step {name} started{endpoint_info} ========="
)

current_step.endpoint = endpoint
self.runner_hook.step_start(name=name, endpoint=current_step.endpoint)
else:
self.print_step(step, steps[self.current_step_index].description)
self.print_step(step, current_step.description)

self.step_start_time = datetime.now(tz=timezone.utc)
self.current_step_index = self.current_step_index + 1
self.current_step_index += 1
self.step_skipped = False

def get_setup_payload_info(self) -> List[SetupPayloadInfo]:
Expand Down

0 comments on commit 326dcb2

Please sign in to comment.