diff --git a/src/python_testing/TC_OCC_2_1.py b/src/python_testing/TC_OCC_2_1.py index cf39a7eff5a111..ddbb34cbf0252a 100644 --- a/src/python_testing/TC_OCC_2_1.py +++ b/src/python_testing/TC_OCC_2_1.py @@ -1,278 +1,288 @@ -# -# Copyright (c) 2024 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for -# the occupancy state ON/OFF change. -# [TC-OCC-3.1] test procedure step 4 -# [TC-OCC-3.2] test precedure step 3a, 3c - -# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments -# for details about the block below. -# -# === BEGIN CI TEST ARGUMENTS === -# test-runner-runs: run1 -# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/quiet: True -# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto -# === END CI TEST ARGUMENTS === - -import logging - -import chip.clusters as Clusters -from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main -from mobly import asserts - - -class TC_OCC_2_1(MatterBaseTest): - async def read_occ_attribute_expect_success(self, endpoint, attribute): - cluster = Clusters.Objects.OccupancySensing - return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - - def desc_TC_OCC_2_1(self) -> str: - return "[TC-OCC-2.1] Attributes with DUT as Server" - - def steps_TC_OCC_2_1(self) -> list[TestStep]: - steps = [ - TestStep(1, "Commissioning, already done", is_commissioning=True), - TestStep(2, "Read Occupancy attribute."), - TestStep(3, "Read OccupancySensorType attribute."), - TestStep(4, "Read OccupancySensorTypeBitmap attribute."), - TestStep(5, "Read HoldTimeLimits attribute, if supported"), - TestStep(6, "Read HoldTime attribute, if supported"), - TestStep(7, "Read PIROccupiedToUnoccupiedDelay attribute, if supported"), - TestStep(8, "Read PIRUnoccupiedToOccupiedDelay attribute, if supported"), - TestStep(9, "Read PIRUnoccupiedToOccupiedThreshold attribute, if supported"), - TestStep(10, "Read UltrasonicOccupiedToUnoccupiedDelay attribute, if supported"), - TestStep(11, "Read UltrasonicUnoccupiedToOccupiedDelay attribute, if supported"), - TestStep(12, "Read UltrasonicUnoccupiedToOccupiedThreshold attribute, if supported"), - TestStep(13, "Read PhysicalContactOccupiedToUnoccupiedDelay attribute, if supported"), - TestStep(14, "Read PhysicalContactUnoccupiedToOccupiedDelay attribute, if supported"), - TestStep(15, "Read PhysicalContactUnoccupiedToOccupiedThreshold attribute, if supported") - ] - return steps - - def pics_TC_OCC_2_1(self) -> list[str]: - pics = [ - "OCC.S", - ] - return pics - - @async_test_body - async def test_TC_OCC_2_1(self): - - endpoint = self.user_params.get("endpoint", 1) - - self.step(1) - attributes = Clusters.OccupancySensing.Attributes - attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) - - self.step(2) - asserts.assert_in(attributes.Occupancy.attribute_id, attribute_list, "Occupancy attribute is mandatory") - occupancy_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) - asserts.assert_less_equal(occupancy_dut, 0b00000001, "Occupancy attribute is not in valid range") - - self.step(3) - asserts.assert_in(attributes.OccupancySensorType.attribute_id, attribute_list, - "OccupancySensorType attribute is a mandatory attribute.") - - occupancy_sensor_type_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorType) - asserts.assert_less(occupancy_sensor_type_dut, Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kUnknownEnumValue, - "OccupancySensorType is not in valid range") - asserts.assert_in(occupancy_sensor_type_dut, {Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kPir, - Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic, - Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic, - Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact}, "OccupancySensorType is not in valid range") - self.step(4) - asserts.assert_in(attributes.OccupancySensorTypeBitmap.attribute_id, attribute_list, - "OccupancySensorTypeBitmap attribute is a mandatory attribute.") - - occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorTypeBitmap) - asserts.assert_less_equal(occupancy_sensor_type_bitmap_dut, 0b00000111, - "OccupancySensorTypeBitmap attribute is not in valid range") - - self.step(5) - if attributes.HoldTimeLimits.attribute_id in attribute_list: - asserts.assert_in(attributes.HoldTime.attribute_id, attribute_list, "HoldTime attribute conformance failed.") - hold_time_limits_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTimeLimits) - asserts.assert_less_equal(hold_time_limits_dut.holdTimeMin, hold_time_limits_dut.holdTimeMax, - "HoldTimeMin is not in valid range") - asserts.assert_greater_equal(hold_time_limits_dut.holdTimeMin, 0, "HoldTimeMin is not in valid range") - asserts.assert_less_equal(hold_time_limits_dut.holdTimeMax, 0xFFFE, "HoldTimeMin is not in valid range") - asserts.assert_greater_equal(hold_time_limits_dut.holdTimeMax, - hold_time_limits_dut.holdTimeMin, "HoldTimeMin is not in valid range") - asserts.assert_less_equal(hold_time_limits_dut.holdTimeDefault, - hold_time_limits_dut.holdTimeMax, "HoldTimeMin is not in valid range") - asserts.assert_greater_equal(hold_time_limits_dut.holdTimeDefault, - hold_time_limits_dut.holdTimeMin, "HoldTimeMin is not in valid range") - else: - logging.info("HoldTimeLimits not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(6) - if attributes.HoldTime.attribute_id in attribute_list: - hold_time_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTime) - hold_time_limits_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTimeLimits) - - asserts.assert_less_equal(hold_time_dut, hold_time_limits_dut.holdTimeMax, "HoldTime attribute is out of range") - asserts.assert_greater_equal(hold_time_dut, hold_time_limits_dut.holdTimeMin, "HoldTime attribute is out of range") - else: - logging.info("HoldTime not supported. The rest of legacy attribute test can be skipped") - self.skip_all_remaining_steps(7) - return - - self.step(7) - if attributes.PIROccupiedToUnoccupiedDelay.attribute_id in attribute_list: - has_pir_bitmap = (occupancy_sensor_type_bitmap_dut & - Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPir) != 0 - has_ultrasonic_bitmap = (occupancy_sensor_type_bitmap_dut & - Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kUltrasonic) != 0 - has_phy_bitmap = (occupancy_sensor_type_bitmap_dut & - Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPhysicalContact) != 0 - if has_pir_bitmap or (not has_ultrasonic_bitmap and not has_phy_bitmap): - pir_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) - asserts.assert_less_equal(pir_otou_delay_dut, 0xFFFE, "PIROccupiedToUnoccupiedDelay is not in valid range") - asserts.assert_greater_equal(pir_otou_delay_dut, 0, "PIROccupiedToUnoccupiedDelay is not in valid range") - else: - logging.info("PIROccupiedToUnoccupiedDelay conformance failed") - asserts.fail( - f"PIROccupiedToUnoccupiedDelay conformance is incorrect: {has_pir_bitmap}, {has_ultrasonic_bitmap}, {has_phy_bitmap}") - else: - logging.info("PIROccupiedToUnoccupiedDelay not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(8) - if attributes.PIRUnoccupiedToOccupiedDelay.attribute_id in attribute_list: - has_delay = attributes.PIRUnoccupiedToOccupiedDelay.attribute_id in attribute_list - has_threshold = attributes.PIRUnoccupiedToOccupiedThreshold.attribute_id in attribute_list - asserts.assert_equal(has_delay, has_threshold, "PIRUnoccupiedToOccupiedDelay conformance failure") - pir_utoo_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIRUnoccupiedToOccupiedDelay) - asserts.assert_less_equal(pir_utoo_delay_dut, 0xFFFE, "PIRUnoccupiedToOccupiedDelay is not in valid range") - asserts.assert_greater_equal(pir_utoo_delay_dut, 0, "PIRUnoccupiedToOccupiedDelay is not in valid range") - else: - logging.info("PIRUnoccupiedToOccupiedDelay not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(9) - if attributes.PIRUnoccupiedToOccupiedThreshold.attribute_id in attribute_list: - has_delay = attributes.PIRUnoccupiedToOccupiedDelay.attribute_id in attribute_list - has_threshold = attributes.PIRUnoccupiedToOccupiedThreshold.attribute_id in attribute_list - asserts.assert_equal(has_delay, has_threshold, "PIRUnoccupiedToOccupiedThreshold conformance failure") - pir_utoo_threshold_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIRUnoccupiedToOccupiedThreshold) - asserts.assert_less_equal(pir_utoo_threshold_dut, 0xFE, "PIRUnoccupiedToOccupiedThreshold is not in valid range") - asserts.assert_greater_equal(pir_utoo_threshold_dut, 0, "PIRUnoccupiedToOccupiedThreshold is not in valid range") - else: - logging.info("PIRUnoccupiedToOccupiedThreshold not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(10) - if attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list: - has_ultrasonic_bitmap = (occupancy_sensor_type_bitmap_dut & - Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic) != 0 - has_ultrasonic_delay = attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list - asserts.assert_equal(has_ultrasonic_bitmap, has_ultrasonic_delay, "Bad conformance on Ultrasonic bitmap") - - ultrasonic_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) - asserts.assert_less_equal(ultrasonic_otou_delay_dut, 0xFFFE, - "UltrasonicOccupiedToUnoccupiedDelay is not in valid range") - asserts.assert_greater_equal(ultrasonic_otou_delay_dut, 0, "UltrasonicOccupiedToUnoccupiedDelay is not in valid range") - - else: - logging.info("UltrasonicOccupiedToUnoccupiedDelay not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(11) - if attributes.UltrasonicUnoccupiedToOccupiedDelay.attribute_id in attribute_list: - has_delay = attributes.UltrasonicUnoccupiedToOccupiedDelay.attribute_id in attribute_list - has_threshold = attributes.UltrasonicUnoccupiedToOccupiedThreshold.attribute_id in attribute_list - asserts.assert_equal(has_delay, has_threshold, "UltrasonicUnoccupiedToOccupiedDelay conformance failure") - - ultrasonic_utoo_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicUnoccupiedToOccupiedDelay) - asserts.assert_less_equal(ultrasonic_utoo_delay_dut, 0xFFFE, - "UltrasonicUnoccupiedToOccupiedDelay is not in valid range") - asserts.assert_greater_equal(ultrasonic_utoo_delay_dut, 0, "UltrasonicUnoccupiedToOccupiedDelay is not in valid range") - else: - logging.info("UltrasonicUnoccupiedToOccupiedDelay not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(12) - if attributes.UltrasonicUnoccupiedToOccupiedThreshold.attribute_id in attribute_list: - has_delay = attributes.UltrasonicUnoccupiedToOccupiedDelay.attribute_id in attribute_list - has_threshold = attributes.UltrasonicUnoccupiedToOccupiedThreshold.attribute_id in attribute_list - asserts.assert_equal(has_delay, has_threshold, "UltrasonicUnoccupiedToOccupiedThreshold conformance failure") - - ultrasonic_utoo_threshold_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicUnoccupiedToOccupiedThreshold) - asserts.assert_less_equal(ultrasonic_utoo_threshold_dut, 0xFE, - "UltrasonicUnoccupiedToOccupiedThreshold is not in valid range") - asserts.assert_greater_equal(ultrasonic_utoo_threshold_dut, 0, - "UltrasonicUnoccupiedToOccupiedThreshold is not in valid range") - - else: - logging.info("UltrasonicUnoccupiedToOccupiedThreshold not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(13) - if attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list: - has_phycon_bitmap = (occupancy_sensor_type_bitmap_dut & - Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact) != 0 - has_phycon_delay = attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list - asserts.assert_equal(has_phycon_bitmap, has_phycon_delay, "Bad conformance on PhysicalContact bitmap") - phycontact_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) - asserts.assert_less_equal(phycontact_otou_delay_dut, 0xFFFE, - "PhysicalContactOccupiedToUnoccupiedDelay is not in valid range") - asserts.assert_greater_equal(phycontact_otou_delay_dut, 0, - "PhysicalContactOccupiedToUnoccupiedDelay is not in valid range") - - else: - logging.info("PhysicalContactOccupiedToUnoccupiedDelay not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(14) - if attributes.PhysicalContactUnoccupiedToOccupiedDelay.attribute_id in attribute_list: - has_delay = attributes.PhysicalContactUnoccupiedToOccupiedDelay.attribute_id in attribute_list - has_threshold = attributes.PhysicalContactUnoccupiedToOccupiedThreshold.attribute_id in attribute_list - asserts.assert_equal(has_delay, has_threshold, "PhysicalContactUnoccupiedToOccupiedDelay conformance failure") - - phycontact_utoo_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactUnoccupiedToOccupiedDelay) - asserts.assert_less_equal(phycontact_utoo_delay_dut, 0xFFFE, - "PhysicalContactUnoccupiedToOccupiedDelay is not in valid range") - asserts.assert_greater_equal(phycontact_utoo_delay_dut, 0, - "PhysicalContactUnoccupiedToOccupiedDelay is not in valid range") - - else: - logging.info("PhysicalContactUnoccupiedToOccupiedDelay not supported. Test step skipped") - self.mark_current_step_skipped() - - self.step(15) - if attributes.PhysicalContactUnoccupiedToOccupiedThreshold.attribute_id in attribute_list: - has_delay = attributes.PhysicalContactUnoccupiedToOccupiedDelay.attribute_id in attribute_list - has_threshold = attributes.PhysicalContactUnoccupiedToOccupiedThreshold.attribute_id in attribute_list - asserts.assert_equal(has_delay, has_threshold, "PhysicalContactUnoccupiedToOccupiedThreshold conformance failure") - - phycontact_utoo_threshold_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactUnoccupiedToOccupiedThreshold) - asserts.assert_less_equal(phycontact_utoo_threshold_dut, 0xFE, - "PhysicalContactUnoccupiedToOccupiedThreshold is not in valid range") - asserts.assert_greater_equal(phycontact_utoo_threshold_dut, 0, - "PhysicalContactUnoccupiedToOccupiedThreshold is not in valid range") - - else: - logging.info("PhysicalContactUnoccupiedToOccupiedThreshold not supported. Test step skipped") - self.mark_current_step_skipped() - - -if __name__ == "__main__": - default_matter_test_main() +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for +# the occupancy state ON/OFF change. +# [TC-OCC-3.1] test procedure step 4 +# [TC-OCC-3.2] test precedure step 3a, 3c + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 +# === END CI TEST ARGUMENTS === + +import logging + +import chip.clusters as Clusters +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_OCC_2_1(MatterBaseTest): + async def read_occ_attribute_expect_success(self, endpoint, attribute): + cluster = Clusters.Objects.OccupancySensing + return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) + + def desc_TC_OCC_2_1(self) -> str: + return "[TC-OCC-2.1] Attributes with DUT as Server" + + def steps_TC_OCC_2_1(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Read Occupancy attribute."), + TestStep(3, "Read OccupancySensorType attribute."), + TestStep(4, "Read OccupancySensorTypeBitmap attribute."), + TestStep(5, "Read HoldTimeLimits attribute, if supported"), + TestStep(6, "Read HoldTime attribute, if supported"), + TestStep(7, "Read PIROccupiedToUnoccupiedDelay attribute, if supported"), + TestStep(8, "Read PIRUnoccupiedToOccupiedDelay attribute, if supported"), + TestStep(9, "Read PIRUnoccupiedToOccupiedThreshold attribute, if supported"), + TestStep(10, "Read UltrasonicOccupiedToUnoccupiedDelay attribute, if supported"), + TestStep(11, "Read UltrasonicUnoccupiedToOccupiedDelay attribute, if supported"), + TestStep(12, "Read UltrasonicUnoccupiedToOccupiedThreshold attribute, if supported"), + TestStep(13, "Read PhysicalContactOccupiedToUnoccupiedDelay attribute, if supported"), + TestStep(14, "Read PhysicalContactUnoccupiedToOccupiedDelay attribute, if supported"), + TestStep(15, "Read PhysicalContactUnoccupiedToOccupiedThreshold attribute, if supported") + ] + return steps + + def pics_TC_OCC_2_1(self) -> list[str]: + pics = [ + "OCC.S", + ] + return pics + + @async_test_body + async def test_TC_OCC_2_1(self): + endpoint = self.matter_test_config.endpoint + cluster = Clusters.Objects.OccupancySensing + attributes = cluster.Attributes + + self.step(1) # Already done, immediately go to step 2 + + self.step(2) + + feature_map = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.FeatureMap) + has_feature_pir = (feature_map & cluster.Bitmaps.Feature.kPassiveInfrared) != 0 + has_feature_ultrasonic = (feature_map & cluster.Bitmaps.Feature.kUltrasonic) != 0 + has_feature_contact = (feature_map & cluster.Bitmaps.Feature.kPhysicalContact) != 0 + + logging.info( + f"Feature map: 0x{feature_map:x}. PIR: {has_feature_pir}, US:{has_feature_ultrasonic}, PHY:{has_feature_contact}") + + attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) + + asserts.assert_in(attributes.Occupancy.attribute_id, attribute_list, "Occupancy attribute is mandatory") + occupancy_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) + asserts.assert_less_equal(occupancy_dut, 0b00000001, "Occupancy attribute is not in valid range") + + self.step(3) + asserts.assert_in(attributes.OccupancySensorType.attribute_id, attribute_list, + "OccupancySensorType attribute is a mandatory attribute.") + + occupancy_sensor_type_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorType) + asserts.assert_less(occupancy_sensor_type_dut, Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kUnknownEnumValue, + "OccupancySensorType is not in valid range") + asserts.assert_in(occupancy_sensor_type_dut, {Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kPir, + Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic, + Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic, + Clusters.Objects.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact}, "OccupancySensorType is not in valid range") + self.step(4) + asserts.assert_in(attributes.OccupancySensorTypeBitmap.attribute_id, attribute_list, + "OccupancySensorTypeBitmap attribute is a mandatory attribute.") + + occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorTypeBitmap) + asserts.assert_less_equal(occupancy_sensor_type_bitmap_dut, 0b00000111, + "OccupancySensorTypeBitmap attribute is not in valid range") + + self.step(5) + if attributes.HoldTimeLimits.attribute_id in attribute_list: + asserts.assert_in(attributes.HoldTime.attribute_id, attribute_list, "HoldTime attribute conformance failed.") + hold_time_limits_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTimeLimits) + asserts.assert_less_equal(hold_time_limits_dut.holdTimeMin, hold_time_limits_dut.holdTimeMax, + "HoldTimeMin is not in valid range") + asserts.assert_greater_equal(hold_time_limits_dut.holdTimeMin, 0, "HoldTimeMin is not in valid range") + asserts.assert_less_equal(hold_time_limits_dut.holdTimeMax, 0xFFFE, "HoldTimeMin is not in valid range") + asserts.assert_greater_equal(hold_time_limits_dut.holdTimeMax, + hold_time_limits_dut.holdTimeMin, "HoldTimeMin is not in valid range") + asserts.assert_less_equal(hold_time_limits_dut.holdTimeDefault, + hold_time_limits_dut.holdTimeMax, "HoldTimeMin is not in valid range") + asserts.assert_greater_equal(hold_time_limits_dut.holdTimeDefault, + hold_time_limits_dut.holdTimeMin, "HoldTimeMin is not in valid range") + else: + logging.info("HoldTimeLimits not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(6) + if attributes.HoldTime.attribute_id in attribute_list: + hold_time_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTime) + hold_time_limits_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTimeLimits) + + asserts.assert_less_equal(hold_time_dut, hold_time_limits_dut.holdTimeMax, "HoldTime attribute is out of range") + asserts.assert_greater_equal(hold_time_dut, hold_time_limits_dut.holdTimeMin, "HoldTime attribute is out of range") + else: + logging.info("HoldTime not supported. The rest of legacy attribute test can be skipped") + self.skip_all_remaining_steps(7) + return + + self.step(7) + if attributes.PIROccupiedToUnoccupiedDelay.attribute_id in attribute_list: + has_feature_pir = (occupancy_sensor_type_bitmap_dut & + Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPir) != 0 + has_feature_ultrasonic = (occupancy_sensor_type_bitmap_dut & + Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kUltrasonic) != 0 + has_feature_contact = (occupancy_sensor_type_bitmap_dut & + Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPhysicalContact) != 0 + if has_feature_pir or (not has_feature_pir and not has_feature_ultrasonic and not has_feature_contact): + pir_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) + asserts.assert_less_equal(pir_otou_delay_dut, 0xFFFE, "PIROccupiedToUnoccupiedDelay is not in valid range") + asserts.assert_greater_equal(pir_otou_delay_dut, 0, "PIROccupiedToUnoccupiedDelay is not in valid range") + else: + logging.info("PIROccupiedToUnoccupiedDelay conformance failed") + asserts.fail( + f"PIROccupiedToUnoccupiedDelay conformance is incorrect: {has_feature_pir}, {has_feature_ultrasonic}, {has_feature_contact}") + else: + logging.info("PIROccupiedToUnoccupiedDelay not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(8) + if attributes.PIRUnoccupiedToOccupiedDelay.attribute_id in attribute_list: + has_delay = attributes.PIRUnoccupiedToOccupiedDelay.attribute_id in attribute_list + has_threshold = attributes.PIRUnoccupiedToOccupiedThreshold.attribute_id in attribute_list + asserts.assert_equal(has_delay, has_threshold, "PIRUnoccupiedToOccupiedDelay conformance failure") + pir_utoo_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIRUnoccupiedToOccupiedDelay) + asserts.assert_less_equal(pir_utoo_delay_dut, 0xFFFE, "PIRUnoccupiedToOccupiedDelay is not in valid range") + asserts.assert_greater_equal(pir_utoo_delay_dut, 0, "PIRUnoccupiedToOccupiedDelay is not in valid range") + else: + logging.info("PIRUnoccupiedToOccupiedDelay not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(9) + if attributes.PIRUnoccupiedToOccupiedThreshold.attribute_id in attribute_list: + has_delay = attributes.PIRUnoccupiedToOccupiedDelay.attribute_id in attribute_list + has_threshold = attributes.PIRUnoccupiedToOccupiedThreshold.attribute_id in attribute_list + asserts.assert_equal(has_delay, has_threshold, "PIRUnoccupiedToOccupiedThreshold conformance failure") + pir_utoo_threshold_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIRUnoccupiedToOccupiedThreshold) + asserts.assert_less_equal(pir_utoo_threshold_dut, 0xFE, "PIRUnoccupiedToOccupiedThreshold is not in valid range") + asserts.assert_greater_equal(pir_utoo_threshold_dut, 0, "PIRUnoccupiedToOccupiedThreshold is not in valid range") + else: + logging.info("PIRUnoccupiedToOccupiedThreshold not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(10) + if attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list: + has_feature_ultrasonic = (occupancy_sensor_type_bitmap_dut & + Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic) != 0 + has_ultrasonic_delay = attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list + asserts.assert_equal(has_feature_ultrasonic, has_ultrasonic_delay, "Bad conformance on Ultrasonic bitmap") + + ultrasonic_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) + asserts.assert_less_equal(ultrasonic_otou_delay_dut, 0xFFFE, + "UltrasonicOccupiedToUnoccupiedDelay is not in valid range") + asserts.assert_greater_equal(ultrasonic_otou_delay_dut, 0, "UltrasonicOccupiedToUnoccupiedDelay is not in valid range") + + else: + logging.info("UltrasonicOccupiedToUnoccupiedDelay not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(11) + if attributes.UltrasonicUnoccupiedToOccupiedDelay.attribute_id in attribute_list: + has_delay = attributes.UltrasonicUnoccupiedToOccupiedDelay.attribute_id in attribute_list + has_threshold = attributes.UltrasonicUnoccupiedToOccupiedThreshold.attribute_id in attribute_list + asserts.assert_equal(has_delay, has_threshold, "UltrasonicUnoccupiedToOccupiedDelay conformance failure") + + ultrasonic_utoo_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicUnoccupiedToOccupiedDelay) + asserts.assert_less_equal(ultrasonic_utoo_delay_dut, 0xFFFE, + "UltrasonicUnoccupiedToOccupiedDelay is not in valid range") + asserts.assert_greater_equal(ultrasonic_utoo_delay_dut, 0, "UltrasonicUnoccupiedToOccupiedDelay is not in valid range") + else: + logging.info("UltrasonicUnoccupiedToOccupiedDelay not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(12) + if attributes.UltrasonicUnoccupiedToOccupiedThreshold.attribute_id in attribute_list: + has_delay = attributes.UltrasonicUnoccupiedToOccupiedDelay.attribute_id in attribute_list + has_threshold = attributes.UltrasonicUnoccupiedToOccupiedThreshold.attribute_id in attribute_list + asserts.assert_equal(has_delay, has_threshold, "UltrasonicUnoccupiedToOccupiedThreshold conformance failure") + + ultrasonic_utoo_threshold_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicUnoccupiedToOccupiedThreshold) + asserts.assert_less_equal(ultrasonic_utoo_threshold_dut, 0xFE, + "UltrasonicUnoccupiedToOccupiedThreshold is not in valid range") + asserts.assert_greater_equal(ultrasonic_utoo_threshold_dut, 0, + "UltrasonicUnoccupiedToOccupiedThreshold is not in valid range") + + else: + logging.info("UltrasonicUnoccupiedToOccupiedThreshold not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(13) + if attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list: + has_phycon_bitmap = (occupancy_sensor_type_bitmap_dut & + Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact) != 0 + has_phycon_delay = attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list + asserts.assert_equal(has_phycon_bitmap, has_phycon_delay, "Bad conformance on PhysicalContact bitmap") + phycontact_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) + asserts.assert_less_equal(phycontact_otou_delay_dut, 0xFFFE, + "PhysicalContactOccupiedToUnoccupiedDelay is not in valid range") + asserts.assert_greater_equal(phycontact_otou_delay_dut, 0, + "PhysicalContactOccupiedToUnoccupiedDelay is not in valid range") + + else: + logging.info("PhysicalContactOccupiedToUnoccupiedDelay not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(14) + if attributes.PhysicalContactUnoccupiedToOccupiedDelay.attribute_id in attribute_list: + has_delay = attributes.PhysicalContactUnoccupiedToOccupiedDelay.attribute_id in attribute_list + has_threshold = attributes.PhysicalContactUnoccupiedToOccupiedThreshold.attribute_id in attribute_list + asserts.assert_equal(has_delay, has_threshold, "PhysicalContactUnoccupiedToOccupiedDelay conformance failure") + + phycontact_utoo_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactUnoccupiedToOccupiedDelay) + asserts.assert_less_equal(phycontact_utoo_delay_dut, 0xFFFE, + "PhysicalContactUnoccupiedToOccupiedDelay is not in valid range") + asserts.assert_greater_equal(phycontact_utoo_delay_dut, 0, + "PhysicalContactUnoccupiedToOccupiedDelay is not in valid range") + + else: + logging.info("PhysicalContactUnoccupiedToOccupiedDelay not supported. Test step skipped") + self.mark_current_step_skipped() + + self.step(15) + if attributes.PhysicalContactUnoccupiedToOccupiedThreshold.attribute_id in attribute_list: + has_delay = attributes.PhysicalContactUnoccupiedToOccupiedDelay.attribute_id in attribute_list + has_threshold = attributes.PhysicalContactUnoccupiedToOccupiedThreshold.attribute_id in attribute_list + asserts.assert_equal(has_delay, has_threshold, "PhysicalContactUnoccupiedToOccupiedThreshold conformance failure") + + phycontact_utoo_threshold_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactUnoccupiedToOccupiedThreshold) + asserts.assert_less_equal(phycontact_utoo_threshold_dut, 0xFE, + "PhysicalContactUnoccupiedToOccupiedThreshold is not in valid range") + asserts.assert_greater_equal(phycontact_utoo_threshold_dut, 0, + "PhysicalContactUnoccupiedToOccupiedThreshold is not in valid range") + + else: + logging.info("PhysicalContactUnoccupiedToOccupiedThreshold not supported. Test step skipped") + self.mark_current_step_skipped() + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_OCC_2_2.py b/src/python_testing/TC_OCC_2_2.py index e27bbf30ebcade..9914452342cef3 100644 --- a/src/python_testing/TC_OCC_2_2.py +++ b/src/python_testing/TC_OCC_2_2.py @@ -1,132 +1,132 @@ -# -# Copyright (c) 2024 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments -# for details about the block below. -# -# === BEGIN CI TEST ARGUMENTS === -# test-runner-runs: run1 -# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/quiet: True -# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto -# === END CI TEST ARGUMENTS === - -import chip.clusters as Clusters -from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main -from mobly import asserts - - -class TC_OCC_2_2(MatterBaseTest): - async def read_occ_attribute_expect_success(self, endpoint, attribute): - cluster = Clusters.Objects.OccupancySensing - return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - - def desc_TC_OCC_2_2(self) -> str: - return "[TC-OCC-2.2] OccupancySensorTypeBitmap and OccupancySensorType interdependency with server as DUT" - - def steps_TC_OCC_2_2(self) -> list[TestStep]: - steps = [ - TestStep(1, "Commissioning, already done", is_commissioning=True), - TestStep(2, "Read OccupancySensorType attribute selection based on FeatureMap Bitmap."), - TestStep(3, "Read OccupancySensorTypeBitmap attribute selection based on FeatureMap Bitmap.") - ] - return steps - - def pics_TC_OCC_2_2(self) -> list[str]: - pics = [ - "OCC.S", - ] - return pics - - @async_test_body - async def test_TC_OCC_2_2(self): - - endpoint = self.user_params.get("endpoint", 1) - - attributes = Clusters.OccupancySensing.Attributes - feature_map = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.FeatureMap) - - self.step(1) - attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) - - self.step(2) - # OccupancySensorType will be determined by FeatureMap matching table at 2.7.6.2. - asserts.assert_in(attributes.OccupancySensorType.attribute_id, attribute_list, - "OccupancySensorType attribute is a mandatory attribute.") - occupancy_sensor_type_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorType) - - # For validation purposes, 2.7.6.2 table describes what feature flags map to what type of sensors - TypeEnum = Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum - - Y = True - N = False - # Map is PIR, US, PHY => expected sensor type - # odd Y/N mapping to make the table align nicely - mappings = { - (N, N, N): TypeEnum.kPir, - (Y, N, N): TypeEnum.kPir, - (N, Y, N): TypeEnum.kUltrasonic, - (Y, Y, N): TypeEnum.kPIRAndUltrasonic, - (N, N, Y): TypeEnum.kPhysicalContact, - (Y, N, Y): TypeEnum.kPir, - (N, Y, Y): TypeEnum.kUltrasonic, - (Y, Y, Y): TypeEnum.kPIRAndUltrasonic, - } - - FeatureBit = Clusters.OccupancySensing.Bitmaps.Feature - expected = mappings.get( - ( - (feature_map & FeatureBit.kPassiveInfrared) != 0, - (feature_map & FeatureBit.kUltrasonic) != 0, - (feature_map & FeatureBit.kPhysicalContact) != 0 - )) - - asserts.assert_equal( - occupancy_sensor_type_dut, - expected, - f"Sensor Type should be f{expected}" - ) - - self.step(3) - # OccupancySensorTypeBitmap will be determined by FeatureMap matching table at 2.7.6.2. - asserts.assert_in(attributes.OccupancySensorTypeBitmap.attribute_id, attribute_list, - "OccupancySensorTypeBitmap attribute is a mandatory attribute.") - - occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorTypeBitmap) - - # Feature map must match the sensor type bitmap - must_match_bits = [ - (Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPir, - Clusters.OccupancySensing.Bitmaps.Feature.kPassiveInfrared, "PIR"), - (Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kUltrasonic, - Clusters.OccupancySensing.Bitmaps.Feature.kUltrasonic, "Ultrasonic"), - (Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPhysicalContact, - Clusters.OccupancySensing.Bitmaps.Feature.kPhysicalContact, "Physical contact"), - ] - - for sensor_bit, feature_bit, name in must_match_bits: - asserts.assert_equal( - (occupancy_sensor_type_bitmap_dut & sensor_bit) != 0, - (feature_map & feature_bit) != 0, - f"Feature bit and sensor bitmap must be equal for {name} (BITMAP: 0x{occupancy_sensor_type_bitmap_dut:02X}, FEATUREMAP: 0x{feature_map:02X})" - ) - - -if __name__ == "__main__": - default_matter_test_main() +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 +# === END CI TEST ARGUMENTS === + +import chip.clusters as Clusters +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_OCC_2_2(MatterBaseTest): + async def read_occ_attribute_expect_success(self, endpoint, attribute): + cluster = Clusters.Objects.OccupancySensing + return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) + + def desc_TC_OCC_2_2(self) -> str: + return "[TC-OCC-2.2] OccupancySensorTypeBitmap and OccupancySensorType interdependency with server as DUT" + + def steps_TC_OCC_2_2(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commissioning, already done", is_commissioning=True), + TestStep(2, "Read OccupancySensorType attribute selection based on FeatureMap Bitmap."), + TestStep(3, "Read OccupancySensorTypeBitmap attribute selection based on FeatureMap Bitmap.") + ] + return steps + + def pics_TC_OCC_2_2(self) -> list[str]: + pics = [ + "OCC.S", + ] + return pics + + @async_test_body + async def test_TC_OCC_2_2(self): + endpoint = self.matter_test_config.endpoint + + self.step(1) # Already done, immediately go to step 2 + + self.step(2) + + attributes = Clusters.OccupancySensing.Attributes + feature_map = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.FeatureMap) + attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) + + # OccupancySensorType will be determined by FeatureMap matching table at 2.7.6.2. + asserts.assert_in(attributes.OccupancySensorType.attribute_id, attribute_list, + "OccupancySensorType attribute is a mandatory attribute.") + occupancy_sensor_type_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorType) + + # For validation purposes, 2.7.6.2 table describes what feature flags map to what type of sensors + TypeEnum = Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum + + Y = True + N = False + # Map is PIR, US, PHY => expected sensor type + # odd Y/N mapping to make the table align nicely + mappings = { + (N, N, N): TypeEnum.kPir, + (Y, N, N): TypeEnum.kPir, + (N, Y, N): TypeEnum.kUltrasonic, + (Y, Y, N): TypeEnum.kPIRAndUltrasonic, + (N, N, Y): TypeEnum.kPhysicalContact, + (Y, N, Y): TypeEnum.kPir, + (N, Y, Y): TypeEnum.kUltrasonic, + (Y, Y, Y): TypeEnum.kPIRAndUltrasonic, + } + + FeatureBit = Clusters.OccupancySensing.Bitmaps.Feature + expected = mappings.get( + ( + (feature_map & FeatureBit.kPassiveInfrared) != 0, + (feature_map & FeatureBit.kUltrasonic) != 0, + (feature_map & FeatureBit.kPhysicalContact) != 0 + )) + + asserts.assert_equal( + occupancy_sensor_type_dut, + expected, + f"Sensor Type should be f{expected}" + ) + + self.step(3) + # OccupancySensorTypeBitmap will be determined by FeatureMap matching table at 2.7.6.2. + asserts.assert_in(attributes.OccupancySensorTypeBitmap.attribute_id, attribute_list, + "OccupancySensorTypeBitmap attribute is a mandatory attribute.") + + occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorTypeBitmap) + + # Feature map must match the sensor type bitmap + must_match_bits = [ + (Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPir, + Clusters.OccupancySensing.Bitmaps.Feature.kPassiveInfrared, "PIR"), + (Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kUltrasonic, + Clusters.OccupancySensing.Bitmaps.Feature.kUltrasonic, "Ultrasonic"), + (Clusters.OccupancySensing.Bitmaps.OccupancySensorTypeBitmap.kPhysicalContact, + Clusters.OccupancySensing.Bitmaps.Feature.kPhysicalContact, "Physical contact"), + ] + + for sensor_bit, feature_bit, name in must_match_bits: + asserts.assert_equal( + (occupancy_sensor_type_bitmap_dut & sensor_bit) != 0, + (feature_map & feature_bit) != 0, + f"Feature bit and sensor bitmap must be equal for {name} (BITMAP: 0x{occupancy_sensor_type_bitmap_dut:02X}, FEATUREMAP: 0x{feature_map:02X})" + ) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_OCC_2_3.py b/src/python_testing/TC_OCC_2_3.py index 0184b670c6b306..63e973b159ca37 100644 --- a/src/python_testing/TC_OCC_2_3.py +++ b/src/python_testing/TC_OCC_2_3.py @@ -1,127 +1,128 @@ -# -# Copyright (c) 2024 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments -# for details about the block below. -# -# === BEGIN CI TEST ARGUMENTS === -# test-runner-runs: run1 -# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/quiet: True -# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto -# === END CI TEST ARGUMENTS === - -import logging - -import chip.clusters as Clusters -from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main -from mobly import asserts - - -class TC_OCC_2_3(MatterBaseTest): - async def read_occ_attribute_expect_success(self, endpoint, attribute): - cluster = Clusters.Objects.OccupancySensing - return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - - def desc_TC_OCC_2_3(self) -> str: - return "[TC-OCC-2.3] HoldTime Backward Compatibility Test with server as DUT" - - def steps_TC_OCC_2_3(self) -> list[TestStep]: - steps = [ - TestStep(1, "Commission DUT to TH", is_commissioning=True), - TestStep(2, "DUT supports HoldTime attribute. If DUT doesn’t support it, then stop and exit this test case."), - TestStep(3, "Based on the feature flag value table, read OccupancySensorType attribute from DUT"), - TestStep(4, "If TH reads 0 - PIR, TH reads PIROccupiedToUnoccupiedDelay attribute and its value should be same as HoldTime"), - TestStep(5, "If TH reads 1 - Ultrasonic, TH reads UltrasonicOccupiedToUnoccupiedDelay attribute and its value should be same as HoldTime"), - TestStep(6, "If TH reads 2 - PHY, TH reads PhysicalContactOccupiedToUnoccupiedDelay attribute and its value should be same as HoldTime") - ] - return steps - - def pics_TC_OCC_2_3(self) -> list[str]: - pics = [ - "OCC.S", - ] - return pics - - @async_test_body - async def test_TC_OCC_2_3(self): - - endpoint = self.user_params.get("endpoint", 1) - - self.step(1) - attributes = Clusters.OccupancySensing.Attributes - attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) - - self.step(2) - if attributes.HoldTime.attribute_id in attribute_list: - occupancy_hold_time_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTime) - else: - logging.info("No HoldTime attribute supports. Terminate this test case") - - self.step(3) - if attributes.OccupancySensorType.attribute_id in attribute_list: - occupancy_sensor_type_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorType) - - asserts.assert_less_equal(occupancy_sensor_type_dut, 3, "OccupancySensorType attribute is out of range") - asserts.assert_greater_equal(occupancy_sensor_type_dut, 0, "OccupancySensorType attribute is out of range") - else: - logging.info("OccupancySensorType attribute doesn't exist. Test step skipped") - - if occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPir: - self.step(4) - occupancy_pir_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) - - asserts.assert_equal(occupancy_pir_otou_delay_dut, occupancy_hold_time_dut, - "HoldTime attribute value is not equal to PIROccupiedToUnoccupiedDelay") - self.skip_step(5) - self.skip_step(6) - - elif occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic: - self.step(4) - occupancy_pir_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) - - asserts.assert_equal(occupancy_pir_otou_delay_dut, occupancy_hold_time_dut, - "HoldTime attribute value is not equal to PIROccupiedToUnoccupiedDelay") - self.step(5) - occupancy_us_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) - - asserts.assert_equal(occupancy_us_otou_delay_dut, occupancy_hold_time_dut, - "HoldTime attribute value is not equal to UltrasonicOccupiedToUnoccupiedDelay") - self.skip_step(6) - - elif occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic: - occupancy_pirus_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) - - asserts.assert_equal(occupancy_pirus_otou_delay_dut, occupancy_hold_time_dut, - "HoldTime attribute value is not equal to PIROccupiedToUnoccupiedDelay") - - elif occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact: - self.skip_step(4) - self.skip_step(5) - self.step(6) - occupancy_phy_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) - - asserts.assert_equal(occupancy_phy_otou_delay_dut, occupancy_hold_time_dut, - "HoldTime attribute value is not equal to PhysicalContactOccupiedToUnoccupiedDelay") - else: - logging.info("OccupancySensorType attribute value is out of range") - - -if __name__ == "__main__": - default_matter_test_main() +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 +# === END CI TEST ARGUMENTS === + +import logging + +import chip.clusters as Clusters +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_OCC_2_3(MatterBaseTest): + async def read_occ_attribute_expect_success(self, endpoint, attribute): + cluster = Clusters.Objects.OccupancySensing + return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) + + def desc_TC_OCC_2_3(self) -> str: + return "[TC-OCC-2.3] HoldTime Backward Compatibility Test with server as DUT" + + def steps_TC_OCC_2_3(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commission DUT to TH", is_commissioning=True), + TestStep(2, "DUT supports HoldTime attribute. If DUT doesn’t support it, then stop and exit this test case."), + TestStep(3, "Based on the feature flag value table, read OccupancySensorType attribute from DUT"), + TestStep(4, "If TH reads 0 - PIR, TH reads PIROccupiedToUnoccupiedDelay attribute and its value should be same as HoldTime"), + TestStep(5, "If TH reads 1 - Ultrasonic, TH reads UltrasonicOccupiedToUnoccupiedDelay attribute and its value should be same as HoldTime"), + TestStep(6, "If TH reads 2 - PHY, TH reads PhysicalContactOccupiedToUnoccupiedDelay attribute and its value should be same as HoldTime") + ] + return steps + + def pics_TC_OCC_2_3(self) -> list[str]: + pics = [ + "OCC.S", + ] + return pics + + @async_test_body + async def test_TC_OCC_2_3(self): + endpoint = self.matter_test_config.endpoint + + self.step(1) # Already done, immediately go to step 2 + + self.step(2) + + attributes = Clusters.OccupancySensing.Attributes + attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) + + if attributes.HoldTime.attribute_id in attribute_list: + occupancy_hold_time_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTime) + else: + logging.info("No HoldTime attribute supports. Terminate this test case") + + self.step(3) + if attributes.OccupancySensorType.attribute_id in attribute_list: + occupancy_sensor_type_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorType) + + asserts.assert_less_equal(occupancy_sensor_type_dut, 3, "OccupancySensorType attribute is out of range") + asserts.assert_greater_equal(occupancy_sensor_type_dut, 0, "OccupancySensorType attribute is out of range") + else: + logging.info("OccupancySensorType attribute doesn't exist. Test step skipped") + + if occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPir: + self.step(4) + occupancy_pir_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) + + asserts.assert_equal(occupancy_pir_otou_delay_dut, occupancy_hold_time_dut, + "HoldTime attribute value is not equal to PIROccupiedToUnoccupiedDelay") + self.skip_step(5) + self.skip_step(6) + + elif occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic: + self.step(4) + occupancy_pir_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) + + asserts.assert_equal(occupancy_pir_otou_delay_dut, occupancy_hold_time_dut, + "HoldTime attribute value is not equal to PIROccupiedToUnoccupiedDelay") + self.step(5) + occupancy_us_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) + + asserts.assert_equal(occupancy_us_otou_delay_dut, occupancy_hold_time_dut, + "HoldTime attribute value is not equal to UltrasonicOccupiedToUnoccupiedDelay") + self.skip_step(6) + + elif occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic: + occupancy_pirus_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) + + asserts.assert_equal(occupancy_pirus_otou_delay_dut, occupancy_hold_time_dut, + "HoldTime attribute value is not equal to PIROccupiedToUnoccupiedDelay") + + elif occupancy_sensor_type_dut == Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact: + self.skip_step(4) + self.skip_step(5) + self.step(6) + occupancy_phy_otou_delay_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) + + asserts.assert_equal(occupancy_phy_otou_delay_dut, occupancy_hold_time_dut, + "HoldTime attribute value is not equal to PhysicalContactOccupiedToUnoccupiedDelay") + else: + logging.info("OccupancySensorType attribute value is out of range") + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_OCC_3_1.py b/src/python_testing/TC_OCC_3_1.py index 3fd082a62bc974..246e3a13f2a211 100644 --- a/src/python_testing/TC_OCC_3_1.py +++ b/src/python_testing/TC_OCC_3_1.py @@ -1,134 +1,129 @@ -# -# Copyright (c) 2024 Project CHIP Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# === BEGIN CI TEST ARGUMENTS === -# test-runner-runs: run1 -# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/quiet: True -# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 -# === END CI TEST ARGUMENTS === -# There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for -# the occupancy state ON/OFF change. -# [TC-OCC-3.1] test procedure step 4 -# [TC-OCC-3.2] test precedure step 3c - -import logging -import time -from typing import Any, Optional - -import chip.clusters as Clusters -from chip.interaction_model import Status -from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main -from mobly import asserts - - -class TC_OCC_3_1(MatterBaseTest): - async def read_occ_attribute_expect_success(self, attribute): - cluster = Clusters.Objects.OccupancySensing - endpoint = self.matter_test_config.endpoint - return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - - async def write_hold_time(self, hold_time: Optional[Any]) -> Status: - dev_ctrl = self.default_controller - node_id = self.dut_node_id - endpoint = self.matter_test_config.endpoint - - cluster = Clusters.OccupancySensing - write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint, cluster.Attributes.HoldTime(hold_time))]) - return write_result[0].Status - - def desc_TC_OCC_3_1(self) -> str: - return "[TC-OCC-3.1] Primary functionality with server as DUT" - - def steps_TC_OCC_3_1(self) -> list[TestStep]: - steps = [ - TestStep(1, "Commission DUT to TH and obtain DUT attribute list.", is_commissioning=True), - TestStep(2, "Change DUT HoldTime attribute value to 10 seconds."), - TestStep(3, "Do not trigger DUT occupancy sensing for the period of HoldTime. TH reads Occupancy attribute from DUT."), - TestStep(4, "Trigger DUT occupancy sensing to change the occupancy state and start a timer."), - TestStep(5, "After 10 seconds, TH reads Occupancy attribute from DUT.") - ] - return steps - - def pics_TC_OCC_3_1(self) -> list[str]: - pics = [ - "OCC.S", - ] - return pics - - @async_test_body - async def test_TC_OCC_3_1(self): - hold_time = 10 # 10 seconds for occupancy state hold time - - self.step(1) # commissioning and getting cluster attribute list - cluster = Clusters.OccupancySensing - attributes = cluster.Attributes - attribute_list = await self.read_occ_attribute_expect_success(attribute=attributes.AttributeList) - - has_hold_time = attributes.HoldTime.attribute_id in attribute_list - - self.step(2) - if has_hold_time: - # write 10 as a HoldTime attribute - await self.write_single_attribute(cluster.Attributes.HoldTime(hold_time)) - else: - logging.info("No HoldTime attribute supports. Will test only occupancy attribute triggering functionality") - - self.step(3) - # check if Occupancy attribute is 0 - occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) - - # if occupancy is on, then wait until the sensor occupancy state is 0. - if occupancy_dut == 1: - # Don't trigger occupancy sensor to render occupancy attribute to 0 - if has_hold_time: - time.sleep(hold_time + 2.0) # add some extra 2 seconds to ensure hold time has passed. - else: # a user wait until a sensor specific time to change occupancy attribute to 0. This is the case where the sensor doesn't support HoldTime. - self.wait_for_user_input( - prompt_msg="Type any letter and press ENTER after the sensor occupancy is detection ready state (occupancy attribute = 0)") - - # check sensor occupancy state is 0 for the next test step - occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) - asserts.assert_equal(occupancy_dut, 0, "Occupancy attribute is still 1.") - - self.step(4) - # Trigger occupancy sensor to change Occupancy attribute value to 1 => TESTER ACTION on DUT - self.wait_for_user_input(prompt_msg="Type any letter and press ENTER after a sensor occupancy is triggered.") - - # And then check if Occupancy attribute has changed. - occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) - asserts.assert_equal(occupancy_dut, 1, "Occupancy state is not changed to 1") - - self.step(5) - # check if Occupancy attribute is back to 0 after HoldTime attribute period - # Tester should not be triggering the sensor for this test step. - if has_hold_time: - - # Start a timer based on HoldTime - time.sleep(hold_time + 2.0) # add some extra 2 seconds to ensure hold time has passed. - - occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) - asserts.assert_equal(occupancy_dut, 0, "Occupancy state is not 0 after HoldTime period") - - else: - logging.info("HoldTime attribute not supported. Skip this return to 0 timing test procedure.") - self.skip_step(5) - - -if __name__ == "__main__": - default_matter_test_main() +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for +# the occupancy state ON/OFF change. +# [TC-OCC-3.1] test procedure step 4 +# [TC-OCC-3.2] test precedure step 3c + +import logging +import time +from typing import Any, Optional + +import chip.clusters as Clusters +from chip.interaction_model import Status +from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_OCC_3_1(MatterBaseTest): + async def read_occ_attribute_expect_success(self, attribute): + cluster = Clusters.Objects.OccupancySensing + endpoint = self.matter_test_config.endpoint + return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) + + async def write_hold_time(self, hold_time: Optional[Any]) -> Status: + dev_ctrl = self.default_controller + node_id = self.dut_node_id + endpoint = self.matter_test_config.endpoint + + cluster = Clusters.OccupancySensing + write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint, cluster.Attributes.HoldTime(hold_time))]) + return write_result[0].Status + + def desc_TC_OCC_3_1(self) -> str: + return "[TC-OCC-3.1] Primary functionality with server as DUT" + + def steps_TC_OCC_3_1(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commission DUT to TH and obtain DUT attribute list.", is_commissioning=True), + TestStep(2, "Change DUT HoldTime attribute value to 10 seconds."), + TestStep(3, "Do not trigger DUT occupancy sensing for the period of HoldTime. TH reads Occupancy attribute from DUT."), + TestStep(4, "Trigger DUT occupancy sensing to change the occupancy state and start a timer."), + TestStep(5, "After 10 seconds, TH reads Occupancy attribute from DUT.") + ] + return steps + + def pics_TC_OCC_3_1(self) -> list[str]: + pics = [ + "OCC.S", + ] + return pics + + @async_test_body + async def test_TC_OCC_3_1(self): + hold_time = 10 # 10 seconds for occupancy state hold time + + self.step(1) # Commissioning already done + + self.step(2) + + cluster = Clusters.OccupancySensing + attributes = cluster.Attributes + attribute_list = await self.read_occ_attribute_expect_success(attribute=attributes.AttributeList) + + has_hold_time = attributes.HoldTime.attribute_id in attribute_list + + if has_hold_time: + # write 10 as a HoldTime attribute + await self.write_single_attribute(cluster.Attributes.HoldTime(hold_time)) + else: + logging.info("No HoldTime attribute supports. Will test only occupancy attribute triggering functionality") + + self.step(3) + # check if Occupancy attribute is 0 + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) + + # if occupancy is on, then wait until the sensor occupancy state is 0. + if occupancy_dut == 1: + # Don't trigger occupancy sensor to render occupancy attribute to 0 + if has_hold_time: + time.sleep(hold_time + 2.0) # add some extra 2 seconds to ensure hold time has passed. + else: # a user wait until a sensor specific time to change occupancy attribute to 0. This is the case where the sensor doesn't support HoldTime. + self.wait_for_user_input( + prompt_msg="Type any letter and press ENTER after the sensor occupancy is detection ready state (occupancy attribute = 0)") + + # check sensor occupancy state is 0 for the next test step + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) + asserts.assert_equal(occupancy_dut, 0, "Occupancy attribute is still 1.") + + self.step(4) + # Trigger occupancy sensor to change Occupancy attribute value to 1 => TESTER ACTION on DUT + self.wait_for_user_input(prompt_msg="Type any letter and press ENTER after a sensor occupancy is triggered.") + + # And then check if Occupancy attribute has changed. + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) + asserts.assert_equal(occupancy_dut, 1, "Occupancy state is not changed to 1") + + self.step(5) + # check if Occupancy attribute is back to 0 after HoldTime attribute period + # Tester should not be triggering the sensor for this test step. + if has_hold_time: + + # Start a timer based on HoldTime + time.sleep(hold_time + 2.0) # add some extra 2 seconds to ensure hold time has passed. + + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) + asserts.assert_equal(occupancy_dut, 0, "Occupancy state is not 0 after HoldTime period") + + else: + logging.info("HoldTime attribute not supported. Skip this return to 0 timing test procedure.") + self.skip_step(5) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_OCC_3_2.py b/src/python_testing/TC_OCC_3_2.py index 7e811362e2e2a4..64a588b6eb36e8 100644 --- a/src/python_testing/TC_OCC_3_2.py +++ b/src/python_testing/TC_OCC_3_2.py @@ -1,207 +1,204 @@ -# -# Copyright (c) 2024 Project CHIP (Matter) Authors -# All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments -# for details about the block below. -# -# === BEGIN CI TEST ARGUMENTS === -# test-runner-runs: run1 -# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/quiet: True -# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 -# === END CI TEST ARGUMENTS === - -# TODO: There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for -# the occupancy state ON/OFF change. -# [TC-OCC-3.1] test procedure step 4 -# [TC-OCC-3.2] test precedure step 3a, 3c - -import logging - -import chip.clusters as Clusters -from matter_testing_support import (ClusterAttributeChangeAccumulator, MatterBaseTest, TestStep, async_test_body, - await_sequence_of_reports, default_matter_test_main) -from mobly import asserts - - -class TC_OCC_3_2(MatterBaseTest): - async def read_occ_attribute_expect_success(self, attribute): - cluster = Clusters.Objects.OccupancySensing - endpoint_id = self.matter_test_config.endpoint - return await self.read_single_attribute_check_success(endpoint=endpoint_id, cluster=cluster, attribute=attribute) - - def desc_TC_OCC_3_2(self) -> str: - return "[TC-OCC-3.2] Subscription Report Verification with server as DUT" - - def steps_TC_OCC_3_2(self) -> list[TestStep]: - steps = [ - TestStep(1, "Commission DUT to TH if not already done", is_commissioning=True), - TestStep(2, "TH establishes a wildcard subscription to all attributes on Occupancy Sensing Cluster on the endpoint under test. Subscription min interval = 0 and max interval = 30 seconds."), - TestStep("3a", "Do not trigger DUT for occupancy state change."), - TestStep("3b", "TH reads DUT Occupancy attribute and saves the initial value as initial"), - TestStep("3c", "Trigger DUT to change the occupancy state."), - TestStep("3d", "TH awaits a ReportDataMessage containing an attribute report for DUT Occupancy attribute."), - TestStep("4a", "Check if DUT supports HoldTime attribute, If not supported, then stop and skip the rest of test cases."), - TestStep("4b", "TH reads DUT HoldTime attribute and saves the initial value as initial"), - TestStep("4c", "TH writes a different value to DUT HoldTime attribute."), - TestStep("4d", "TH awaits a ReportDataMessage containing an attribute report for DUT HoldTime attribute."), - TestStep("5a", "Check if DUT supports DUT feature flag PIR or OTHER, If not supported, then stop and skip to 6a."), - TestStep("5b", "TH reads DUT PIROccupiedToUnoccupiedDelay attribute and saves the initial value as initial"), - TestStep("5c", "TH writes a different value to DUT PIROccupiedToUnoccupiedDelay attribute."), - TestStep("5d", "TH awaits a ReportDataMessage containing an attribute report for DUT PIROccupiedToUnoccupiedDelay attribute."), - TestStep("6a", "Check if DUT supports DUT feature flag US, If not supported, then stop and skip to 7a."), - TestStep("6b", "TH reads DUT UltrasonicOccupiedToUnoccupiedDelay attribute and saves the initial value as initial"), - TestStep("6c", "TH writes a different value to DUT UltrasonicOccupiedToUnoccupiedDelay attribute."), - TestStep("6d", "TH awaits a ReportDataMessage containing an attribute report for DUT UltrasonicOccupiedToUnoccupiedDelay attribute."), - TestStep("7a", "Check if DUT supports DUT feature flag PHY, If not supported, terminate this test case."), - TestStep("7b", "TH reads DUT PhysicalContactOccupiedToUnoccupiedDelay attribute and saves the initial value as initial"), - TestStep("7c", "TH writes a different value to DUT PhysicalContactOccupiedToUnoccupiedDelay attribute."), - TestStep("7d", "TH awaits a ReportDataMessage containing an attribute report for DUT PhysicalContactOccupiedToUnoccupiedDelay attribute.") - ] - return steps - - def pics_TC_OCC_3_2(self) -> list[str]: - pics = [ - "OCC.S", - ] - return pics - - @async_test_body - async def test_TC_OCC_3_2(self): - endpoint_id = self.matter_test_config.endpoint - node_id = self.dut_node_id - dev_ctrl = self.default_controller - - post_prompt_settle_delay_seconds = 10.0 - cluster = Clusters.Objects.OccupancySensing - attributes = cluster.Attributes - - self.step(1) - - occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(attribute=attributes.OccupancySensorTypeBitmap) - has_type_pir = ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPir) != 0) or \ - ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic) != 0) - has_type_ultrasonic = ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kUltrasonic) != 0) or \ - ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic) != 0) - has_type_contact = (occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPhysicalContact) != 0 - - attribute_list = await self.read_occ_attribute_expect_success(attribute=attributes.AttributeList) - has_pir_timing_attrib = attributes.PIROccupiedToUnoccupiedDelay.attribute_id in attribute_list - has_ultrasonic_timing_attrib = attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list - has_contact_timing_attrib = attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list - - self.step(2) - # min interval = 0, and max interval = 30 seconds - attrib_listener = ClusterAttributeChangeAccumulator(Clusters.Objects.OccupancySensing) - await attrib_listener.start(dev_ctrl, node_id, endpoint=endpoint_id, min_interval_sec=0, max_interval_sec=30) - - # TODO - Will add Namepiped to assimilate the manual sensor untrigger here - self.step("3a") - self.wait_for_user_input(prompt_msg="Type any letter and press ENTER after DUT goes back to unoccupied state.") - - self.step("3b") - if attributes.Occupancy.attribute_id in attribute_list: - initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) - asserts.assert_equal(initial_dut, 0, "Occupancy attribute is still detected state") - - # TODO - Will add Namepiped to assimilate the manual sensor trigger here - self.step("3c") - self.wait_for_user_input( - prompt_msg="Type any letter and press ENTER after the sensor occupancy is triggered and its occupancy state changed.") - - self.step("3d") - await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.Occupancy, sequence=[ - 0, 1], timeout_sec=post_prompt_settle_delay_seconds) - - self.step("4a") - if attributes.HoldTime.attribute_id not in attribute_list: - logging.info("No HoldTime attribute supports. Terminate this test case") - self.skip_all_remaining_steps("4b") - - self.step("4b") - initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.HoldTime) - - self.step("4c") - # write a different a HoldTime attribute value - diff_val = 12 - await self.write_single_attribute(attributes.HoldTime(diff_val)) - - self.step("4d") - await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.HoldTime, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) - - self.step("5a") - if not has_type_pir or not has_pir_timing_attrib: - logging.info("No PIR timing attribute support. Skip this steps 5b, 5c, 5d") - self.skip_step("5b") - self.skip_step("5c") - self.skip_step("5d") - else: - self.step("5b") - initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.PIROccupiedToUnoccupiedDelay) - - self.step("5c") - # write the new attribute value - diff_val = 11 - await self.write_single_attribute(attributes.PIROccupiedToUnoccupiedDelay(diff_val)) - - self.step("5d") - await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PIROccupiedToUnoccupiedDelay, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) - - self.step("6a") - if not has_type_ultrasonic or not has_ultrasonic_timing_attrib: - logging.info("No Ultrasonic timing attribute supports. Skip steps 6b, 6c, 6d") - self.skip_step("6b") - self.skip_step("6c") - self.skip_step("6d") - else: - self.step("6b") - initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) - - self.step("6c") - # write the new attribute value - diff_val = 14 - await self.write_single_attribute(attributes.UltrasonicOccupiedToUnoccupiedDelay(diff_val)) - - self.step("6d") - await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.UltrasonicOccupiedToUnoccupiedDelay, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) - - self.step("7a") - if not has_type_contact or not has_contact_timing_attrib: - logging.info("No Physical contact timing attribute supports. Skip this test case") - self.skip_step("7b") - self.skip_step("7c") - self.skip_step("7d") - else: - self.step("7b") - initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) - - self.step("7c") - # write the new attribute value - diff_val = 9 - await self.write_single_attribute(attributes.PhysicalContactOccupiedToUnoccupiedDelay(diff_val)) - - self.step("7d") - await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PhysicalContactOccupiedToUnoccupiedDelay, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) - - -if __name__ == "__main__": - default_matter_test_main() +# +# Copyright (c) 2024 Project CHIP (Matter) Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# TODO: There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for +# the occupancy state ON/OFF change. +# [TC-OCC-3.1] test procedure step 4 +# [TC-OCC-3.2] test precedure step 3a, 3c + +import logging + +import chip.clusters as Clusters +from matter_testing_support import (ClusterAttributeChangeAccumulator, MatterBaseTest, TestStep, async_test_body, + await_sequence_of_reports, default_matter_test_main) +from mobly import asserts + + +class TC_OCC_3_2(MatterBaseTest): + async def read_occ_attribute_expect_success(self, attribute): + cluster = Clusters.Objects.OccupancySensing + endpoint_id = self.matter_test_config.endpoint + return await self.read_single_attribute_check_success(endpoint=endpoint_id, cluster=cluster, attribute=attribute) + + def desc_TC_OCC_3_2(self) -> str: + return "[TC-OCC-3.2] Subscription Report Verification with server as DUT" + + def steps_TC_OCC_3_2(self) -> list[TestStep]: + steps = [ + TestStep(1, "Commission DUT to TH if not already done", is_commissioning=True), + TestStep(2, "TH establishes a wildcard subscription to all attributes on Occupancy Sensing Cluster on the endpoint under test. Subscription min interval = 0 and max interval = 30 seconds."), + TestStep("3a", "Do not trigger DUT for occupancy state change."), + TestStep("3b", "TH reads DUT Occupancy attribute and saves the initial value as initial"), + TestStep("3c", "Trigger DUT to change the occupancy state."), + TestStep("3d", "TH awaits a ReportDataMessage containing an attribute report for DUT Occupancy attribute."), + TestStep("4a", "Check if DUT supports HoldTime attribute, If not supported, then stop and skip the rest of test cases."), + TestStep("4b", "TH reads DUT HoldTime attribute and saves the initial value as initial"), + TestStep("4c", "TH writes a different value to DUT HoldTime attribute."), + TestStep("4d", "TH awaits a ReportDataMessage containing an attribute report for DUT HoldTime attribute."), + TestStep("5a", "Check if DUT supports DUT feature flag PIR or (!PIR & !US & !PHY) and has the PIROccupiedToUnoccupiedDelay attribute, If not supported, then skip 5b, 5c, 5d."), + TestStep("5b", "TH reads DUT PIROccupiedToUnoccupiedDelay attribute and saves the initial value as initial"), + TestStep("5c", "TH writes a different value to DUT PIROccupiedToUnoccupiedDelay attribute."), + TestStep("5d", "TH awaits a ReportDataMessage containing an attribute report for DUT PIROccupiedToUnoccupiedDelay attribute."), + TestStep("6a", "Check if DUT supports DUT feature flag US and has the UltrasonicOccupiedToUnoccupiedDelay attribute. If not supported, then skip 6b, 6c, 6d."), + TestStep("6b", "TH reads DUT UltrasonicOccupiedToUnoccupiedDelay attribute and saves the initial value as initial"), + TestStep("6c", "TH writes a different value to DUT UltrasonicOccupiedToUnoccupiedDelay attribute."), + TestStep("6d", "TH awaits a ReportDataMessage containing an attribute report for DUT UltrasonicOccupiedToUnoccupiedDelay attribute."), + TestStep("7a", "Check if DUT supports DUT feature flag PHY and has the PhysicalContactOccupiedToUnoccupiedDelay attribute. If not supported, skip 7b, 7c, 7d."), + TestStep("7b", "TH reads DUT PhysicalContactOccupiedToUnoccupiedDelay attribute and saves the initial value as initial"), + TestStep("7c", "TH writes a different value to DUT PhysicalContactOccupiedToUnoccupiedDelay attribute."), + TestStep("7d", "TH awaits a ReportDataMessage containing an attribute report for DUT PhysicalContactOccupiedToUnoccupiedDelay attribute.") + ] + return steps + + def pics_TC_OCC_3_2(self) -> list[str]: + pics = [ + "OCC.S", + ] + return pics + + @async_test_body + async def test_TC_OCC_3_2(self): + endpoint_id = self.matter_test_config.endpoint + node_id = self.dut_node_id + dev_ctrl = self.default_controller + + post_prompt_settle_delay_seconds = 10.0 + cluster = Clusters.Objects.OccupancySensing + attributes = cluster.Attributes + + self.step(1) # Commissioning already done + + self.step(2) + feature_map = await self.read_occ_attribute_expect_success(attribute=attributes.FeatureMap) + has_feature_pir = (feature_map & cluster.Bitmaps.Feature.kPassiveInfrared) != 0 + has_feature_ultrasonic = (feature_map & cluster.Bitmaps.Feature.kUltrasonic) != 0 + has_feature_contact = (feature_map & cluster.Bitmaps.Feature.kPhysicalContact) != 0 + + logging.info( + f"Feature map: 0x{feature_map:x}. PIR: {has_feature_pir}, US:{has_feature_ultrasonic}, PHY:{has_feature_contact}") + + attribute_list = await self.read_occ_attribute_expect_success(attribute=attributes.AttributeList) + has_pir_timing_attrib = attributes.PIROccupiedToUnoccupiedDelay.attribute_id in attribute_list + has_ultrasonic_timing_attrib = attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list + has_contact_timing_attrib = attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list + logging.info(f"Attribute list: {attribute_list}") + logging.info(f"--> Has PIROccupiedToUnoccupiedDelay: {has_pir_timing_attrib}") + logging.info(f"--> Has UltrasonicOccupiedToUnoccupiedDelay: {has_ultrasonic_timing_attrib}") + logging.info(f"--> Has PhysicalContactOccupiedToUnoccupiedDelay: {has_contact_timing_attrib}") + + # min interval = 0, and max interval = 30 seconds + attrib_listener = ClusterAttributeChangeAccumulator(Clusters.Objects.OccupancySensing) + await attrib_listener.start(dev_ctrl, node_id, endpoint=endpoint_id, min_interval_sec=0, max_interval_sec=30) + + # TODO - Will add Namepiped to assimilate the manual sensor untrigger here + self.step("3a") + self.wait_for_user_input(prompt_msg="Type any letter and press ENTER after DUT goes back to unoccupied state.") + + self.step("3b") + if attributes.Occupancy.attribute_id in attribute_list: + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) + asserts.assert_equal(initial_dut, 0, "Occupancy attribute is still detected state") + + # TODO - Will add Namepiped to assimilate the manual sensor trigger here + self.step("3c") + self.wait_for_user_input( + prompt_msg="Type any letter and press ENTER after the sensor occupancy is triggered and its occupancy state changed.") + + self.step("3d") + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.Occupancy, sequence=[ + 1], timeout_sec=post_prompt_settle_delay_seconds) + + self.step("4a") + if attributes.HoldTime.attribute_id not in attribute_list: + logging.info("No HoldTime attribute supports. Terminate this test case") + self.skip_all_remaining_steps("4b") + + self.step("4b") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.HoldTime) + + self.step("4c") + # write a different a HoldTime attribute value + diff_val = 12 + await self.write_single_attribute(attributes.HoldTime(diff_val)) + + self.step("4d") + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.HoldTime, sequence=[ + diff_val], timeout_sec=post_prompt_settle_delay_seconds) + + self.step("5a") + + has_no_legacy_features = ((not has_feature_pir) and (not has_feature_ultrasonic) and (not has_feature_contact)) + + if has_pir_timing_attrib and (has_feature_pir or has_no_legacy_features): + self.step("5b") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.PIROccupiedToUnoccupiedDelay) + + self.step("5c") + # write the new attribute value + diff_val = 11 + await self.write_single_attribute(attributes.PIROccupiedToUnoccupiedDelay(diff_val)) + + self.step("5d") + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PIROccupiedToUnoccupiedDelay, sequence=[ + diff_val], timeout_sec=post_prompt_settle_delay_seconds) + else: + logging.info("No PIR timing attribute support. Skipping steps 5b, 5c, 5d") + self.skip_step("5b") + self.skip_step("5c") + self.skip_step("5d") + + self.step("6a") + if not has_feature_ultrasonic or not has_ultrasonic_timing_attrib: + logging.info("No Ultrasonic timing attribute supports. Skipping steps 6b, 6c, 6d") + self.skip_step("6b") + self.skip_step("6c") + self.skip_step("6d") + else: + self.step("6b") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) + + self.step("6c") + # write the new attribute value + diff_val = 14 + await self.write_single_attribute(attributes.UltrasonicOccupiedToUnoccupiedDelay(diff_val)) + + self.step("6d") + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.UltrasonicOccupiedToUnoccupiedDelay, sequence=[ + diff_val], timeout_sec=post_prompt_settle_delay_seconds) + + self.step("7a") + if not has_feature_contact or not has_contact_timing_attrib: + logging.info("No Physical contact timing attribute supports. Skipping steps 7b, 7c, 7d") + self.skip_step("7b") + self.skip_step("7c") + self.skip_step("7d") + else: + self.step("7b") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) + + self.step("7c") + # write the new attribute value + diff_val = 9 + await self.write_single_attribute(attributes.PhysicalContactOccupiedToUnoccupiedDelay(diff_val)) + + self.step("7d") + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PhysicalContactOccupiedToUnoccupiedDelay, sequence=[ + diff_val], timeout_sec=post_prompt_settle_delay_seconds) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index eb8a6ba20d63b9..d3d810149ce5dd 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -334,6 +334,15 @@ def wait_for_report(self): asserts.fail(f"[AttributeChangeCallback] Attribute {self._expected_attribute} not found in returned report") +def clear_queue(report_queue: queue.Queue): + """Flush all contents of a report queue. Useful to get back to empty point.""" + while not report_queue.empty(): + try: + report_queue.get(block=False) + except queue.Empty: + break + + def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): """Given a queue.Queue hooked-up to an attribute change accumulator, await a given expected sequence of attribute reports. @@ -344,6 +353,9 @@ def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attri - sequence: list of attribute values in order that are expected. - timeout_sec: number of seconds to wait for. + *** WARNING: The queue contains every report since the sub was established. Use + clear_queue to make it empty. *** + This will fail current Mobly test with assertion failure if the data is not as expected in order. Returns nothing on success so the test can go on. @@ -446,6 +458,20 @@ def attribute_report_counts(self) -> dict[ClusterObjects.ClusterAttributeDescrip def attribute_reports(self) -> dict[ClusterObjects.ClusterAttributeDescriptor, AttributeValue]: return self._attribute_reports + def get_last_report(self) -> Optional[Any]: + """Flush entire queue, returning last (newest) report only.""" + last_report: Optional[Any] = None + while True: + try: + last_report = self._q.get(block=False) + except queue.Empty: + return last_report + + def flush_reports(self) -> None: + """Flush entire queue, returning nothing.""" + _ = self.get_last_report() + return + class InternalTestRunnerHooks(TestRunnerHooks):