diff --git a/CHANGES.rst b/CHANGES.rst index c7cc502..5a3bf83 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -61,6 +61,34 @@ ============= +Changes for version 0.7.5 +========================= + + +Major Changes +------------- + +- New ``Terminal``, ``MessageBus`` & ``Error`` classes were created to + assist in some heavy refactorings of the codebase. Separating error + handling logic & sending commands to the terminal into their own + classes & methods. + + +Minor Changes +------------- + +- Removed the ``import-drop-uids`` option from the package's import + commands for several reasons. First, this option doesn't work on most + systems. Second, if it did work, the result would be problematic, as + that would mean all uid information would always be dropped from + imported keys. This option was intended to keep GnuPG from crashing + when importing keys which don't have uid information, but it's an + unideal hack around the root problem. +- Some changes to signatures for a better ux, & various code cleanups. + + + + Changes for version 0.7.4 ========================= diff --git a/README.rst b/README.rst index 903de4f..5e391a4 100644 --- a/README.rst +++ b/README.rst @@ -576,6 +576,34 @@ After a user no longer considers a key useful, or wants to dissociate from the k ============= +Changes for version 0.7.5 +========================= + + +Major Changes +------------- + +- New ``Terminal``, ``MessageBus`` & ``Error`` classes were created to + assist in some heavy refactorings of the codebase. Separating error + handling logic & sending commands to the terminal into their own + classes & methods. + + +Minor Changes +------------- + +- Removed the ``import-drop-uids`` option from the package's import + commands for several reasons. First, this option doesn't work on most + systems. Second, if it did work, the result would be problematic, as + that would mean all uid information would always be dropped from + imported keys. This option was intended to keep GnuPG from crashing + when importing keys which don't have uid information, but it's an + unideal hack around the root problem. +- Some changes to signatures for a better ux, & various code cleanups. + + + + Changes for version 0.7.4 ========================= diff --git a/tiny_gnupg.egg-info/PKG-INFO b/tiny_gnupg.egg-info/PKG-INFO index 19dac77..ce04f56 100644 --- a/tiny_gnupg.egg-info/PKG-INFO +++ b/tiny_gnupg.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: tiny-gnupg -Version: 0.7.4 +Version: 0.7.5 Summary: tiny_gnupg - A small-as-possible solution for handling GnuPG ed25519 ECC keys. Home-page: https://github.com/rmlibre/tiny_gnupg Author: Gonzo Investigative Journalism Agency, LLC @@ -586,6 +586,34 @@ Description: tiny_gnupg - A small-as-possible solution for handling GnuPG ed2551 ============= + Changes for version 0.7.5 + ========================= + + + Major Changes + ------------- + + - New ``Terminal``, ``MessageBus`` & ``Error`` classes were created to + assist in some heavy refactorings of the codebase. Separating error + handling logic & sending commands to the terminal into their own + classes & methods. + + + Minor Changes + ------------- + + - Removed the ``import-drop-uids`` option from the package's import + commands for several reasons. First, this option doesn't work on most + systems. Second, if it did work, the result would be problematic, as + that would mean all uid information would always be dropped from + imported keys. This option was intended to keep GnuPG from crashing + when importing keys which don't have uid information, but it's an + unideal hack around the root problem. + - Some changes to signatures for a better ux, & various code cleanups. + + + + Changes for version 0.7.4 ========================= diff --git a/tiny_gnupg/__init__.py b/tiny_gnupg/__init__.py index f666eab..794147f 100644 --- a/tiny_gnupg/__init__.py +++ b/tiny_gnupg/__init__.py @@ -9,7 +9,7 @@ # -__version__ = "0.7.4" +__version__ = "0.7.5" __license__ = "GPLv3" @@ -21,5 +21,6 @@ ) -from .tiny_gnupg import GnuPG, Network, run, __all__ +from .tiny_gnupg import * +from .tiny_gnupg import __all__ diff --git a/tiny_gnupg/tiny_gnupg.py b/tiny_gnupg/tiny_gnupg.py index 6c0b9d9..9cb6181 100644 --- a/tiny_gnupg/tiny_gnupg.py +++ b/tiny_gnupg/tiny_gnupg.py @@ -9,7 +9,16 @@ # -__all__ = ["GnuPG", "Network", "run"] +__all__ = [ + "__all__", + "GnuPG", + "User", + "Network", + "Terminal", + "MessageBus", + "Error", + "run", +] __doc__ = ( @@ -113,6 +122,316 @@ async def post(self, url, **kw): return await response.text() +class MessageBus: + """ + This type carries values that can be added to & queried from an + instance through dotted or bracketed syntax. It's used in the + `tiny_gnupg` package to carry values between user code & the output + of commands sent to the terminal. + """ + def __init__(self, mapping={}, **kwargs): + kw = mapping if mapping.__class__ == dict else json.loads(mapping) + self.__dict__.update({**kw, **kwargs}) + + def __setitem__(self, key, value): + self.__dict__[key] = value + + def __delitem__(self, key): + del self.__dict__[key] + + def __getitem__(self, key): + try: + return self.__dict__[key] + except KeyError: + return getattr(self, key) + + +class Terminal: + """ + This type functions as a helpful & pythonic abstraction for sending + commands to a terminal, reading their outputs, & handling errors + that arise from those calls. + + Usage Example: + + from subprocess import STDOUT + from subprocess import CalledProcessError + + def error_handler(terminal, error): + if not isinstance(error, CalledProcessError): + raise error + print(error.output) + # Do some error handling in this function + + def teardown_logic(terminal): + print(terminal.bus.result) + # Code in this function is always run after a context + + + handlers = dict(if_exception=error_handler, finally_run=teardown_logic) + + with Terminal(**handlers) as terminal: + terminal.bus.result = terminal.enter(["ls", "-al"], stderr=STDOUT) + """ + + @staticmethod + def _if_exception(self, error, *a, **kw): + """ + A placeholder method which is run after an execption is raised + within the class' context manager if another function isn't + specified. + """ + raise error + + @staticmethod + def _finally_run(self, *a, **kw): + """ + A placeholder method which is run in a finally block after the + class' context manager is finished if another function isn't + specified. + """ + return + + @staticmethod + def enter(command=(), inputs=b"", **kw): + """ + Quotes terminal escape characters & runs user commands. + """ + return check_output( + [quote(part) for part in command], input=inputs, **kw + ).decode() + + def __init__(self, *, if_exception=None, finally_run=None): + """ + Inserts methods into the instance which will be run after the + class' context manager, one if an execption occurs, & the other + in a finally block. + """ + self.bus = MessageBus() + self.if_exception = ( + if_exception if if_exception else self._if_exception + ) + self.finally_run = finally_run if finally_run else self._finally_run + + def __enter__(self): + """ + Opens a context manager which catches execptions that will be + run from within the `__exit__` method. + """ + return self + + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + """ + Runs the instance's `if_exception` & `finally_run` methods after + an exception is raised or the context closes. If the return + value of the `if_exception` method is `True` then all exceptions + within the context will be surpressed. + """ + try: + if exc_type: + error = exc_value if exc_value else exc_type + return self.if_exception(self, error) + finally: + self.finally_run(self) + + +class Error: + """ + This type helps to separate the error handling logic from the GnuPG + class with pythonic abstractions. This class' methods are instructed + to be run after exceptions are raised within a `Terminal` class' + context manager. + """ + _BAD_PASSPHRASE_OR_KEY = ( + "Passphrase wrong, inexistent key, or invalid rights to access " + "secret key." + ) + _KEY_ISNT_IMPORTABLE = ( + "_UID_ key isn't importable. See https://dev.gnupg.org/T4393" + ) + _PACKETS_PROTECTED_BY_SECRET_KEY = ( + "Can't decrypt all packets without secret key." + ) + _INVALID_TARGET_OPENPGP_DATA = ( + "``target`` doesn't seem to be valid OpenPGP data." + ) + _KEY_WITH_UID_NOT_IN_KEYRING = ( + "UID '_UID_' is not in package _SECRET_keyring" + ) + _SIGNATURES_PUBLIC_KEY_ISNT_IN_LOCAL_KEYRING = ( + f"UID '_UID_' not in the instance's keyring." + ) + _MESSAGE_IS_UNVERIFIABLE = "``message`` is unverifiable." + + @classmethod + def _key_isnt_importable(cls, uid): + """ + Inserts the uid of key causing the error into a static string + that is reported to the user. + """ + return ( + cls._KEY_ISNT_IMPORTABLE.replace("_UID_", uid) + ) + + @classmethod + def _key_with_uid_not_in_keyring(cls, uid, secret): + """ + Inserts the uid of key causing the error into a static string + that is reported to the user. Also inserts a conditional string + to signal if the key was being checked for among the instance's + secret keys. + """ + return ( + cls._KEY_WITH_UID_NOT_IN_KEYRING + .replace("_UID_", uid) + .replace("_SECRET_", secret) + ) + + @classmethod + def _signatures_public_key_isnt_in_local_keyring(cls, uid): + """ + Inserts the uid of key causing the error into a static string + that is reported to the user. + """ + return ( + cls._SIGNATURES_PUBLIC_KEY_ISNT_IN_LOCAL_KEYRING + .replace("_UID_", uid) + ) + + @staticmethod + def _raise_unexpected_error(error): + """ + Raises any non-`CalledProcessError` so it isn't processed any + further by an error handling method. + """ + if error.__class__ != CalledProcessError: + raise error + + @classmethod + def no_permission(cls, terminal, error): + """ + If either the user's passphrase is wrong, or the key they're + wanting to use isn't owned by their current instance, or the key + isn't in the package's keyring, then an error is raised. This + method runs a command again after failure to catch the error's + outputs to inform the user. + """ + cls._raise_unexpected_error(error) + try: + bus = terminal.bus + bus.kw.pop("stderr") if "stderr" in bus.kw else 0 + terminal.enter( + bus.command, bus.inputs, stderr=STDOUT, **bus.kw + ) + except CalledProcessError as permissions_check: + warning = PermissionError(cls._BAD_PASSPHRASE_OR_KEY) + warning.inputs = bus.inputs + warning.command = bus.command + warning.output = permissions_check.output.decode() + bad_passphrase = "Bad passphrase" in warning.output + missing_key = "No secret key" in warning.output + raise warning if (bad_passphrase or missing_key) else error + + @classmethod + def secret_packets(cls, terminal, error): + """ + If openpgp data packets are encrypted it causes an error. This + method is run after a read failure to inform the user the data + needs to be decrypted successfully before reading its openpgp + data. + """ + cls._raise_unexpected_error(error) + warning = KeyError(cls._PACKETS_PROTECTED_BY_SECRET_KEY) + warning.value = error.output.decode() + raise warning if "No secret key" in warning.value else error + + @classmethod + def invalid_pgp_packets(cls, terminal, error): + """ + If some target data doesn't contain correctly formatted openpgp + data packets it causes an error. This method is run after a read + failure to inform the user something is wrong with the data that + was provided. + """ + if error.__class__ == KeyError: + terminal.bus.packets = error.value + return True + else: + cls._raise_unexpected_error(error) + exception = TypeError(cls._INVALID_TARGET_OPENPGP_DATA) + exception.value = terminal.bus.target + exception.output = error.output + raise exception + + @classmethod + def cannot_list_key(cls, terminal, error): + """ + Searching for a uid which isn't contained by any key in the + package's keyring causes an error. This method is run after a + search failure to inform the user the uid they provided doesn't + match a key in their keyring. + """ + cls._raise_unexpected_error(error) + uid, secret = terminal.bus.uid, terminal.bus.secret + warning = LookupError(cls._key_with_uid_not_in_keyring(uid, secret)) + warning.value = terminal.bus.uid + raise warning + + @classmethod + def no_signature_key(cls, terminal, error): + """ + Decrypting an openpgp message which contains a signature causes + an error if the public key associated with the signature isn't + in the package's keyring. An error is also possible if the user + doesn't have the correct passphrase or decryption key. This + method runs the decryption again after a failure to determine + the issue & correctly inform the user. + """ + cls._raise_unexpected_error(error) + try: + bus = terminal.bus + terminal.enter(bus.command, bus.inputs, stderr=STDOUT) + except CalledProcessError as exception: + sentinel = "gpg: using" + error_lines = exception.output.decode().strip().split("\n") + uid = [line[-40:] for line in error_lines if sentinel in line] + uid = uid[-1] if uid else bus.fingerprint + warning = LookupError( + cls._signatures_public_key_isnt_in_local_keyring(uid) + if uid not in bus.keys() + else cls._BAD_PASSPHRASE_OR_KEY + ) + warning.value = uid + raise warning + + @classmethod + def unverifiable_message(cls, terminal, error): + """ + If some provided data isn't a valid signature then an error is + caused when verification is attempted. This method is run after + a verification error to inform the user. + """ + cls._raise_unexpected_error(error) + exception = PermissionError(cls._MESSAGE_IS_UNVERIFIABLE) + exception.value = terminal.bus.fingerprint + raise exception + + @classmethod + def key_isnt_importable(cls, terminal, error): + """ + Since gnupg can't import keys without user ID's, this method is + run after an import failure to inform the user of this bug in + GnuPG if it was the cause of the error. + """ + cls._raise_unexpected_error(error) + warning = KeyError( + cls._key_isnt_importable(terminal.bus.fingerprint) + ) + warning.value = terminal.bus.key + warning.output = error.output.decode() + raise warning if "no user ID" in warning.output else error + + class GnuPG: """ GnuPG - A linux-specific, small, simple & intuitive wrapper for @@ -319,15 +638,15 @@ def _searchserver(self): """ return self._keyserver + self._search_prefix - async def _raw_search(self, query=""): + async def _raw_search(self, query): """ Returns HTML of keyserver key search matching ``query`` uid. """ - url = f"{self._searchserver}{query}" + url = self._searchserver + query print(f"querying: {url}") return await self.network.get(url) - async def search(self, query=""): + async def search(self, query): """ Returns keyserver URL of the key found from ``query`` uid. """ @@ -366,29 +685,11 @@ def read_output(self, command=(), inputs=b"", **kw): """ Quotes terminal escape characters & runs user commands. """ - try: - return check_output( - [quote(part) for part in command], input=inputs, **kw - ).decode() - except Exception as source: - error = source - try: - kw.pop("stderr") if "stderr" in kw else 0 - check_output( - [quote(part) for part in command], - input=inputs, - stderr=STDOUT, - **kw, - ).decode() - except CalledProcessError as permissions_check: - notice = "Passphrase wrong, inexistent key, or invalid rights " - notice += "to access secret key." - warning = PermissionError(notice) - warning.inputs = inputs - warning.command = command - warning.output = permissions_check.output.decode() - print(warning.output) - raise warning if "Bad passphrase" in warning.output else error + with Terminal(if_exception=Error.no_permission) as terminal: + terminal.bus.kw = kw + terminal.bus.inputs = inputs + terminal.bus.command = command + return terminal.enter(command, inputs, **kw) def _reset_daemon(self): """ @@ -487,12 +788,12 @@ def delete(self, uid=""): "--command-fd", "0", "--delete-secret-keys", uid ) inputs = self.encode_inputs("y", "y") - self.read_output(command, inputs) + Terminal.enter(command, inputs) command = self.encode_command( "--command-fd", "0", "--delete-key", uid ) inputs = self.encode_inputs("y") - return self.read_output(command, inputs) + return Terminal.enter(command, inputs) def revoke(self, uid=""): """ @@ -526,46 +827,31 @@ def _raw_packets(self, target=""): ) command.remove("--batch") inputs = self.encode_inputs(target) - try: - return self.read_output(command, inputs, stderr=STDOUT) - except CalledProcessError as error: - notice = "Can't decrypt all packets without secret key." - warning = KeyError(notice) - warning.value = error.output.decode() - raise warning if "No secret key" in warning.value else error + with Terminal(if_exception=Error.secret_packets) as terminal: + return terminal.enter(command, inputs, stderr=STDOUT) def _list_packets(self, target=""): """ Returns OpenPGP metadata from ``target`` in list format. """ - try: - packets = self._raw_packets(target).strip().split("\n\t") - except KeyError as warning: - packets = warning.value.split("\n\t") - except CalledProcessError as warning: - notice = f"``target`` doesn't seem to be valid OpenPGP data." - error = TypeError(notice) - error.value = target - error.output = warning.output - raise error - return [packet.strip().split("\n") for packet in packets] + with Terminal(if_exception=Error.invalid_pgp_packets) as terminal: + terminal.bus.target = target + terminal.bus.packets = self._raw_packets(target) + return [ + packet.strip().split("\n") + for packet in terminal.bus.packets.strip().split("\n\t") + ] def _packet_fingerprint(self, target=""): """ Returns the sender's key fingerprint scraped from ``target``, a gpg message, key or signature. """ - try: - packets = self._raw_packets(target).replace(")", "") - except KeyError as warning: - packets = warning.value.replace(")", "") - except CalledProcessError as warning: - notice = f"``target`` doesn't seem to be valid OpenPGP data." - error = TypeError(notice) - error.value = target - error.output = warning.output - raise error - packets = packets.replace("key ID", "keyid") + with Terminal(if_exception=Error.invalid_pgp_packets) as terminal: + bus = terminal.bus + bus.target = target + bus.packets = self._raw_packets(target) + packets = bus.packets.replace("key ID", "keyid").replace(")", "") if "(issuer fpr" in packets: size = slice(-40, None) sentinel = "(issuer fpr" @@ -580,18 +866,17 @@ def _raw_list_keys(self, uid="", secret=False): """ Returns the terminal output of the --list-keys ``uid`` option. """ + if secret is not True and secret is not False: + raise TypeError(f"``secret`` != bool, {type(secret)} given.") secret = "secret-" if secret == True else "" if uid: command = self.encode_command(f"--list-{secret}keys", uid) else: command = self.encode_command(f"--list-{secret}keys") - try: - return self.read_output(command) - except CalledProcessError: - notice = f"UID '{uid}' not in package {secret}keyring" - warning = LookupError(notice) - warning.value = uid - raise warning + with Terminal(if_exception=Error.cannot_list_key) as terminal: + terminal.bus.uid = uid + terminal.bus.secret = secret + return terminal.enter(command) def _format_list_keys(self, raw_list_keys_terminal_output, secret): """ @@ -628,10 +913,10 @@ def key_email(self, uid=""): parts = self._raw_list_keys(uid).replace(" ", "") for part in parts.split("\nuid"): if "@" in part and "]" in part: - part = part[part.find("]") + 1 :] - if "<" in part and ">" in part: - part = part[part.find("<") + 1 : part.find(">")] - return part + email = part[part.find("]") + 1 :] + if "<" in email and ">" in email: + email = email[email.find("<") + 1 : email.find(">")] + return email def key_fingerprint(self, uid=""): """ @@ -663,9 +948,9 @@ def set_key_trust(self, uid="", level=5): "--edit-key", "--command-fd", "0", uid ) inputs = self.encode_inputs("trust", str(level), "y", "save") - return self.read_output(command, inputs) + return Terminal.enter(command, inputs) - def encrypt(self, message="", uid="", sign=True, local_user=""): + def encrypt(self, message="", uid="", *, sign=True, local_user=""): """ Encrypts ``message`` to key matching ``uid`` & signs with key matching ``local_user`` or defaults to instance key. Optionally, @@ -690,55 +975,55 @@ def encrypt(self, message="", uid="", sign=True, local_user=""): return self.read_output(command, inputs[:-1]) async def auto_encrypt( - self, message="", uid="", sign=True, local_user="" + self, message="", uid="", *, sign=True, local_user="" ): """ Queries keyserver before encryption if recipient's ``uid`` key isn't in the local keyring. """ try: - return self.encrypt(message, uid, sign, local_user) + return self.encrypt( + message, uid, sign=sign, local_user=local_user + ) except LookupError as uid: await self.network_import(uid.value) - return self.encrypt(message, uid.value, sign, local_user) + return self.encrypt( + message, uid, sign=sign, local_user=local_user + ) - def decrypt(self, message=""): + def decrypt(self, message="", *, local_user=None): """ Decrypts ``message`` autodetecting correct key from keyring. """ self._reset_daemon() fingerprint = self._packet_fingerprint(message) fingerprint = self.key_fingerprint(fingerprint) - try: - command = self.encode_command("-d", with_passphrase=True) - inputs = self.encode_inputs(self.user.passphrase, message) - return self.read_output(command, inputs) - except CalledProcessError: - pass - try: - self.read_output(command, inputs, stderr=STDOUT) - except CalledProcessError as error: - error_lines = error.output.decode().strip().split("\n") - sentinel = "gpg: using" - uid = [line[-40:] for line in error_lines if sentinel in line] - uid = uid[-1] if uid else "" - notice = f"UID '{uid}' not in the instance's keyring." - warning = LookupError(notice) - warning.value = uid if uid else fingerprint - raise warning + command = self.encode_command( + "--local-user", + local_user if local_user else self.fingerprint, + "-d", + with_passphrase=True, + ) + inputs = self.encode_inputs(self.user.passphrase, message) + with Terminal(if_exception=Error.no_signature_key) as terminal: + terminal.bus.inputs = inputs + terminal.bus.command = command + terminal.bus.keys = self.list_keys + terminal.bus.fingerprint = fingerprint + return terminal.enter(command, inputs) - async def auto_decrypt(self, message=""): + async def auto_decrypt(self, message="", *, local_user=None): """ Queries keyserver before decryption if ``message`` signature key isn't in the local keyring. """ try: - return self.decrypt(message) + return self.decrypt(message, local_user=local_user) except LookupError as fingerprint: await self.network_import(fingerprint.value) - return self.decrypt(message) + return self.decrypt(message, local_user=local_user) - def sign(self, target="", local_user="", *, key=False): + def sign(self, target="", *, local_user="", key=False): """ Signs key matching ``target`` uid with a key matching ``local_user`` uid or the instance default. Optionally signs ``target`` message @@ -774,15 +1059,11 @@ def verify(self, message=""): self._reset_daemon() fingerprint = self._packet_fingerprint(message) fingerprint = self.key_fingerprint(fingerprint) - try: - command = self.encode_command("--verify") - inputs = self.encode_inputs(message) - return self.read_output(command, inputs) - except CalledProcessError: - notice = f"``message`` is unverifiable." - error = PermissionError(notice) - error.value = fingerprint - raise error + command = self.encode_command("--verify") + inputs = self.encode_inputs(message) + with Terminal(if_exception=Error.unverifiable_message) as terminal: + terminal.bus.fingerprint = fingerprint + return terminal.enter(command, inputs) async def auto_verify(self, message=""): """ @@ -819,29 +1100,12 @@ def text_import(self, key=""): """ Imports the ``key`` string into the local keyring. """ - command_bugfix = self.encode_command( - "--import-options", "import-drop-uids", "--import" - ) - # "--import-options", "import-drop-uids" needed to allow import - # of keys without uids from Hagrid-like keyservers. Doesn't work - # b/c of a bug in GnuPG. Pass the option to allow the patch to - # take effect if/when one is available. - command = self.encode_command("--import") inputs = self.encode_inputs(key) - try: - fingerprint = self._packet_fingerprint(key) - return self.read_output(command_bugfix, inputs, stderr=STDOUT) - except CalledProcessError: - pass - try: - return self.read_output(command, inputs, stderr=STDOUT) - except CalledProcessError as error: - notice = f"{fingerprint} key isn't importable." - notice += " See https://dev.gnupg.org/T4393" - warning = KeyError(notice) - warning.value = key - warning.output = error.output.decode() - raise warning if "no user ID" in warning.output else error + command = self.encode_command("--import") + with Terminal(if_exception=Error.key_isnt_importable) as terminal: + terminal.bus.key = key + terminal.bus.fingerprint = self._packet_fingerprint(key) + return terminal.enter(command, inputs, stderr=STDOUT) async def _raw_api_export(self, uid=""): """ @@ -884,9 +1148,7 @@ async def network_export(self, uid=""): print(f"check {payload['addresses'][0]} for confirmation.") return response - def file_export( - self, path="", uid="", *, secret=False - ): + def file_export(self, path="", uid="", *, secret=False): """ Exports the public key matching ``uid`` to the ``path`` directory. If ``secret`` == True then exports the secret key that matches @@ -915,5 +1177,5 @@ def text_export(self, uid="", *, secret=False): command = self.encode_command("-a", "--export", uid) return self.read_output(command) else: - raise TypeError(f"``secret`` != boolean, {type(secret)} given") + raise TypeError(f"``secret`` != bool, {type(secret)} given.")