Skip to content

Commit

Permalink
Add GC 2_5 test script
Browse files Browse the repository at this point in the history
Add GC 2_5 test script
  • Loading branch information
swan-amazon committed Sep 10, 2024
1 parent 3524a2e commit f052c70
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 12 deletions.
22 changes: 22 additions & 0 deletions src/controller/python/ChipDeviceController-ScriptBinding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ PyChipError pychip_DeviceController_OpenCommissioningWindow(chip::Controller::De

bool pychip_DeviceController_GetIPForDiscoveredDevice(chip::Controller::DeviceCommissioner * devCtrl, int idx, char * addrStr,
uint32_t len);
PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired);
PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse);
PyChipError pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete);

// Pairing Delegate
PyChipError
Expand Down Expand Up @@ -570,6 +573,25 @@ PyChipError pychip_DeviceController_SetDefaultNtp(const char * defaultNTP)
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired)
{
sCommissioningParameters.SetRequireTermsAndConditionsAcknowledgement(tcRequired);
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse)
{
sCommissioningParameters.SetTermsAndConditionsAcknowledgement(
{ .acceptedTermsAndConditions = tcUserResponse, .acceptedTermsAndConditionsVersion = tcVersion });
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete)
{
sCommissioningParameters.SetSkipCommissioningComplete(skipCommissioningComplete);
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetTrustedTimeSource(chip::NodeId nodeId, chip::EndpointId endpoint)
{
chip::app::Clusters::TimeSynchronization::Structs::FabricScopedTrustedTimeSourceStruct::Type timeSource = { .nodeID = nodeId,
Expand Down
30 changes: 30 additions & 0 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,15 @@ def _InitLib(self):
self._dmLib.pychip_CreateManualCode.restype = PyChipError
self._dmLib.pychip_CreateManualCode.argtypes = [c_uint16, c_uint32, c_char_p, c_size_t, POINTER(c_size_t)]

self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.restype = PyChipError
self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.argtypes = [c_bool]

self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.restype = PyChipError
self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.argtypes = [c_bool]

self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.restype = PyChipError
self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.argtypes = [c_uint16, c_uint16]


class ChipDeviceController(ChipDeviceControllerBase):
''' The ChipDeviceCommissioner binding, named as ChipDeviceController
Expand Down Expand Up @@ -2084,6 +2093,27 @@ def SetDSTOffset(self, offset: int, validStarting: int, validUntil: int):
lambda: self._dmLib.pychip_DeviceController_SetDSTOffset(offset, validStarting, validUntil)
).raise_on_error()

def SetTCRequired(self, tcRequired: bool):
''' Set whether TC Acknowledgements should be set during commissioning'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(tcRequired)
).raise_on_error()

def SetTCAcknowledgements(self, tcAcceptedVersion: int, tcUserResponse: int):
''' Set the TC acknowledgements to set during commissioning'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetTermsAcknowledgements(tcAcceptedVersion, tcUserResponse)
).raise_on_error()

def SetSkipCommissioningComplete(self, skipCommissioningComplete: bool):
''' Set whether to skip the commissioning complete callback'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete(skipCommissioningComplete)
).raise_on_error()

def SetDefaultNTP(self, defaultNTP: str):
''' Set the DefaultNTP to set during commissioning'''
self.CheckIsActive()
Expand Down
134 changes: 134 additions & 0 deletions src/python_testing/TC_CGEN_2_5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs: run1
# test-runner-run/run1/app: ${TERMS_AND_CONDITIONS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --tc-version 1 --tc-user-response 1 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1
# === END CI TEST ARGUMENTS ===

import logging
import random

import chip.clusters as Clusters
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts


class TC_CGEN_2_5(MatterBaseTest):
async def verify_tc_attributes(self, expected_tc_version: int, expected_tc_acknowledgements: int):
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_accepted_version, expected_tc_version,
f"Expected TCAcceptedVersion to be {expected_tc_version}, but got {tc_accepted_version}")

attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_acknowedgements, expected_tc_acknowledgements,
f"Expected TCAcknowledgements to be {expected_tc_acknowledgements}, but got {tc_acknowedgements}")

@async_test_body
async def test_TC_CGEN_2_5(self):
self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)

logging.info('Step 1 - TH reads the TCAcceptedVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

print(f"tc_accepted_version: {tc_accepted_version}")

logging.info('Step 2 - TH reads the TCAcknowledgements attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
print(f"tc_acknowedgements: {tc_acknowedgements}")

logging.info('Step 3 - TH reads the TCMinRequiredVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCMinRequiredVersion
tc_min_requried_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
print(f"tc_min_requried_version: {tc_min_requried_version}")

logging.info('Step 4 - TH reads the TCAcknowledgementsRequired attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgementsRequired
tc_required = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
print(f"tc_required: {tc_required}")

logging.info('Step 5 - TH sends SetTCAcknowledgements with greater values than current and verify set')
new_accepted_version = tc_accepted_version + 1
new_acknowledgements = 65535

cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, 'Incorrect error code')
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 6 - TH sends SetTCAcknowledgements with no accepted terms at version 0')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(0, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCMinVersionNotMet, 'Incorrect error code'
)
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 7 - TH sends SetTCAcknowledgements with no accepted terms at version 1')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code'
)
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 8 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 60')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(60)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
'Incorrect error code')

logging.info('Step 9 - TH sends SetTCAcknowledgements with incremented TCVersion')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version + 1, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
'Incorrect error code')

logging.info('Step 10 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 0')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk,
'Incorrect error code')

# Verify that the TC attributes have been reset to the pre-failsafe state
await self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 11 - TH removes all fabrics from the device')
fabrics = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics)
for fabric in fabrics:
if fabric.fabricIndex == self.th1.fabricId:
continue
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric.fabricIndex)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk)

# Remove TH1 fabric last
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(self.th1.fabricId)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)


if __name__ == "__main__":
default_matter_test_main()
94 changes: 94 additions & 0 deletions src/python_testing/TC_CGEN_2_6.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs: run1
# test-runner-run/run1/app: ${TERMS_AND_CONDITIONS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --commissioning-method on-network --qr-code MT:-24J0AFN00KA0648G00 --trace_file /tmp/chip_trace.log --trace_log 1 --trace_decode 1
# === END CI TEST ARGUMENTS ===

import logging
import random

import chip.clusters as Clusters
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
from mobly import asserts


class TC_CGEN_2_6(MatterBaseTest):


def setup_class(self):
logging.info('Setup')
super().setup_class()

def setup_test(self):
logging.info('setup_test')
super().setup_test()

def teardown_test(self):
logging.info('teardown_test')
super().teardown_test()

def teardown_class(self):
logging.info('teardown_class')
super().teardown_class()

def desc_TC_CGEN_2_6(self) -> str:
"""Returns a description of this test"""
return "[TC_CGEN_2_6] Verification of "

def steps_TC_CGEN_2_6(self) -> list[TestStep]:
return [
TestStep("0", "DUT commissioned and preconditions", is_commissioning=False),
TestStep("1", "TH commissions the DUT without setting TCs")
]

@async_test_body
async def test_TC_CGEN_2_6(self):
self.step("0")

self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)

self.step("1")

# Don't set TCs for the next commissioning and skip CommissioningComplete so we can manually call CommissioningComplete in order to check the response error code
self.th1.SetTCRequired(False)
self.th1.SetSkipCommissioningComplete(True)
self.matter_test_config.commissioning_method = self.matter_test_config.pairing_method
await self.commission_devices()

cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived, 'Incorrect error code')

cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code')

cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived, 'Incorrect error code')


if __name__ == "__main__":
default_matter_test_main()
Loading

0 comments on commit f052c70

Please sign in to comment.