From 1913dbab1d235f6ecc7733cd2ed2e7214001ccb6 Mon Sep 17 00:00:00 2001 From: William Date: Fri, 9 Feb 2024 11:38:09 +0000 Subject: [PATCH] Enable rvc python tests to run in ci (#31872) * Replaced the vendor specific mode tag for the mapping mode with the mapping mode tag. * Updated the rvc-app's RvcOperationalStateDelegate to allow setting of a callback furtion for when the GoHome commond in received. * Implemented the GoHome handler for rvc-app. * Updated the PICS file for the rvc-app. * Fixed the Admin commissioning cluter's zap config for the rvc-app * Renamed the PICS values file to be similar to the ci file. Updated the script that runs all the RVC yaml tests. * Updated the rvc-app state machine diagram. * Updated theh rvc-app README. * Restyled by clang-format * Restyled by prettier-markdown * Restyled by shfmt * Updated the rvc-app .matter file. * Reduced the text of some errors as it was longer that allowed. * Added an out-of-band message to reset the rvc-app to its start-up state. * Updated a returend error following the spec change. * Excluded the RVC state diagram png from the lint check for %zu. * Added the PICS_SDK_CI_ONLY PICS to the rvc-app-pics-values to allow it to run tests in CI. * Improved the TC_RVCCLEANM_2_1 test to allow the CI to run the tests against the rvc-app. * Improved the TC_RVCRUNM_2_1 test to allow the CI to run the tests against the rvc-app. * Improved the TC_RVCCLEAN_2_2 test to allow the CI to run the tests against the rvc-app. * Improved the TC_RVCRUNN_2_2 test to allow the CI to run the tests against the rvc-app. * Improved the TC_RVCOPSTATE_2_1 test to allow the CI to run the tests against the rvc-app. * Improved the TC_RVCOPSTATE_2_3 test to allow the CI to run the tests against the rvc-app. * Restyled by autopep8 * Restyled by isort * Updated the rvc-app readme. * Removed PIXIT settings from the rvc-app-pics values. * Excluded all binary files from the lint check for %zu. * Fixed typos from code review Co-authored-by: Petru Lauric <81822411+plauric@users.noreply.github.com> * Re-removed the OpenBasicCommissioningWindow command of the Administrator Commissioning cluster in the rvc-app due to security concerns. * Updated the rvc-app .matter file. * Removed sleeps between messages to the DUT as they are not needed. * Restyled by isort * Fixed extra imports in python tests. --------- Co-authored-by: Restyled.io Co-authored-by: Petru Lauric <81822411+plauric@users.noreply.github.com> --- .../rvc-common/pics/rvc-app-pics-values | 2 + src/python_testing/TC_RVCCLEANM_2_1.py | 47 +++++-- src/python_testing/TC_RVCCLEANM_2_2.py | 33 ++++- src/python_testing/TC_RVCOPSTATE_2_1.py | 119 +++++++++++++++--- src/python_testing/TC_RVCOPSTATE_2_3.py | 80 ++++++++++-- src/python_testing/TC_RVCRUNM_2_1.py | 29 ++++- src/python_testing/TC_RVCRUNM_2_2.py | 28 +++-- 7 files changed, 285 insertions(+), 53 deletions(-) diff --git a/examples/rvc-app/rvc-common/pics/rvc-app-pics-values b/examples/rvc-app/rvc-common/pics/rvc-app-pics-values index 6b094132ed5c22..2a6adeb31e38f6 100644 --- a/examples/rvc-app/rvc-common/pics/rvc-app-pics-values +++ b/examples/rvc-app/rvc-common/pics/rvc-app-pics-values @@ -1,3 +1,5 @@ +PICS_SDK_CI_ONLY=1 + RVCCLEANM.S=1 RVCCLEANM.S.A0000=1 RVCCLEANM.S.A0001=1 diff --git a/src/python_testing/TC_RVCCLEANM_2_1.py b/src/python_testing/TC_RVCCLEANM_2_1.py index 4f553206eb0a7f..d000f0d5af697d 100644 --- a/src/python_testing/TC_RVCCLEANM_2_1.py +++ b/src/python_testing/TC_RVCCLEANM_2_1.py @@ -33,17 +33,30 @@ def __init__(self, *args): self.endpoint = 0 self.mode_ok = 0 self.mode_fail = 0 + self.is_ci = False + self.app_pipe = "/tmp/chip_rvc_fifo_" async def read_mod_attribute_expect_success(self, endpoint, attribute): cluster = Clusters.Objects.RvcCleanMode return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - async def send_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcCleanMode.Commands.ChangeToModeResponse: + async def send_clean_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcCleanMode.Commands.ChangeToModeResponse: ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcCleanMode.Commands.ChangeToMode(newMode=newMode), endpoint=self.endpoint) asserts.assert_true(type_matches(ret, Clusters.Objects.RvcCleanMode.Commands.ChangeToModeResponse), - "Unexpected return type for ChangeToMode") + "Unexpected return type for RVC Clean Mode ChangeToMode") return ret + async def send_run_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcRunMode.Commands.ChangeToModeResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=newMode), endpoint=self.endpoint) + asserts.assert_true(type_matches(ret, Clusters.Objects.RvcRunMode.Commands.ChangeToModeResponse), + "Unexpected return type for RVC Run Mode ChangeToMode") + return ret + + # Sends and out-of-band command to the rvc-app + def write_to_app_pipe(self, command): + with open(self.app_pipe, "w") as app_pipe: + app_pipe.write(command + "\n") + def pics_TC_RVCCLEANM_2_1(self) -> list[str]: return ["RVCCLEANM.S"] @@ -60,6 +73,12 @@ async def test_TC_RVCCLEANM_2_1(self): self.endpoint = self.matter_test_config.endpoint self.mode_ok = self.matter_test_config.global_test_params['PIXIT.RVCCLEANM.MODE_CHANGE_OK'] self.mode_fail = self.matter_test_config.global_test_params['PIXIT.RVCCLEANM.MODE_CHANGE_FAIL'] + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") + if self.is_ci: + app_pid = self.matter_test_config.app_pid + if app_pid == 0: + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set") + self.app_pipe = self.app_pipe + str(app_pid) asserts.assert_true(self.check_pics("RVCCLEANM.S.A0000"), "RVCCLEANM.S.A0000 must be supported") asserts.assert_true(self.check_pics("RVCCLEANM.S.A0001"), "RVCCLEANM.S.A0001 must be supported") @@ -70,6 +89,10 @@ async def test_TC_RVCCLEANM_2_1(self): self.print_step(1, "Commissioning, already done") + # Ensure that the device is in the correct state + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + self.print_step(2, "Read SupportedModes attribute") supported_modes = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.SupportedModes) @@ -100,14 +123,18 @@ class CommonCodes(Enum): self.print_step(4, "Send ChangeToMode command with NewMode set to %d" % (old_current_mode)) - ret = await self.send_change_to_mode_cmd(newMode=old_current_mode) + ret = await self.send_clean_change_to_mode_cmd(newMode=old_current_mode) asserts.assert_true(ret.status == CommonCodes.SUCCESS.value, "Changing the mode to the current mode should be a no-op") if self.check_pics("RVCCLEANM.S.M.CAN_TEST_MODE_FAILURE"): asserts.assert_true(self.mode_fail in modes, "The MODE_CHANGE_FAIL PIXIT value (%d) is not a supported mode" % (self.mode_fail)) self.print_step(5, "Manually put the device in a state from which it will FAIL to transition to mode %d" % (self.mode_fail)) - input("Press Enter when done.\n") + if self.is_ci: + print("Changing mode to Cleaning") + await self.send_run_change_to_mode_cmd(1) + else: + input("Press Enter when done.\n") self.print_step(6, "Read CurrentMode attribute") old_current_mode = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentMode) @@ -116,7 +143,7 @@ class CommonCodes(Enum): self.print_step(7, "Send ChangeToMode command with NewMode set to %d" % (self.mode_fail)) - ret = await self.send_change_to_mode_cmd(newMode=self.mode_fail) + ret = await self.send_clean_change_to_mode_cmd(newMode=self.mode_fail) st = ret.status is_mfg_code = st in range(0x80, 0xC0) is_err_code = (st == CommonCodes.GENERIC_FAILURE.value) or ( @@ -134,7 +161,11 @@ class CommonCodes(Enum): asserts.assert_true(current_mode == old_current_mode, "CurrentMode changed after failed ChangeToMode command!") self.print_step(9, "Manually put the device in a state from which it will SUCCESSFULLY transition to mode %d" % (self.mode_ok)) - input("Press Enter when done.\n") + if self.is_ci: + print("Changing mode to Idle") + await self.send_run_change_to_mode_cmd(0) + else: + input("Press Enter when done.\n") self.print_step(10, "Read CurrentMode attribute") old_current_mode = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentMode) @@ -143,7 +174,7 @@ class CommonCodes(Enum): self.print_step(11, "Send ChangeToMode command with NewMode set to %d" % (self.mode_ok)) - ret = await self.send_change_to_mode_cmd(newMode=self.mode_ok) + ret = await self.send_clean_change_to_mode_cmd(newMode=self.mode_ok) asserts.assert_true(ret.status == CommonCodes.SUCCESS.value, "Changing to mode %d must succeed due to the current state of the device" % (self.mode_ok)) @@ -157,7 +188,7 @@ class CommonCodes(Enum): self.print_step(13, "Send ChangeToMode command with NewMode set to %d" % (invalid_mode)) - ret = await self.send_change_to_mode_cmd(newMode=invalid_mode) + ret = await self.send_clean_change_to_mode_cmd(newMode=invalid_mode) asserts.assert_true(ret.status == CommonCodes.UNSUPPORTED_MODE.value, "Attempt to change to invalid mode %d didn't fail as expected" % (invalid_mode)) diff --git a/src/python_testing/TC_RVCCLEANM_2_2.py b/src/python_testing/TC_RVCCLEANM_2_2.py index a4107e12c70665..ae5c268d7a9668 100644 --- a/src/python_testing/TC_RVCCLEANM_2_2.py +++ b/src/python_testing/TC_RVCCLEANM_2_2.py @@ -31,6 +31,8 @@ def __init__(self, *args): self.run_mode_dut = 0 self.old_clean_mode_dut = 0 self.new_clean_mode_th = 0 + self.is_ci = False + self.app_pipe = "/tmp/chip_rvc_fifo_" async def read_mod_attribute_expect_success(self, cluster, attribute): return await self.read_single_attribute_check_success( @@ -48,10 +50,14 @@ async def read_clean_supported_modes(self) -> Clusters.Objects.RvcCleanMode.Attr Clusters.RvcCleanMode.Attributes.SupportedModes) return ret - async def send_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcCleanMode.Commands.ChangeToModeResponse: + async def send_clean_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcCleanMode.Commands.ChangeToModeResponse: ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcCleanMode.Commands.ChangeToMode(newMode=newMode), endpoint=self.endpoint) return ret + async def send_run_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcRunMode.Commands.ChangeToModeResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=newMode), endpoint=self.endpoint) + return ret + # Prints the instruction and waits for a user input to continue def print_instruction(self, step_number, instruction): self.print_step(step_number, instruction) @@ -60,9 +66,20 @@ def print_instruction(self, step_number, instruction): def pics_TC_RVCCLEANM_2_2(self) -> list[str]: return ["RVCCLEANM.S"] + # Sends and out-of-band command to the rvc-app + def write_to_app_pipe(self, command): + with open(self.app_pipe, "w") as app_pipe: + app_pipe.write(command + "\n") + @async_test_body async def test_TC_RVCCLEANM_2_2(self): self.endpoint = self.matter_test_config.endpoint + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") + if self.is_ci: + app_pid = self.matter_test_config.app_pid + if app_pid == 0: + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c") + self.app_pipe = self.app_pipe + str(app_pid) asserts.assert_true(self.check_pics("RVCCLEANM.S"), "RVCCLEANM.S must be supported") asserts.assert_true(self.check_pics("RVCRUNM.S.A0000"), "RVCRUNM.S.A0000 must be supported") @@ -70,8 +87,16 @@ async def test_TC_RVCCLEANM_2_2(self): self.print_step(1, "Commissioning, already done") - self.print_instruction(2, "Manually put the device in a state in which the RVC Run Mode " - "cluster’s CurrentMode attribute is set to a mode without the Idle mode tag.") + # Ensure that the device is in the correct state + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + + self.print_step( + 2, "Manually put the device in a state in which the RVC Run Mode cluster’s CurrentMode attribute is set to a mode without the Idle mode tag.") + if self.is_ci: + await self.send_run_change_to_mode_cmd(1) + else: + input("Press Enter when done.\n") self.print_step(3, "Read the RvcRunMode SupportedModes attribute") supported_run_modes = await self.read_run_supported_modes() @@ -115,7 +140,7 @@ async def test_TC_RVCCLEANM_2_2(self): break self.print_step(7, "Send ChangeToMode command") - response = await self.send_change_to_mode_cmd(self.new_clean_mode_th) + response = await self.send_clean_change_to_mode_cmd(self.new_clean_mode_th) asserts.assert_equal(response.status, 3, "The response should contain a ChangeToModeResponse command " "with the Status set to InvalidInMode(0x03).") diff --git a/src/python_testing/TC_RVCOPSTATE_2_1.py b/src/python_testing/TC_RVCOPSTATE_2_1.py index a74a27616b7b09..b0d753ad216164 100644 --- a/src/python_testing/TC_RVCOPSTATE_2_1.py +++ b/src/python_testing/TC_RVCOPSTATE_2_1.py @@ -27,6 +27,8 @@ class TC_RVCOPSTATE_2_1(MatterBaseTest): def __init__(self, *args): super().__init__(*args) self.endpoint = None + self.is_ci = False + self.app_pipe = "/tmp/chip_rvc_fifo_" async def read_mod_attribute_expect_success(self, endpoint, attribute): cluster = Clusters.Objects.RvcOperationalState @@ -48,6 +50,20 @@ async def read_and_validate_operror(self, step, expected_error): asserts.assert_equal(operational_error.errorStateID, expected_error, "errorStateID(%s) should equal %s" % (operational_error.errorStateID, expected_error)) + async def send_run_change_to_mode_cmd(self, new_mode) -> Clusters.Objects.RvcRunMode.Commands.ChangeToModeResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode), + endpoint=self.endpoint) + return ret + + async def send_pause_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcOperationalState.Commands.Pause(), endpoint=self.endpoint) + return ret + + # Sends and out-of-band command to the rvc-app + def write_to_app_pipe(self, command): + with open(self.app_pipe, "w") as app_pipe: + app_pipe.write(command + "\n") + def TC_RVCOPSTATE_2_1(self) -> list[str]: return ["RVCOPSTATE.S"] @@ -55,11 +71,21 @@ def TC_RVCOPSTATE_2_1(self) -> list[str]: async def test_TC_RVCOPSTATE_2_1(self): self.endpoint = self.matter_test_config.endpoint asserts.assert_false(self.endpoint is None, "--endpoint must be included on the command line in.") + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") + if self.is_ci: + app_pid = self.matter_test_config.app_pid + if app_pid == 0: + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set") + self.app_pipe = self.app_pipe + str(app_pid) attributes = Clusters.RvcOperationalState.Attributes self.print_step(1, "Commissioning, already done") + # Ensure that the device is in the correct state + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + if self.check_pics("RVCOPSTATE.S.A0000"): self.print_step(2, "Read PhaseList attribute") phase_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.PhaseList) @@ -129,31 +155,52 @@ async def test_TC_RVCOPSTATE_2_1(self): if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"): self.print_step("6a", "Manually put the device in the stopped state") - input("Press Enter when done.\n") + if not self.is_ci: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6b", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kStopped) if self.check_pics("RVCOPSTATE.S.M.ST_RUNNING"): self.print_step("6c", "Manually put the device in the running state") - input("Press Enter when done.\n") + if self.is_ci: + await self.send_run_change_to_mode_cmd(1) + else: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6d", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kRunning) if self.check_pics("RVCOPSTATE.S.M.ST_PAUSED"): self.print_step("6e", "Manually put the device in the paused state") - input("Press Enter when done.\n") + if self.is_ci: + await self.send_pause_cmd() + else: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6f", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kPaused) if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"): self.print_step("6g", "Manually put the device in the error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6h", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kError) if self.check_pics("RVCOPSTATE.S.M.ST_SEEKING_CHARGER"): self.print_step("6i", "Manually put the device in the seeking charger state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + await self.send_run_change_to_mode_cmd(1) + await self.send_run_change_to_mode_cmd(0) + else: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6j", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kSeekingCharger) if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"): self.print_step("6k", "Manually put the device in the charging state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ChargerFound"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6l", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kCharging) if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"): self.print_step("6m", "Manually put the device in the docked state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "Charged"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6n", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked) if self.check_pics("RVCOPSTATE.S.A0005"): @@ -176,51 +223,85 @@ async def test_TC_RVCOPSTATE_2_1(self): if self.check_pics("RVCOPSTATE.S.M.ERR_NO_ERROR"): self.print_step("7a", "Manually put the device in the no error state") - input("Press Enter when done.\n") + if not self.is_ci: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7b", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kNoError) if self.check_pics("RVCOPSTATE.S.M.ERR_UNABLE_TO_START_OR_RESUME"): self.print_step("7c", "Manually put the device in the unable to start or resume error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7d", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToStartOrResume) if self.check_pics("RVCOPSTATE.S.M.ERR_UNABLE_TO_COMPLETE_OPERATION"): self.print_step("7e", "Manually put the device in the unable to complete operation error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToCompleteOperation"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7f", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToCompleteOperation) if self.check_pics("RVCOPSTATE.S.M.ERR_COMMAND_INVALID_IN_STATE"): self.print_step("7g", "Manually put the device in the command invalid error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "CommandInvalidInState"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7h", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kCommandInvalidInState) if self.check_pics("RVCOPSTATE.S.M.ERR_FAILED_TO_FIND_CHARGING_DOCK"): self.print_step("7i", "Manually put the device in the failed to find dock error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "FailedToFindChargingDock"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7j", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kFailedToFindChargingDock) if self.check_pics("RVCOPSTATE.S.M.ERR_STUCK"): self.print_step("7k", "Manually put the device in the stuck error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "Stuck"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7l", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kStuck) if self.check_pics("RVCOPSTATE.S.M.ERR_DUST_BIN_MISSING"): self.print_step("7m", "Manually put the device in the dust bin missing error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "DustBinMissing"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7n", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinMissing) if self.check_pics("RVCOPSTATE.S.M.ERR_DUST_BIN_FULL"): self.print_step("7o", "Manually put the device in the dust bin full error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "DustBinFull"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7p", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinFull) if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_EMPTY"): self.print_step("7q", "Manually put the device in the water tank empty error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankEmpty"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7r", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankEmpty) if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_MISSING"): self.print_step("7s", "Manually put the device in the water tank missing error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankMissing"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7t", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankMissing) if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_LID_OPEN"): self.print_step("7u", "Manually put the device in the water tank lid open error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankLidOpen"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7v", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankLidOpen) if self.check_pics("RVCOPSTATE.S.M.ERR_MOP_CLEANING_PAD_MISSING"): self.print_step("7w", "Manually put the device in the mop cleaning pad missing error state") - input("Press Enter when done.\n") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "MopCleaningPadMissing"}') + else: + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7x", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing) diff --git a/src/python_testing/TC_RVCOPSTATE_2_3.py b/src/python_testing/TC_RVCOPSTATE_2_3.py index da3f1522d74976..4d275ac5adc5b9 100644 --- a/src/python_testing/TC_RVCOPSTATE_2_3.py +++ b/src/python_testing/TC_RVCOPSTATE_2_3.py @@ -16,7 +16,7 @@ # import logging -import time +from time import sleep import chip.clusters as Clusters from chip.clusters.Types import NullValue @@ -71,15 +71,14 @@ def error_enum_to_text(error_enum): elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing: return "MopCleaningPadMissing(0x47)" - def pics_TC_RVCOPSTATE_2_3(self) -> list[str]: - return ["RVCOPSTATE.S"] - class TC_RVCOPSTATE_2_3(MatterBaseTest): def __init__(self, *args): super().__init__(*args) self.endpoint = None + self.is_ci = False + self.app_pipe = "/tmp/chip_rvc_fifo_" async def read_mod_attribute_expect_success(self, endpoint, attribute): cluster = Clusters.Objects.RvcOperationalState @@ -122,16 +121,35 @@ async def send_resume_cmd_with_check(self, step_number, expected_error): "errorStateID(%s) should be %s" % (ret.commandResponseState.errorStateID, error_enum_to_text(expected_error))) + async def send_run_change_to_mode_cmd(self, new_mode) -> Clusters.Objects.RvcRunMode.Commands.ChangeToModeResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode), + endpoint=self.endpoint) + return ret + + # Sends and out-of-band command to the rvc-app + def write_to_app_pipe(self, command): + with open(self.app_pipe, "w") as app_pipe: + app_pipe.write(command + "\n") + # Prints the instruction and waits for a user input to continue def print_instruction(self, step_number, instruction): self.print_step(step_number, instruction) input("Press Enter when done.\n") + def pics_TC_RVCOPSTATE_2_3(self) -> list[str]: + return ["RVCOPSTATE.S"] + @async_test_body async def test_TC_RVCOPSTATE_2_3(self): self.endpoint = self.matter_test_config.endpoint asserts.assert_false(self.endpoint is None, "--endpoint must be included on the command line in.") + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") + if self.is_ci: + app_pid = self.matter_test_config.app_pid + if app_pid == 0: + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c") + self.app_pipe = self.app_pipe + str(app_pid) asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0003"), "RVCOPSTATE.S.A0003 must be supported") asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0004"), "RVCOPSTATE.S.A0004 must be supported") @@ -147,7 +165,15 @@ async def test_TC_RVCOPSTATE_2_3(self): self.print_step(1, "Commissioning, already done") - self.print_instruction(2, "Manually put the device in a state where it can receive a Pause command") + # Ensure that the device is in the correct state + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + + self.print_step(2, "Manually put the device in a state where it can receive a Pause command") + if self.is_ci: + await self.send_run_change_to_mode_cmd(1) + else: + input("Press Enter when done.\n") self.print_step(3, "Read OperationalStateList attribute") op_state_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, @@ -182,7 +208,7 @@ async def test_TC_RVCOPSTATE_2_3(self): "invalid CountdownTime(%s). Must be in between 1 and 259200, or null " % initial_countdown_time) self.print_step(8, "Waiting for 5 seconds") - time.sleep(5) + sleep(5) self.print_step(9, "Read CountdownTime attribute") countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CountdownTime) @@ -224,7 +250,11 @@ async def test_TC_RVCOPSTATE_2_3(self): await self.read_operational_state_with_check(23, op_states.kRunning) if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"): - self.print_instruction(24, "Manually put the device in the Stopped(0x00) operational state") + self.print_step(24, "Manually put the device in the Stopped(0x00) operational state") + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + else: + input("Press Enter when done.\n") await self.read_operational_state_with_check(25, op_states.kStopped) @@ -233,7 +263,11 @@ async def test_TC_RVCOPSTATE_2_3(self): await self.send_resume_cmd_with_check(27, op_errors.kCommandInvalidInState) if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"): - self.print_instruction(28, "Manually put the device in the Error(0x03) operational state") + self.print_step(28, "Manually put the device in the Error(0x03) operational state") + if self.is_ci: + self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "Stuck"}') + else: + input("Press Enter when done.\n") await self.read_operational_state_with_check(29, op_states.kError) @@ -242,33 +276,53 @@ async def test_TC_RVCOPSTATE_2_3(self): await self.send_resume_cmd_with_check(31, op_errors.kCommandInvalidInState) if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"): - self.print_instruction(32, "Manually put the device in the Charging(0x41) operational state") + self.print_step(32, "Manually put the device in the Charging(0x41) operational state") + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + await self.send_run_change_to_mode_cmd(1) + await self.send_run_change_to_mode_cmd(0) + self.write_to_app_pipe('{"Name": "ChargerFound"}') + else: + input("Press Enter when done.\n") await self.read_operational_state_with_check(33, rvc_op_states.kCharging) await self.send_pause_cmd_with_check(34, op_errors.kCommandInvalidInState) - self.print_instruction( + self.print_step( 35, "Manually put the device in the Charging(0x41) operational state and RVC Run Mode cluster's CurrentMode attribute set to a mode with the Idle mode tag") + if not self.is_ci: + input("Press Enter when done.\n") await self.read_operational_state_with_check(36, rvc_op_states.kCharging) await self.send_resume_cmd_with_check(37, op_errors.kCommandInvalidInState) if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"): - self.print_instruction(38, "Manually put the device in the Docked(0x42) operational state") + self.print_step(38, "Manually put the device in the Docked(0x42) operational state") + if self.is_ci: + self.write_to_app_pipe('{"Name": "Charged"}') + else: + input("Press Enter when done.\n") await self.read_operational_state_with_check(39, rvc_op_states.kDocked) await self.send_pause_cmd_with_check(40, op_errors.kCommandInvalidInState) - self.print_instruction( + self.print_step( 41, "Manually put the device in the Docked(0x42) operational state and RVC Run Mode cluster's CurrentMode attribute set to a mode with the Idle mode tag") + if not self.is_ci: + input("Press Enter when done.\n") await self.send_resume_cmd_with_check(42, op_errors.kCommandInvalidInState) if self.check_pics("RVCOPSTATE.S.M.ST_SEEKING_CHARGER"): - self.print_instruction(43, "Manually put the device in the SeekingCharger(0x40) operational state") + self.print_step(43, "Manually put the device in the SeekingCharger(0x40) operational state") + if self.is_ci: + await self.send_run_change_to_mode_cmd(1) + await self.send_run_change_to_mode_cmd(0) + else: + input("Press Enter when done.\n") await self.read_operational_state_with_check(44, rvc_op_states.kSeekingCharger) diff --git a/src/python_testing/TC_RVCRUNM_2_1.py b/src/python_testing/TC_RVCRUNM_2_1.py index 60afbef1b2b115..a9b4c44ba4f88c 100644 --- a/src/python_testing/TC_RVCRUNM_2_1.py +++ b/src/python_testing/TC_RVCRUNM_2_1.py @@ -24,6 +24,7 @@ # This test requires several additional command line arguments # run with # --int-arg PIXIT.RVCRUNM.MODE_CHANGE_OK: PIXIT.RVCRUNM.MODE_CHANGE_FAIL: +# For running in CI, it is expected that OK=0 and FAIL=2 class TC_RVCRUNM_2_1(MatterBaseTest): @@ -33,6 +34,8 @@ def __init__(self, *args): self.endpoint = 0 self.mode_ok = 0 self.mode_fail = 0 + self.is_ci = False + self.app_pipe = "/tmp/chip_rvc_fifo_" async def read_mod_attribute_expect_success(self, endpoint, attribute): cluster = Clusters.Objects.RvcRunMode @@ -44,6 +47,11 @@ async def send_change_to_mode_cmd(self, newMode) -> Clusters.Objects.RvcRunMode. "Unexpected return type for ChangeToMode") return ret + # Sends and out-of-band command to the rvc-app + def write_to_app_pipe(self, command): + with open(self.app_pipe, "w") as app_pipe: + app_pipe.write(command + "\n") + def pics_TC_RVCRUNM_2_1(self) -> list[str]: return ["RVCRUNM.S"] @@ -60,6 +68,12 @@ async def test_TC_RVCRUNM_2_1(self): self.endpoint = self.matter_test_config.endpoint self.mode_ok = self.matter_test_config.global_test_params['PIXIT.RVCRUNM.MODE_CHANGE_OK'] self.mode_fail = self.matter_test_config.global_test_params['PIXIT.RVCRUNM.MODE_CHANGE_FAIL'] + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") + if self.is_ci: + app_pid = self.matter_test_config.app_pid + if app_pid == 0: + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c") + self.app_pipe = self.app_pipe + str(app_pid) asserts.assert_true(self.check_pics("RVCRUNM.S.A0000"), "RVCRUNM.S.A0000 must be supported") asserts.assert_true(self.check_pics("RVCRUNM.S.A0001"), "RVCRUNM.S.A0001 must be supported") @@ -70,6 +84,10 @@ async def test_TC_RVCRUNM_2_1(self): self.print_step(1, "Commissioning, already done") + # Ensure that the device is in the correct state + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + self.print_step(2, "Read SupportedModes attribute") supported_modes = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.SupportedModes) @@ -107,7 +125,11 @@ class CommonCodes(Enum): asserts.assert_true(self.mode_fail in modes, "The MODE_CHANGE_FAIL PIXIT value (%d) is not a supported mode" % (self.mode_fail)) self.print_step(5, "Manually put the device in a state from which it will FAIL to transition to mode %d" % (self.mode_fail)) - input("Press Enter when done.\n") + if self.is_ci: + print("Change to RVC Run mode Cleaning") + await self.send_change_to_mode_cmd(newMode=1) + else: + input("Press Enter when done.\n") self.print_step(6, "Read CurrentMode attribute") old_current_mode = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentMode) @@ -134,7 +156,10 @@ class CommonCodes(Enum): asserts.assert_true(current_mode == old_current_mode, "CurrentMode changed after failed ChangeToMode command!") self.print_step(9, "Manually put the device in a state from which it will SUCCESSFULLY transition to mode %d" % (self.mode_ok)) - input("Press Enter when done.\n") + if self.is_ci: + print("Continuing...") + else: + input("Press Enter when done.\n") self.print_step(10, "Read CurrentMode attribute") old_current_mode = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentMode) diff --git a/src/python_testing/TC_RVCRUNM_2_2.py b/src/python_testing/TC_RVCRUNM_2_2.py index d6d0ac8acb7b5d..d7ee163b1eb37a 100644 --- a/src/python_testing/TC_RVCRUNM_2_2.py +++ b/src/python_testing/TC_RVCRUNM_2_2.py @@ -50,6 +50,8 @@ def __init__(self, *args): self.supported_run_modes = {} # these are the ModeOptionStructs self.supported_run_modes_dut = [] self.idle_mode_dut = 0 + self.is_ci = False + self.app_pipe = "/tmp/chip_rvc_fifo_" async def read_mod_attribute_expect_success(self, cluster, attribute): return await self.read_single_attribute_check_success( @@ -79,10 +81,10 @@ async def send_change_to_mode_with_check(self, new_mode, expected_error): "Expected a ChangeToMode response status of %s, got %s" % (error_enum_to_text(expected_error), error_enum_to_text(response.status))) - # Prints the instruction and waits for a user input to continue - def print_instruction(self, step_number, instruction): - self.print_step(step_number, instruction) - input("Press Enter when done.\n") + # Sends and out-of-band command to the rvc-app + def write_to_app_pipe(self, command): + with open(self.app_pipe, "w") as app_pipe: + app_pipe.write(command + "\n") def pics_TC_RVCRUNM_2_2(self) -> list[str]: return ["RVCRUNM.S"] @@ -98,8 +100,14 @@ async def test_TC_RVCRUNM_2_2(self): "PIXIT.RVCRUNM.MODE_B:") self.endpoint = self.matter_test_config.endpoint + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") self.mode_a = self.matter_test_config.global_test_params['PIXIT.RVCRUNM.MODE_A'] self.mode_b = self.matter_test_config.global_test_params['PIXIT.RVCRUNM.MODE_B'] + if self.is_ci: + app_pid = self.matter_test_config.app_pid + if app_pid == 0: + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c") + self.app_pipe = self.app_pipe + str(app_pid) asserts.assert_true(self.check_pics("RVCRUNM.S"), "RVCRUNM.S must be supported") # I think that the following PICS should be listed in the preconditions section in the test plan as if either @@ -113,9 +121,15 @@ async def test_TC_RVCRUNM_2_2(self): # Starting the test steps self.print_step(1, "Commissioning, already done") - self.print_instruction(2, "Manually put the device in a RVC Run Mode cluster mode with " - "the Idle(0x4000) mode tag and in a device state that allows changing to either " - "of these modes: %i, %i" % (self.mode_a, self.mode_b)) + # Ensure that the device is in the correct state + if self.is_ci: + self.write_to_app_pipe('{"Name": "Reset"}') + + self.print_step(2, "Manually put the device in a RVC Run Mode cluster mode with " + "the Idle(0x4000) mode tag and in a device state that allows changing to either " + "of these modes: %i, %i" % (self.mode_a, self.mode_b)) + if not self.is_ci: + input("Press Enter when done.\n") self.print_step(3, "Read the RvcRunMode SupportedModes attribute") supported_run_modes = await self.read_run_supported_modes()