From 4ce69044955cd4badb82c74a8e5a687d0309f641 Mon Sep 17 00:00:00 2001 From: James Maroney Date: Fri, 6 Oct 2017 17:56:09 -0400 Subject: [PATCH] when `output` is an object with a `write` method, write directly to it --- gnupg/_meta.py | 40 ++++++++++++++++------------------- gnupg/gnupg.py | 15 +++++++++++--- gnupg/test/test_gnupg.py | 45 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 25 deletions(-) diff --git a/gnupg/_meta.py b/gnupg/_meta.py index 9df6163..c6b55ab 100644 --- a/gnupg/_meta.py +++ b/gnupg/_meta.py @@ -685,7 +685,12 @@ def _read_data(self, stream, result): data = stream.read(1024) if len(data) == 0: break - chunks.append(data) + + if hasattr(result, 'writer'): + result.writer.write(data) + else: + chunks.append(data) + log.debug("Read %4d bytes" % len(data)) # Join using b'' or '', as appropriate @@ -913,7 +918,7 @@ def _encrypt(self, data, recipients, warnings. (default: True) :type output: str or file-like object - :param output: The output file to write to. If not specified, the + :param output: The output file or file-like object to write to. If not specified, the encrypted output is returned, and thus should be stored as an object in Python. For example: @@ -965,20 +970,14 @@ def _encrypt(self, data, recipients, ## FIXME: GnuPG appears to ignore the --output directive when being ## programmatically driven. We'll handle the IO ourselves to fix this ## for now. - output_filename = None - if output: - if getattr(output, 'fileno', None) is not None: - ## avoid overwrite confirmation message - if getattr(output, 'name', None) is not None: - output_filename = output.name - if os.path.exists(output.name): - os.remove(output.name) - #args.append('--output %s' % output.name) - else: - output_filename = output - if os.path.exists(output): - os.remove(output) - #args.append('--output %s' % output) + close_output_when_done = False + if output and not hasattr(output, 'write'): + output = str(output) + if os.path.exists(output): + os.remove(output) + args.append('--output %s' % output) + output = open(output, 'wb') + close_output_when_done = True if armor: args.append('--armor') if always_trust: args.append('--always-trust') @@ -1038,6 +1037,7 @@ def _encrypt(self, data, recipients, % recipients) result = self._result_map['crypt'](self) + result.writer = output log.debug("Got data '%s' with type '%s'." % (data, type(data))) self._handle_io(args, data, result, passphrase=passphrase, binary=True) # Avoid writing raw encrypted bytes to terminal loggers and breaking @@ -1045,12 +1045,8 @@ def _encrypt(self, data, recipients, if armor: log.debug("\n%s" % result.data) - if output_filename: - log.info("Writing encrypted output to file: %s" % output_filename) - with open(output_filename, 'wb') as fh: - fh.write(result.data) - fh.flush() - log.info("Encrypted output written successfully.") + if close_output_when_done: + output.close() return result diff --git a/gnupg/gnupg.py b/gnupg/gnupg.py index e0fdaee..0b52c3d 100644 --- a/gnupg/gnupg.py +++ b/gnupg/gnupg.py @@ -1072,7 +1072,7 @@ def decrypt(self, message, **kwargs): :param message: A string or file-like object to decrypt. :param bool always_trust: Instruct GnuPG to ignore trust checks. :param str passphrase: The passphrase for the secret key used for decryption. - :param str output: A filename to write the decrypted output to. + :param str output: A filename or file-like object to write the decrypted output to. """ stream = _make_binary_stream(message, self._encoding) result = self.decrypt_file(stream, **kwargs) @@ -1089,17 +1089,26 @@ def decrypt_file(self, filename, always_trust=False, passphrase=None, :param str output: A filename to write the decrypted output to. """ args = ["--decrypt"] - if output: # write the output to a file with the specified name + output_writer = None + if output and isinstance(output, str): # write the output to a file with the specified name if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message + os.remove(output) # to avoid overwrite confirmation message args.append('--output %s' % output) + elif output and hasattr(output, 'write'): + output_writer = output + if always_trust: args.append("--always-trust") + result = self._result_map['crypt'](self) + if output_writer: + result.writer = output_writer + self._handle_io(args, filename, result, passphrase, binary=True) log.debug('decrypt result: %r', result.data) return result + class GPGUtilities(object): """Extra tools for working with GnuPG.""" diff --git a/gnupg/test/test_gnupg.py b/gnupg/test/test_gnupg.py index dc518c7..ef67f1b 100755 --- a/gnupg/test/test_gnupg.py +++ b/gnupg/test/test_gnupg.py @@ -32,6 +32,7 @@ from codecs import open as open from functools import wraps from glob import glob +from io import BytesIO from time import localtime from time import mktime @@ -1179,6 +1180,33 @@ def test_encryption_multi_recipient(self): self.assertIsNotNone(encrypted) self.assertGreater(len(encrypted), 0) + def test_decryption_with_bytes_writeable_output(self): + """Test decryption""" + key = self.generate_key("Rück", "rü.ck", passphrase="ruck") + ruck_fpr = key.fingerprint + ruck = self.gpg.export_keys(key.fingerprint) + self.gpg.import_keys(ruck) + + message = """ +In 2010 Riggio and Sicari presented a practical application of homomorphic +encryption to a hybrid wireless sensor/mesh network. The system enables +transparent multi-hop wireless backhauls that are able to perform statistical +analysis of different kinds of data (temperature, humidity, etc.) coming from +a WSN while ensuring both end-to-end encryption and hop-by-hop +authentication.""" + + encrypted = str(self.gpg.encrypt(message, ruck_fpr)) + + output = BytesIO() + self.gpg.decrypt(encrypted, passphrase="ruck", output=output) + decrypted = output.getvalue().decode('utf-8') + + if message != decrypted: + log.debug("was: %r" % message) + log.debug("new: %r" % decrypted) + + self.assertEqual(message, decrypted) + def test_decryption(self): """Test decryption""" key = self.generate_key("Frey", "fr.ey", passphrase="frey") @@ -1490,6 +1518,23 @@ def test_encryption_with_output(self): encrypted_message = fh.read() self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message) + def test_encryption_with_bytes_writeable_output(self): + """Test that ``encrypt('foo', ..., output=writeable_object)`` is successful.""" + data = "Test Message" + output = BytesIO() + kwargs = dict(passphrase='speedtest', + symmetric=True, + cipher_algo='AES256', + encrypt=False, + output=output) + encrypted = self.gpg.encrypt(data, None, **kwargs) + self.assertTrue(encrypted.ok) + + encrypted_message = output.getvalue() + self.assertGreater(len(encrypted_message), 0) + print(encrypted_message) + self.assertIn(b"-----BEGIN PGP MESSAGE-----", encrypted_message) + def test_key_expiration(self): """Test that changing key expiration date succeeds.""" today = datetime.date.today()