Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
swan-amazon committed Aug 28, 2024
1 parent 2377be2 commit 377fe7b
Showing 1 changed file with 63 additions and 106 deletions.
169 changes: 63 additions & 106 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,16 +520,6 @@ def test_skipped(self, filename: str, name: str):

@dataclass
class MatterTestConfig:
def update(self, override_attr_dict: dict) -> None:
"""
Update the attributes of the MatterTestConfig instance with the values provided in the override_attr_dict.
This method allows for dynamic modification of the instance's attributes. If a key in override_attr_dict matches
an existing attribute name, the attribute's value is updated. If the key does not match any existing attribute,
a new attribute is created with the given key and value.
"""
self.__dict__.update(override_attr_dict)

storage_path: pathlib.Path = pathlib.Path(".")
logs_path: pathlib.Path = pathlib.Path(".")
paa_trust_store_path: Optional[pathlib.Path] = None
Expand All @@ -545,7 +535,6 @@ def update(self, override_attr_dict: dict) -> None:
endpoint: int = 0
app_pid: int = 0

pre_commissioning: bool = True
commissioning_method: Optional[str] = None
discriminators: Optional[List[int]] = None
setup_passcodes: Optional[List[int]] = None
Expand Down Expand Up @@ -584,10 +573,6 @@ def update(self, override_attr_dict: dict) -> None:

trace_to: List[str] = field(default_factory=list)

# Accepted Terms and Conditions if used
tc_version: int = None
tc_user_response: int = None


class ClusterMapper:
"""Describe clusters/attributes using schema names."""
Expand Down Expand Up @@ -857,82 +842,6 @@ def __init__(self, *args):
self.problems = []
self.is_commissioning = False

async def commission_device(self, dut_node_idx: int) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

info = self.get_setup_payload_info()[dut_node_idx]

if conf.tc_version is not None and conf.tc_user_response is not None:
logging.debug(f"Setting TC Acknowledgements to version {conf.tc_version} with user response {conf.tc_user_response}.")
dev_ctrl.SetTCAcknowledgements(conf.tc_version, conf.tc_user_response)
dev_ctrl.SetTCRequired(True)
else:
dev_ctrl.SetTCRequired(False)

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_ids[dut_node_idx],
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-wifi":
try:
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.dut_node_ids[dut_node_idx],
conf.wifi_ssid,
conf.wifi_passphrase,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-thread":
try:
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.dut_node_ids[dut_node_idx],
conf.thread_operational_dataset,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "on-network-ip":
try:
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
await dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=info.passcode, nodeid=conf.dut_node_ids[dut_node_idx]
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)

async def commission_devices(self):
conf = self.matter_test_config

for commission_idx, node_id in enumerate(conf.dut_node_ids):
logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)

await self.commission_device(commission_idx)

def get_test_steps(self, test: str) -> list[TestStep]:
''' Retrieves the test step list for the given test
Expand Down Expand Up @@ -1757,9 +1666,6 @@ def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config.controller_node_id = args.controller_node_id
config.trace_to = args.trace_to

config.tc_version = args.tc_version
config.tc_user_response = args.tc_user_response

# Accumulate all command-line-passed named args
all_global_args = []
argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg,
Expand Down Expand Up @@ -1853,10 +1759,6 @@ def parse_matter_test_args(argv: Optional[List[str]] = None) -> MatterTestConfig
commission_group.add_argument('--commission-only', action="store_true", default=False,
help="If true, test exits after commissioning without running subsequent tests")

commission_group.add_argument('--tc-version', type=int, help="Terms and conditions version")

commission_group.add_argument('--tc-user-response', type=int, help="Terms and conditions acknowledgements")

code_group = parser.add_mutually_exclusive_group(required=False)

code_group.add_argument('-q', '--qr-code', type=str,
Expand Down Expand Up @@ -2126,11 +2028,70 @@ def test_run_commissioning(self):
(conf.root_of_trust_index, conf.fabric_id, node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)

if not asyncio.run(self.commission_device(commission_idx)):
if not asyncio.run(self._commission_device(commission_idx)):
raise signals.TestAbortAll("Failed to commission node")

async def _commission_device(self, i) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

info = self.get_setup_payload_info()[i]

if conf.commissioning_method == "on-network":
try:
await dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_ids[i],
setupPinCode=info.passcode,
filterType=info.filter_type,
filter=info.filter_value
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-wifi":
try:
await dev_ctrl.CommissionWiFi(
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
conf.wifi_ssid,
conf.wifi_passphrase,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "ble-thread":
try:
await dev_ctrl.CommissionThread(
info.filter_value,
info.passcode,
conf.dut_node_ids[i],
conf.thread_operational_dataset,
isShortDiscriminator=(info.filter_type == DiscoveryFilterType.SHORT_DISCRIMINATOR)
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
elif conf.commissioning_method == "on-network-ip":
try:
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
await dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=info.passcode, nodeid=conf.dut_node_ids[i]
)
return True
except ChipStackError as e:
logging.error("Commissioning failed: %s" % e)
return False
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)


def default_matter_test_main(override_matter_test_config: dict = {}):
def default_matter_test_main():
"""Execute the test class in a test module.
This is the default entry point for running a test script file directly.
In this case, only one test class in a test script is allowed.
Expand All @@ -2142,11 +2103,7 @@ def default_matter_test_main(override_matter_test_config: dict = {}):
default_matter_test_main.main()
"""

# Parse standard test configuration arguments
matter_test_config: MatterTestConfig = parse_matter_test_args()

# Apply test configuration overrides
matter_test_config.update(override_matter_test_config)
matter_test_config = parse_matter_test_args()

# Find the test class in the test script.
test_class = _find_test_class()
Expand Down Expand Up @@ -2220,7 +2177,7 @@ def run_tests_no_exit(test_class: MatterBaseTest, matter_test_config: MatterTest
testbed_name=test_config.testbed_name)

with runner.mobly_logger():
if matter_test_config.pre_commissioning and matter_test_config.commissioning_method is not None:
if matter_test_config.commissioning_method is not None:
runner.add_test_class(test_config, CommissionDeviceTest, None)

# Add the tests selected unless we have a commission-only request
Expand Down

0 comments on commit 377fe7b

Please sign in to comment.