Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when output is an object with a write method, write directly to it #210

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions gnupg/_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,12 @@ def _read_data(self, stream, result):
data = stream.read(1024)
if len(data) == 0:
break
chunks.append(data)

if hasattr(result, 'writer'):
result.writer.write(data)
else:
chunks.append(data)

log.debug("Read %4d bytes" % len(data))

# Join using b'' or '', as appropriate
Expand Down Expand Up @@ -913,7 +918,7 @@ def _encrypt(self, data, recipients,
warnings. (default: True)

:type output: str or file-like object
:param output: The output file to write to. If not specified, the
:param output: The output file or file-like object to write to. If not specified, the
encrypted output is returned, and thus should be stored
as an object in Python. For example:

Expand Down Expand Up @@ -965,20 +970,14 @@ def _encrypt(self, data, recipients,
## FIXME: GnuPG appears to ignore the --output directive when being
## programmatically driven. We'll handle the IO ourselves to fix this
## for now.
output_filename = None
if output:
if getattr(output, 'fileno', None) is not None:
## avoid overwrite confirmation message
if getattr(output, 'name', None) is not None:
output_filename = output.name
if os.path.exists(output.name):
os.remove(output.name)
#args.append('--output %s' % output.name)
else:
output_filename = output
if os.path.exists(output):
os.remove(output)
#args.append('--output %s' % output)
close_output_when_done = False
if output and not hasattr(output, 'write'):
output = str(output)
if os.path.exists(output):
os.remove(output)
args.append('--output %s' % output)
output = open(output, 'wb')
close_output_when_done = True

if armor: args.append('--armor')
if always_trust: args.append('--always-trust')
Expand Down Expand Up @@ -1038,19 +1037,16 @@ def _encrypt(self, data, recipients,
% recipients)

result = self._result_map['crypt'](self)
result.writer = output
log.debug("Got data '%s' with type '%s'." % (data, type(data)))
self._handle_io(args, data, result, passphrase=passphrase, binary=True)
# Avoid writing raw encrypted bytes to terminal loggers and breaking
# them in that adorable way where they spew hieroglyphics until reset:
if armor:
log.debug("\n%s" % result.data)

if output_filename:
log.info("Writing encrypted output to file: %s" % output_filename)
with open(output_filename, 'wb') as fh:
fh.write(result.data)
fh.flush()
log.info("Encrypted output written successfully.")
if close_output_when_done:
output.close()

return result

Expand Down
15 changes: 12 additions & 3 deletions gnupg/gnupg.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def decrypt(self, message, **kwargs):
:param message: A string or file-like object to decrypt.
:param bool always_trust: Instruct GnuPG to ignore trust checks.
:param str passphrase: The passphrase for the secret key used for decryption.
:param str output: A filename to write the decrypted output to.
:param str output: A filename or file-like object to write the decrypted output to.
"""
stream = _make_binary_stream(message, self._encoding)
result = self.decrypt_file(stream, **kwargs)
Expand All @@ -1089,17 +1089,26 @@ def decrypt_file(self, filename, always_trust=False, passphrase=None,
:param str output: A filename to write the decrypted output to.
"""
args = ["--decrypt"]
if output: # write the output to a file with the specified name
output_writer = None
if output and isinstance(output, str): # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
os.remove(output) # to avoid overwrite confirmation message
args.append('--output %s' % output)
elif output and hasattr(output, 'write'):
output_writer = output

if always_trust:
args.append("--always-trust")

result = self._result_map['crypt'](self)
if output_writer:
result.writer = output_writer

self._handle_io(args, filename, result, passphrase, binary=True)
log.debug('decrypt result: %r', result.data)
return result


class GPGUtilities(object):
"""Extra tools for working with GnuPG."""

Expand Down
45 changes: 45 additions & 0 deletions gnupg/test/test_gnupg.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from codecs import open as open
from functools import wraps
from glob import glob
from io import BytesIO
from time import localtime
from time import mktime

Expand Down Expand Up @@ -1179,6 +1180,33 @@ def test_encryption_multi_recipient(self):
self.assertIsNotNone(encrypted)
self.assertGreater(len(encrypted), 0)

def test_decryption_with_bytes_writeable_output(self):
"""Test decryption"""
key = self.generate_key("Rück", "rü.ck", passphrase="ruck")
ruck_fpr = key.fingerprint
ruck = self.gpg.export_keys(key.fingerprint)
self.gpg.import_keys(ruck)

message = """
In 2010 Riggio and Sicari presented a practical application of homomorphic
encryption to a hybrid wireless sensor/mesh network. The system enables
transparent multi-hop wireless backhauls that are able to perform statistical
analysis of different kinds of data (temperature, humidity, etc.) coming from
a WSN while ensuring both end-to-end encryption and hop-by-hop
authentication."""

encrypted = str(self.gpg.encrypt(message, ruck_fpr))

output = BytesIO()
self.gpg.decrypt(encrypted, passphrase="ruck", output=output)
decrypted = output.getvalue().decode('utf-8')

if message != decrypted:
log.debug("was: %r" % message)
log.debug("new: %r" % decrypted)

self.assertEqual(message, decrypted)

def test_decryption(self):
"""Test decryption"""
key = self.generate_key("Frey", "fr.ey", passphrase="frey")
Expand Down Expand Up @@ -1490,6 +1518,23 @@ def test_encryption_with_output(self):
encrypted_message = fh.read()
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)

def test_encryption_with_bytes_writeable_output(self):
"""Test that ``encrypt('foo', ..., output=writeable_object)`` is successful."""
data = "Test Message"
output = BytesIO()
kwargs = dict(passphrase='speedtest',
symmetric=True,
cipher_algo='AES256',
encrypt=False,
output=output)
encrypted = self.gpg.encrypt(data, None, **kwargs)
self.assertTrue(encrypted.ok)

encrypted_message = output.getvalue()
self.assertGreater(len(encrypted_message), 0)
print(encrypted_message)
self.assertIn(b"-----BEGIN PGP MESSAGE-----", encrypted_message)

def test_key_expiration(self):
"""Test that changing key expiration date succeeds."""
today = datetime.date.today()
Expand Down