Skip to content

Commit

Permalink
Add GC 2_5 test script
Browse files Browse the repository at this point in the history
Add GC 2_5 test script
  • Loading branch information
Chris Koos committed Aug 16, 2024
1 parent 468f612 commit 6a38736
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/controller/python/ChipDeviceController-ScriptBinding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ PyChipError pychip_DeviceController_OpenCommissioningWindow(chip::Controller::De

bool pychip_DeviceController_GetIPForDiscoveredDevice(chip::Controller::DeviceCommissioner * devCtrl, int idx, char * addrStr,
uint32_t len);
PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired);
PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse);
PyChipError pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete);

// Pairing Delegate
PyChipError
Expand Down Expand Up @@ -570,6 +573,29 @@ PyChipError pychip_DeviceController_SetDefaultNtp(const char * defaultNTP)
return ToPyChipError(CHIP_NO_ERROR);
}


PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired)
{
sCommissioningParameters.SetRequireTermsAndConditionsAcknowledgement(tcRequired);
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse)
{
sCommissioningParameters.SetTermsAndConditionsAcknowledgement({
.acceptedTermsAndConditions = tcUserResponse,
.acceptedTermsAndConditionsVersion = tcVersion
});
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError
pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete)
{
sCommissioningParameters.SetSkipCommissioningComplete(skipCommissioningComplete);
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetTrustedTimeSource(chip::NodeId nodeId, chip::EndpointId endpoint)
{
chip::app::Clusters::TimeSynchronization::Structs::FabricScopedTrustedTimeSourceStruct::Type timeSource = { .nodeID = nodeId,
Expand Down
31 changes: 31 additions & 0 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,16 @@ def _InitLib(self):
self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters.restype = PyChipError
self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters.argtypes = [c_void_p, c_char_p]

self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.restype = PyChipError
self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.argtypes = [c_bool]

self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.restype = PyChipError
self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.argtypes = [c_bool]

self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.restype = PyChipError
self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.argtypes = [c_uint16, c_uint16]



class ChipDeviceController(ChipDeviceControllerBase):
''' The ChipDeviceCommissioner binding, named as ChipDeviceController
Expand Down Expand Up @@ -2054,6 +2064,27 @@ def SetDSTOffset(self, offset: int, validStarting: int, validUntil: int):
lambda: self._dmLib.pychip_DeviceController_SetDSTOffset(offset, validStarting, validUntil)
).raise_on_error()

def SetTCRequired(self, tcRequired: bool):
''' Set whether TC Acknowledgements should be set during commissioning'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(tcRequired)
).raise_on_error()

def SetTCAcknowledgements(self, tcAcceptedVersion: int, tcUserResponse: int):
''' Set the TC acknowledgements to set during commissioning'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetTermsAcknowledgements(tcAcceptedVersion, tcUserResponse)
).raise_on_error()

def SetSkipCommissioningComplete(self, skipCommissioningComplete: bool):
''' Set whether to skip the commissioning complete callback'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete(skipCommissioningComplete)
).raise_on_error()

def SetDefaultNTP(self, defaultNTP: str):
''' Set the DefaultNTP to set during commissioning'''
self.CheckIsActive()
Expand Down
200 changes: 200 additions & 0 deletions src/python_testing/TC_CGEN_2_5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# test-runner-runs: run1
# test-runner-run/run1/app: ${ALL_CLUSTERS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto

import logging
import random

import chip.clusters as Clusters
from chip.exceptions import ChipStackError
from matter_testing_support import DiscoveryFilterType, MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts


class TC_CGEN_2_5(MatterBaseTest):

async def commission_device(self) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config
info = self.get_setup_payload_info()[0]

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
nodeId=self.dut_node_id,
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-wifi":
try:
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.self.dut_node_id,
conf.wifi_ssid,
conf.wifi_passphrase,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-thread":
try:
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.self.dut_node_id,
conf.thread_operational_dataset,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)

async def verify_tc_attributes(self, expected_tc_version: int, expected_tc_acknowledgements: int):
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_accepted_version, expected_tc_version,
f"Expected TCAcceptedVersion to be {expected_tc_version}, but got {tc_accepted_version}")

attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_acknowedgements, expected_tc_acknowledgements,
f"Expected TCAcknowledgements to be {expected_tc_acknowledgements}, but got {tc_acknowedgements}")

@async_test_body
async def test_TC_CGEN_2_5(self):
self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)

logging.info('Step 1 - TH reads the TCAcceptedVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

print(f"tc_accepted_version: {tc_accepted_version}")

logging.info('Step 2 - TH reads the TCAcknowledgements attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
print(f"tc_acknowedgements: {tc_acknowedgements}")

logging.info('Step 3 - TH reads the TCMinRequiredVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCMinRequiredVersion
tc_min_requried_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

logging.info('Step 4 - TH reads the TCAcknowledgementsRequired attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgementsRequired
tc_required = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

logging.info('Step 5 - TH sends SetTCAcknowledgements with greater values than current and verify set')
new_accepted_version = tc_accepted_version + 1
new_acknowledgements = 65535

cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, 'Incorrect error code')
self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 6 - TH sends SetTCAcknowledgements with no accepted terms at version 0')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(0, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCMinVersionNotMet, 'Incorrect error code'
)
self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 7 - TH sends SetTCAcknowledgements with no accepted terms at version 1')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code'
)
self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 8 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 60')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(60)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoError,
'Incorrect error code')

logging.info('Step 9 - TH sends SetTCAcknowledgements with incremented TCVersion')
new_accepted_version += 1
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoError,
'Incorrect error code')

logging.info('Step 10 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 0')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoError,
'Incorrect error code')

self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 11 - TH removes all fabrics from the device')
fabrics = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics)
for fabric in fabrics:
if fabric.fabricIndex == self.th1.fabricId:
continue
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric.fabricIndex)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk)

# Remove TH1 fabric last
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(self.th1.fabricId)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)

# Give a prompt for the user to make the device in a commissionable state.
# Currently with the sample apps the device doesn't have an opening commissioning window after removing all fabrics.
print("Please return the device to a commissionable state if needed (reboot) and press any key to continue...")
input()

logging.info('Step 12 - TH commissions the DUT without setting TCs')
self.th1.ResetTestCommissioner()
self.th1.ExpireSessions(self.dut_node_id)

# Don't set TCs for the next commissioning and skip CommissioningComplete so we can check the error code
self.th1.SetTCRequired(False)
self.th1.SetTCAcknowledgements(0, 0)
self.th1.SetSkipCommissioningComplete(True)

await self.commission_device()

cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived,
'Incorrect error code')


if __name__ == "__main__":
default_matter_test_main()
18 changes: 18 additions & 0 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ class MatterTestConfig:

trace_to: List[str] = field(default_factory=list)

# Accepted Terms and Conditions if used
tc_version: int = None
tc_user_response: int = None


class ClusterMapper:
"""Describe clusters/attributes using schema names."""
Expand Down Expand Up @@ -1590,6 +1594,9 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config.controller_node_id = args.controller_node_id
config.trace_to = args.trace_to

config.tc_version = args.tc_version
config.tc_user_response = args.tc_user_response

# Accumulate all command-line-passed named args
all_global_args = []
argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg,
Expand Down Expand Up @@ -1683,6 +1690,10 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig
commission_group.add_argument('--commission-only', action="store_true", default=False,
help="If true, test exits after commissioning without running subsequent tests")

commission_group.add_argument('--tc-version', type=int, help="Terms and conditions version")

commission_group.add_argument('--tc-user-response', type=int, help="Terms and conditions acknowledgements")

code_group = parser.add_mutually_exclusive_group(required=False)

code_group.add_argument('-q', '--qr-code', type=str,
Expand Down Expand Up @@ -1953,6 +1964,13 @@ async def _commission_device(self, i) -> bool:

info = self.get_setup_payload_info()[i]

if conf.tc_version is not None and conf.tc_user_response is not None:
logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.")
dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response)
dev_ctrl.SetTCRequired(True)
else:
dev_ctrl.SetTCRequired(False)

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
Expand Down

0 comments on commit 6a38736

Please sign in to comment.