From 2089bd1005eb4dcb934987e8d0401ada7fe2e5d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dominik=20Pant=C5=AF=C4=8Dek?= Date: Wed, 16 Oct 2024 11:27:36 +0200 Subject: [PATCH] Refactor HeaderPacket to allow multiple ways of construction - separate constructor for filling the fields in - add from_stream classmethod - provide defaults (None) for now mandatory fields - new properties: content, packet_type - add tests for new properties --- oarepo_c4gh/crypt4gh/filter.py | 12 ++-- oarepo_c4gh/crypt4gh/header.py | 2 +- oarepo_c4gh/crypt4gh/header_packet.py | 92 +++++++++++++++++---------- tests/test_crypt4gh.py | 6 ++ 4 files changed, 74 insertions(+), 38 deletions(-) diff --git a/oarepo_c4gh/crypt4gh/filter.py b/oarepo_c4gh/crypt4gh/filter.py index ae196be..7038485 100644 --- a/oarepo_c4gh/crypt4gh/filter.py +++ b/oarepo_c4gh/crypt4gh/filter.py @@ -9,6 +9,7 @@ from typing import Generator from .data_block import DataBlock from ..key.software import SoftwareKey +from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt class Crypt4GHHeaderFilter(ACrypt4GHHeader): @@ -47,22 +48,23 @@ def packets(self) -> list: """ ekey = None - ekey_col = None temp_packets = self._original.packets.copy() for public_key in self._recipients_to_add: for packet in self._original.packets: if packet.is_readable and packet.packet_type in (1, 2): if ekey is None: ekey = SoftwareKey.generate() - ekey_col = KeyCollection(ekey) payload = io.BytesIO() payload.write(packet.length.to_bytes(4, "little")) enc_method = 0 payload.write(enc_method.to_bytes(4, "little")) payload.write(ekey.public_key) - # At offset 40 here. - # Encrypt content: packet.content - # TODO: write encrypted content (the same size as content + 16 bytes MAC) + symmetric_key = ekey.compute_write_key(public_key) + content = crypto_aead_chacha20poly1305_ietf_encrypt( + packet.content, None, symmetric_key + ) + payload.write(content) + # construct the packet and add it to temp_packets return temp_packets @property diff --git a/oarepo_c4gh/crypt4gh/header.py b/oarepo_c4gh/crypt4gh/header.py index dc07873..5f3ee90 100644 --- a/oarepo_c4gh/crypt4gh/header.py +++ b/oarepo_c4gh/crypt4gh/header.py @@ -95,7 +95,7 @@ def load_packets(self) -> None: """ self._packets = [] for idx in range(self._packet_count): - packet = HeaderPacket(self._reader_keys, self._istream) + packet = HeaderPacket.from_stream(self._reader_keys, self._istream) if packet.is_data_encryption_parameters: self._deks.add_dek( DEK(packet.data_encryption_key, packet.reader_key) diff --git a/oarepo_c4gh/crypt4gh/header_packet.py b/oarepo_c4gh/crypt4gh/header_packet.py index e12005e..6927a98 100644 --- a/oarepo_c4gh/crypt4gh/header_packet.py +++ b/oarepo_c4gh/crypt4gh/header_packet.py @@ -17,6 +17,26 @@ class HeaderPacket: """ def __init__( + self, + packet_length, + packet_data, + content, + reader_key, + packet_type, + data_encryption_method, + data_encryption_key, + ): + """Initializes the packet structure with all fields given.""" + self._packet_length = packet_length + self._packet_data = packet_data + self._content = content + self._reader_key = reader_key + self._packet_type = packet_type + self._data_encryption_method = data_encryption_method + self._data_encryption_key = data_encryption_key + + @classmethod + def from_stream( self, reader_keys: KeyCollection, istream: io.RawIOBase ) -> None: """Tries parsing a single packet from given input stream and @@ -32,62 +52,74 @@ def __init__( Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs. """ - self._packet_length = read_crypt4gh_stream_le_uint32( + _packet_length = read_crypt4gh_stream_le_uint32( istream, "packet length" ) - self._packet_data = self._packet_length.to_bytes( - 4, "little" - ) + istream.read(self._packet_length - 4) - if len(self._packet_data) != self._packet_length: + _packet_data = _packet_length.to_bytes(4, "little") + istream.read( + _packet_length - 4 + ) + if len(_packet_data) != _packet_length: raise Crypt4GHHeaderPacketException( - f"Header packet: read only {len(self._packet_data)} " - f"instead of {self._packet_length}" + f"Header packet: read only {len(_packet_data)} " + f"instead of {_packet_length}" ) encryption_method = read_crypt4gh_bytes_le_uint32( - self._packet_data, 4, "encryption method" + _packet_data, 4, "encryption method" ) if encryption_method != 0: raise Crypt4GHHeaderPacketException( f"Unsupported encryption method {encryption_method}" ) - writer_public_key = self._packet_data[8:40] - nonce = self._packet_data[40:52] - payload_length = self._packet_length - 4 - 4 - 32 - 12 - 16 - payload = self._packet_data[52:] + writer_public_key = _packet_data[8:40] + nonce = _packet_data[40:52] + payload_length = _packet_length - 4 - 4 - 32 - 12 - 16 + payload = _packet_data[52:] for maybe_reader_key in reader_keys.keys: symmetric_key = maybe_reader_key.compute_read_key( writer_public_key ) - self._content = None - self._reader_key = None + _content = None + _reader_key = None try: - self._content = crypto_aead_chacha20poly1305_ietf_decrypt( + _content = crypto_aead_chacha20poly1305_ietf_decrypt( payload, None, nonce, symmetric_key ) - self._reader_key = maybe_reader_key.public_key + _reader_key = maybe_reader_key.public_key break except CryptoError as cerr: pass - if self._content is not None: - self._packet_type = read_crypt4gh_bytes_le_uint32( - self._content, 0, "packet type" + _data_encryption_method = None + _packet_type = None + _data_encryption_key = None + if _content is not None: + _packet_type = read_crypt4gh_bytes_le_uint32( + _content, 0, "packet type" ) - if self._packet_type == 0: - self._data_encryption_method = read_crypt4gh_bytes_le_uint32( - self._content, 4, "encryption method" + if _packet_type == 0: + _data_encryption_method = read_crypt4gh_bytes_le_uint32( + _content, 4, "encryption method" ) - if self._data_encryption_method != 0: + if _data_encryption_method != 0: raise Crypt4GHHeaderPacketException( f"Unknown data encryption method " - f"{self._data_encryption_method}." + f"{_data_encryption_method}." ) - self._data_encryption_key = self._content[8:40] - elif self._packet_type == 1: + _data_encryption_key = _content[8:40] + elif _packet_type == 1: # Edit List pass else: # Report error? Warning? pass + return HeaderPacket( + _packet_length, + _packet_data, + _content, + _reader_key, + _packet_type, + _data_encryption_method, + _data_encryption_key, + ) @property def is_data_encryption_parameters(self) -> bool: @@ -149,14 +181,10 @@ def packet_data(self) -> bytes: @property def packet_type(self) -> int: - """Returns the numerical representation of packet type. - - """ + """Returns the numerical representation of packet type.""" return self._packet_type @property def content(self) -> bytes: - """Returns the encrypted packet content. - - """ + """Returns the encrypted packet content.""" return self._content diff --git a/tests/test_crypt4gh.py b/tests/test_crypt4gh.py index aeec7ef..0939363 100644 --- a/tests/test_crypt4gh.py +++ b/tests/test_crypt4gh.py @@ -118,6 +118,12 @@ def test_encrypted_hello_header(self): assert header.reader_keys_used == [ akey.public_key ], "Alice's key not collected as successfull reader" + assert ( + dek_packet.packet_type == 0 + ), "DEK packet must have packet_type 0" + assert ( + dek_packet.content is not None + ), "DEK packet must have readable content" def test_encrypted_hello_blocks(self): akey = C4GHKey.from_bytes(alice_sec_bstr, lambda: alice_sec_password)