Skip to content

Commit

Permalink
Fix: lot and sequence logic & update exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
meherett committed Aug 24, 2024
1 parent d57cffb commit 682c253
Showing 1 changed file with 93 additions and 54 deletions.
147 changes: 93 additions & 54 deletions bip38/bip38.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# file COPYING or https://opensource.org/license/mit

from typing import (
Type, Union, Optional, Literal
Type, Union, Optional, Literal, List
)
from pyaes import AESModeOfOperationECB

Expand Down Expand Up @@ -39,10 +39,11 @@
MAGIC_NO_LOT_AND_SEQUENCE_COMPRESSED_FLAG,
EC_MULTIPLIED_PRIVATE_KEY_PREFIX,
CONFIRMATION_CODE_PREFIX,
WIF_TYPES,
FLAGS
)
from .exceptions import (
Error, PassphraseError
Error, CryptocurrencyError, NetworkError, Secp256k1Error, WIFError, PassphraseError
)
from .utils import (
integer_to_bytes, bytes_to_integer, bytes_to_string, get_bytes
Expand All @@ -62,17 +63,25 @@ class BIP38:
:type network: str
"""

network: str
cryptocurrency: Type[ICryptocurrency]
networks: List[str]
network: str
alphabet: str

def __init__(
self, cryptocurrency: Type[ICryptocurrency], network: str = "mainnet"
) -> None:

self.network = network
self.cryptocurrency = cryptocurrency
self.alphabet = (
if not issubclass(cryptocurrency, ICryptocurrency):
raise CryptocurrencyError(
"Invalid cryptocurrency sub-class", expected=Type[ICryptocurrency], got=type(cryptocurrency)
)
self.cryptocurrency, self.networks = (
cryptocurrency, list(cryptocurrency.NETWORKS.keys())
)
if network not in self.networks:
raise NetworkError("Wrong network type", expected=self.networks, got=type(network))
self.network, self.alphabet = network, (
self.cryptocurrency.ALPHABET
if self.cryptocurrency.ALPHABET else
Bitcoin.ALPHABET
Expand Down Expand Up @@ -112,19 +121,18 @@ def intermediate_code(

owner_salt: bytes = get_bytes(owner_salt) if owner_salt else os.urandom(8)
if len(owner_salt) not in [4, 8]:
raise Error(f"Invalid owner salt length (expected: 8 or 16, got: {len(bytes_to_string(owner_salt))})")
if len(owner_salt) == 4 and (not lot or not sequence):
raise Error(
f"Invalid owner salt length for non lot/sequence (expected: 16, got: {len(bytes_to_string(owner_salt))})")
if (lot and not sequence) or (not lot and sequence):
raise Error(f"Both lot & sequence are required, got: (lot {lot}) (sequence {sequence})")
raise Error("Invalid owner salt length", expected=[8, 16], got=len(bytes_to_string(owner_salt)))
if len(owner_salt) == 4 and (lot is None or sequence is None):
raise Error("Invalid owner salt length for non lot/sequence", expected=16, got=len(bytes_to_string(owner_salt)))
if (lot and sequence is None) or (lot is None and sequence):
raise Error("Both lot & sequence are required", detail=f"got: (lot {lot}) & (sequence {sequence})")

if lot and sequence:
lot, sequence = int(lot), int(sequence)
if not 100000 <= lot <= 999999:
raise Error(f"Invalid lot, (expected: 100000 <= lot <= 999999, got: {lot})")
raise Error("Invalid lot", expected="100000 <= lot <= 999999", got=lot)
if not 0 <= sequence <= 4095:
raise Error(f"Invalid lot, (expected: 0 <= sequence <= 4095, got: {sequence})")
raise Error("Invalid sequence", expected="0 <= sequence <= 4095", got=sequence)

pre_factor: bytes = scrypt.hash(unicodedata.normalize("NFC", passphrase), owner_salt[:4], 16384, 8, 8, 32)
owner_entropy: bytes = owner_salt[:4] + integer_to_bytes((lot * 4096 + sequence), 4)
Expand Down Expand Up @@ -172,6 +180,8 @@ def encrypt(self, wif: str, passphrase: str, network: Optional[str] = None) -> s
network: str = (
network if network else self.network
)
if network not in self.networks:
raise NetworkError("Wrong network type", expected=self.networks, got=type(network))
wif_type: str = get_wif_type(
wif=wif, cryptocurrency=self.cryptocurrency, network=network
)
Expand All @@ -181,14 +191,12 @@ def encrypt(self, wif: str, passphrase: str, network: Optional[str] = None) -> s
wif=wif, cryptocurrency=self.cryptocurrency, network=network
)))
public_key_type: str = "uncompressed"
elif wif_type == "wif-compressed":
else:
flag: bytes = integer_to_bytes(NO_EC_MULTIPLIED_WIF_COMPRESSED_FLAG)
private_key: PrivateKey = PrivateKey.from_bytes(get_bytes(wif_to_private_key(
wif=wif, cryptocurrency=self.cryptocurrency, network=network
)))
public_key_type: str = "compressed"
else:
raise Error("Wrong WIF (Wallet Important Format) type")

address: str = P2PKHAddress.encode(
public_key=private_key.public_key(),
Expand Down Expand Up @@ -219,7 +227,7 @@ def encrypt(self, wif: str, passphrase: str, network: Optional[str] = None) -> s
def create_new_encrypted_wif(
self,
intermediate_passphrase: str,
public_key_type: str = "uncompressed",
wif_type: str = "wif",
seed: Optional[Union[str, bytes]] = None,
network: Optional[str] = None
) -> dict:
Expand All @@ -228,8 +236,8 @@ def create_new_encrypted_wif(
:param intermediate_passphrase: The intermediate passphrase.
:type intermediate_passphrase: str
:param public_key_type: The type of public key (default: "uncompressed").
:type public_key_type: str
:param wif_type: The WIF type, either 'wif' or 'wif-compressed' (default is 'wif').
:type wif_type: str
:param seed: Optional seed (default: random 24 bytes).
:type seed: Optional[str, bytes]
:param network: Optional network for encryption. Defaults to the class's network if not provided.
Expand All @@ -241,41 +249,52 @@ def create_new_encrypted_wif(
>>> from bip38 import BIP38
>>> from bip38.cryptocurrencies import Bitcoin
>>> bip38: BIP38 = BIP38(cryptocurrency=Bitcoin, network="testnet")
>>> bip38.create_new_encrypted_wif(intermediate_passphrase="passphraseb7ruSN4At4Rb8hPTNcAVezfsjonvUs4Qo3xSp1fBFsFPvVGSbpP2WTJMhw3mVZ", public_key_type="uncompressed")
>>> bip38.create_new_encrypted_wif(intermediate_passphrase="passphraseb7ruSN4At4Rb8hPTNcAVezfsjonvUs4Qo3xSp1fBFsFPvVGSbpP2WTJMhw3mVZ", wif_type="wif")
{'encrypted_wif': '6PgMqfwt1nqJeUkCTRVf4TV6FJoqA8GFCGaj6RsTW1t3XgQNUDfKW9u9Px', 'confirmation_code': 'cfrm38V8ZR17HkU8Xdu8RunWf8CZauXphNQa5HFJd4cxEYckXHS6tfo9M73yL3FPmWv1xqBQsgG', 'public_key': '04fdfbd938b4bb220c11fc1c22e87c5306d105130dd05d7ece4013aa1d2382f3a2a8673fd7b4b2f55c48cd0ebda7d88089783b3394210a0853159803b5eb99097e', 'seed': '0a89d39d0af0f0d7abc655baa0e399f8ddcd372bb9aaebce', 'public_key_type': 'uncompressed', 'address': 'myRfjUS74ab2ZbEbQuiWNzPHb5fSYuFvm4'}
>>> bip38.create_new_encrypted_wif(intermediate_passphrase="passphraseb7ruSN4At4Rb8hPTNcAVezfsjonvUs4Qo3xSp1fBFsFPvVGSbpP2WTJMhw3mVZ", public_key_type="compressed", seed="99241d58245c883896f80843d2846672d7312e6195ca1a6c")
>>> bip38.create_new_encrypted_wif(intermediate_passphrase="passphraseb7ruSN4At4Rb8hPTNcAVezfsjonvUs4Qo3xSp1fBFsFPvVGSbpP2WTJMhw3mVZ", wif_type="wif-compressed", seed="99241d58245c883896f80843d2846672d7312e6195ca1a6c")
{'encrypted_wif': '6PoH364JVeoBPsJBveXCwfWpX2H82N5qiAervtynak7r7dzZF2TBFxZAXE', 'confirmation_code': 'cfrm38VX6jSx7C3nbxWCVEdGna7JMpm57zHdbuofbpCA9EiN57aEt4s5fh9k19b5cTmyZ5jMkE2', 'public_key': '02100bb0440ff4106a1743750813271e66a7017431e92921e520319f537c7357c1', 'seed': '99241d58245c883896f80843d2846672d7312e6195ca1a6c', 'public_key_type': 'compressed', 'address': 'mjurfzLk2ryxCyfm4nMk5qarvNRhbNCtK8'}
"""

network: str = (
network if network else self.network
)
if network not in self.networks:
raise NetworkError("Wrong network type", expected=self.networks, got=type(network))
seed_b: bytes = get_bytes(seed) if seed else os.urandom(24)
intermediate_decode: bytes = check_decode(intermediate_passphrase)
try:
intermediate_decode: bytes = check_decode(intermediate_passphrase)
except ValueError:
raise PassphraseError("Invalid intermediate passphrase")
if len(intermediate_decode) != 49:
raise PassphraseError(f"Invalid intermediate passphrase length (expected: 49, got: {len(intermediate_decode)})")
raise PassphraseError("Invalid intermediate passphrase length", expected=49, got=len(intermediate_decode))

magic: bytes = intermediate_decode[:8]
owner_entropy: bytes = intermediate_decode[8:16]
pass_point: bytes = intermediate_decode[16:]

if wif_type == "wif":
public_key_type = "uncompressed"
elif wif_type == "wif-compressed":
public_key_type = "compressed"
else:
raise WIFError("Invalid wif type", expected=WIF_TYPES, got=wif_type)

if magic == integer_to_bytes(MAGIC_LOT_AND_SEQUENCE):
if public_key_type == "uncompressed":
flag: bytes = integer_to_bytes(MAGIC_LOT_AND_SEQUENCE_UNCOMPRESSED_FLAG)
elif public_key_type == "compressed":
flag: bytes = integer_to_bytes(MAGIC_LOT_AND_SEQUENCE_COMPRESSED_FLAG)
else:
raise Error(
f"Invalid public key type (expected: 'uncompressed' or 'compressed', got: {public_key_type})")
flag: bytes = integer_to_bytes(MAGIC_LOT_AND_SEQUENCE_COMPRESSED_FLAG)
elif magic == integer_to_bytes(MAGIC_NO_LOT_AND_SEQUENCE):
if public_key_type == "uncompressed":
flag: bytes = integer_to_bytes(MAGIC_NO_LOT_AND_SEQUENCE_UNCOMPRESSED_FLAG)
elif public_key_type == "compressed":
flag: bytes = integer_to_bytes(MAGIC_NO_LOT_AND_SEQUENCE_COMPRESSED_FLAG)
else:
raise Error(
f"Invalid public key type (expected: 'uncompressed' or 'compressed', got: {public_key_type})")
flag: bytes = integer_to_bytes(MAGIC_NO_LOT_AND_SEQUENCE_COMPRESSED_FLAG)
else:
raise Error(
f"Invalid magic (expected: {bytes_to_string(integer_to_bytes(MAGIC_LOT_AND_SEQUENCE))}/"
f"{bytes_to_string(integer_to_bytes(MAGIC_NO_LOT_AND_SEQUENCE))}, got: {bytes_to_string(magic)})"
"Invalid magic", expected=[
bytes_to_string(integer_to_bytes(MAGIC_LOT_AND_SEQUENCE)),
bytes_to_string(integer_to_bytes(MAGIC_NO_LOT_AND_SEQUENCE))
], got=bytes_to_string(magic)
)

factor_b: bytes = double_sha256(seed_b)
Expand All @@ -287,7 +306,7 @@ def create_new_encrypted_wif(
)
address: str = P2PKHAddress.encode(
public_key=public_key,
address_prefix=self.cryptocurrency.NETWORKS[network if network else self.network]["address_prefix"],
address_prefix=self.cryptocurrency.NETWORKS[network]["address_prefix"],
public_key_type=public_key_type
)
address_hash: bytes = get_checksum(get_bytes(address, unhexlify=False))
Expand Down Expand Up @@ -364,14 +383,22 @@ def confirm_code(
{'public_key': '02100bb0440ff4106a1743750813271e66a7017431e92921e520319f537c7357c1', 'public_key_type': 'compressed', 'address': '15PuNwFmDqYhRsC9MDPNFvNY4Npzibm67c', 'lot': 199999, 'sequence': 1}
"""

confirmation_code_decode: bytes = check_decode(confirmation_code)
network: str = (
network if network else self.network
)
if network not in self.networks:
raise NetworkError("Wrong network type", expected=self.networks, got=type(network))
try:
confirmation_code_decode: bytes = check_decode(confirmation_code)
except ValueError:
raise Error("Invalid confirmation code")
if len(confirmation_code_decode) != 51:
raise Error(f"Invalid confirmation code length (expected: 102, got: {len(confirmation_code_decode)})")
raise Error("Invalid confirmation code length", expected=102, got=len(confirmation_code_decode))

prefix_length: int = len(integer_to_bytes(CONFIRMATION_CODE_PREFIX))
prefix_got: bytes = confirmation_code_decode[:prefix_length]
if integer_to_bytes(CONFIRMATION_CODE_PREFIX) != prefix_got:
raise Error(f"Invalid confirmation code prefix (expected: {prefix_length}, got: {prefix_got})")
raise Error("Invalid confirmation code prefix", expected=prefix_length, got=prefix_got)

flag: bytes = confirmation_code_decode[5:6]
address_hash: bytes = confirmation_code_decode[6:10]
Expand Down Expand Up @@ -409,16 +436,20 @@ def confirm_code(
point_b: bytes = (
point_b_prefix + point_b_half_1 + point_b_half_2
)
public_key: PublicKey = PublicKey.from_point(
PublicKey.from_bytes(point_b).point() * bytes_to_integer(pass_factor)
)
public_key_type: str = "uncompressed"
if bytes_to_integer(flag) in FLAGS["compression"]:
public_key_type: str = "compressed"

try:
public_key: PublicKey = PublicKey.from_point(
PublicKey.from_bytes(point_b).point() * bytes_to_integer(pass_factor)
)
public_key_type: str = "uncompressed"
if bytes_to_integer(flag) in FLAGS["compression"]:
public_key_type: str = "compressed"
except Secp256k1Error:
raise PassphraseError("Incorrect passphrase")

address: str = P2PKHAddress.encode(
public_key=public_key,
address_prefix=self.cryptocurrency.NETWORKS[network if network else self.network]["address_prefix"],
address_prefix=self.cryptocurrency.NETWORKS[network]["address_prefix"],
public_key_type=public_key_type
)
if get_checksum(get_bytes(address, unhexlify=False)) == address_hash:
Expand All @@ -436,7 +467,7 @@ def confirm_code(
sequence=sequence
)
return address
raise PassphraseError("Incorrect passphrase or password")
raise PassphraseError("Incorrect passphrase")

def decrypt(
self, encrypted_wif: str, passphrase: str, network: Optional[str] = None, detail: bool = False
Expand Down Expand Up @@ -472,9 +503,11 @@ def decrypt(
network: str = (
network if network else self.network
)
if network not in self.networks:
raise NetworkError("Wrong network type", expected=self.networks, got=type(network))
encrypted_wif_decode: bytes = decode(encrypted_wif)
if len(encrypted_wif_decode) != 43:
raise Error(f"Invalid encrypted WIF length (expected: 43, got: {len(encrypted_wif_decode)})")
raise WIFError("Invalid encrypted WIF length", expected=43, got=len(encrypted_wif_decode))

prefix: bytes = encrypted_wif_decode[:2]
flag: bytes = encrypted_wif_decode[2:3]
Expand All @@ -490,11 +523,15 @@ def decrypt(
public_key_type: str = "compressed"
else:
raise Error(
f"Invalid flag (expected: {bytes_to_string(integer_to_bytes(NO_EC_MULTIPLIED_WIF_FLAG))} or "
f"{bytes_to_string(integer_to_bytes(NO_EC_MULTIPLIED_WIF_COMPRESSED_FLAG))}, got: {bytes_to_string(flag)})"
"Invalid flag", expected=[
bytes_to_string(integer_to_bytes(NO_EC_MULTIPLIED_WIF_FLAG)),
bytes_to_string(integer_to_bytes(NO_EC_MULTIPLIED_WIF_COMPRESSED_FLAG))
], got=bytes_to_string(flag)
)

key: bytes = scrypt.hash(unicodedata.normalize("NFC", passphrase), address_hash, 16384, 8, 8)
key: bytes = scrypt.hash(
unicodedata.normalize("NFC", passphrase), address_hash, 16384, 8, 8
)
derived_half_1, derived_half_2 = key[0:32], key[32:64]
encrypted_half_1: bytes = encrypted_wif_decode[7:23]
encrypted_half_2: bytes = encrypted_wif_decode[23:39]
Expand All @@ -516,7 +553,7 @@ def decrypt(
public_key_type=public_key_type
)
if get_checksum(get_bytes(address, unhexlify=False)) != address_hash:
raise PassphraseError("Incorrect passphrase or password")
raise PassphraseError("Incorrect passphrase")

wif: str = private_key_to_wif(
private_key=private_key, wif_type=wif_type, cryptocurrency=self.cryptocurrency, network=network
Expand Down Expand Up @@ -612,9 +649,11 @@ def decrypt(
sequence=sequence
)
return wif
raise PassphraseError("Incorrect passphrase or password")
raise PassphraseError("Incorrect passphrase")
else:
raise Error(
f"Invalid prefix (expected: {bytes_to_string(integer_to_bytes(NO_EC_MULTIPLIED_PRIVATE_KEY_PREFIX))} or "
f"{bytes_to_string(integer_to_bytes(EC_MULTIPLIED_PRIVATE_KEY_PREFIX))}, got: {bytes_to_string(prefix)})"
"Invalid prefix", expected=[
bytes_to_string(integer_to_bytes(NO_EC_MULTIPLIED_PRIVATE_KEY_PREFIX)),
bytes_to_string(integer_to_bytes(EC_MULTIPLIED_PRIVATE_KEY_PREFIX))
], got=bytes_to_string(prefix)
)

0 comments on commit 682c253

Please sign in to comment.