From 78fe2259608b7b738aa219748c74b22a2aa85f48 Mon Sep 17 00:00:00 2001 From: Andrew Kozlik Date: Wed, 4 Sep 2024 17:03:32 +0200 Subject: [PATCH] feat(python): Implement entropy check workflow in device.reset(). --- python/src/trezorlib/cli/__init__.py | 2 +- python/src/trezorlib/cli/device.py | 14 +++- python/src/trezorlib/device.py | 114 +++++++++++++++++++++++++-- python/src/trezorlib/tools.py | 17 ++++ 4 files changed, 137 insertions(+), 10 deletions(-) diff --git a/python/src/trezorlib/cli/__init__.py b/python/src/trezorlib/cli/__init__.py index 050e3788f78..d9538585427 100644 --- a/python/src/trezorlib/cli/__init__.py +++ b/python/src/trezorlib/cli/__init__.py @@ -158,7 +158,7 @@ def trezorctl_command_with_client( # the return type of @click.pass_obj is improperly specified and pyright doesn't # understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs) - return trezorctl_command_with_client # type: ignore [is incompatible with return type] + return trezorctl_command_with_client class AliasedGroup(click.Group): diff --git a/python/src/trezorlib/cli/device.py b/python/src/trezorlib/cli/device.py index 6f2bd14883c..1ff7762308c 100644 --- a/python/src/trezorlib/cli/device.py +++ b/python/src/trezorlib/cli/device.py @@ -24,6 +24,7 @@ import requests from .. import debuglink, device, exceptions, messages, ui +from ..tools import format_path from . import ChoiceType, with_client if t.TYPE_CHECKING: @@ -214,6 +215,7 @@ def recover( @click.option("-s", "--skip-backup", is_flag=True) @click.option("-n", "--no-backup", is_flag=True) @click.option("-b", "--backup-type", type=ChoiceType(BACKUP_TYPE)) +@click.option("-e", "--entropy-check-count", type=click.IntRange(0)) @with_client def setup( client: "TrezorClient", @@ -225,6 +227,7 @@ def setup( skip_backup: bool, no_backup: bool, backup_type: messages.BackupType | None, + entropy_check_count: int | None, ) -> str: """Perform device setup and generate new seed.""" if strength: @@ -253,7 +256,7 @@ def setup( "backup type. Traditional BIP39 backup may be generated instead." ) - return device.reset( + resp, path_xpubs = device.reset_entropy_check( client, strength=strength, passphrase_protection=passphrase_protection, @@ -263,8 +266,17 @@ def setup( skip_backup=skip_backup, no_backup=no_backup, backup_type=backup_type, + entropy_check_count=entropy_check_count, ) + if isinstance(resp, messages.Success): + click.echo("XPUBs for the generated seed") + for path, xpub in path_xpubs: + click.echo(f"{format_path(path)}: {xpub}") + return resp.message or "" + else: + raise RuntimeError(f"Received {resp.__class__}") + @cli.command() @click.option("-t", "--group-threshold", type=int) diff --git a/python/src/trezorlib/device.py b/python/src/trezorlib/device.py index 95b5db26544..3b4928e53d3 100644 --- a/python/src/trezorlib/device.py +++ b/python/src/trezorlib/device.py @@ -16,14 +16,18 @@ from __future__ import annotations +import hashlib +import hmac import os import time import warnings -from typing import TYPE_CHECKING, Callable, Iterable, Optional +from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple + +from slip10 import SLIP10 from . import messages from .exceptions import Cancelled, TrezorException -from .tools import Address, expect, session +from .tools import Address, expect, parse_path, session if TYPE_CHECKING: from .client import TrezorClient @@ -229,9 +233,53 @@ def recover( return res +def is_slip39_backup_type(backup_type: messages.BackupType): + return backup_type in ( + messages.BackupType.Slip39_Basic, + messages.BackupType.Slip39_Advanced, + messages.BackupType.Slip39_Single_Extendable, + messages.BackupType.Slip39_Basic_Extendable, + messages.BackupType.Slip39_Advanced_Extendable, + ) + + +def _seed_from_entropy( + internal_entropy: bytes, + external_entropy: bytes, + strength: int, + backup_type: messages.BackupType, +) -> bytes: + entropy = hashlib.sha256(internal_entropy + external_entropy).digest() + secret = entropy[: strength // 8] + + if len(secret) * 8 != strength: + raise ValueError("Entropy length mismatch") + + if backup_type == messages.BackupType.Bip39: + import mnemonic + + bip39 = mnemonic.Mnemonic("english") + words = bip39.to_mnemonic(secret) + seed = bip39.to_seed(words, passphrase="") + elif is_slip39_backup_type(backup_type): + import shamir_mnemonic + + seed = shamir_mnemonic.cipher.decrypt( + secret, b"", iteration_exponent=1, identifier=0, extendable=True + ) + else: + raise ValueError("Unknown backup type.") + + return seed + + @expect(messages.Success, field="message", ret_type=str) +def reset(*args: Any, **kwargs: Any) -> "MessageType": + return reset_entropy_check(*args, **kwargs)[0] + + @session -def reset( +def reset_entropy_check( client: "TrezorClient", display_random: bool = False, strength: Optional[int] = None, @@ -243,7 +291,9 @@ def reset( skip_backup: bool = False, no_backup: bool = False, backup_type: messages.BackupType = messages.BackupType.Bip39, -) -> "MessageType": + entropy_check_count: Optional[int] = None, + paths: List[Address] = [], +) -> Tuple["MessageType", Iterable[Tuple[Address, str]]]: if display_random: warnings.warn( "display_random ignored. The feature is deprecated.", @@ -267,6 +317,10 @@ def reset( else: strength = 128 + if not paths: + # Get XPUBs for the first BTC SegWit v0 account and first ETH account. + paths = [parse_path("m/84h/0h/0h"), parse_path("m/44h/60h/0h")] + # Begin with device reset workflow msg = messages.ResetDevice( strength=strength, @@ -277,17 +331,61 @@ def reset( skip_backup=bool(skip_backup), no_backup=bool(no_backup), backup_type=backup_type, + entropy_check=entropy_check_count is not None, ) resp = client.call(msg) if not isinstance(resp, messages.EntropyRequest): raise RuntimeError("Invalid response, expected EntropyRequest") - external_entropy = os.urandom(32) - # LOG.debug("Computer generated entropy: " + external_entropy.hex()) - ret = client.call(messages.EntropyAck(entropy=external_entropy)) + while True: + xpubs = [] + + external_entropy = os.urandom(32) + entropy_commitment = resp.entropy_commitment + resp = client.call(messages.EntropyAck(entropy=external_entropy)) + + if entropy_check_count is None: + break + + if not isinstance(resp, messages.Success): + return resp, [] + + for path in paths: + resp = client.call(messages.GetPublicKey(address_n=path)) + if not isinstance(resp, messages.PublicKey): + return resp, [] + xpubs.append(resp.xpub) + + if entropy_check_count <= 0: + resp = client.call(messages.ResetDeviceFinish()) + break + + entropy_check_count -= 1 + + resp = client.call(messages.ResetDeviceContinue()) + if not isinstance(resp, messages.EntropyRequest): + raise RuntimeError("Invalid response, expected EntropyRequest") + + # Check the entropy commitment from the previous round. + assert resp.prev_entropy + if ( + hmac.HMAC(key=resp.prev_entropy, msg=b"", digestmod=hashlib.sha256).digest() + != entropy_commitment + ): + raise RuntimeError("Invalid entropy commitment.") + + # Derive the seed and check that XPUBs match. + seed = _seed_from_entropy( + resp.prev_entropy, external_entropy, strength, backup_type + ) + slip10 = SLIP10.from_seed(seed) + for path, xpub in zip(paths, xpubs): + if slip10.get_xpub_from_path(path) != xpub: + raise RuntimeError("Invalid XPUB in entropy check") + client.init_device() - return ret + return resp, zip(paths, xpubs) @expect(messages.Success, field="message", ret_type=str) diff --git a/python/src/trezorlib/tools.py b/python/src/trezorlib/tools.py index 4fd1558ec29..18f62061cfe 100644 --- a/python/src/trezorlib/tools.py +++ b/python/src/trezorlib/tools.py @@ -222,6 +222,23 @@ def str_to_harden(x: str) -> int: raise ValueError("Invalid BIP32 path", nstr) from e +def format_path(path: Address, flag: str = "h") -> str: + """ + Convert BIP32 path list of uint32 integers with hardened flags to string. + Several conventions are supported to denote the hardened flag: 1', 1h + + e.g.: [0, 0x80000001, 1] -> "m/0/1h/1" + + :param path: list of integers + :return: path string + """ + nstr = "m" + for i in path: + nstr += f"/{unharden(i)}{flag if is_hardened(i) else ''}" + + return nstr + + def prepare_message_bytes(txt: AnyStr) -> bytes: """ Make message suitable for protobuf.