Skip to content

Commit

Permalink
Finish loading Crypt4GH private keys.
Browse files Browse the repository at this point in the history
- add check for c4gh-v1 magic word
- explicitly check supported KDFs and implement them:
  - scrypt
  - bcrypt
  - pbkdf2_hmac_sha256
- c4gh binary key decoding
- decrypting encrypted binary key using user-provided passphrase callback
- add bcrypt and cryptography to dependencies (for now without explicit version)
- add simple test for loading sample secret key from reference implementation
  • Loading branch information
dzoep committed Aug 21, 2024
1 parent 03a37cd commit 79aebb2
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 6 deletions.
160 changes: 157 additions & 3 deletions oarepo_c4gh/key/c4gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,34 @@
from io import RawIOBase, BytesIO
from typing import Self
from base64 import b64decode
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305

# 7 bytes magic word that is at the very beginning of any private key
C4GH_MAGIC_WORD = b"c4gh-v1"

def default_passphrase_callback():
# Supported KDFs of Crypt4GH
C4GH_KDFS = b"scrypt" b"bcrypt" b"pbkdf2_hmac_sha256"


def check_c4gh_kdf(kdf_name: bytes) -> bool:
"""Returns true if given KDF is supported.
Parameters:
kdf_name: KDF name string as bytes
Returns:
True if the KDF is supported.
"""
return kdf_name in C4GH_KDFS


def default_passphrase_callback() -> None:
"""By default the constructor has no means of obtaining the
passphrase and therefore this function unconditionally raises an
exception when called.
"""
raise ArgumentError("No password callback provided!")
raise ValueError("No password callback provided!")


def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes):
Expand Down Expand Up @@ -51,6 +70,120 @@ def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes):
return begin_label, data


def decode_c4gh_bytes(istream: RawIOBase) -> bytes:
"""Decodes binary string encoded as two-byte big-endian integer
length and the actual data that follows this length field.
Parameters:
istream: input stream from which to decode the bytes string.
Returns:
The decoded bytes string.
Raises:
ValueError: if there is not enough data in the stream
"""
lengthb = istream.read(2)
lengthb_length = len(lengthb)
if len(lengthb) != 2:
raise ValueError(
f"Binary string read - not enought data to read the length: "
f"{lengthb_length} != 2"
)
length = int.from_bytes(lengthb, byteorder="big")
string = istream.read(length)
read_length = len(string)
if read_length != length:
raise ValueError(
f"Binary string read - not enough data: {read_length} != {length}"
)
return string


def check_c4gh_stream_magic(istreamb: RawIOBase) -> None:
"""Reads enough bytes from given input stream and checks whether
they contain the correct Crypt4GH signature. Raises error if it
doesn't.
Parameters:
istreamb: input stream with the raw Crypt4GH binary key stream.
Raises:
ValueError: if the signature does not match.
"""
magic_to_check = istreamb.read(len(C4GH_MAGIC_WORD))
if magic_to_check != C4GH_MAGIC_WORD:
raise ValueError("Not a Crypt4GH private key!")


def parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes):
"""Parses KDF name and options (if applicable) from given input
stream.
Parameters:
istreamb: input stream with the raw Crypt4GH binary stream.
Returns:
kdf_name: the name of the KDF as binary string
kdf_rounds: number of hashing rounds for KDF
kdf_salt: salt for initializing the hashing
Raises:
ValueError: if parsed KDF name is not supported
"""
kdf_name = decode_c4gh_bytes(istreamb)
if kdf_name == b"none":
return (kdf_name, None, None)
elif check_c4gh_kdf(kdf_name):
kdf_options = decode_c4gh_bytes(istreamb)
kdf_rounds = int.from_bytes(kdf_options[:4], byteorder="big")
kdf_salt = kdf_options[4:]
return (kdf_name, kdf_rounds, kdf_salt)
else:
raise ValueError(f"Unsupported KDF {kdf_name}")


def derive_c4gh_key(
algo: bytes, passphrase: bytes, salt: bytes, rounds: int
) -> bytes:
"""Derives the symmetric key for decrypting the private key.
Parameters:
algo: the algorithm for key derivation
passphrase: the passphrase from which to derive the key
rounds: number of hashing rounds
Returns:
The derived symmetric key.
Raises:
ValueError if given KDF algorithm is not supported (should not happen
as this is expected to be called after parse_c4gh_kdf_options).
"""
if algo == b"scrypt":
from hashlib import scrypt

return scrypt(passphrase, salt=salt, n=1 << 14, r=8, p=1, dklen=32)
if algo == b"bcrypt":
import bcrypt

return bcrypt.kdf(
passphrase,
salt=salt,
desired_key_bytes=32,
rounds=rounds,
ignore_few_rounds=True,
)
if algo == b"pbkdf2_hmac_sha256":
from hashlib import pbkdf2_hmac

return pbkdf2_hmac("sha256", passphrase, salt, rounds, dklen=32)
raise ValueError(f"Unsupported KDF: {algo}")


class C4GHKey(SoftwareKey):
"""This class implements the loader for Crypt4GH key file format."""

Expand Down Expand Up @@ -124,4 +257,25 @@ def from_stream(
if slabel == b"CRYPT4GH PUBLIC KEY":
return C4GHKey(sdata, True)
else:
raise ArgumentError("Private C4GH Key not implemented!")
istreamb = BytesIO(sdata)
check_c4gh_stream_magic(istreamb)
kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb)
cipher_name = decode_c4gh_bytes(istreamb)
if cipher_name == b"none":
secret_data = decode_c4gh_bytes(istreamb)
return C4GHKey(secret_data, False)
if cipher_name != b"chacha20_poly1305":
raise ValueError(f"Unsupported cipher: {cipher_name}")
assert callable(
callback
), "Invalid passphrase callback (non-callable)"
passphrase = callback().encode()
symmetric_key = derive_c4gh_key(
kdf_name, passphrase, kdf_salt, kdf_rounds
)
nonce_and_encrypted_data = decode_c4gh_bytes(istreamb)
nonce = nonce_and_encrypted_data[:12]
encrypted_data = nonce_and_encrypted_data[12:]
return ChaCha20Poly1305(symmetric_key).decrypt(
nonce, encrypted_data, None
)
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ classifiers = [
"Operating System :: OS Independent"
]
dependencies = [
"mkdocs-material",
"pynacl>=1.5.0"
"pynacl>=1.5.0",
"bcrypt",
"cryptography"
]

[tool.setuptools]
Expand All @@ -26,5 +27,6 @@ Issues = "https://github.com/oarepo/oarepo-c4gh/issues"

[project.optional-dependencies]
dev = [
"mkdocstrings[python]>=0.18"
"mkdocstrings[python]>=0.18",
"mkdocs-material"
]
12 changes: 12 additions & 0 deletions tests/test_c4gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@
b"-----END CRYPT4GH PUBLIC KEY-----\n"


alice_sec_bstr = \
b"-----BEGIN ENCRYPTED PRIVATE KEY-----\n" \
b"YzRnaC12MQAGYmNyeXB0ABQAAABk8Kn90WJVzJBevxN4980aWwARY2hhY2hhMjBfcG9seTEzMDUAPBdXfpV1zOcMg5EJRlGNpKZXT4PXM2iraMGCyomRQqWaH5iBGmJXU/JROPsyoX5nqmNo8oxANvgDi1hqZQ==\n" \
b"-----END ENCRYPTED PRIVATE KEY-----"


alice_sec_password = "alice"


class TestC4GHKeyImplementation(unittest.TestCase):
def test_b64_decoder(self):
alabel, adata = decode_b64_envelope(io.BytesIO(alice_pub_bstr))
Expand All @@ -17,6 +26,9 @@ def test_b64_decoder(self):
def test_public_loader(self):
akey = C4GHKey.from_bytes(alice_pub_bstr)

def test_secret_loader(self):
akey = C4GHKey.from_bytes(alice_sec_bstr, lambda: alice_sec_password)


if __name__ == '__main__':
unittest.main()

0 comments on commit 79aebb2

Please sign in to comment.