diff --git a/pypdf/_protocols.py b/pypdf/_protocols.py index de0fa78ce..a2b07fa23 100644 --- a/pypdf/_protocols.py +++ b/pypdf/_protocols.py @@ -76,6 +76,9 @@ def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO]: def _add_object(self, obj: Any) -> Any: ... + def _replace_object(self, indirect_reference: Any, obj: Any) -> Any: + ... + @property def pages(self) -> List[Any]: ... diff --git a/pypdf/_reader.py b/pypdf/_reader.py index 7871ce143..eb3f0c35f 100644 --- a/pypdf/_reader.py +++ b/pypdf/_reader.py @@ -39,9 +39,7 @@ Callable, Dict, Iterable, - Iterator, List, - Mapping, Optional, Tuple, Union, @@ -87,6 +85,7 @@ ) from .generic import ( ArrayObject, + AttachmentBytesDictionary, BooleanObject, ContentStream, DecodedStreamObject, @@ -98,6 +97,7 @@ FloatObject, IndirectObject, NameObject, + NameTree, NullObject, NumberObject, PdfObject, @@ -2206,107 +2206,50 @@ def rename_form_topname(self, name: str) -> Optional[DictionaryObject]: interim[NameObject("/T")] = TextStringObject(name) return interim - @property - def attachments(self) -> Mapping[str, List[bytes]]: - return LazyDict( - { - name: (self._get_attachment_list, name) - for name in self._list_attachments() - } - ) - - def _list_attachments(self) -> List[str]: + def _get_embedded_files_root(self) -> Optional[NameTree]: """ - Retrieves the list of filenames of file attachments. - - Returns: - list of filenames + Returns the EmbeddedFiles root as a NameTree Object + if the root does not exists, return None """ catalog = cast(DictionaryObject, self.trailer["/Root"]) - # From the catalog get the embedded file names - try: - filenames = cast( - ArrayObject, - cast( - DictionaryObject, - cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"], - )["/Names"], - ) - except KeyError: - return [] - attachments_names = [f for f in filenames if isinstance(f, str)] - return attachments_names - - def _get_attachment_list(self, name: str) -> List[bytes]: - out = self._get_attachments(name)[name] - if isinstance(out, list): - return out - return [out] - - def _get_attachments( - self, filename: Optional[str] = None - ) -> Dict[str, Union[bytes, List[bytes]]]: + if "/Names" not in catalog: + return None + ef = cast(DictionaryObject, catalog["/Names"]).get("/EmbeddedFiles", None) + if ef is None: + return None + efo = ef.get_object() + # not for reader """ - Retrieves all or selected file attachments of the PDF as a dictionary of file names - and the file data as a bytestring. - - Args: - filename: If filename is None, then a dictionary of all attachments - will be returned, where the key is the filename and the value - is the content. Otherwise, a dictionary with just a single key - - the filename - and its content will be returned. + if not isinstance(efo,NameTree): + if isinstance(ef,IndirectObject): + ef.replace_object(efo) + else: + cast(DictionaryObject,catalog["/Names"])[ + NameObject("/EmbeddedFiles")] = NameTree(efo) + """ + return NameTree(efo) + @property + def attachments_names(self) -> List[str]: + """ Returns: - dictionary of filename -> Union[bytestring or List[ByteString]] - if the filename exists multiple times a List of the different version will be provided + List of names """ - catalog = cast(DictionaryObject, self.trailer["/Root"]) - # From the catalog get the embedded file names - try: - filenames = cast( - ArrayObject, - cast( - DictionaryObject, - cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"], - )["/Names"], - ) - except KeyError: - return {} - attachments: Dict[str, Union[bytes, List[bytes]]] = {} - # Loop through attachments - for i in range(len(filenames)): - f = filenames[i] - if isinstance(f, str): - if filename is not None and f != filename: - continue - name = f - f_dict = filenames[i + 1].get_object() - f_data = f_dict["/EF"]["/F"].get_data() - if name in attachments: - if not isinstance(attachments[name], list): - attachments[name] = [attachments[name]] # type:ignore - attachments[name].append(f_data) # type:ignore - else: - attachments[name] = f_data - return attachments + return self.attachments.keys() + @property + def attachments(self) -> AttachmentBytesDictionary: + """ + extracts the /EF entries as bytes from the embedded files + Returns: + Dictionary with the filenames as keys and the file content as bytes, + extra data cah be accessed with Attachmentbytes extra properties(.name, + .list_rf_names(), .get_embeddedfile(), .all_files) -class LazyDict(Mapping): - def __init__(self, *args: Any, **kw: Any) -> None: - self._raw_dict = dict(*args, **kw) - - def __getitem__(self, key: str) -> Any: - func, arg = self._raw_dict.__getitem__(key) - return func(arg) - - def __iter__(self) -> Iterator[Any]: - return iter(self._raw_dict) - - def __len__(self) -> int: - return len(self._raw_dict) - - def __str__(self) -> str: - return f"LazyDict(keys={list(self.keys())})" + Note: + If you want to access /RF + """ + return AttachmentBytesDictionary(self._get_embedded_files_root()) class PdfFileReader(PdfReader): # deprecated diff --git a/pypdf/_writer.py b/pypdf/_writer.py index 10cc9342a..a75d01bbb 100644 --- a/pypdf/_writer.py +++ b/pypdf/_writer.py @@ -95,6 +95,7 @@ from .generic import ( PAGE_FIT, ArrayObject, + AttachmentBytesDictionary, BooleanObject, ByteStringObject, ContentStream, @@ -105,6 +106,7 @@ FloatObject, IndirectObject, NameObject, + NameTree, NullObject, NumberObject, PdfObject, @@ -702,7 +704,71 @@ def addJS(self, javascript: str) -> None: # deprecated deprecation_with_replacement("addJS", "add_js", "3.0.0") return self.add_js(javascript) - def add_attachment(self, filename: str, data: Union[str, bytes]) -> None: + def _get_embedded_files_root(self) -> Optional[NameTree]: + """ + Returns the EmbeddedFiles root as a NameTree Object + if the root does not exists, return None + """ + catalog = self._root_object + if "/Names" not in catalog: + return None + ef = cast(DictionaryObject, catalog["/Names"]).get("/EmbeddedFiles", None) + if ef is None: + return None + efo = ef.get_object() + if not isinstance(efo, NameTree): + efo = NameTree(efo) + if isinstance(ef, IndirectObject): + ef.replace_object(efo) + else: + cast(DictionaryObject, catalog["/Names"])[ + NameObject("/EmbeddedFiles") + ] = efo + return efo + + def _create_attachment_root(self) -> NameTree: + if "/Names" not in self._root_object: + self._root_object[NameObject("/Names")] = self._add_object( + DictionaryObject() + ) + node = cast(DictionaryObject, self._root_object["/Names"]) + if "/EmbeddedFiles" not in node: + node[NameObject("/EmbeddedFiles")] = self._add_object(NameTree()) + node = cast(NameTree, node["/EmbeddedFiles"]) + if "/Kids" not in node and "/Names" not in node: + node[NameObject("/Names")] = ArrayObject() + return node + + @property + def attachments_names(self) -> List[str]: + """ + Returns: + List of names + """ + return self.attachments.keys() + + @property + def attachments(self) -> AttachmentBytesDictionary: + """ + extracts the /EF entries as bytes from the embedded files + Returns: + Dictionary with the filenames as keys and the file content as bytes, + extra data cah be accessed with Attachmentbytes extra properties(.name, + .list_rf_names(), .get_embeddedfile(), .all_files) + + Note: + If you want to access /RF + """ + return AttachmentBytesDictionary(self._get_embedded_files_root()) + + def add_attachment( + self, + filename: str, + data: Union[str, bytes, List[Tuple[str, bytes]]], + overwrite: bool = True, + fname: Optional[str] = None, + desc: str = "", + ) -> Optional[DictionaryObject]: """ Embed a file inside the PDF. @@ -711,9 +777,22 @@ def add_attachment(self, filename: str, data: Union[str, bytes]) -> None: Section 7.11.3 Args: - filename: The filename to display. + filename: The filename to display (in UTF-16). data: The data in the file. + if data is an array, it will feed + fname: an old style name for "/F" entry (should be ansi). if None will be automatically proposed + desc: a description string + + Returns: + The filespec DictionaryObject """ + if not overwrite and filename in self.attachments_names: + return None + if fname is None: + st = filename.replace("/", "\\/").replace("\\\\/", "\\/") + fname = st.encode().decode("ascii", errors="xmlcharreplace") + fname = f"{fname}" # to escape string + # We need three entries: # * The file's data # * The /Filespec entry @@ -731,9 +810,22 @@ def add_attachment(self, filename: str, data: Union[str, bytes]) -> None: # endstream # endobj - file_entry = DecodedStreamObject() - file_entry.set_data(b_(data)) - file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")}) + if isinstance(data, list): + ef_entry = DictionaryObject() + a = ArrayObject() + ef_entry.update({NameObject("/F"): self._add_object(a)}) + for fn, da in data: + a.append(TextStringObject(fn)) + file_entry = DecodedStreamObject() + file_entry.set_data(b_(da)) + file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")}) + a.append(self._add_object(file_entry)) + else: + file_entry = DecodedStreamObject() + file_entry.set_data(b_(data)) + file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")}) + ef_entry = DictionaryObject() + ef_entry.update({NameObject("/F"): self._add_object(file_entry)}) # The Filespec entry # Sample: @@ -744,51 +836,29 @@ def add_attachment(self, filename: str, data: Union[str, bytes]) -> None: # /EF << /F 8 0 R >> # >> - ef_entry = DictionaryObject() - ef_entry.update({NameObject("/F"): self._add_object(file_entry)}) - filespec = DictionaryObject() filespec.update( { NameObject(PA.TYPE): NameObject("/Filespec"), - NameObject(FileSpecificationDictionaryEntries.F): create_string_object( + NameObject(FileSpecificationDictionaryEntries.UF): TextStringObject( filename - ), # Perhaps also try TextStringObject - NameObject(FileSpecificationDictionaryEntries.EF): ef_entry, + ), + NameObject(FileSpecificationDictionaryEntries.F): TextStringObject( + fname + ), + NameObject(FileSpecificationDictionaryEntries.DESC): TextStringObject( + desc + ), } ) - - # Then create the entry for the root, as it needs - # a reference to the Filespec - # Sample: - # 1 0 obj - # << - # /Type /Catalog - # /Outlines 2 0 R - # /Pages 3 0 R - # /Names << /EmbeddedFiles << /Names [(hello.txt) 7 0 R] >> >> - # >> - # endobj - - if CA.NAMES not in self._root_object: - self._root_object[NameObject(CA.NAMES)] = self._add_object( - DictionaryObject() - ) - if "/EmbeddedFiles" not in cast(DictionaryObject, self._root_object[CA.NAMES]): - embedded_files_names_dictionary = DictionaryObject( - {NameObject(CA.NAMES): ArrayObject()} - ) - cast(DictionaryObject, self._root_object[CA.NAMES])[ - NameObject("/EmbeddedFiles") - ] = self._add_object(embedded_files_names_dictionary) + if isinstance(data, list): + filespec[NameObject(FileSpecificationDictionaryEntries.RF)] = ef_entry else: - embedded_files_names_dictionary = cast( - DictionaryObject, - cast(DictionaryObject, self._root_object[CA.NAMES])["/EmbeddedFiles"], - ) - cast(ArrayObject, embedded_files_names_dictionary[CA.NAMES]).extend( - [create_string_object(filename), filespec] - ) + filespec[NameObject(FileSpecificationDictionaryEntries.EF)] = ef_entry + + nm = self._get_embedded_files_root() or self._create_attachment_root() + nm.list_add(filename, filespec, overwrite=True) + return filespec def addAttachment(self, fname: str, fdata: Union[str, bytes]) -> None: # deprecated """ @@ -797,7 +867,7 @@ def addAttachment(self, fname: str, fdata: Union[str, bytes]) -> None: # deprec .. deprecated:: 1.28.0 """ deprecation_with_replacement("addAttachment", "add_attachment", "3.0.0") - return self.add_attachment(fname, fdata) + self.add_attachment(fname, fdata) def append_pages_from_reader( self, diff --git a/pypdf/constants.py b/pypdf/constants.py index 56a24b183..b4bae5939 100644 --- a/pypdf/constants.py +++ b/pypdf/constants.py @@ -149,8 +149,11 @@ class FileSpecificationDictionaryEntries: Type = "/Type" FS = "/FS" # The name of the file system to be used to interpret this file specification - F = "/F" # A file specification string of the form described in Section 3.10.1 + F = "/F" # A file specification string of the file as described in Section 3.10.1 + UF = "/UF" # A unicode string of the file as described in Section 3.10.1 EF = "/EF" # dictionary, containing a subset of the keys F , UF , DOS , Mac , and Unix + RF = "/RF" # dictionary, containing arrays of /EmbeddedFile + DESC = "/Desc" # description of the file as de class StreamAttributes: diff --git a/pypdf/generic/__init__.py b/pypdf/generic/__init__.py index 778a9339e..f2eadf079 100644 --- a/pypdf/generic/__init__.py +++ b/pypdf/generic/__init__.py @@ -46,15 +46,20 @@ encode_pdfdocencoding, ) from ._data_structures import ( + PREFERED_ATTACHMENT, ArrayObject, + AttachmentBytes, + AttachmentBytesDictionary, ContentStream, DecodedStreamObject, Destination, DictionaryObject, EncodedStreamObject, Field, + NameTree, StreamObject, TreeObject, + get_from_file_specification, read_object, ) from ._fit import Fit @@ -435,6 +440,8 @@ def link( "PAGE_FIT", # Data structures "ArrayObject", + "AttachmentBytes", + "AttachmentBytesDictionary", "DictionaryObject", "TreeObject", "StreamObject", @@ -444,6 +451,9 @@ def link( "RectangleObject", "Field", "Destination", + "NameTree", + "PREFERED_ATTACHMENT", + "get_from_file_specification", "ViewerPreferences", # --- More specific stuff # Outline diff --git a/pypdf/generic/_base.py b/pypdf/generic/_base.py index a50bb5faf..d429f2724 100644 --- a/pypdf/generic/_base.py +++ b/pypdf/generic/_base.py @@ -314,6 +314,18 @@ def get_object(self) -> Optional["PdfObject"]: return None return obj.get_object() + def replace_object(self, obj: "PdfObject") -> None: + """ + Replace the pointed object with obj + Only applies to IndirectObjects within a PdfWriter + """ + obj = cast("PdfObject", obj.get_object()) + pdf = self.pdf + if not hasattr(pdf, "_replace_object"): + raise TypeError("Trying to replace Object in a non PdfWriter") + pdf._replace_object(self.idnum, obj) + obj.indirect_reference = self + def __repr__(self) -> str: return f"IndirectObject({self.idnum!r}, {self.generation!r}, {id(self.pdf)})" diff --git a/pypdf/generic/_data_structures.py b/pypdf/generic/_data_structures.py index 4ee13a13b..9261eafc3 100644 --- a/pypdf/generic/_data_structures.py +++ b/pypdf/generic/_data_structures.py @@ -37,10 +37,11 @@ Callable, Dict, Iterable, + Iterator, List, + Mapping, Optional, Sequence, - Set, Tuple, Union, cast, @@ -188,7 +189,6 @@ def clone( except Exception: pass - visited: Set[Tuple[int, int]] = set() # (idnum, generation) d__ = cast( "DictionaryObject", self._reference_clone(self.__class__(), pdf_dest, force_duplicate), @@ -196,7 +196,7 @@ def clone( if ignore_fields is None: ignore_fields = [] if len(d__.keys()) == 0: - d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) + d__._clone(self, pdf_dest, force_duplicate, ignore_fields) return d__ def _clone( @@ -205,7 +205,6 @@ def _clone( pdf_dest: PdfWriterProtocol, force_duplicate: bool, ignore_fields: Optional[Sequence[Union[str, int]]], - visited: Set[Tuple[int, int]], # (idnum, generation) ) -> None: """ Update the object from src. @@ -273,14 +272,6 @@ def _clone( cur_obj.__class__(), pdf_dest, force_duplicate ), ) - # check to see if we've previously processed our item - if clon.indirect_reference is not None: - idnum = clon.indirect_reference.idnum - generation = clon.indirect_reference.generation - if (idnum, generation) in visited: - cur_obj = None - break - visited.add((idnum, generation)) objs.append((cur_obj, clon)) assert prev_obj is not None prev_obj[NameObject(k)] = clon.indirect_reference @@ -293,9 +284,7 @@ def _clone( except Exception: cur_obj = None for s, c in objs: - c._clone( - s, pdf_dest, force_duplicate, ignore_fields, visited - ) + c._clone(s, pdf_dest, force_duplicate, ignore_fields) for k, v in src.items(): if k not in ignore_fields: @@ -811,7 +800,6 @@ def _clone( pdf_dest: PdfWriterProtocol, force_duplicate: bool, ignore_fields: Optional[Sequence[Union[str, int]]], - visited: Set[Tuple[int, int]], ) -> None: """ Update the object from src. @@ -834,7 +822,7 @@ def _clone( ) except Exception: pass - super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) + super()._clone(src, pdf_dest, force_duplicate, ignore_fields) def get_data(self) -> Union[bytes, str]: return self._data @@ -1062,7 +1050,6 @@ def clone( except Exception: pass - visited: Set[Tuple[int, int]] = set() d__ = cast( "ContentStream", self._reference_clone( @@ -1071,7 +1058,7 @@ def clone( ) if ignore_fields is None: ignore_fields = [] - d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited) + d__._clone(self, pdf_dest, force_duplicate, ignore_fields) return d__ def _clone( @@ -1080,7 +1067,6 @@ def _clone( pdf_dest: PdfWriterProtocol, force_duplicate: bool, ignore_fields: Optional[Sequence[Union[str, int]]], - visited: Set[Tuple[int, int]], ) -> None: """ Update the object from src. @@ -1097,7 +1083,7 @@ def _clone( self._operations = list(src_cs._operations) self.forced_encoding = src_cs.forced_encoding # no need to call DictionaryObjection or anything - # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited) + # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields) def _parse_content_stream(self, stream: StreamType) -> None: # 7.8.2 Content Streams @@ -1467,6 +1453,460 @@ def additionalActions(self) -> Optional[DictionaryObject]: # deprecated return self.additional_actions +class AttachmentBytes(bytes): + """Extension of bytes class, adding File Spefication dedicated properties""" + + source_object: Optional[IndirectObject] = None + """ + Pointer to the File Specification entry associated ; + None, if created from a bytes or StreamObject + """ + within_page: Optional[IndirectObject] = None + """ + Page where the File Spefication is referenced, else None + This is relevant only for file attachement annotations + note : this property should be initialized manually out of the constructor + """ + + def __new__( + cls, + src: Optional[ + Union[bytes, IndirectObject, StreamObject, DictionaryObject] + ] = None, + ) -> "AttachmentBytes": + """ + Object Constructor. + + Args: + src [DictionaryObject] : FileSpecification Object to populate the new object + src [bytes/StreamObject] : bytes/StreamObject(EmbeddedFile) to extract the stream + to initialize (partially the object) + src [IndirectObject] : Pointer to the DictionaryObject/StreamObject for init + src [None] : similar to src = b"" + """ + inp: Optional[IndirectObject] = None + obj: Any = src + v: Union[str, bytes] + if isinstance(obj, IndirectObject): + obj = obj.get_object() + if isinstance(obj, bytes): + v = obj + elif isinstance(obj, StreamObject): + v = obj.get_data() + elif isinstance(obj, DictionaryObject) and "/EF" in obj: + inp = obj.indirect_reference + o = cast(DictionaryObject, obj["/EF"]) + o = cast(StreamObject, get_from_file_specification(o).get_object()) + v = o.get_data() + else: + v = b"" + if isinstance(v, str): + v = v.encode() + out = bytes.__new__(cls, v) + if inp is None: + out.source_object = None + else: + out.source_object = inp.indirect_reference + out.within_page = None # has to be set by program + return out + + @property + def name(self) -> Optional[str]: + """Returns the (best) name from the File Specification Object else None""" + o: Any = self.source_object + if o is None: + return None + o = cast(DictionaryObject, o.get_object()) + return cast(str, get_from_file_specification(o)) + + def list_rf_names(self) -> List[str]: + """ + Returns: + List of filenames store in /RF fields; + Empty list if no /RF field exists + + Note: + does not contains "" entry (for EF) + """ + o: Any = self.source_object + if o is None: + return [] + o = cast(DictionaryObject, o.get_object()) + if "/RF" in o: + o = cast(DictionaryObject, o["/RF"]) + o = cast(DictionaryObject, get_from_file_specification(o)) + try: + lst = [o[i] for i in range(0, len(o), 2)] + return lst + except ValueError: + return [] + else: + return [] + + def get_embeddedfile(self, subfile: str = "") -> Optional[StreamObject]: + """ + Returns the EmbeddedFile(Stream Object) containing the data bytes + Args: + subfile: filename of the EmbeddedFile to be returned; + "" returns the EmbeddedFile from the /EF field + Returns: + StreamObject + + Note: + o == o.get_embeddedfile("").get_data() + """ + o: Any = self.source_object + if o is None: + return None + o = cast(DictionaryObject, o.get_object()) + if subfile == "": + o = cast(DictionaryObject, o["/EF"]) + return cast(StreamObject, get_from_file_specification(o).get_object()) + elif "/RF" in o: + o = cast(DictionaryObject, o["/RF"]) + o = cast(DictionaryObject, get_from_file_specification(o)) + try: + i = o.index(subfile) + return cast(StreamObject, o[i + 1].get_object()) + except ValueError: + return None + else: + return None + + @property + def all_files(self) -> Dict[str, bytes]: + """ + Returns: + a dictionary filename/data bytes; + {} if the object is not assocatied with a File Spefication. + + Note: + the results contains also the /EF stored behin "" key + """ + o: Any = self.source_object + if o is None: + return {} + o = cast(DictionaryObject, o.get_object()) + out: Dict[str, bytes] = {} + o = cast(DictionaryObject, o["/EF"]) + v = cast(StreamObject, get_from_file_specification(o)).get_data() + if isinstance(v, str): + v = v.encode() + out[""] = v + if "/RF" in o: + o = cast(DictionaryObject, o["/RF"]) + a = cast(ArrayObject, get_from_file_specification(o)) + try: + for i in range(0, len(a), 2): + v = cast(StreamObject, a[i + 1].get_object()).get_data() + if isinstance(v, str): + v = v.encode() + out[a[i]] = v + return out + except ValueError as exc: + logger_warning(exc.__repr__(), __name__) + return out + else: + return out + + +class NameTree(DictionaryObject): + """ + Name Tree Structure + Allow to list, get and set objects In a Name Tree + """ + + def __init__(self, obj: Optional[PdfObject] = None) -> None: + DictionaryObject.__init__(self) + if obj is None: + self[NameObject("/Names")] = ArrayObject() + return + if not isinstance(obj, DictionaryObject) or all( + x not in obj for x in ("/Names", "/Kids") + ): + raise ValueError("source object is not a valid source object") + self.update(obj) + if hasattr(obj, "indirect_reference"): + self.indirect_reference = obj.indirect_reference + + def list_keys(self) -> List[str]: + """ + Provides the list of keys of the items in the Name Tree + + Returns: + List of str keys + """ + + def _list(o: Optional[PdfObject]) -> List[str]: + if o is None: + return [] + o = cast(DictionaryObject, o) + _l = o.get("/Names", None) + a = o.get("/Kids", None) + _l = _l.get_object() if _l else [] + a = a.get_object() if a else [] + ll = [v for v in _l if isinstance(v, str)] # and v not in ll: + for x in a: + ll.extend(_list(x.get_object())) + # for v in _list(x.get_object()): + # if v not in ll: + # ll.append(v) + return ll + + _l = _list(self) + _l.sort() + return _l + + def list_items(self) -> Mapping[str, List[PdfObject]]: + """ + Provides the Name Tree Entries as a dictionary + + Returns: + dictionary of objects + """ + + def _list( + o: Optional[PdfObject], lout: List[Tuple[str, List[PdfObject]]] + ) -> List[Tuple[str, List[PdfObject]]]: + def _append_with_dup( + ll: List[Tuple[str, Any]], _l: List[Tuple[str, Any]] + ) -> None: + for k, v in _l: + try: + i = tuple(x[0] for x in ll).index(k) + ll[i][1].append(v) + except ValueError: + ll.append((k, [v])) + + if o is None: + return lout + o = cast(DictionaryObject, o) + _l = o.get("/Names", None) + a = o.get("/Kids", None) + _l = _l.get_object() if _l else [] + a = a.get_object() if a else [] + _l = [ + (v, None if isinstance(_l[i + 1], str) else _l[i + 1]) + for i, v in enumerate(_l) + if isinstance(v, str) + ] + # to handle duplicates + _append_with_dup(lout, _l) + for x in a: + # _append_with_dup(lout, _list(x.get_object(),lout)) + _list(x.get_object(), lout) + return lout + + _l: List[Tuple[str, List[PdfObject]]] = [] + _list(self, _l) + return dict(_l) + + def list_get(self, key: str) -> Optional[PdfObject]: + """ + Get the entry from the Name Tree + + Args: + key: searched entry + + Returns: + matching PdfObject; None i + attributeEntries as a dictionary + """ + + def _get(key: str, o: Optional[PdfObject]) -> Optional[PdfObject]: + if o is None: + return None + o = cast(DictionaryObject, o) + _l = o.get("/Names", None) + a = o.get("/Kids", None) + _l = _l.get_object() if _l else [] + a = a.get_object() if a else [] + for i, x in enumerate(_l): + if x == key: + return _l[i + 1] + for x in a: + v = _get(key, x) + if v is not None: + return v + return None # if we arrive here, it means nothing matched + + return _get(key, self) + + def list_add( + self, + key: Union[str, TextStringObject], + data: PdfObject, + overwrite: bool = False, + ) -> Optional[IndirectObject]: + """ + Add the data entry from the Name Tree + + Args: + key: entry + data: PdfObject (it will be added to the list of objects + overwrite: allow to overwrite existing key + + Returns: + matching PdfObject; None i + attributeEntries as a dictionary + """ + try: + if self.indirect_reference is None: + raise TypeError + writer = self.indirect_reference.pdf + if not hasattr(writer, "_add_object"): + raise TypeError + except (TypeError, AttributeError): + raise TypeError("Object does not belong to a PdfWriter") + if not isinstance(key, TextStringObject): + key = TextStringObject(key) + + def _update_limits( + obj: DictionaryObject, + lo: Optional[Union[str, TextStringObject]], + hi: Optional[Union[str, TextStringObject]], + ) -> bool: + if "/Limits" not in obj: + return False + a = cast("ArrayObject", obj["/Limits"]) + if lo is not None and lo < a[0]: + if not isinstance(lo, TextStringObject): + lo = TextStringObject(lo) + a[0] = lo + return True + if hi is not None and hi > a[0]: + if not isinstance(hi, TextStringObject): + lo = TextStringObject(hi) + a[1] = hi + return True + return False + + def _add_in( + o: Optional[PdfObject], appb: bool = True, app: bool = True + ) -> Optional[PdfObject]: + nonlocal overwrite, writer, key, data + if o is None: + return None + o = cast(DictionaryObject, o) + if "/Names" in o: + _l = cast(ArrayObject, o["/Names"]) + if len(_l) > 0: + li = o.get("/Limits", [_l[0], _l[-2]]) + if not appb and key < li[0]: + return None + if not app and key > li[1]: + return None + i = 0 + while i < len(_l): + if _l[i] == key: + d = _l[i + 1] + if not overwrite: + return d + if isinstance(d, IndirectObject): + d.replace_object(data) + else: # pragma: no cover + # should not occur iaw pdf spec + _l[i + 1] = data + return _l[i + 1] + elif key < _l[i]: + _l.insert(i, key) + _l.insert(i + 1, writer._add_object(data)) + _update_limits(o, key, None) + return _l[i + 1] + i += 2 + if app: + _l.append(key) + _l.append(writer._add_object(data)) + _update_limits(o, key, None) + return _l[-1] + return None + else: # kids + ar = cast(ArrayObject, o["/Kids"]) + for x in ar: + r = _add_in(x, x == ar[0], x == ar[-1]) + if r: + _update_limits(o, key, key) + return r + return None + + o = _add_in(self, True, True) + return o.indirect_reference if o is not None else None + + +PREFERED_ATTACHMENT = "/DOS" + + +def get_from_file_specification(_a: DictionaryObject) -> PdfObject: + return ( + _a.get("/UF") + or _a.get("/F") + or _a.get(PREFERED_ATTACHMENT) + or _a.get("/DOS") + or _a.get("/Unix") + or _a.get("/Mac") + or DictionaryObject() + ) + + +class AttachmentBytesDictionary(Mapping[str, AttachmentBytes]): + """ + Dict[str, AttachmentBytes] + Ease access to Dictionary of Object + """ + + root: Optional[NameTree] + names: List[str] + + def __init__( + self, root: Optional[Union[NameTree, DictionaryObject]] = None + ) -> None: + # super().__init__(self) + if isinstance(root, IndirectObject): + root = cast(DictionaryObject, root.get_object()) + if root is not None: + self.root = root if isinstance(root, NameTree) else NameTree(root) + self.names = list(self.root.list_keys()) + else: + self.root = None + self.names = [] + + def keys(self) -> List[str]: # type: ignore[override] + return self.names + + def __len__(self) -> int: + return len(self.names) + + def __iter__(self) -> Iterator[str]: # type: ignore + yield from self.names + + def items(self) -> Iterable[Tuple[str, AttachmentBytes]]: # type: ignore[override] + if self.root is None: + return [] + else: + for k, v in self.root.list_items().items(): + if len(v) > 1: + logger_warning( + "Unexpected amout of entries in attachments," + "please report" + "and share the file for analysis with pypdf dev team", + __name__, + ) + yield (k, AttachmentBytes(cast(DictionaryObject, v[0].get_object()))) + + def __getitem__(self, k: str) -> AttachmentBytes: + if k not in self.names: + raise KeyError(f"KeyError: {k}") + if self.root is None: + raise ValueError("Empty Object") + v = self.root.list_get(k) + if v is None: + raise KeyError(f"KeyError: {k}") + return AttachmentBytes(cast(DictionaryObject, v.get_object())) + + def __repr__(self) -> str: + return "{ " + ", ".join(["'" + x + "': ..." for x in self.names]) + "}" + + class Destination(TreeObject): """ A class representing a destination within a PDF file. diff --git a/tests/test_generic.py b/tests/test_generic.py index 0e0fff677..5f776d5ed 100644 --- a/tests/test_generic.py +++ b/tests/test_generic.py @@ -19,6 +19,7 @@ FloatObject, IndirectObject, NameObject, + NameTree, NullObject, NumberObject, OutlineItem, @@ -1235,3 +1236,18 @@ def test_encodedstream_set_data(): assert cc["/Filter"] == ["/FlateDecode", "/FlateDecode", "/FlateDecode"] assert str(cc["/DecodeParms"]) == "[NullObject, NullObject, NullObject]" assert cc[NameObject("/Test")] == "/MyTest" + + +def test_replace_object(): + writer = PdfWriter(clone_from=RESOURCE_ROOT / "crazyones.pdf") + i = writer.pages[0]["/Contents"][0].idnum + writer.pages[0]["/Contents"][0].replace_object(NullObject()) + assert writer.pages[0]["/Contents"][0].idnum == i + assert isinstance(writer.pages[0]["/Contents"][0].get_object(), NullObject) + + +def test_nametree(): + writer = PdfWriter(clone_from=RESOURCE_ROOT / "crazyones.pdf") + with pytest.raises(ValueError): + NameTree(writer._root_object) + writer._root_object[NameObject("/Names")] = DictionaryObject() diff --git a/tests/test_reader.py b/tests/test_reader.py index b252e48f9..f43f11df6 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -1466,3 +1466,10 @@ def test_xyz_with_missing_param(): assert reader.outline[0]["/Top"] == 0 assert reader.outline[1]["/Left"] == 0 assert reader.outline[0]["/Top"] == 0 + + +def test_embedded_files_no_ef(): + reader = PdfReader(RESOURCE_ROOT / "crazyones.pdf") + reader.trailer["/Root"][NameObject("/Names")] = DictionaryObject() + assert reader.attachments_names == [] + assert reader.attachments == {} diff --git a/tests/test_writer.py b/tests/test_writer.py index 281232c4b..1a8a545f0 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1297,8 +1297,8 @@ def test_attachments(): reader = PdfReader(b) b = None assert reader.attachments == {} - assert reader._list_attachments() == [] - assert reader._get_attachments() == {} + # assert reader._list_attachments() == [] + # assert reader._get_attachments() == {} to_add = [ ("foobar.txt", b"foobarcontent"), ("foobar2.txt", b"foobarcontent2"), @@ -1313,25 +1313,52 @@ def test_attachments(): reader = PdfReader(b) b = None assert sorted(reader.attachments.keys()) == sorted({name for name, _ in to_add}) - assert str(reader.attachments) == "LazyDict(keys=['foobar.txt', 'foobar2.txt'])" - assert reader._list_attachments() == [name for name, _ in to_add] + assert dict(reader.attachments.items()) == { + "foobar.txt": b"foobarcontent", + "foobar2.txt": b"2nd_foobarcontent", + } + writer.add_attachment("foobar2.txt", b"overwrite_ignored", overwrite=False) + assert dict(reader.attachments.items()) == { + "foobar.txt": b"foobarcontent", + "foobar2.txt": b"2nd_foobarcontent", + } + assert dict(writer.attachments.items()) == { + "foobar.txt": b"foobarcontent", + "foobar2.txt": b"2nd_foobarcontent", + } + # _l = list({name for name, _ in to_add}) + # _l.sort() + # assert reader._list_attachments() == _l + # assert writer._list_attachments() == _l # We've added the same key twice - hence only 2 and not 3: - att = reader._get_attachments() - assert len(att) == 2 # we have 2 keys, but 3 attachments! + # att = reader._get_attachments() + # assert len(att) == 2 # The content for foobar.txt is clear and just a single value: - assert att["foobar.txt"] == b"foobarcontent" + # assert att["foobar.txt"] == b"foobarcontent" + + # Not applicable for writer + # att = writer._get_attachments() + # assert len(att) == 2 # we have 2 keys only + # assert att["foobar.txt"] == b"foobarcontent" # The content for foobar2.txt is a list! - att = reader._get_attachments("foobar2.txt") - assert len(att) == 1 - assert att["foobar2.txt"] == [b"foobarcontent2", b"2nd_foobarcontent"] + # att = reader._get_attachments("foobar2.txt") + # assert len(att) == 1 + # assert att["foobar2.txt"] == [b"2nd_foobarcontent"] + + # The content for foobar2.txt is a list! + # att = writer._get_attachments("foobar2.txt") + # assert len(att) == 1 + # assert att["foobar2.txt"] == [b"2nd_foobarcontent"] # Let's do both cases with the public interface: - assert reader.attachments["foobar.txt"][0] == b"foobarcontent" - assert reader.attachments["foobar2.txt"][0] == b"foobarcontent2" - assert reader.attachments["foobar2.txt"][1] == b"2nd_foobarcontent" + assert reader.attachments["foobar.txt"] == b"foobarcontent" + assert reader.attachments["foobar2.txt"] == b"2nd_foobarcontent" + + assert writer.attachments["foobar.txt"] == b"foobarcontent" + assert writer.attachments["foobar2.txt"] == b"2nd_foobarcontent" @pytest.mark.enable_socket()