Skip to content

Commit

Permalink
Finish add_recipient implementation
Browse files Browse the repository at this point in the history
- nonce generation for symmetric encryption
- test vectors for bob as recipient
- serializable header packet construction
- basic round-trip test from alice to bob
  • Loading branch information
dzoep committed Oct 16, 2024
1 parent 2089bd1 commit a8d464a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 10 deletions.
23 changes: 14 additions & 9 deletions oarepo_c4gh/crypt4gh/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from .data_block import DataBlock
from ..key.software import SoftwareKey
from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt
import io
import secrets
from .header_packet import HeaderPacket


class Crypt4GHHeaderFilter(ACrypt4GHHeader):
Expand Down Expand Up @@ -37,7 +40,7 @@ def add_recipient(self, public_key: bytes) -> None:
public_key: The reader public key to add.
"""
if not public_key in self.original.reader_keys_used:
if not public_key in self._original.reader_keys_used:
if not public_key in self._recipients_to_add:
self._recipients_to_add.append(public_key)

Expand All @@ -51,20 +54,22 @@ def packets(self) -> list:
temp_packets = self._original.packets.copy()
for public_key in self._recipients_to_add:
for packet in self._original.packets:
if packet.is_readable and packet.packet_type in (1, 2):
if packet.is_readable and packet.packet_type in (0, 1):
if ekey is None:
ekey = SoftwareKey.generate()
payload = io.BytesIO()
payload.write(packet.length.to_bytes(4, "little"))
data = io.BytesIO()
data.write(packet.length.to_bytes(4, "little"))
enc_method = 0
payload.write(enc_method.to_bytes(4, "little"))
payload.write(ekey.public_key)
data.write(enc_method.to_bytes(4, "little"))
data.write(ekey.public_key)
symmetric_key = ekey.compute_write_key(public_key)
nonce = secrets.token_bytes(12)
content = crypto_aead_chacha20poly1305_ietf_encrypt(
packet.content, None, symmetric_key
packet.content, None, nonce, symmetric_key
)
payload.write(content)
# construct the packet and add it to temp_packets
data.write(content)
# This packet is useful only for serialization
temp_packets.append(HeaderPacket(packet.length, data.getvalue(), None, None, None, None, None))
return temp_packets

@property
Expand Down
2 changes: 1 addition & 1 deletion oarepo_c4gh/crypt4gh/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def reader_keys_used(self) -> list[bytes]:
return list(
set(
packet.reader_key
for packet in self._packets
for packet in self.packets
if packet.reader_key is not None
)
)
8 changes: 8 additions & 0 deletions oarepo_c4gh/crypt4gh/header_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ def packet_type(self) -> int:
def content(self) -> bytes:
"""Returns the encrypted packet content."""
return self._content

@property
def length(self) -> int:
"""Returns the packet length in bytes - including the packet
length 4-byte value at the beginning.
"""
return self._packet_length
8 changes: 8 additions & 0 deletions tests/_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@
b"\x2b\xee\x1a\x5e\x89"
)

bob_sec_bstr = (
b"-----BEGIN ENCRYPTED PRIVATE KEY-----\r\n"
b"YzRnaC12MQAGYmNyeXB0ABQAAABkb1LLjyLNrcL4IgMD+NuDDQARY2hhY2hhMjBfcG9seTEzMDUAPFfaFm7bJc+pr6IRezakf5AsP7HTZnVfhSBt7XIKQcJBJY/yrPSfLxLvPMY4Edu4r0hyJTX2CNqR7wmwYg==\r\n"
b"-----END ENCRYPTED PRIVATE KEY-----\r\n"
)

bob_sec_password = "bob"

# crypt4gh encrypt --sk ../crypt4gh/tests/_common/alice.sec --recipient_pk ../crypt4gh/tests/_common/bob.pub <hello.txt >hello-bob.txt.c4gh
hello_world_bob_encrypted = (
b"crypt4gh\x01\x00\x00\x00\x01\x00\x00\x00"
Expand Down
1 change: 1 addition & 0 deletions tests/test_crypt4gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def test_encrypted_hello_header(self):
assert (
dek_packet.content is not None
), "DEK packet must have readable content"
assert dek_packet.length > 0, "Non-positive length of DEK packet"

def test_encrypted_hello_blocks(self):
akey = C4GHKey.from_bytes(alice_sec_bstr, lambda: alice_sec_password)
Expand Down
16 changes: 16 additions & 0 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
alice_sec_bstr,
alice_sec_password,
hello_world_encrypted,
bob_sec_bstr,
bob_sec_password,
)
from oarepo_c4gh.crypt4gh.crypt4gh import Crypt4GH
import io
Expand Down Expand Up @@ -65,6 +67,20 @@ def test_identity(self):
ostream.getvalue() == hello_world_encrypted
), "Identity filter failure."

def test_roundtrip(self):
akey = C4GHKey.from_bytes(alice_sec_bstr, lambda: alice_sec_password)
crypt4gh = Crypt4GH(akey, io.BytesIO(hello_world_encrypted))
filter4gh = Crypt4GHFilter(crypt4gh)
bkey = C4GHKey.from_bytes(bob_sec_bstr, lambda: bob_sec_password)
filter4gh.add_recipient(bkey.public_key)
ostream = io.BytesIO()
writer = Crypt4GHWriter(filter4gh, ostream)
writer.write()
crypt4ghb = Crypt4GH(bkey, io.BytesIO(ostream.getvalue()))
header = crypt4ghb.header
packets = header.packets
assert len(packets) == 2, "Exactly two header packets expected"


if __name__ == "__main__":
unittest.main()

0 comments on commit a8d464a

Please sign in to comment.