Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ecdsa python client example #6

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
"""
This script demonstrates a complete implementation of using Nillion's infrastructure to sign a message based on
the cggmp21 Threshold ECDSA protocol [1, 2]. Threshold ECDSA enables splitting the signing process across
multiple parties, ensuring enhanced security and fault tolerance.

We use an external python library as an example to show how to generate key-value pairs and verify the signature.

The program includes:
1. Generation and storage of an ECDSA private key.
2. Hashing of a message to create a digest.
3. Storage and retrieval of both the private key and the digest for secure computation.
4. Digital signature creation on Nillion's network using the stored private key and digest.
5. Verification of the generated digital signature using an external library.

References:
- [1] Canetti, R., Gennaro, R., Goldfeder, S., Makriyannis, N., & Peled, U. (2020). UC Non-Interactive, Proactive,
Threshold ECDSA with Identifiable Aborts. Proceedings of the 2020 ACM SIGSAC Conference on Computer and Communications
Security (CCS ’20), 1769–1787. ACM.
- [2] LFDT-Lockness. (n.d.). cggmp21. GitHub repository. Retrieved November 25, 2024,
from https://github.com/LFDT-Lockness/cggmp21
"""

import asyncio
import pytest
import os

from nillion_client import (
InputPartyBinding,
Network,
NilChainPayer,
NilChainPrivateKey,
OutputPartyBinding,
Permissions,
EcdsaPrivateKey,
EcdsaDigestMessage,
EcdsaSignature,
VmClient,
PrivateKey,
)
from dotenv import load_dotenv

from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives import serialization, hashes

home = os.getenv("HOME")
print(f"{home}/.config/nillion/nillion-devnet.env\n")
load_dotenv(f"{home}/.config/nillion/nillion-devnet.env")
print("Loaded!")



async def main():

###########################################
# #
# NILLION NETWORK CONFIGURATION #
# #
###########################################


# Use the devnet configuration generated by `nillion-devnet`
network = Network.from_config("devnet")

# Create payments config and set up Nillion wallet with a private key to pay for operations
nilchain_key: str = os.getenv("NILLION_NILCHAIN_PRIVATE_KEY_0") # type: ignore
payer = NilChainPayer(
network,
wallet_private_key=NilChainPrivateKey(bytes.fromhex(nilchain_key)),
gas_limit=10000000,
)

# Use a random key to identify ourselves
signing_key = PrivateKey()
client = await VmClient.create(signing_key, network, payer)


###########################################
# #
# ECDSA CONFIG NAMES #
# #
###########################################


# program id
tecdsa_program_id = "builtin/tecdsa_sign"
# input store name
tecdsa_key_name = "tecdsa_private_key"
tecdsa_digest_name = "tecdsa_digest_message"
tecdsa_signature_name = "tecdsa_signature"
# party names
tecdsa_key_party = "tecdsa_key_party"
tecdsa_digest_party = "tecdsa_digest_message_party"
tecdsa_output_party = "tecdsa_output_party"


###########################################
# #
# ECDSA PRIVATE KEY #
# #
###########################################


##### LOCAL ECDSA PRIVATE KEY GENERATION
print("-----LOCAL ECDSA PRIVATE KEY GENERATION")

# Generate ECDSA private key
ecdsa_private_key = ec.generate_private_key(ec.SECP256K1())
ecdsa_public_key = ecdsa_private_key.public_key()

# Convert private key to bytes
private_key_bytes = ecdsa_private_key.private_numbers().private_value.to_bytes(
length=ecdsa_private_key.key_size // 8, # Key size in bytes
byteorder='big'
)
print("Generated private key: ", private_key_bytes)


##### STORE ECDSA PRIVATE KEY
print("-----STORE ECDSA PRIVATE KEY")

tecdsa_key_value = bytearray(private_key_bytes)
# ecdsa key to be stored or used for signing
my_ecdsa_key = {
tecdsa_key_name: EcdsaPrivateKey(tecdsa_key_value),
}

# Create a permissions object to attach to the stored secret
permissions = Permissions.defaults_for_user(client.user_id).allow_compute(
client.user_id, tecdsa_program_id
)

# Store the secret
my_ecdsa_key_id = await client.store_values(
my_ecdsa_key, ttl_days=5, permissions=permissions
).invoke()

print(f"The ecdsa private key is stored at: {my_ecdsa_key_id}")


##### RETRIEVE ECDSA PRIVATE KEY
print("-----RETRIEVE ECDSA PRIVATE KEY")

retrieved_values = await client.retrieve_values(my_ecdsa_key_id).invoke()
value: EcdsaPrivateKey = retrieved_values[tecdsa_key_name] # type: ignore

ecdsa_private_key_bytearray = value.value
print("Retrieved stored private key: ", ecdsa_private_key_bytearray)


###########################################
# #
# ECDSA DIGEST #
# #
###########################################


##### GENERATE MESSAGE AND DIGEST
print("-----GENERATE MESSAGE AND DIGEST")

# The message to sign
message = b"A deep message with a deep number: 42."
# Hashing the message
digest = hashes.Hash(hashes.SHA256())
digest.update(message)
hashed_message = digest.finalize()

print("Generated digest: ", hashed_message)


##### STORE ECDSA DIGEST
print("-----STORE ECDSA DIGEST")

tecdsa_digest_value = bytearray(hashed_message)
# ecdsa key to be stored or used for signing
my_ecdsa_digest = {
tecdsa_digest_name: EcdsaDigestMessage(tecdsa_digest_value),
}

# Create a permissions object to attach to the stored secret
permissions = Permissions.defaults_for_user(client.user_id).allow_compute(
client.user_id, tecdsa_program_id
)

# Store the secret
my_ecdsa_digest_id = await client.store_values(
my_ecdsa_digest, ttl_days=5, permissions=permissions
).invoke()

print(f"The digest is stored at: {my_ecdsa_digest_id}")


##### RETRIEVE ECDSA DIGEST
print("-----RETRIEVE ECDSA DIGEST")

retrieved_values = await client.retrieve_values(my_ecdsa_digest_id).invoke()
value: EcdsaDigestMessage = retrieved_values[tecdsa_digest_name] # type: ignore

ecdsa_digest_bytearray = value.value
print("Retrieved stored digest: ", ecdsa_digest_bytearray)


###########################################
# #
# ECDSA SIGNING #
# #
###########################################

##### ECDSA SIGNING
print("-----ECDSA SIGNING")

# Bind the parties in the computation to the client to set input and output parties
input_bindings = [
InputPartyBinding(tecdsa_key_party, client.user_id),
InputPartyBinding(tecdsa_digest_party, client.user_id)
]
output_bindings = [OutputPartyBinding(tecdsa_output_party, [client.user_id])]

# Create a computation time secret to use
compute_time_values = {}

# Compute, passing in the compute time values as well as the previously uploaded value.
print(f"Invoking computation using program {tecdsa_program_id} and values id: {my_ecdsa_key_id} and {my_ecdsa_digest_id}")
compute_id = await client.compute(
tecdsa_program_id,
input_bindings,
output_bindings,
values=compute_time_values,
value_ids=[my_ecdsa_key_id, my_ecdsa_digest_id],
).invoke()

# 6. Return the computation result
print(f"The computation was sent to the network. compute_id: {compute_id}")
result = await client.retrieve_compute_results(compute_id).invoke()
signature: EcdsaSignature = result[tecdsa_signature_name]
print(f"✅ Compute complete for compute_id {compute_id}")
print(f"🖥️ The result is {signature.value}")


###########################################
# #
# ECDSA VERIFICATION #
# #
###########################################


##### ECDSA VERIFICATION
print("-----ECDSA VERIFICATION")

# Transform the result signature to some external library signature
(r, s) = signature.value
r_int = int.from_bytes(r, byteorder="big")
s_int = int.from_bytes(s, byteorder="big")
signature = utils.encode_dss_signature(r_int, s_int)

# Convert public key to PEM format (for visibility)
pem_public_key = ecdsa_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# Verify the signature
try:
ecdsa_public_key.verify(
signature,
message,
ec.ECDSA(hashes.SHA256())
)
print(f"🎉 Signature verification successful!\n"
f"Message: {message.decode()}\n"
f"Signature details:\n"
f" r = {r_int}\n"
f" s = {s_int}\n"
f"Public key:\n{pem_public_key.decode()}\n"
f"The message has been verified as authentic.")
except Exception as e:
print(f"❌ Signature is invalid: {str(e)}")

return result



if __name__ == "__main__":
asyncio.run(main())


@pytest.mark.asyncio
async def test_main():
await main()