Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GC 2_5 test script #4

Open
wants to merge 2 commits into
base: feature/enhanced-setup-flow
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/controller/python/ChipDeviceController-ScriptBinding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ PyChipError pychip_DeviceController_OpenCommissioningWindow(chip::Controller::De

bool pychip_DeviceController_GetIPForDiscoveredDevice(chip::Controller::DeviceCommissioner * devCtrl, int idx, char * addrStr,
uint32_t len);
PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired);
PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse);
PyChipError pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete);

// Pairing Delegate
PyChipError
Expand Down Expand Up @@ -570,6 +573,29 @@ PyChipError pychip_DeviceController_SetDefaultNtp(const char * defaultNTP)
return ToPyChipError(CHIP_NO_ERROR);
}


PyChipError pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(bool tcRequired)
{
sCommissioningParameters.SetRequireTermsAndConditionsAcknowledgement(tcRequired);
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetTermsAcknowledgements(uint16_t tcVersion, uint16_t tcUserResponse)
{
sCommissioningParameters.SetTermsAndConditionsAcknowledgement({
.acceptedTermsAndConditions = tcUserResponse,
.acceptedTermsAndConditionsVersion = tcVersion
});
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError
pychip_DeviceController_SetSkipCommissioningComplete(bool skipCommissioningComplete)
{
sCommissioningParameters.SetSkipCommissioningComplete(skipCommissioningComplete);
return ToPyChipError(CHIP_NO_ERROR);
}

PyChipError pychip_DeviceController_SetTrustedTimeSource(chip::NodeId nodeId, chip::EndpointId endpoint)
{
chip::app::Clusters::TimeSynchronization::Structs::FabricScopedTrustedTimeSourceStruct::Type timeSource = { .nodeID = nodeId,
Expand Down
31 changes: 31 additions & 0 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,16 @@ def _InitLib(self):
self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters.restype = PyChipError
self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters.argtypes = [c_void_p, c_char_p]

self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.restype = PyChipError
self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.argtypes = [c_bool]

self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.restype = PyChipError
self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement.argtypes = [c_bool]

self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.restype = PyChipError
self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.argtypes = [c_uint16, c_uint16]



class ChipDeviceController(ChipDeviceControllerBase):
''' The ChipDeviceCommissioner binding, named as ChipDeviceController
Expand Down Expand Up @@ -2054,6 +2064,27 @@ def SetDSTOffset(self, offset: int, validStarting: int, validUntil: int):
lambda: self._dmLib.pychip_DeviceController_SetDSTOffset(offset, validStarting, validUntil)
).raise_on_error()

def SetTCRequired(self, tcRequired: bool):
''' Set whether TC Acknowledgements should be set during commissioning'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetRequireTermsAndConditionsAcknowledgement(tcRequired)
).raise_on_error()

def SetTCAcknowledgements(self, tcAcceptedVersion: int, tcUserResponse: int):
''' Set the TC acknowledgements to set during commissioning'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetTermsAcknowledgements(tcAcceptedVersion, tcUserResponse)
).raise_on_error()

def SetSkipCommissioningComplete(self, skipCommissioningComplete: bool):
''' Set whether to skip the commissioning complete callback'''
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete(skipCommissioningComplete)
).raise_on_error()

def SetDefaultNTP(self, defaultNTP: str):
''' Set the DefaultNTP to set during commissioning'''
self.CheckIsActive()
Expand Down
200 changes: 200 additions & 0 deletions src/python_testing/TC_CGEN_2_5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# test-runner-runs: run1
# test-runner-run/run1/app: ${ALL_CLUSTERS_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto

import logging
import random

import chip.clusters as Clusters
from chip.exceptions import ChipStackError
from matter_testing_support import DiscoveryFilterType, MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts


class TC_CGEN_2_5(MatterBaseTest):

async def commission_device(self) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config
info = self.get_setup_payload_info()[0]

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
nodeId=self.dut_node_id,
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-wifi":
try:
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.self.dut_node_id,
conf.wifi_ssid,
conf.wifi_passphrase,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-thread":
try:
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.self.dut_node_id,
conf.thread_operational_dataset,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)

async def verify_tc_attributes(self, expected_tc_version: int, expected_tc_acknowledgements: int):
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_accepted_version, expected_tc_version,
f"Expected TCAcceptedVersion to be {expected_tc_version}, but got {tc_accepted_version}")

attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
asserts.assert_equal(tc_acknowedgements, expected_tc_acknowledgements,
f"Expected TCAcknowledgements to be {expected_tc_acknowledgements}, but got {tc_acknowedgements}")

@async_test_body
async def test_TC_CGEN_2_5(self):
self.th1 = self.default_controller
self.discriminator = random.randint(0, 4095)

logging.info('Step 1 - TH reads the TCAcceptedVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcceptedVersion
tc_accepted_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

print(f"tc_accepted_version: {tc_accepted_version}")

logging.info('Step 2 - TH reads the TCAcknowledgements attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgements
tc_acknowedgements = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)
print(f"tc_acknowedgements: {tc_acknowedgements}")

logging.info('Step 3 - TH reads the TCMinRequiredVersion attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCMinRequiredVersion
tc_min_requried_version = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

logging.info('Step 4 - TH reads the TCAcknowledgementsRequired attribute')
attr = Clusters.GeneralCommissioning.Attributes.TCAcknowledgementsRequired
tc_required = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=attr)

logging.info('Step 5 - TH sends SetTCAcknowledgements with greater values than current and verify set')
new_accepted_version = tc_accepted_version + 1
new_acknowledgements = 65535

cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kOk, 'Incorrect error code')
self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 6 - TH sends SetTCAcknowledgements with no accepted terms at version 0')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(0, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCMinVersionNotMet, 'Incorrect error code'
)
self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 7 - TH sends SetTCAcknowledgements with no accepted terms at version 1')
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(1, 0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(
resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kRequiredTCNotAccepted, 'Incorrect error code'
)
self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 8 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 60')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(60)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoError,
'Incorrect error code')

logging.info('Step 9 - TH sends SetTCAcknowledgements with incremented TCVersion')
new_accepted_version += 1
cmd = Clusters.GeneralCommissioning.Commands.SetTCAcknowledgements(new_accepted_version, new_acknowledgements)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoError,
'Incorrect error code')

logging.info('Step 10 - TH sends ArmFailSafe with ExpiryLengthSeconds set to 0')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(0)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kNoError,
'Incorrect error code')

self.verify_tc_attributes(new_accepted_version, new_acknowledgements)

logging.info('Step 11 - TH removes all fabrics from the device')
fabrics = await self.read_single_attribute(dev_ctrl=self.th1, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics)
for fabric in fabrics:
if fabric.fabricIndex == self.th1.fabricId:
continue
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric.fabricIndex)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.statusCode, Clusters.OperationalCredentials.Enums.NodeOperationalCertStatusEnum.kOk)

# Remove TH1 fabric last
cmd = Clusters.OperationalCredentials.Commands.RemoveFabric(self.th1.fabricId)
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)

# Give a prompt for the user to make the device in a commissionable state.
# Currently with the sample apps the device doesn't have an opening commissioning window after removing all fabrics.
print("Please return the device to a commissionable state if needed (reboot) and press any key to continue...")
input()

logging.info('Step 12 - TH commissions the DUT without setting TCs')
self.th1.ResetTestCommissioner()
self.th1.ExpireSessions(self.dut_node_id)

# Don't set TCs for the next commissioning and skip CommissioningComplete so we can manually call CommissioningComplete in order to check the response error code
self.th1.SetTCRequired(False)
self.th1.SetTCAcknowledgements(0, 0)
self.th1.SetSkipCommissioningComplete(True)

await self.commission_device()

cmd = Clusters.GeneralCommissioning.Commands.CommissioningComplete()
resp = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, timedRequestTimeoutMs=6000)
asserts.assert_equal(resp.errorCode, Clusters.GeneralCommissioning.Enums.CommissioningErrorEnum.kTCAcknowledgementsNotReceived,
'Incorrect error code')


if __name__ == "__main__":
default_matter_test_main()
18 changes: 18 additions & 0 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ class MatterTestConfig:

trace_to: List[str] = field(default_factory=list)

# Accepted Terms and Conditions if used
tc_version: int = None
tc_user_response: int = None


class ClusterMapper:
"""Describe clusters/attributes using schema names."""
Expand Down Expand Up @@ -1590,6 +1594,9 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config.controller_node_id = args.controller_node_id
config.trace_to = args.trace_to

config.tc_version = args.tc_version
config.tc_user_response = args.tc_user_response

# Accumulate all command-line-passed named args
all_global_args = []
argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg,
Expand Down Expand Up @@ -1683,6 +1690,10 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig
commission_group.add_argument('--commission-only', action="store_true", default=False,
help="If true, test exits after commissioning without running subsequent tests")

commission_group.add_argument('--tc-version', type=int, help="Terms and conditions version")

commission_group.add_argument('--tc-user-response', type=int, help="Terms and conditions acknowledgements")

code_group = parser.add_mutually_exclusive_group(required=False)

code_group.add_argument('-q', '--qr-code', type=str,
Expand Down Expand Up @@ -1953,6 +1964,13 @@ async def _commission_device(self, i) -> bool:

info = self.get_setup_payload_info()[i]

if conf.tc_version is not None and conf.tc_user_response is not None:
logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.")
dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response)
dev_ctrl.SetTCRequired(True)
else:
dev_ctrl.SetTCRequired(False)

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
Expand Down