forked from bitcoin-core/HWI
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
1 parent
7fd2cea
commit 80ca9df
Showing
7 changed files
with
759 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,5 +5,6 @@ | |
'digitalbitbox', | ||
'coldcard', | ||
'bitbox02', | ||
'jade' | ||
'jade', | ||
'onekey' | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,362 @@ | ||
# type: ignore | ||
"""" | ||
OneKey Devices | ||
************** | ||
""" | ||
|
||
|
||
import sys | ||
from ..common import Chain | ||
from ..errors import ( | ||
DEVICE_NOT_INITIALIZED, | ||
DeviceNotReadyError, | ||
common_err_msgs, | ||
handle_errors, | ||
) | ||
from .trezorlib import protobuf, debuglink | ||
from .trezorlib.transport import ( | ||
udp, | ||
webusb, | ||
) | ||
from .trezor import TrezorClient | ||
from .trezorlib.mapping import DEFAULT_MAPPING | ||
from .trezorlib.messages import ( | ||
BackupType, | ||
Capability, | ||
Features, | ||
SafetyCheckLevel, | ||
) | ||
from types import MethodType | ||
from .trezorlib.models import TrezorModel | ||
from typing import ( | ||
Any, | ||
Dict, | ||
List, | ||
Optional, | ||
Sequence, | ||
) | ||
|
||
py_enumerate = enumerate # Need to use the enumerate built-in but there's another function already named that | ||
|
||
VENDORS = ("onekey.so", ) | ||
|
||
|
||
class OnekeyFeatures(Features): | ||
MESSAGE_WIRE_TYPE = 17 | ||
FIELDS = { | ||
1: protobuf.Field("vendor", "string", repeated=False, required=False), | ||
2: protobuf.Field("major_version", "uint32", repeated=False, required=True), | ||
3: protobuf.Field("minor_version", "uint32", repeated=False, required=True), | ||
4: protobuf.Field("patch_version", "uint32", repeated=False, required=True), | ||
5: protobuf.Field("bootloader_mode", "bool", repeated=False, required=False), | ||
6: protobuf.Field("device_id", "string", repeated=False, required=False), | ||
7: protobuf.Field("pin_protection", "bool", repeated=False, required=False), | ||
8: protobuf.Field( | ||
"passphrase_protection", "bool", repeated=False, required=False | ||
), | ||
9: protobuf.Field("language", "string", repeated=False, required=False), | ||
10: protobuf.Field("label", "string", repeated=False, required=False), | ||
12: protobuf.Field("initialized", "bool", repeated=False, required=False), | ||
13: protobuf.Field("revision", "bytes", repeated=False, required=False), | ||
14: protobuf.Field("bootloader_hash", "bytes", repeated=False, required=False), | ||
15: protobuf.Field("imported", "bool", repeated=False, required=False), | ||
16: protobuf.Field("unlocked", "bool", repeated=False, required=False), | ||
17: protobuf.Field( | ||
"_passphrase_cached", "bool", repeated=False, required=False | ||
), | ||
18: protobuf.Field("firmware_present", "bool", repeated=False, required=False), | ||
19: protobuf.Field("needs_backup", "bool", repeated=False, required=False), | ||
20: protobuf.Field("flags", "uint32", repeated=False, required=False), | ||
21: protobuf.Field("model", "string", repeated=False, required=False), | ||
22: protobuf.Field("fw_major", "uint32", repeated=False, required=False), | ||
23: protobuf.Field("fw_minor", "uint32", repeated=False, required=False), | ||
24: protobuf.Field("fw_patch", "uint32", repeated=False, required=False), | ||
25: protobuf.Field("fw_vendor", "string", repeated=False, required=False), | ||
27: protobuf.Field("unfinished_backup", "bool", repeated=False, required=False), | ||
28: protobuf.Field("no_backup", "bool", repeated=False, required=False), | ||
29: protobuf.Field("recovery_mode", "bool", repeated=False, required=False), | ||
30: protobuf.Field("capabilities", "Capability", repeated=True, required=False), | ||
31: protobuf.Field("backup_type", "BackupType", repeated=False, required=False), | ||
32: protobuf.Field("sd_card_present", "bool", repeated=False, required=False), | ||
33: protobuf.Field("sd_protection", "bool", repeated=False, required=False), | ||
34: protobuf.Field( | ||
"wipe_code_protection", "bool", repeated=False, required=False | ||
), | ||
35: protobuf.Field("session_id", "bytes", repeated=False, required=False), | ||
36: protobuf.Field( | ||
"passphrase_always_on_device", "bool", repeated=False, required=False | ||
), | ||
37: protobuf.Field( | ||
"safety_checks", "SafetyCheckLevel", repeated=False, required=False | ||
), | ||
38: protobuf.Field( | ||
"auto_lock_delay_ms", "uint32", repeated=False, required=False | ||
), | ||
39: protobuf.Field( | ||
"display_rotation", "uint32", repeated=False, required=False | ||
), | ||
40: protobuf.Field( | ||
"experimental_features", "bool", repeated=False, required=False | ||
), | ||
500: protobuf.Field("offset", "uint32", repeated=False, required=False), | ||
501: protobuf.Field("ble_name", "string", repeated=False, required=False), | ||
502: protobuf.Field("ble_ver", "string", repeated=False, required=False), | ||
503: protobuf.Field("ble_enable", "bool", repeated=False, required=False), | ||
504: protobuf.Field("se_enable", "bool", repeated=False, required=False), | ||
506: protobuf.Field("se_ver", "string", repeated=False, required=False), | ||
507: protobuf.Field("backup_only", "bool", repeated=False, required=False), | ||
508: protobuf.Field("onekey_version", "string", repeated=False, required=False), | ||
509: protobuf.Field("onekey_serial", "string", repeated=False, required=False), | ||
510: protobuf.Field( | ||
"bootloader_version", "string", repeated=False, required=False | ||
), | ||
511: protobuf.Field("serial_no", "string", repeated=False, required=False), | ||
519: protobuf.Field( | ||
"boardloader_version", "string", repeated=False, required=False | ||
), | ||
} | ||
|
||
def __init__( | ||
self, | ||
*, | ||
major_version: "int", | ||
minor_version: "int", | ||
patch_version: "int", | ||
capabilities: Optional[Sequence["Capability"]] = None, | ||
vendor: Optional["str"] = None, | ||
bootloader_mode: Optional["bool"] = None, | ||
device_id: Optional["str"] = None, | ||
pin_protection: Optional["bool"] = None, | ||
passphrase_protection: Optional["bool"] = None, | ||
language: Optional["str"] = None, | ||
label: Optional["str"] = None, | ||
initialized: Optional["bool"] = None, | ||
revision: Optional["bytes"] = None, | ||
bootloader_hash: Optional["bytes"] = None, | ||
imported: Optional["bool"] = None, | ||
unlocked: Optional["bool"] = None, | ||
_passphrase_cached: Optional["bool"] = None, | ||
firmware_present: Optional["bool"] = None, | ||
needs_backup: Optional["bool"] = None, | ||
flags: Optional["int"] = None, | ||
model: Optional["str"] = None, | ||
fw_major: Optional["int"] = None, | ||
fw_minor: Optional["int"] = None, | ||
fw_patch: Optional["int"] = None, | ||
fw_vendor: Optional["str"] = None, | ||
unfinished_backup: Optional["bool"] = None, | ||
no_backup: Optional["bool"] = None, | ||
recovery_mode: Optional["bool"] = None, | ||
backup_type: Optional["BackupType"] = None, | ||
sd_card_present: Optional["bool"] = None, | ||
sd_protection: Optional["bool"] = None, | ||
wipe_code_protection: Optional["bool"] = None, | ||
session_id: Optional["bytes"] = None, | ||
passphrase_always_on_device: Optional["bool"] = None, | ||
safety_checks: Optional["SafetyCheckLevel"] = None, | ||
auto_lock_delay_ms: Optional["int"] = None, | ||
display_rotation: Optional["int"] = None, | ||
experimental_features: Optional["bool"] = None, | ||
offset: Optional["int"] = None, | ||
ble_name: Optional["str"] = None, | ||
ble_ver: Optional["str"] = None, | ||
ble_enable: Optional["bool"] = None, | ||
se_enable: Optional["bool"] = None, | ||
se_ver: Optional["str"] = None, | ||
backup_only: Optional["bool"] = None, | ||
onekey_version: Optional["str"] = None, | ||
onekey_serial: Optional["str"] = None, | ||
bootloader_version: Optional["str"] = None, | ||
serial_no: Optional["str"] = None, | ||
boardloader_version: Optional["str"] = None, | ||
) -> None: | ||
self.capabilities: Sequence["Capability"] = ( | ||
capabilities if capabilities is not None else [] | ||
) | ||
self.major_version = major_version | ||
self.minor_version = minor_version | ||
self.patch_version = patch_version | ||
self.vendor = vendor | ||
self.bootloader_mode = bootloader_mode | ||
self.device_id = device_id | ||
self.pin_protection = pin_protection | ||
self.passphrase_protection = passphrase_protection | ||
self.language = language | ||
self.label = label | ||
self.initialized = initialized | ||
self.revision = revision | ||
self.bootloader_hash = bootloader_hash | ||
self.imported = imported | ||
self.unlocked = unlocked | ||
self._passphrase_cached = _passphrase_cached | ||
self.firmware_present = firmware_present | ||
self.needs_backup = needs_backup | ||
self.flags = flags | ||
self.model = model | ||
self.fw_major = fw_major | ||
self.fw_minor = fw_minor | ||
self.fw_patch = fw_patch | ||
self.fw_vendor = fw_vendor | ||
self.unfinished_backup = unfinished_backup | ||
self.no_backup = no_backup | ||
self.recovery_mode = recovery_mode | ||
self.backup_type = backup_type | ||
self.sd_card_present = sd_card_present | ||
self.sd_protection = sd_protection | ||
self.wipe_code_protection = wipe_code_protection | ||
self.session_id = session_id | ||
self.passphrase_always_on_device = passphrase_always_on_device | ||
self.safety_checks = safety_checks | ||
self.auto_lock_delay_ms = auto_lock_delay_ms | ||
self.display_rotation = display_rotation | ||
self.experimental_features = experimental_features | ||
self.offset = offset | ||
self.ble_name = ble_name | ||
self.ble_ver = ble_ver | ||
self.ble_enable = ble_enable | ||
self.se_enable = se_enable | ||
self.se_ver = se_ver | ||
self.backup_only = backup_only | ||
self.onekey_version = onekey_version | ||
self.onekey_serial = onekey_serial | ||
self.bootloader_version = bootloader_version | ||
self.serial_no = serial_no | ||
self.boardloader_version = boardloader_version | ||
|
||
|
||
DEFAULT_MAPPING.register(OnekeyFeatures) | ||
|
||
USB_IDS = {(0x1209, 0x4F4A), (0x1209, 0x4F4B), } | ||
|
||
ONEKEY_LEGACY = TrezorModel( | ||
name="1", | ||
minimum_version=(2, 11, 0), | ||
vendors=VENDORS, | ||
usb_ids=USB_IDS, | ||
default_mapping=DEFAULT_MAPPING, | ||
) | ||
|
||
ONEKEY_TOUCH = TrezorModel( | ||
name="T", | ||
minimum_version=(4, 2, 0), | ||
vendors=VENDORS, | ||
usb_ids=USB_IDS, | ||
default_mapping=DEFAULT_MAPPING, | ||
) | ||
|
||
ONEKEYS = (ONEKEY_LEGACY, ONEKEY_TOUCH) | ||
|
||
|
||
def model_by_name(name: str) -> Optional[TrezorModel]: | ||
for model in ONEKEYS: | ||
if model.name == name: | ||
return model | ||
return None | ||
|
||
|
||
# ===============overwrite methods for onekey device begin============ | ||
|
||
|
||
def _refresh_features(self: object, features: Features) -> None: | ||
"""Update internal fields based on passed-in Features message.""" | ||
if not self.model: | ||
self.model = model_by_name(features.model or "1") | ||
if self.model is None: | ||
raise RuntimeError("Unsupported OneKey model") | ||
|
||
if features.vendor not in self.model.vendors: | ||
raise RuntimeError("Unsupported device") | ||
self.features = features | ||
self.version = (*map(int, self.features.onekey_version.split(".")), ) | ||
self.check_firmware_version(warn_only=True) | ||
if self.features.session_id is not None: | ||
self.session_id = self.features.session_id | ||
self.features.session_id = None | ||
|
||
def button_request(self: object, code: Optional[int]) -> None: | ||
if not self.prompt_shown: | ||
print("Please confirm action on your OneKey device", file=sys.stderr) | ||
if not self.always_prompt: | ||
self.prompt_shown = True | ||
|
||
|
||
# ===============overwrite methods for onekey device end============ | ||
|
||
ONEKEY_EMULATOR_PATH = "127.0.0.1:54935" | ||
class OnekeyClient(TrezorClient): | ||
def __init__( | ||
self, | ||
path: str, | ||
password: Optional[str] = None, | ||
expert: bool = False, | ||
chain: Chain = Chain.MAIN, | ||
) -> None: | ||
super().__init__(path, password, expert, chain, webusb_ids=USB_IDS, sim_path=ONEKEY_EMULATOR_PATH) | ||
self.client._refresh_features = MethodType(_refresh_features, self.client) | ||
if not isinstance(self.client.ui, debuglink.DebugUI): | ||
self.client.ui.button_request = MethodType(button_request, self.client.ui) | ||
self.type = "OneKey" | ||
|
||
|
||
def enumerate( | ||
password: Optional[str] = None, expert: bool = False, chain: Chain = Chain.MAIN | ||
) -> List[Dict[str, Any]]: | ||
results = [] | ||
devs = webusb.WebUsbTransport.enumerate(usb_ids=USB_IDS) | ||
devs.extend(udp.UdpTransport.enumerate(path=ONEKEY_EMULATOR_PATH)) | ||
for dev in devs: | ||
d_data: Dict[str, Any] = {} | ||
|
||
d_data["type"] = "onekey" | ||
d_data["path"] = dev.get_path() | ||
client = None | ||
with handle_errors(common_err_msgs["enumerate"], d_data): | ||
client = OnekeyClient(d_data["path"], password) | ||
try: | ||
client._prepare_device() | ||
except TypeError: | ||
continue | ||
if not client.client.features.onekey_version or client.client.features.vendor not in VENDORS: | ||
continue | ||
|
||
d_data["label"] = client.client.features.label | ||
d_data["model"] = "onekey_" + client.client.features.model.lower() | ||
if d_data["path"].startswith("udp:"): | ||
d_data["model"] += "_simulator" | ||
|
||
d_data["needs_pin_sent"] = ( | ||
client.client.features.pin_protection | ||
and not client.client.features.unlocked | ||
) | ||
if client.client.features.model == "1": | ||
d_data[ | ||
"needs_passphrase_sent" | ||
] = ( | ||
client.client.features.passphrase_protection | ||
) # always need the passphrase sent for Trezor One if it has passphrase protection enabled | ||
else: | ||
d_data["needs_passphrase_sent"] = False | ||
if d_data["needs_pin_sent"]: | ||
raise DeviceNotReadyError( | ||
"OneKey is locked. Unlock by using 'promptpin' and then 'sendpin'." | ||
) | ||
if d_data["needs_passphrase_sent"] and password is None: | ||
d_data["warnings"] = [ | ||
[ | ||
'Passphrase protection enabled but passphrase was not provided. Using default passphrase of the empty string ("")' | ||
] | ||
] | ||
if client.client.features.initialized: | ||
d_data["fingerprint"] = client.get_master_fingerprint().hex() | ||
d_data[ | ||
"needs_passphrase_sent" | ||
] = False # Passphrase is always needed for the above to have worked, so it's already sent | ||
else: | ||
d_data["error"] = "Not initialized" | ||
d_data["code"] = DEVICE_NOT_INITIALIZED | ||
|
||
if client: | ||
client.close() | ||
|
||
results.append(d_data) | ||
return results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# OneKey: Hold your own key | ||
# https://onekey.so/ | ||
# | ||
# Put this file into /etc/udev/rules.d | ||
# | ||
# If you are creating a distribution package, | ||
# put this into /usr/lib/udev/rules.d or /lib/udev/rules.d | ||
# depending on your distribution | ||
|
||
# OneKey | ||
# onekey boot | ||
SUBSYSTEM=="usb", ATTR{idVendor}=="1209", ATTR{idProduct}=="4F4A", MODE="0660", GROUP="plugdev", TAG+="uaccess", TAG+="udev-acl", SYMLINK+="onekey%n" | ||
# onekey firmware | ||
SUBSYSTEM=="usb", ATTR{idVendor}=="1209", ATTR{idProduct}=="4F4B", MODE="0660", GROUP="plugdev", TAG+="uaccess", TAG+="udev-acl", SYMLINK+="onekey%n" | ||
KERNEL=="hidraw*", ATTRS{idVendor}=="1209", ATTRS{idProduct}=="4F4B", MODE="0660", GROUP="plugdev", TAG+="uaccess", TAG+="udev-acl" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
#! /usr/bin/env python3 | ||
|
||
import argparse | ||
import atexit | ||
import json | ||
import os | ||
import shlex | ||
import signal | ||
import socket | ||
import subprocess | ||
import sys | ||
import time | ||
import unittest | ||
|
||
from hwilib.devices.trezorlib.transport.udp import UdpTransport | ||
from hwilib.devices.trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic | ||
from hwilib.devices.trezorlib import device | ||
from hwilib.devices.onekey import _refresh_features | ||
from test_device import ( | ||
Bitcoind, | ||
DeviceEmulator, | ||
DeviceTestCase, | ||
TestDeviceConnect, | ||
TestGetKeypool, | ||
TestGetDescriptors, | ||
TestDisplayAddress, | ||
TestSignMessage, | ||
TestSignTx, | ||
) | ||
|
||
from hwilib._cli import process_commands | ||
|
||
from types import MethodType | ||
|
||
ONEKEY_MODELS = {'1', 't'} | ||
|
||
def get_pin(self, code=None): | ||
if self.pin: | ||
return self.debuglink.encode_pin(self.pin) | ||
else: | ||
return self.debuglink.read_pin_encoded() | ||
|
||
DEFAULT_UDP_PORT = 54935 | ||
|
||
class OnkeyEmulator(DeviceEmulator): | ||
def __init__(self, path, model): | ||
assert model in ONEKEY_MODELS | ||
self.emulator_path = path | ||
self.emulator_proc = None | ||
self.model = model | ||
self.emulator_log = None | ||
try: | ||
os.unlink('onekey-{}-emulator.stdout'.format(self.model)) | ||
except FileNotFoundError: | ||
pass | ||
self.type = f"onekey_{model}" | ||
self.path = "udp:127.0.0.1:54935" | ||
self.fingerprint = '95d8f670' | ||
self.master_xpub = "tpubDCknDegFqAdP4V2AhHhs635DPe8N1aTjfKE9m2UFbdej8zmeNbtqDzK59SxnsYSRSx5uS3AujbwgANUiAk4oHmDNUKoGGkWWUY6c48WgjEx" | ||
self.password = "" | ||
self.supports_ms_display = True | ||
self.supports_xpub_ms_display = True | ||
self.supports_unsorted_ms = True | ||
self.supports_taproot = True | ||
self.strict_bip48 = True | ||
self.include_xpubs = False | ||
self.supports_device_multiple_multisig = True | ||
|
||
def start(self): | ||
super().start() | ||
self.emulator_log = open('onekey-{}-emulator.stdout'.format(self.model), 'a') | ||
# Start the Onekey emulator | ||
self.emulator_proc = subprocess.Popen(['./' + os.path.basename(self.emulator_path)], cwd=os.path.dirname(self.emulator_path), stdout=self.emulator_log, env={'SDL_VIDEODRIVER': 'dummy', 'PYOPT': '0'}, shell=True, preexec_fn=os.setsid) | ||
# Wait for emulator to be up | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.connect(('127.0.0.1', DEFAULT_UDP_PORT)) | ||
sock.settimeout(0) | ||
while True: | ||
try: | ||
time.sleep(1) | ||
sock.sendall(b"PINGPING") | ||
r = sock.recv(8) | ||
if r == b"PONGPONG": | ||
break | ||
except Exception: | ||
time.sleep(1) | ||
# Setup the emulator | ||
wirelink = UdpTransport.enumerate("127.0.0.1:54935")[0] | ||
client = TrezorClientDebugLink(wirelink) | ||
client._refresh_features = MethodType(_refresh_features, client) | ||
client.init_device() | ||
device.wipe(client) | ||
load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests | ||
atexit.register(self.stop) | ||
return client | ||
|
||
def stop(self): | ||
super().stop() | ||
if self.emulator_proc.poll() is None: | ||
os.killpg(os.getpgid(self.emulator_proc.pid), signal.SIGTERM) | ||
os.waitpid(self.emulator_proc.pid, 0) | ||
|
||
# Clean up emulator image | ||
if self.model == 't': | ||
emulator_img = "/var/tmp/onekey.flash" | ||
else: # self.model == '1' | ||
emulator_img = os.path.dirname(self.emulator_path) + "/emulator.img" | ||
|
||
if os.path.isfile(emulator_img): | ||
os.unlink(emulator_img) | ||
|
||
if self.emulator_log is not None: | ||
self.emulator_log.close() | ||
self.emulator_log = None | ||
|
||
# Wait a second for everything to be cleaned up before going to the next test | ||
time.sleep(1) | ||
|
||
atexit.unregister(self.stop) | ||
|
||
class OnekeyTestCase(unittest.TestCase): | ||
def __init__(self, emulator, interface='library', methodName='runTest'): | ||
super(OnekeyTestCase, self).__init__(methodName) | ||
self.emulator = emulator | ||
self.interface = interface | ||
|
||
@staticmethod | ||
def parameterize(testclass, emulator, interface='library'): | ||
testloader = unittest.TestLoader() | ||
testnames = testloader.getTestCaseNames(testclass) | ||
suite = unittest.TestSuite() | ||
for name in testnames: | ||
suite.addTest(testclass(emulator, interface, name)) | ||
return suite | ||
|
||
def do_command(self, args): | ||
cli_args = [] | ||
for arg in args: | ||
cli_args.append(shlex.quote(arg)) | ||
if self.interface == 'cli': | ||
proc = subprocess.Popen(['hwi ' + ' '.join(cli_args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) | ||
result = proc.communicate() | ||
return json.loads(result[0].decode()) | ||
elif self.interface == 'bindist': | ||
proc = subprocess.Popen(['../dist/hwi ' + ' '.join(cli_args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) | ||
result = proc.communicate() | ||
return json.loads(result[0].decode()) | ||
elif self.interface == 'stdin': | ||
input_str = '\n'.join(args) + '\n' | ||
proc = subprocess.Popen(['hwi', '--stdin'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) | ||
result = proc.communicate(input_str.encode()) | ||
return json.loads(result[0].decode()) | ||
else: | ||
return process_commands(args) | ||
|
||
def __str__(self): | ||
return 'onekey_{}: {}'.format(self.emulator.model, super().__str__()) | ||
|
||
def __repr__(self): | ||
return 'onekey_{}: {}'.format(self.emulator.model, super().__repr__()) | ||
|
||
def setUp(self): | ||
self.client = self.emulator.start() | ||
|
||
def tearDown(self): | ||
self.emulator.stop() | ||
|
||
# OneKey specific getxpub test because this requires device specific thing to set xprvs | ||
class TestOnekeyGetxpub(OnekeyTestCase): | ||
def test_getxpub(self): | ||
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f: | ||
vectors = json.load(f) | ||
for vec in vectors: | ||
with self.subTest(vector=vec): | ||
# Setup with mnemonic | ||
device.wipe(self.client) | ||
load_device_by_mnemonic(client=self.client, mnemonic=vec['mnemonic'], pin='', passphrase_protection=False, label='test', language='english') | ||
|
||
# Test getmasterxpub | ||
gmxp_res = self.do_command(['-t', 'onekey', '-d', "udp:127.0.0.1:54935", 'getmasterxpub', "--addr-type", "legacy"]) | ||
self.assertEqual(gmxp_res['xpub'], vec['master_xpub']) | ||
|
||
# Test the path derivs | ||
for path_vec in vec['vectors']: | ||
gxp_res = self.do_command(['-t', 'onekey', '-d', "udp:127.0.0.1:54935", 'getxpub', path_vec['path']]) | ||
self.assertEqual(gxp_res['xpub'], path_vec['xpub']) | ||
|
||
def test_expert_getxpub(self): | ||
result = self.do_command(['-t', 'onekey', '-d', "udp:127.0.0.1:54935", '--expert', 'getxpub', 'm/44h/0h/0h/3']) | ||
self.assertEqual(result['xpub'], 'xpub6FMafWAi3n3ET2rU5yQr16UhRD1Zx4dELmcEw3NaYeBaNnipcr2zjzYp1sNdwR3aTN37hxAqRWQ13AWUZr6L9jc617mU6EvgYXyBjXrEhgr') | ||
self.assertFalse(result['testnet']) | ||
self.assertFalse(result['private']) | ||
self.assertEqual(result['depth'], 4) | ||
self.assertEqual(result['parent_fingerprint'], 'f7e318db') | ||
self.assertEqual(result['child_num'], 3) | ||
self.assertEqual(result['chaincode'], '95a7fb33c4f0896f66045cd7f45ed49a9e72372d2aed204ad0149c39b7b17905') | ||
self.assertEqual(result['pubkey'], '022e6d9c18e5a837e802fb09abe00f787c8ccb0fc489c6ec5dc2613d930efd7eae') | ||
|
||
class TestOnekeyLabel(OnekeyTestCase): | ||
def setUp(self): | ||
self.client = self.emulator.start() | ||
self.dev_args = ['-t', 'onekey', '-d', "udp:127.0.0.1:54935"] | ||
|
||
def test_label(self): | ||
result = self.do_command(self.dev_args + ['enumerate']) | ||
for dev in result: | ||
if dev['type'] == 'onekey' and dev['path'] == "udp:127.0.0.1:54935": | ||
self.assertEqual(dev['label'], 'test') | ||
break | ||
else: | ||
self.fail("Did not enumerate device") | ||
|
||
def onekey_test_suite(emulator, bitcoind, interface, model): | ||
assert model in ONEKEY_MODELS | ||
# Redirect stderr to /dev/null as it's super spammy | ||
sys.stderr = open(os.devnull, 'w') | ||
|
||
dev_emulator = OnkeyEmulator(emulator, model) | ||
signtx_cases = [ | ||
(["legacy"], ["legacy"], False, True), | ||
(["segwit"], ["segwit"], False, True), | ||
(["tap"], [], False, True), | ||
(["legacy", "segwit"], ["legacy", "segwit"], False, True), | ||
(["legacy", "segwit", "tap"], ["legacy", "segwit"], False, True), | ||
] | ||
# Generic Device tests | ||
suite = unittest.TestSuite() | ||
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, bitcoind, emulator=dev_emulator, interface=interface, detect_type="onekey")) | ||
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, bitcoind, emulator=dev_emulator, interface=interface, detect_type=f"onekey_{model}_simulator")) | ||
suite.addTest(DeviceTestCase.parameterize(TestGetDescriptors, bitcoind, emulator=dev_emulator, interface=interface)) | ||
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, bitcoind, emulator=dev_emulator, interface=interface)) | ||
if model == 't': | ||
suite.addTest(DeviceTestCase.parameterize(TestSignTx, bitcoind, emulator=dev_emulator, interface=interface, signtx_cases=signtx_cases)) | ||
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, bitcoind, emulator=dev_emulator, interface=interface)) | ||
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, bitcoind, emulator=dev_emulator, interface=interface)) | ||
suite.addTest(OnekeyTestCase.parameterize(TestOnekeyLabel, emulator=dev_emulator, interface=interface)) | ||
suite.addTest(OnekeyTestCase.parameterize(TestOnekeyGetxpub, emulator=dev_emulator, interface=interface)) | ||
result = unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite) | ||
sys.stderr = sys.__stderr__ | ||
return result.wasSuccessful() | ||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser(description='Test Onekey implementation') | ||
parser.add_argument('emulator', help='Path to the Onekey emulator') | ||
parser.add_argument('bitcoind', help='Path to bitcoind binary') | ||
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli', 'bindist'], default='library') | ||
group = parser.add_argument_group() | ||
group.add_argument('--model_1', help='The emulator is for the Onekey legacy', action='store_const', const='1', dest='model') | ||
group.add_argument('--model_t', help='The emulator is for the Onekey Touch', action='store_const', const='t', dest='model') | ||
args = parser.parse_args() | ||
|
||
# Start bitcoind | ||
bitcoind = Bitcoind.create(args.bitcoind) | ||
|
||
sys.exit(not onekey_test_suite(args.emulator, bitcoind, args.interface, args.model)) |