Skip to content

Commit

Permalink
feat(python): Implement entropy check workflow in device.reset().
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkozlik committed Sep 5, 2024
1 parent dda6542 commit 78fe225
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 10 deletions.
2 changes: 1 addition & 1 deletion python/src/trezorlib/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def trezorctl_command_with_client(

# the return type of @click.pass_obj is improperly specified and pyright doesn't
# understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs)
return trezorctl_command_with_client # type: ignore [is incompatible with return type]
return trezorctl_command_with_client


class AliasedGroup(click.Group):
Expand Down
14 changes: 13 additions & 1 deletion python/src/trezorlib/cli/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import requests

from .. import debuglink, device, exceptions, messages, ui
from ..tools import format_path
from . import ChoiceType, with_client

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -214,6 +215,7 @@ def recover(
@click.option("-s", "--skip-backup", is_flag=True)
@click.option("-n", "--no-backup", is_flag=True)
@click.option("-b", "--backup-type", type=ChoiceType(BACKUP_TYPE))
@click.option("-e", "--entropy-check-count", type=click.IntRange(0))
@with_client
def setup(
client: "TrezorClient",
Expand All @@ -225,6 +227,7 @@ def setup(
skip_backup: bool,
no_backup: bool,
backup_type: messages.BackupType | None,
entropy_check_count: int | None,
) -> str:
"""Perform device setup and generate new seed."""
if strength:
Expand Down Expand Up @@ -253,7 +256,7 @@ def setup(
"backup type. Traditional BIP39 backup may be generated instead."
)

return device.reset(
resp, path_xpubs = device.reset_entropy_check(
client,
strength=strength,
passphrase_protection=passphrase_protection,
Expand All @@ -263,8 +266,17 @@ def setup(
skip_backup=skip_backup,
no_backup=no_backup,
backup_type=backup_type,
entropy_check_count=entropy_check_count,
)

if isinstance(resp, messages.Success):
click.echo("XPUBs for the generated seed")
for path, xpub in path_xpubs:
click.echo(f"{format_path(path)}: {xpub}")
return resp.message or ""
else:
raise RuntimeError(f"Received {resp.__class__}")


@cli.command()
@click.option("-t", "--group-threshold", type=int)
Expand Down
114 changes: 106 additions & 8 deletions python/src/trezorlib/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

from __future__ import annotations

import hashlib
import hmac
import os
import time
import warnings
from typing import TYPE_CHECKING, Callable, Iterable, Optional
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple

from slip10 import SLIP10

from . import messages
from .exceptions import Cancelled, TrezorException
from .tools import Address, expect, session
from .tools import Address, expect, parse_path, session

if TYPE_CHECKING:
from .client import TrezorClient
Expand Down Expand Up @@ -229,9 +233,53 @@ def recover(
return res


def is_slip39_backup_type(backup_type: messages.BackupType):
return backup_type in (
messages.BackupType.Slip39_Basic,
messages.BackupType.Slip39_Advanced,
messages.BackupType.Slip39_Single_Extendable,
messages.BackupType.Slip39_Basic_Extendable,
messages.BackupType.Slip39_Advanced_Extendable,
)


def _seed_from_entropy(
internal_entropy: bytes,
external_entropy: bytes,
strength: int,
backup_type: messages.BackupType,
) -> bytes:
entropy = hashlib.sha256(internal_entropy + external_entropy).digest()
secret = entropy[: strength // 8]

if len(secret) * 8 != strength:
raise ValueError("Entropy length mismatch")

if backup_type == messages.BackupType.Bip39:
import mnemonic

bip39 = mnemonic.Mnemonic("english")
words = bip39.to_mnemonic(secret)
seed = bip39.to_seed(words, passphrase="")
elif is_slip39_backup_type(backup_type):
import shamir_mnemonic

seed = shamir_mnemonic.cipher.decrypt(
secret, b"", iteration_exponent=1, identifier=0, extendable=True
)
else:
raise ValueError("Unknown backup type.")

return seed


@expect(messages.Success, field="message", ret_type=str)
def reset(*args: Any, **kwargs: Any) -> "MessageType":
return reset_entropy_check(*args, **kwargs)[0]


@session
def reset(
def reset_entropy_check(
client: "TrezorClient",
display_random: bool = False,
strength: Optional[int] = None,
Expand All @@ -243,7 +291,9 @@ def reset(
skip_backup: bool = False,
no_backup: bool = False,
backup_type: messages.BackupType = messages.BackupType.Bip39,
) -> "MessageType":
entropy_check_count: Optional[int] = None,
paths: List[Address] = [],
) -> Tuple["MessageType", Iterable[Tuple[Address, str]]]:
if display_random:
warnings.warn(
"display_random ignored. The feature is deprecated.",
Expand All @@ -267,6 +317,10 @@ def reset(
else:
strength = 128

if not paths:
# Get XPUBs for the first BTC SegWit v0 account and first ETH account.
paths = [parse_path("m/84h/0h/0h"), parse_path("m/44h/60h/0h")]

# Begin with device reset workflow
msg = messages.ResetDevice(
strength=strength,
Expand All @@ -277,17 +331,61 @@ def reset(
skip_backup=bool(skip_backup),
no_backup=bool(no_backup),
backup_type=backup_type,
entropy_check=entropy_check_count is not None,
)

resp = client.call(msg)
if not isinstance(resp, messages.EntropyRequest):
raise RuntimeError("Invalid response, expected EntropyRequest")

external_entropy = os.urandom(32)
# LOG.debug("Computer generated entropy: " + external_entropy.hex())
ret = client.call(messages.EntropyAck(entropy=external_entropy))
while True:
xpubs = []

external_entropy = os.urandom(32)
entropy_commitment = resp.entropy_commitment
resp = client.call(messages.EntropyAck(entropy=external_entropy))

if entropy_check_count is None:
break

if not isinstance(resp, messages.Success):
return resp, []

for path in paths:
resp = client.call(messages.GetPublicKey(address_n=path))
if not isinstance(resp, messages.PublicKey):
return resp, []
xpubs.append(resp.xpub)

if entropy_check_count <= 0:
resp = client.call(messages.ResetDeviceFinish())
break

entropy_check_count -= 1

resp = client.call(messages.ResetDeviceContinue())
if not isinstance(resp, messages.EntropyRequest):
raise RuntimeError("Invalid response, expected EntropyRequest")

# Check the entropy commitment from the previous round.
assert resp.prev_entropy
if (
hmac.HMAC(key=resp.prev_entropy, msg=b"", digestmod=hashlib.sha256).digest()
!= entropy_commitment
):
raise RuntimeError("Invalid entropy commitment.")

# Derive the seed and check that XPUBs match.
seed = _seed_from_entropy(
resp.prev_entropy, external_entropy, strength, backup_type
)
slip10 = SLIP10.from_seed(seed)
for path, xpub in zip(paths, xpubs):
if slip10.get_xpub_from_path(path) != xpub:
raise RuntimeError("Invalid XPUB in entropy check")

client.init_device()
return ret
return resp, zip(paths, xpubs)


@expect(messages.Success, field="message", ret_type=str)
Expand Down
17 changes: 17 additions & 0 deletions python/src/trezorlib/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,23 @@ def str_to_harden(x: str) -> int:
raise ValueError("Invalid BIP32 path", nstr) from e


def format_path(path: Address, flag: str = "h") -> str:
"""
Convert BIP32 path list of uint32 integers with hardened flags to string.
Several conventions are supported to denote the hardened flag: 1', 1h
e.g.: [0, 0x80000001, 1] -> "m/0/1h/1"
:param path: list of integers
:return: path string
"""
nstr = "m"
for i in path:
nstr += f"/{unharden(i)}{flag if is_hardened(i) else ''}"

return nstr


def prepare_message_bytes(txt: AnyStr) -> bytes:
"""
Make message suitable for protobuf.
Expand Down

0 comments on commit 78fe225

Please sign in to comment.