Skip to content

Commit

Permalink
Fix bellows CLI (#637)
Browse files Browse the repository at this point in the history
* Remove the backup/restore interface

* Fix network initialization

* Fix flow control

* Fix type names
  • Loading branch information
puddly authored Jul 23, 2024
1 parent 93c671b commit c4578a0
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 323 deletions.
313 changes: 5 additions & 308 deletions bellows/cli/backup.py
Original file line number Diff line number Diff line change
@@ -1,142 +1,17 @@
import asyncio
import json
import logging
import os

import click
import voluptuous as vol
from zigpy.config.validators import cv_hex, cv_key
import zigpy.types
import zigpy.zdo.types

import bellows.types as t

from . import util
from .main import main

LOGGER = logging.getLogger(__name__)

ATTR_CHANNELS = "channels"
ATTR_EXT_PAN_ID = "extendedPanId"
ATTR_KEY = "key"
ATTR_KEY_BITMASK = "bitmask"
ATTR_KEY_FRAME_COUNTER_IN = "incomingFrameCounter"
ATTR_KEY_FRAME_COUNTER_OUT = "outgoingFrameCounter"
ATTR_KEY_GLOBAL = "tc_link_key"
ATTR_KEY_NWK = "network_key"
ATTR_KEY_PARTNER = "partnerEUI64"
ATTR_KEY_SEQ = "sequenceNumber"
ATTR_KEY_TABLE = "key_table"
ATTR_KEY_TYPE = "type"
ATTR_NODE_EUI64 = "node_ieee"
ATTR_NODE_ID = "node_id"
ATTR_NODE_TYPE = "node_type"
ATTR_NWK_UPDATE_ID = "nwkUpdateId"
ATTR_PAN_ID = "panId"
ATTR_RADIO_CHANNEL = "radioChannel"
ATTR_RADIO_TX_PWR = "radioTxPower"

SCHEMA_KEY = vol.Schema(
{
ATTR_KEY_BITMASK: cv_hex,
ATTR_KEY_TYPE: cv_hex,
ATTR_KEY: cv_key,
ATTR_KEY_FRAME_COUNTER_OUT: cv_hex,
ATTR_KEY_FRAME_COUNTER_IN: cv_hex,
ATTR_KEY_SEQ: cv_hex,
ATTR_KEY_PARTNER: vol.All(str, t.EUI64.convert),
}
)
SCHEMA_BAK = vol.Schema(
{
ATTR_CHANNELS: cv_hex,
ATTR_NODE_TYPE: cv_hex,
ATTR_NODE_ID: cv_hex,
ATTR_NODE_EUI64: vol.All(str, t.EUI64.convert),
ATTR_NWK_UPDATE_ID: cv_hex,
ATTR_PAN_ID: cv_hex,
ATTR_RADIO_CHANNEL: cv_hex,
ATTR_RADIO_TX_PWR: cv_hex,
ATTR_EXT_PAN_ID: vol.All(str, t.ExtendedPanId.convert),
ATTR_KEY_GLOBAL: SCHEMA_KEY,
ATTR_KEY_NWK: SCHEMA_KEY,
ATTR_KEY_TABLE: vol.Any([], vol.Schema([SCHEMA_KEY])),
}
)


def _print_cb(frame_name, response):
LOGGER.debug("%s callback: %s", frame_name, response)


@main.command()
@click.pass_context
@util.background
async def backup(ctx):
"""Backup NCP config to stdio."""
ezsp = await util.setup(ctx.obj["device"], ctx.obj["baudrate"], _print_cb)

try:
await _backup(ezsp)
finally:
ezsp.close()


async def _backup(ezsp):
(status,) = await ezsp.networkInit()
LOGGER.debug("Network init status: %s", status)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK

(status, node_type, network) = await ezsp.getNetworkParameters()
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK
assert node_type == ezsp.types.EmberNodeType.COORDINATOR
LOGGER.debug("Network params: %s", network)

(node_id,) = await ezsp.getNodeId()
(ieee,) = await ezsp.getEui64()

result = {
ATTR_NODE_TYPE: node_type.value,
ATTR_NODE_ID: node_id,
ATTR_NODE_EUI64: str(ieee),
ATTR_PAN_ID: network.panId,
ATTR_EXT_PAN_ID: str(network.extendedPanId),
ATTR_RADIO_CHANNEL: network.radioChannel,
ATTR_RADIO_TX_PWR: network.radioTxPower,
ATTR_NWK_UPDATE_ID: network.nwkUpdateId,
ATTR_CHANNELS: network.channels,
}

for key_name, key_type in (
(ATTR_KEY_GLOBAL, ezsp.types.EmberKeyType.TRUST_CENTER_LINK_KEY),
(ATTR_KEY_NWK, ezsp.types.EmberKeyType.CURRENT_NETWORK_KEY),
):
(status, key) = await ezsp.getKey(key_type)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK
LOGGER.debug("%s key: %s", key_name, key)
result[key_name] = key.as_dict()
result[key_name][ATTR_KEY_PARTNER] = str(key.partnerEUI64)

keys = await _backup_keys(ezsp)
result[ATTR_KEY_TABLE] = keys

click.echo(json.dumps(result))


async def _backup_keys(ezsp):
"""Backup keys."""

keys = []
for idx in range(0, 192):
LOGGER.debug("Getting key index %s", idx)
(status, key_struct) = await ezsp.getKeyTableEntry(idx)
if status == t.EmberStatus.SUCCESS:
key_dict = key_struct.as_dict()
key_dict[ATTR_KEY_PARTNER] = str(key_struct.partnerEUI64)
keys.append(key_dict)
elif status == t.EmberStatus.INDEX_OUT_OF_RANGE:
break
return keys
click.echo(
"The bellows backup/restore command set has been removed. Please use zigpy-cli instead: https://github.com/zigpy/zigpy-cli#network-backup"
)


@main.command()
Expand Down Expand Up @@ -165,184 +40,6 @@ async def restore(
upgrade_to_hashed_tc_link_key,
):
"""Backup NCP config to stdio."""
click.echo("Restoring NCP")
try:
with open(backup_file) as file:
backup_data = json.load(file)
LOGGER.debug("loaded: %s", backup_data)
backup_data = SCHEMA_BAK(backup_data)
LOGGER.debug("schema pass: %s", backup_data)
except OSError as exc:
LOGGER.error("Couldn't import backup file: %s", exc)
return
except vol.Error as exc:
LOGGER.error("backup file does not pass schema validation: %s", exc)
return

LOGGER.info("backup file: %s", backup_data)

ezsp = await util.setup(ctx.obj["device"], ctx.obj["baudrate"], _print_cb)
try:
await _restore(
ezsp,
backup_data,
force,
i_understand_i_can_update_eui64_only_once_and_i_still_want_to_do_it,
upgrade_to_hashed_tc_link_key,
)
finally:
ezsp.close()


async def _restore(
ezsp, backup_data, force, update_eui64_token=False, upg_tc_link_key=False
):
"""Restore backup."""

(status,) = await ezsp.networkInit()
LOGGER.debug("Network init status: %s", status)
assert status in (t.EmberStatus.SUCCESS, t.EmberStatus.NOT_JOINED)

if status == t.EmberStatus.SUCCESS:
if not force:
click.echo("Network is up, not forcing restore")
return
try:
await ezsp.leaveNetwork()
except asyncio.TimeoutError:
LOGGER.error("Didn't not receive stack changed status callback")
return

if update_eui64_token:
ncp_eui64 = t.EUI64(backup_data[ATTR_NODE_EUI64]).serialize()
(status,) = await ezsp.setMfgToken(
t.EzspMfgTokenId.MFG_CUSTOM_EUI_64, ncp_eui64
)

sec_bitmask = (
t.EmberInitialSecurityBitmask.HAVE_PRECONFIGURED_KEY
| t.EmberInitialSecurityBitmask.REQUIRE_ENCRYPTED_KEY
| t.EmberInitialSecurityBitmask.TRUST_CENTER_GLOBAL_LINK_KEY
| t.EmberInitialSecurityBitmask.HAVE_NETWORK_KEY
| t.EmberInitialSecurityBitmask.NO_FRAME_COUNTER_RESET
)
if not is_well_known_key(backup_data[ATTR_KEY_GLOBAL][ATTR_KEY]):
sec_bitmask |= t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY

init_sec_state = t.EmberInitialSecurityState(
bitmask=sec_bitmask,
preconfiguredKey=backup_data[ATTR_KEY_GLOBAL][ATTR_KEY],
networkKey=backup_data[ATTR_KEY_NWK][ATTR_KEY],
networkKeySequenceNumber=backup_data[ATTR_KEY_NWK][ATTR_KEY_SEQ],
preconfiguredTrustCenterEui64=[0x00] * 8,
)
if (
upg_tc_link_key
and t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY
not in sec_bitmask
):
init_sec_state.bitmask |= (
t.EmberInitialSecurityBitmask.TRUST_CENTER_USES_HASHED_LINK_KEY
)
init_sec_state.preconfiguredKey = t.KeyData(os.urandom(16))

(status,) = await ezsp.setInitialSecurityState(init_sec_state)
LOGGER.debug("Set initial security state: %s", status)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK

if backup_data[ATTR_KEY_TABLE]:
await _restore_keys(ezsp, backup_data[ATTR_KEY_TABLE])

network_key = backup_data[ATTR_KEY_NWK]
(status,) = await ezsp.setValue(
ezsp.types.EzspValueId.VALUE_NWK_FRAME_COUNTER,
t.uint32_t(network_key[ATTR_KEY_FRAME_COUNTER_OUT]).serialize(),
)
LOGGER.debug("Set network frame counter: %s", status)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK

tc_key = backup_data[ATTR_KEY_GLOBAL]
(status,) = await ezsp.setValue(
ezsp.types.EzspValueId.VALUE_APS_FRAME_COUNTER,
t.uint32_t(tc_key[ATTR_KEY_FRAME_COUNTER_OUT]).serialize(),
)
LOGGER.debug("Set network frame counter: %s", status)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK

await _form_network(ezsp, backup_data)
await asyncio.sleep(2)


async def _restore_keys(ezsp, key_table):
"""Restore keys."""

(status,) = await ezsp.setConfigurationValue(
ezsp.types.EzspConfigId.CONFIG_KEY_TABLE_SIZE, len(key_table)
click.echo(
"The bellows backup/restore command set has been removed. Please use zigpy-cli instead: https://github.com/zigpy/zigpy-cli#network-backup"
)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK

for key in key_table:
is_link_key = key[ATTR_KEY_TYPE] in (
ezsp.types.EmberKeyType.APPLICATION_LINK_KEY,
ezsp.types.EmberKeyType.TRUST_CENTER_LINK_KEY,
)
(status,) = await ezsp.addOrUpdateKeyTableEntry(
key[ATTR_KEY_PARTNER], is_link_key, key[ATTR_KEY]
)
if status != t.EmberStatus.SUCCESS:
LOGGER.warning("Couldn't add %s key: %s", key, status)
await asyncio.sleep(0.2)


async def _form_network(ezsp, backup_data):
"""Form network."""
network = t.EmberNetworkParameters(
panId=backup_data[ATTR_PAN_ID],
extendedPanId=backup_data[ATTR_EXT_PAN_ID],
radioTxPower=backup_data[ATTR_RADIO_TX_PWR],
radioChannel=backup_data[ATTR_RADIO_CHANNEL],
joinMethod=t.EmberJoinMethod.USE_MAC_ASSOCIATION,
nwkManagerId=0x0000,
nwkUpdateId=backup_data[ATTR_NWK_UPDATE_ID],
channels=backup_data[ATTR_CHANNELS],
)
await ezsp.formNetwork(network)

await _update_nwk_id(ezsp, backup_data[ATTR_NWK_UPDATE_ID])

(status,) = await ezsp.setValue(ezsp.types.EzspValueId.VALUE_STACK_TOKEN_WRITING, 1)
LOGGER.debug("Set token writing: %s", status)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK


async def _update_nwk_id(ezsp, nwk_update_id):
"""Update NWK id by sending a ZDO broadcast."""

aps_frame = t.EmberApsFrame(
profileId=0x0000,
clusterId=zigpy.zdo.types.ZDOCmd.Mgmt_NWK_Update_req,
sourceEndpoint=0x00,
destinationEndpoint=0x00,
options=t.EmberApsOption.APS_OPTION_NONE,
groupId=0x0000,
sequence=0xDE,
)
nwk_update_id = t.uint8_t(nwk_update_id).serialize()
payload = b"\xDE" + zigpy.types.Channels.ALL_CHANNELS.serialize() + b"\xFF"
payload += nwk_update_id + b"\x00\x00"

status, _ = await ezsp.sendBroadcast(
zigpy.types.BroadcastAddress.ALL_DEVICES,
aps_frame,
0x00,
0x01,
payload,
)
assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK
await asyncio.sleep(1)


def is_well_known_key(tc_link_key):
"""Return True if this is a well known key."""
well_known_key = t.KeyData.deserialize(b"ZigBeeAlliance09")[0]
return tc_link_key == well_known_key
2 changes: 1 addition & 1 deletion bellows/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def main(ctx, device, baudrate, flow_control):
ctx.obj = {
"device": device,
"baudrate": baudrate,
"flow_control": flow_control,
"flow_control": flow_control if flow_control != "none" else None,
}
click_log.basic_config()

Expand Down
6 changes: 3 additions & 3 deletions bellows/cli/ncp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def config(ctx, config, all_):
)

if all_:
for config in s.types.EzspConfigId:
for config in t.EzspConfigId:
v = await s.getConfigurationValue(config)
if v[0] == t.EzspStatus.ERROR_INVALID_ID:
continue
Expand All @@ -37,12 +37,12 @@ async def config(ctx, config, all_):
config, value = config.split("=", 1)
if config.isdigit():
try:
config = s.types.EzspConfigId(int(config))
config = t.EzspConfigId(int(config))
except ValueError:
raise click.BadArgumentUsage(f"Invalid config ID: {config}")
else:
try:
config = s.types.EzspConfigId[config]
config = t.EzspConfigId[config]
except KeyError:
raise click.BadArgumentUsage(f"Invalid config name: {config}")
try:
Expand Down
2 changes: 1 addition & 1 deletion bellows/cli/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
flow_control = click.option(
"--flow-control",
default="software",
type=click.Choice(["hardware", "software", "None"]),
type=click.Choice(["hardware", "software", "none"], case_sensitive=False),
envvar="EZSP_FLOW_CONTROL",
help="use hardware flow control",
)
Expand Down
Loading

0 comments on commit c4578a0

Please sign in to comment.