diff --git a/pypdf/_crypt_providers/_fallback.py b/pypdf/_crypt_providers/_fallback.py index ea3a91e99..53f4a8905 100644 --- a/pypdf/_crypt_providers/_fallback.py +++ b/pypdf/_crypt_providers/_fallback.py @@ -36,21 +36,21 @@ class CryptRC4(CryptBase): # type: ignore def __init__(self, key: bytes) -> None: - self.S = bytearray(range(256)) + self.s = bytearray(range(256)) j = 0 for i in range(256): - j = (j + self.S[i] + key[i % len(key)]) % 256 - self.S[i], self.S[j] = self.S[j], self.S[i] + j = (j + self.s[i] + key[i % len(key)]) % 256 + self.s[i], self.s[j] = self.s[j], self.s[i] def encrypt(self, data: bytes) -> bytes: - S = bytearray(self.S) + s = bytearray(self.s) out = [0 for _ in range(len(data))] i, j = 0, 0 for k in range(len(data)): i = (i + 1) % 256 - j = (j + S[i]) % 256 - S[i], S[j] = S[j], S[i] - x = S[(S[i] + S[j]) % 256] + j = (j + s[i]) % 256 + s[i], s[j] = s[j], s[i] + x = s[(s[i] + s[j]) % 256] out[k] = data[k] ^ x return bytes(bytearray(out)) diff --git a/pypdf/_encryption.py b/pypdf/_encryption.py index 929bf0bf9..26065b8e7 100644 --- a/pypdf/_encryption.py +++ b/pypdf/_encryption.py @@ -35,13 +35,13 @@ CryptBase, CryptIdentity, CryptRC4, + aes_cbc_decrypt, + aes_cbc_encrypt, + aes_ecb_decrypt, + aes_ecb_encrypt, + rc4_decrypt, + rc4_encrypt, ) -from pypdf._crypt_providers import aes_cbc_decrypt as AES_CBC_decrypt # noqa: N812 -from pypdf._crypt_providers import aes_cbc_encrypt as AES_CBC_encrypt # noqa: N812 -from pypdf._crypt_providers import aes_ecb_decrypt as AES_ECB_decrypt # noqa: N812 -from pypdf._crypt_providers import aes_ecb_encrypt as AES_ECB_encrypt # noqa: N812 -from pypdf._crypt_providers import rc4_decrypt as RC4_decrypt # noqa: N812 -from pypdf._crypt_providers import rc4_encrypt as RC4_encrypt # noqa: N812 from ._utils import b_, logger_warning from .generic import ( @@ -59,23 +59,26 @@ class CryptFilter: def __init__( - self, stmCrypt: CryptBase, strCrypt: CryptBase, efCrypt: CryptBase + self, + stm_crypt: CryptBase, + str_crypt: CryptBase, + ef_crypt: CryptBase, ) -> None: - self.stmCrypt = stmCrypt - self.strCrypt = strCrypt - self.efCrypt = efCrypt + self.stm_crypt = stm_crypt + self.str_crypt = str_crypt + self.ef_crypt = ef_crypt def encrypt_object(self, obj: PdfObject) -> PdfObject: if isinstance(obj, ByteStringObject): - data = self.strCrypt.encrypt(obj.original_bytes) + data = self.str_crypt.encrypt(obj.original_bytes) obj = ByteStringObject(data) if isinstance(obj, TextStringObject): - data = self.strCrypt.encrypt(obj.get_encoded_bytes()) + data = self.str_crypt.encrypt(obj.get_encoded_bytes()) obj = ByteStringObject(data) elif isinstance(obj, StreamObject): obj2 = StreamObject() obj2.update(obj) - obj2.set_data(self.stmCrypt.encrypt(b_(obj._data))) + obj2.set_data(self.stm_crypt.encrypt(b_(obj._data))) obj = obj2 elif isinstance(obj, DictionaryObject): obj2 = DictionaryObject() # type: ignore @@ -88,10 +91,10 @@ def encrypt_object(self, obj: PdfObject) -> PdfObject: def decrypt_object(self, obj: PdfObject) -> PdfObject: if isinstance(obj, (ByteStringObject, TextStringObject)): - data = self.strCrypt.decrypt(obj.original_bytes) + data = self.str_crypt.decrypt(obj.original_bytes) obj = create_string_object(data) elif isinstance(obj, StreamObject): - obj._data = self.stmCrypt.decrypt(b_(obj._data)) + obj._data = self.stm_crypt.decrypt(b_(obj._data)) elif isinstance(obj, DictionaryObject): for key, value in obj.items(): obj[key] = self.decrypt_object(value) @@ -101,41 +104,9 @@ def decrypt_object(self, obj: PdfObject) -> PdfObject: return obj -_PADDING = bytes( - [ - 0x28, - 0xBF, - 0x4E, - 0x5E, - 0x4E, - 0x75, - 0x8A, - 0x41, - 0x64, - 0x00, - 0x4E, - 0x56, - 0xFF, - 0xFA, - 0x01, - 0x08, - 0x2E, - 0x2E, - 0x00, - 0xB6, - 0xD0, - 0x68, - 0x3E, - 0x80, - 0x2F, - 0x0C, - 0xA9, - 0xFE, - 0x64, - 0x53, - 0x69, - 0x7A, - ] +_PADDING = ( + b"\x28\xbf\x4e\x5e\x4e\x75\x8a\x41\x64\x00\x4e\x56\xff\xfa\x01\x08" + b"\x2e\x2e\x00\xb6\xd0\x68\x3e\x80\x2f\x0c\xa9\xfe\x64\x53\x69\x7a" ) @@ -293,11 +264,11 @@ def compute_O_value(rc4_key: bytes, user_password: bytes, rev: int) -> bytes: The RC4 encrypted """ a = _padding(user_password) - rc4_enc = RC4_encrypt(rc4_key, a) + rc4_enc = rc4_encrypt(rc4_key, a) if rev >= 3: for i in range(1, 20): key = bytes(bytearray(x ^ i for x in rc4_key)) - rc4_enc = RC4_encrypt(key, rc4_enc) + rc4_enc = rc4_encrypt(key, rc4_enc) return rc4_enc @staticmethod @@ -324,7 +295,7 @@ def compute_U_value(key: bytes, rev: int, id1_entry: bytes) -> bytes: The value """ if rev <= 2: - value = RC4_encrypt(key, _PADDING) + value = rc4_encrypt(key, _PADDING) return value """ @@ -354,10 +325,10 @@ def compute_U_value(key: bytes, rev: int, id1_entry: bytes) -> bytes: """ u_hash = hashlib.md5(_PADDING) u_hash.update(id1_entry) - rc4_enc = RC4_encrypt(key, u_hash.digest()) + rc4_enc = rc4_encrypt(key, u_hash.digest()) for i in range(1, 20): rc4_key = bytes(bytearray(x ^ i for x in key)) - rc4_enc = RC4_encrypt(rc4_key, rc4_enc) + rc4_enc = rc4_encrypt(rc4_key, rc4_enc) return _padding(rc4_enc) @staticmethod @@ -472,12 +443,12 @@ def verify_owner_password( rc4_key = AlgV4.compute_O_value_key(owner_password, rev, key_size) if rev <= 2: - user_password = RC4_decrypt(rc4_key, o_entry) + user_password = rc4_decrypt(rc4_key, o_entry) else: user_password = o_entry for i in range(19, -1, -1): key = bytes(bytearray(x ^ i for x in rc4_key)) - user_password = RC4_decrypt(key, user_password) + user_password = rc4_decrypt(key, user_password) return AlgV4.verify_user_password( user_password, rev, @@ -560,7 +531,7 @@ def verify_owner_password( return b"" iv = bytes(0 for _ in range(16)) tmp_key = AlgV5.calculate_hash(R, password, o_value[40:48], u_value[:48]) - key = AES_CBC_decrypt(tmp_key, iv, oe_value) + key = aes_cbc_decrypt(tmp_key, iv, oe_value) return key @staticmethod @@ -587,28 +558,28 @@ def verify_user_password( return b"" iv = bytes(0 for _ in range(16)) tmp_key = AlgV5.calculate_hash(R, password, u_value[40:48], b"") - return AES_CBC_decrypt(tmp_key, iv, ue_value) + return aes_cbc_decrypt(tmp_key, iv, ue_value) @staticmethod def calculate_hash(R: int, password: bytes, salt: bytes, udata: bytes) -> bytes: # from https://github.com/qpdf/qpdf/blob/main/libqpdf/QPDF_encryption.cc - K = hashlib.sha256(password + salt + udata).digest() + k = hashlib.sha256(password + salt + udata).digest() if R < 6: - return K + return k count = 0 while True: count += 1 - K1 = password + K + udata - E = AES_CBC_encrypt(K[:16], K[16:32], K1 * 64) + k1 = password + k + udata + e = aes_cbc_encrypt(k[:16], k[16:32], k1 * 64) hash_fn = ( hashlib.sha256, hashlib.sha384, hashlib.sha512, - )[sum(E[:16]) % 3] - K = hash_fn(E).digest() - if count >= 64 and E[-1] <= count - 32: + )[sum(e[:16]) % 3] + k = hash_fn(e).digest() + if count >= 64 and e[-1] <= count - 32: break - return K[:32] + return k[:32] @staticmethod def verify_perms( @@ -633,7 +604,7 @@ def verify_perms( """ b8 = b"T" if metadata_encrypted else b"F" p1 = struct.pack(" Tuple[bytes, bytes]: tmp_key = AlgV5.calculate_hash(R, password, key_salt, b"") iv = bytes(0 for _ in range(16)) - ue_value = AES_CBC_encrypt(tmp_key, iv, key) + ue_value = aes_cbc_encrypt(tmp_key, iv, key) return u_value, ue_value @staticmethod @@ -736,7 +707,7 @@ def compute_O_value( ) tmp_key = AlgV5.calculate_hash(R, password, key_salt, u_value[:48]) iv = bytes(0 for _ in range(16)) - oe_value = AES_CBC_encrypt(tmp_key, iv, key) + oe_value = aes_cbc_encrypt(tmp_key, iv, key) return o_value, oe_value @staticmethod @@ -774,7 +745,7 @@ def compute_Perms_value(key: bytes, p: int, metadata_encrypted: bool) -> bytes: b8 = b"T" if metadata_encrypted else b"F" rr = secrets.token_bytes(4) data = struct.pack(" CryptFilter: # for AES-256 aes256_key = key - stmCrypt = self._get_crypt(self.StmF, rc4_key, aes128_key, aes256_key) - StrCrypt = self._get_crypt(self.StrF, rc4_key, aes128_key, aes256_key) - efCrypt = self._get_crypt(self.EFF, rc4_key, aes128_key, aes256_key) + stm_crypt = self._get_crypt(self.StmF, rc4_key, aes128_key, aes256_key) + str_crypt = self._get_crypt(self.StrF, rc4_key, aes128_key, aes256_key) + ef_crypt = self._get_crypt(self.EFF, rc4_key, aes128_key, aes256_key) - return CryptFilter(stmCrypt, StrCrypt, efCrypt) + return CryptFilter(stm_crypt, str_crypt, ef_crypt) @staticmethod def _get_crypt( @@ -1050,36 +1021,36 @@ def write_entry( self.values.UE = values["/UE"] self.values.Perms = values["/Perms"] - dictObj = DictionaryObject() - dictObj[NameObject("/V")] = NumberObject(self.V) - dictObj[NameObject("/R")] = NumberObject(self.R) - dictObj[NameObject("/Length")] = NumberObject(self.Length) - dictObj[NameObject("/P")] = NumberObject(self.P) - dictObj[NameObject("/Filter")] = NameObject("/Standard") + dict_obj = DictionaryObject() + dict_obj[NameObject("/V")] = NumberObject(self.V) + dict_obj[NameObject("/R")] = NumberObject(self.R) + dict_obj[NameObject("/Length")] = NumberObject(self.Length) + dict_obj[NameObject("/P")] = NumberObject(self.P) + dict_obj[NameObject("/Filter")] = NameObject("/Standard") # ignore /EncryptMetadata - dictObj[NameObject("/O")] = ByteStringObject(self.values.O) - dictObj[NameObject("/U")] = ByteStringObject(self.values.U) + dict_obj[NameObject("/O")] = ByteStringObject(self.values.O) + dict_obj[NameObject("/U")] = ByteStringObject(self.values.U) if self.V >= 4: # TODO: allow different method - StdCF = DictionaryObject() - StdCF[NameObject("/AuthEvent")] = NameObject("/DocOpen") - StdCF[NameObject("/CFM")] = NameObject(self.StmF) - StdCF[NameObject("/Length")] = NumberObject(self.Length // 8) - CF = DictionaryObject() - CF[NameObject("/StdCF")] = StdCF - dictObj[NameObject("/CF")] = CF - dictObj[NameObject("/StmF")] = NameObject("/StdCF") - dictObj[NameObject("/StrF")] = NameObject("/StdCF") + std_cf = DictionaryObject() + std_cf[NameObject("/AuthEvent")] = NameObject("/DocOpen") + std_cf[NameObject("/CFM")] = NameObject(self.StmF) + std_cf[NameObject("/Length")] = NumberObject(self.Length // 8) + cf = DictionaryObject() + cf[NameObject("/StdCF")] = std_cf + dict_obj[NameObject("/CF")] = cf + dict_obj[NameObject("/StmF")] = NameObject("/StdCF") + dict_obj[NameObject("/StrF")] = NameObject("/StdCF") # ignore EFF - # dictObj[NameObject("/EFF")] = NameObject("/StdCF") + # dict_obj[NameObject("/EFF")] = NameObject("/StdCF") if self.V >= 5: - dictObj[NameObject("/OE")] = ByteStringObject(self.values.OE) - dictObj[NameObject("/UE")] = ByteStringObject(self.values.UE) - dictObj[NameObject("/Perms")] = ByteStringObject(self.values.Perms) - return dictObj + dict_obj[NameObject("/OE")] = ByteStringObject(self.values.OE) + dict_obj[NameObject("/UE")] = ByteStringObject(self.values.UE) + dict_obj[NameObject("/Perms")] = ByteStringObject(self.values.Perms) + return dict_obj def compute_values_v4(self, user_password: bytes, owner_password: bytes) -> None: rc4_key = AlgV4.compute_O_value_key(owner_password, self.R, self.Length) @@ -1102,48 +1073,49 @@ def compute_values_v4(self, user_password: bytes, owner_password: bytes) -> None @staticmethod def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encryption": - filter = encryption_entry.get("/Filter") - if filter != "/Standard": + if encryption_entry.get("/Filter") != "/Standard": raise NotImplementedError( "only Standard PDF encryption handler is available" ) if "/SubFilter" in encryption_entry: raise NotImplementedError("/SubFilter NOT supported") - StmF = "/V2" - StrF = "/V2" - EFF = "/V2" + stm_filter = "/V2" + str_filter = "/V2" + ef_filter = "/V2" - V = encryption_entry.get("/V", 0) - if V not in (1, 2, 3, 4, 5): - raise NotImplementedError(f"Encryption V={V} NOT supported") - if V >= 4: + alg_ver = encryption_entry.get("/V", 0) + if alg_ver not in (1, 2, 3, 4, 5): + raise NotImplementedError(f"Encryption V={alg_ver} NOT supported") + if alg_ver >= 4: filters = encryption_entry["/CF"] - StmF = encryption_entry.get("/StmF", "/Identity") - StrF = encryption_entry.get("/StrF", "/Identity") - EFF = encryption_entry.get("/EFF", StmF) + stm_filter = encryption_entry.get("/StmF", "/Identity") + str_filter = encryption_entry.get("/StrF", "/Identity") + ef_filter = encryption_entry.get("/EFF", stm_filter) - if StmF != "/Identity": - StmF = filters[StmF]["/CFM"] # type: ignore - if StrF != "/Identity": - StrF = filters[StrF]["/CFM"] # type: ignore - if EFF != "/Identity": - EFF = filters[EFF]["/CFM"] # type: ignore + if stm_filter != "/Identity": + stm_filter = filters[stm_filter]["/CFM"] # type: ignore + if str_filter != "/Identity": + str_filter = filters[str_filter]["/CFM"] # type: ignore + if ef_filter != "/Identity": + ef_filter = filters[ef_filter]["/CFM"] # type: ignore allowed_methods = ("/Identity", "/V2", "/AESV2", "/AESV3") - if StmF not in allowed_methods: - raise NotImplementedError(f"StmF Method {StmF} NOT supported!") - if StrF not in allowed_methods: - raise NotImplementedError(f"StrF Method {StrF} NOT supported!") - if EFF not in allowed_methods: - raise NotImplementedError(f"EFF Method {EFF} NOT supported!") - - R = cast(int, encryption_entry["/R"]) - P = cast(int, encryption_entry["/P"]) - Length = encryption_entry.get("/Length", 40) - EncryptMetadata = encryption_entry.get("/EncryptMetadata") - EncryptMetadata = EncryptMetadata.value if EncryptMetadata is not None else True + if stm_filter not in allowed_methods: + raise NotImplementedError(f"StmF Method {stm_filter} NOT supported!") + if str_filter not in allowed_methods: + raise NotImplementedError(f"StrF Method {str_filter} NOT supported!") + if ef_filter not in allowed_methods: + raise NotImplementedError(f"EFF Method {ef_filter} NOT supported!") + + alg_rev = cast(int, encryption_entry["/R"]) + perm_flags = cast(int, encryption_entry["/P"]) + key_bits = encryption_entry.get("/Length", 40) + encrypt_metadata = encryption_entry.get("/EncryptMetadata") + encrypt_metadata = ( + encrypt_metadata.value if encrypt_metadata is not None else True + ) values = EncryptionValues() values.O = cast(ByteStringObject, encryption_entry["/O"]).original_bytes values.U = cast(ByteStringObject, encryption_entry["/U"]).original_bytes @@ -1151,16 +1123,16 @@ def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encrypti values.UE = encryption_entry.get("/UE", ByteStringObject()).original_bytes values.Perms = encryption_entry.get("/Perms", ByteStringObject()).original_bytes return Encryption( - V=V, - R=R, - Length=Length, - P=P, - EncryptMetadata=EncryptMetadata, + V=alg_ver, + R=alg_rev, + Length=key_bits, + P=perm_flags, + EncryptMetadata=encrypt_metadata, first_id_entry=first_id_entry, values=values, - StrF=StrF, - StmF=StmF, - EFF=EFF, + StrF=str_filter, + StmF=stm_filter, + EFF=ef_filter, entry=encryption_entry, # Dummy entry for the moment; will get removed ) @@ -1168,26 +1140,25 @@ def read(encryption_entry: DictionaryObject, first_id_entry: bytes) -> "Encrypti def make( alg: EncryptAlgorithm, permissions: int, first_id_entry: bytes ) -> "Encryption": - V, R, Length = cast(tuple, alg) - P = permissions + alg_ver, alg_rev, key_bits = cast(tuple, alg) - StmF, StrF, EFF = "/V2", "/V2", "/V2" + stm_filter, str_filter, ef_filter = "/V2", "/V2", "/V2" if alg == EncryptAlgorithm.AES_128: - StmF, StrF, EFF = "/AESV2", "/AESV2", "/AESV2" + stm_filter, str_filter, ef_filter = "/AESV2", "/AESV2", "/AESV2" elif alg in (EncryptAlgorithm.AES_256_R5, EncryptAlgorithm.AES_256): - StmF, StrF, EFF = "/AESV3", "/AESV3", "/AESV3" + stm_filter, str_filter, ef_filter = "/AESV3", "/AESV3", "/AESV3" return Encryption( - V=V, - R=R, - Length=Length, - P=P, + V=alg_ver, + R=alg_rev, + Length=key_bits, + P=permissions, EncryptMetadata=True, first_id_entry=first_id_entry, values=None, - StrF=StrF, - StmF=StmF, - EFF=EFF, + StrF=str_filter, + StmF=stm_filter, + EFF=ef_filter, entry=DictionaryObject(), # Dummy entry for the moment; will get removed ) diff --git a/pyproject.toml b/pyproject.toml index 3319c4c30..e18cd3eb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -197,7 +197,7 @@ max-complexity = 54 # Recommended: 10 [tool.ruff.per-file-ignores] "_cryptography.py" = ["S304", "S305"] # Use of insecure cipher / modes, aka RC4 and AES-ECB -"_encryption.py" = ["S324", "S311"] +"_encryption.py" = ["S324"] "_writer.py" = ["S324"] "docs/conf.py" = ["PTH", "INP001"] "json_consistency.py" = ["T201"]