diff --git a/pypdf/_writer.py b/pypdf/_writer.py index 0ac1524bc..ffb8df31a 100644 --- a/pypdf/_writer.py +++ b/pypdf/_writer.py @@ -235,9 +235,9 @@ def _get_clone_from( or Path(str(fileobj)).stat().st_size == 0 ): cloning = False - if isinstance(fileobj, (IO, BytesIO)): + if isinstance(fileobj, (IOBase, BytesIO)): t = fileobj.tell() - fileobj.seek(-1, 2) + fileobj.seek(0, 2) if fileobj.tell() == 0: cloning = False fileobj.seek(t, 0) @@ -249,7 +249,7 @@ def _get_clone_from( # to prevent overwriting self.temp_fileobj = fileobj self.fileobj = "" - self.with_as_usage = False + self.cloned = False # The root of our page tree node. pages = DictionaryObject() pages.update( @@ -267,6 +267,7 @@ def _get_clone_from( if not isinstance(clone_from, PdfReader): clone_from = PdfReader(clone_from) self.clone_document_from_reader(clone_from) + self.cloned = True else: self._pages = self._add_object(pages) # root object @@ -353,10 +354,11 @@ def xmp_metadata(self, value: Optional[XmpInformation]) -> None: return self.root_object.xmp_metadata # type: ignore def __enter__(self) -> "PdfWriter": - """Store that writer is initialized by 'with'.""" + """Store how writer is initialized by 'with'.""" + c: bool = self.cloned t = self.temp_fileobj self.__init__() # type: ignore - self.with_as_usage = True + self.cloned = c self.fileobj = t # type: ignore return self @@ -367,7 +369,7 @@ def __exit__( traceback: Optional[TracebackType], ) -> None: """Write data to the fileobj.""" - if self.fileobj: + if self.fileobj and not self.cloned: self.write(self.fileobj) def _repr_mimebundle_( @@ -1388,13 +1390,14 @@ def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO[Any]]: if isinstance(stream, (str, Path)): stream = FileIO(stream, "wb") - self.with_as_usage = True # my_file = True self.write_stream(stream) - if self.with_as_usage: + if my_file: stream.close() + else: + stream.flush() return my_file, stream diff --git a/tests/test_writer.py b/tests/test_writer.py index 0cd2d03f8..d4e176834 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -2480,3 +2480,31 @@ def test_append_pdf_with_dest_without_page(caplog): writer.append(reader) assert "/__WKANCHOR_8" not in writer.named_destinations assert len(writer.named_destinations) == 3 + + +def test_stream_not_closed(): + """Tests for #2905""" + src = RESOURCE_ROOT / "pdflatex-outline.pdf" + with NamedTemporaryFile(suffix=".pdf") as tmp: + with PdfReader(src) as reader, PdfWriter() as writer: + writer.add_page(reader.pages[0]) + writer.write(tmp) + assert not tmp.file.closed + + with NamedTemporaryFile(suffix=".pdf") as target: + with PdfWriter(target.file) as writer: + writer.add_blank_page(100, 100) + assert not target.file.closed + + with open(src, "rb") as fileobj: + with PdfWriter(fileobj) as writer: + pass + assert not fileobj.closed + + +def test_auto_write(tmp_path): + """Another test for #2905""" + target = tmp_path / "out.pdf" + with PdfWriter(target) as writer: + writer.add_blank_page(100, 100) + assert target.stat().st_size > 0