diff --git a/oarepo_c4gh/key/c4gh.py b/oarepo_c4gh/key/c4gh.py index 059a10b..e7cadf7 100644 --- a/oarepo_c4gh/key/c4gh.py +++ b/oarepo_c4gh/key/c4gh.py @@ -6,15 +6,34 @@ from io import RawIOBase, BytesIO from typing import Self from base64 import b64decode +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +# 7 bytes magic word that is at the very beginning of any private key +C4GH_MAGIC_WORD = b"c4gh-v1" -def default_passphrase_callback(): +# Supported KDFs of Crypt4GH +C4GH_KDFS = b"scrypt" b"bcrypt" b"pbkdf2_hmac_sha256" + + +def check_c4gh_kdf(kdf_name: bytes) -> bool: + """Returns true if given KDF is supported. + + Parameters: + kdf_name: KDF name string as bytes + + Returns: + True if the KDF is supported. + """ + return kdf_name in C4GH_KDFS + + +def default_passphrase_callback() -> None: """By default the constructor has no means of obtaining the passphrase and therefore this function unconditionally raises an exception when called. """ - raise ArgumentError("No password callback provided!") + raise ValueError("No password callback provided!") def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes): @@ -51,6 +70,120 @@ def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes): return begin_label, data +def decode_c4gh_bytes(istream: RawIOBase) -> bytes: + """Decodes binary string encoded as two-byte big-endian integer + length and the actual data that follows this length field. + + Parameters: + istream: input stream from which to decode the bytes string. + + Returns: + The decoded bytes string. + + Raises: + ValueError: if there is not enough data in the stream + + """ + lengthb = istream.read(2) + lengthb_length = len(lengthb) + if len(lengthb) != 2: + raise ValueError( + f"Binary string read - not enought data to read the length: " + f"{lengthb_length} != 2" + ) + length = int.from_bytes(lengthb, byteorder="big") + string = istream.read(length) + read_length = len(string) + if read_length != length: + raise ValueError( + f"Binary string read - not enough data: {read_length} != {length}" + ) + return string + + +def check_c4gh_stream_magic(istreamb: RawIOBase) -> None: + """Reads enough bytes from given input stream and checks whether + they contain the correct Crypt4GH signature. Raises error if it + doesn't. + + Parameters: + istreamb: input stream with the raw Crypt4GH binary key stream. + + Raises: + ValueError: if the signature does not match. + + """ + magic_to_check = istreamb.read(len(C4GH_MAGIC_WORD)) + if magic_to_check != C4GH_MAGIC_WORD: + raise ValueError("Not a Crypt4GH private key!") + + +def parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes): + """Parses KDF name and options (if applicable) from given input + stream. + + Parameters: + istreamb: input stream with the raw Crypt4GH binary stream. + + Returns: + kdf_name: the name of the KDF as binary string + kdf_rounds: number of hashing rounds for KDF + kdf_salt: salt for initializing the hashing + + Raises: + ValueError: if parsed KDF name is not supported + + """ + kdf_name = decode_c4gh_bytes(istreamb) + if kdf_name == b"none": + return (kdf_name, None, None) + elif check_c4gh_kdf(kdf_name): + kdf_options = decode_c4gh_bytes(istreamb) + kdf_rounds = int.from_bytes(kdf_options[:4], byteorder="big") + kdf_salt = kdf_options[4:] + return (kdf_name, kdf_rounds, kdf_salt) + else: + raise ValueError(f"Unsupported KDF {kdf_name}") + + +def derive_c4gh_key( + algo: bytes, passphrase: bytes, salt: bytes, rounds: int +) -> bytes: + """Derives the symmetric key for decrypting the private key. + + Parameters: + algo: the algorithm for key derivation + passphrase: the passphrase from which to derive the key + rounds: number of hashing rounds + + Returns: + The derived symmetric key. + + Raises: + ValueError if given KDF algorithm is not supported (should not happen + as this is expected to be called after parse_c4gh_kdf_options). + """ + if algo == b"scrypt": + from hashlib import scrypt + + return scrypt(passphrase, salt=salt, n=1 << 14, r=8, p=1, dklen=32) + if algo == b"bcrypt": + import bcrypt + + return bcrypt.kdf( + passphrase, + salt=salt, + desired_key_bytes=32, + rounds=rounds, + ignore_few_rounds=True, + ) + if algo == b"pbkdf2_hmac_sha256": + from hashlib import pbkdf2_hmac + + return pbkdf2_hmac("sha256", passphrase, salt, rounds, dklen=32) + raise ValueError(f"Unsupported KDF: {algo}") + + class C4GHKey(SoftwareKey): """This class implements the loader for Crypt4GH key file format.""" @@ -124,4 +257,25 @@ def from_stream( if slabel == b"CRYPT4GH PUBLIC KEY": return C4GHKey(sdata, True) else: - raise ArgumentError("Private C4GH Key not implemented!") + istreamb = BytesIO(sdata) + check_c4gh_stream_magic(istreamb) + kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb) + cipher_name = decode_c4gh_bytes(istreamb) + if cipher_name == b"none": + secret_data = decode_c4gh_bytes(istreamb) + return C4GHKey(secret_data, False) + if cipher_name != b"chacha20_poly1305": + raise ValueError(f"Unsupported cipher: {cipher_name}") + assert callable( + callback + ), "Invalid passphrase callback (non-callable)" + passphrase = callback().encode() + symmetric_key = derive_c4gh_key( + kdf_name, passphrase, kdf_salt, kdf_rounds + ) + nonce_and_encrypted_data = decode_c4gh_bytes(istreamb) + nonce = nonce_and_encrypted_data[:12] + encrypted_data = nonce_and_encrypted_data[12:] + return ChaCha20Poly1305(symmetric_key).decrypt( + nonce, encrypted_data, None + ) diff --git a/pyproject.toml b/pyproject.toml index 1475972..87e5989 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,8 +13,9 @@ classifiers = [ "Operating System :: OS Independent" ] dependencies = [ - "mkdocs-material", - "pynacl>=1.5.0" + "pynacl>=1.5.0", + "bcrypt", + "cryptography" ] [tool.setuptools] @@ -26,5 +27,6 @@ Issues = "https://github.com/oarepo/oarepo-c4gh/issues" [project.optional-dependencies] dev = [ - "mkdocstrings[python]>=0.18" + "mkdocstrings[python]>=0.18", + "mkdocs-material" ] diff --git a/tests/test_c4gh.py b/tests/test_c4gh.py index dcce0d3..16af533 100644 --- a/tests/test_c4gh.py +++ b/tests/test_c4gh.py @@ -9,6 +9,15 @@ b"-----END CRYPT4GH PUBLIC KEY-----\n" +alice_sec_bstr = \ + b"-----BEGIN ENCRYPTED PRIVATE KEY-----\n" \ + b"YzRnaC12MQAGYmNyeXB0ABQAAABk8Kn90WJVzJBevxN4980aWwARY2hhY2hhMjBfcG9seTEzMDUAPBdXfpV1zOcMg5EJRlGNpKZXT4PXM2iraMGCyomRQqWaH5iBGmJXU/JROPsyoX5nqmNo8oxANvgDi1hqZQ==\n" \ + b"-----END ENCRYPTED PRIVATE KEY-----" + + +alice_sec_password = "alice" + + class TestC4GHKeyImplementation(unittest.TestCase): def test_b64_decoder(self): alabel, adata = decode_b64_envelope(io.BytesIO(alice_pub_bstr)) @@ -17,6 +26,9 @@ def test_b64_decoder(self): def test_public_loader(self): akey = C4GHKey.from_bytes(alice_pub_bstr) + def test_secret_loader(self): + akey = C4GHKey.from_bytes(alice_sec_bstr, lambda: alice_sec_password) + if __name__ == '__main__': unittest.main()