diff --git a/tiny_gnupg/tiny_gnupg.py b/tiny_gnupg/tiny_gnupg.py index 84565be..97087e2 100644 --- a/tiny_gnupg/tiny_gnupg.py +++ b/tiny_gnupg/tiny_gnupg.py @@ -24,9 +24,20 @@ class GnuPG: + """ + GnuPG - A linux specific, small, simple & intuitive wrapper for + creating, using and managing GnuPG's Ed-25519 curve keys. This class + favors reducing code size & complexity with strong, bias defaults + over flexibility in the api. It's designed to turn the complex, + legacy, but powerful gnupg system into a fun tool to develop with. + """ def __init__( self, username="", email="", passphrase="", torify=False ): + """ + Initialize an instance intended to create, manage, or represent + a single key in the local package gnupg keyring + """ self.set_homedir() self.email = email self.username = username @@ -36,14 +47,17 @@ def __init__( self.set_network_variables() def set_homedir(self, path=HOME_PATH): + """Initialize a home directory to store gpg2 binary & data""" self.home = self.format_homedir(path) self.executable = str(Path(self.home).absolute() / "gpg2") self.set_home_permissions(self.home) def format_homedir(self, path=HOME_PATH): + """Return an absolute path string for the home directory""" return str(Path(path).absolute()) def set_home_permissions(self, home): + """Set safer permissions on the home directory""" try: home = str(Path(home).absolute()) command = ["chmod", "-R", "700", home] @@ -52,6 +66,7 @@ def set_home_permissions(self, home): print(f"Invalid permission to modify home folder: {home}") def set_base_command(self, torify=False): + """Contruct the default commands used to call gnupg2""" torify = ["torify"] if torify else [] self.base_passphrase_command = torify + [ self.executable, @@ -79,6 +94,7 @@ def set_base_command(self, torify=False): ] def set_fingerprint(self, uid=""): + """Populate `fingerprint` attribute for persistent user""" try: self.fingerprint = self.key_fingerprint(uid) except: @@ -91,6 +107,7 @@ def set_network_variables( keyserver="http://zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad.onion", search="search?q=", ): + """Set network variables for adaptable implementations""" self.port = port self.tor_port = tor_port self._keyserver = keyserver @@ -100,22 +117,27 @@ def set_network_variables( @property def keyserver(self): + """Autoconstruct keyserver URL with adaptable port number""" return f"{self._keyserver}:{self.port}/" @property def keyserver_export_api(self): + """Autoconstruct specific keyserver key upload api URL""" return self.keyserver + "vks/v1/upload" @property def keyserver_verify_api(self): + """Autoconstruct specific keyserver key verification api URL""" return self.keyserver + "vks/v1/request-verify" @property def searchserver(self): + """Autoconstruct specific keyserver search URL""" return f"{self.keyserver}{self._search_string}" @property def connector(self): + """Autoconstruct an aiohttp_socks.SocksConnector instance""" return self._connector( socks_ver=SocksVer.SOCKS5, host="127.0.0.1", @@ -125,10 +147,12 @@ def connector(self): @property def session(self): + """Autoconstruct an aiohttp.ClientSession instance""" return self._session(connector=self.connector) @async_contextmanager async def network_get(self, url="", **kw): + """Opens a aiohttp.ClientSession.get context manager""" try: session = await self.session.__aenter__() yield await session.get(url, **kw) @@ -137,6 +161,7 @@ async def network_get(self, url="", **kw): @async_contextmanager async def network_post(self, url="", **kw): + """Opens a aiohttp.ClientSession.post context manager""" try: session = await self.session.__aenter__() yield await session.post(url, **kw) @@ -144,23 +169,28 @@ async def network_post(self, url="", **kw): await session.close() async def get(self, url="", **kw): + """Returns text of an aiohttp.ClientSession.get request""" async with self.network_get(url, **kw) as response: return await response.text() async def post(self, url="", **kw): + """Returns text of an aiohttp.ClientSession.post request""" async with self.network_post(url, **kw) as response: return await response.text() def command(self, *options, with_passphrase=False): + """Autoformats gpg2 commands soley from additional options""" if with_passphrase: return self.base_passphrase_command + [*options] else: return self.base_command + [*options] def encode_inputs(self, *inputs): + """Prepares inputs *X for subprocess.check_output(input=*X)""" return ("\n".join(inputs) + "\n").encode() def read_output(self, command=(), inputs=b"", shell=False): + """Quotes terminal escape characters & runs user commands""" return check_output( [quote(part) for part in command], input=inputs, @@ -168,6 +198,13 @@ def read_output(self, command=(), inputs=b"", shell=False): ).decode() def gen_key(self): + """ + Generates a set of ed25519 keys with isolated roles: + Main Key - Certification + Subkey - Signing + Subkey - Authentication + Subkey - Encryption + """ command = [ self.executable, "--yes", @@ -204,6 +241,13 @@ def gen_key(self): return self.add_subkeys(self.fingerprint) def add_subkeys(self, uid=""): + """ + Adds three subkeys with isolated roles to key matching `uid`: + `uid` Key + Subkey - Signing + Subkey - Authentication + Subkey - Encryption + """ command = self.command( "--command-fd", "0", @@ -234,6 +278,7 @@ def add_subkeys(self, uid=""): return self.read_output(command, inputs) def delete(self, uid=""): + """Deletes secret & public key matching `uid` from keyring""" uid = self.key_fingerprint(uid) # avoid non-fingerprint uid crash try: command = self.command( @@ -251,6 +296,7 @@ def delete(self, uid=""): return self.read_output(command, inputs) def revoke(self, uid=""): + """Imports & generates revocation cert for key matching `uid`""" command = self.command( "--command-fd", "0", @@ -264,6 +310,7 @@ def revoke(self, uid=""): return self.text_import(revoke_cert) def trust(self, uid="", level=5): + """Sets trust `level` to key matching `uid` in the keyring""" level = str(int(level)) if not 1 <= int(level) <= 5: raise ValueError("Trust levels must be between 1 and 5.") @@ -272,6 +319,11 @@ def trust(self, uid="", level=5): return self.read_output(command, inputs) def encrypt(self, message="", uid="", sign=True, local_user=""): + """ + Encrypts `message` to key matching `uid` & signs with key + matching `local_user` or defaults to instance key. Optionally + doesn't sign `message`. + """ uid = self.key_fingerprint(uid) # avoid wkd lookups command = self.command( "--command-fd", @@ -290,11 +342,17 @@ def encrypt(self, message="", uid="", sign=True, local_user=""): return self.read_output(command, inputs[:-1]) def decrypt(self, message=""): + """Decrypts `message` autodetecting correct key from keyring""" command = self.command("-d", with_passphrase=True) inputs = self.encode_inputs(self.passphrase, message) return self.read_output(command, inputs) def sign(self, target="", local_user="", *, key=False): + """ + Signs key matching `target` uid with a key matching `local_user` + uid or the instance default. Optionally signs `target` message + if `key`==False. + """ if key == True: # avoid truthiness command = self.command( "--local-user", @@ -317,11 +375,16 @@ def sign(self, target="", local_user="", *, key=False): return self.read_output(command, inputs) def verify(self, message=""): + """ + Verifies signed `message` if the corresponding public key is in + the local keyring + """ command = self.command("--verify") inputs = self.encode_inputs(message) return self.read_output(command, inputs) def raw_list_keys(self, uid=""): + """Returns the terminal output of the --list-keys `uid` option""" if uid: command = self.command("--list-keys", uid) else: @@ -329,6 +392,10 @@ def raw_list_keys(self, uid=""): return self.read_output(command) def format_list_keys(self, raw_list_keys_terminal_output): + """ + Returns a dict of fingerprints & email addresses scraped from + the terminal output of the --list-keys option + """ keys = raw_list_keys_terminal_output.split("\npub ") fingerprints = [ part[part.find("\nuid") - 40 : part.find("\nuid")] @@ -342,9 +409,14 @@ def format_list_keys(self, raw_list_keys_terminal_output): return dict(zip(fingerprints, emails)) def list_keys(self, uid=""): + """ + Returns a dict of fingerprints & email addresses of all keys in + the local keyring, or optionally the key matching `uid`. + """ return self.format_list_keys(self.raw_list_keys(uid)) def key_email(self, uid=""): + """Returns the email address on the key matching `uid`""" parts = self.raw_list_keys(uid).replace(" ", "") for part in parts.split("\nuid"): if "@" in part and "]" in part: @@ -354,15 +426,18 @@ def key_email(self, uid=""): return part def key_fingerprint(self, uid=""): + """Returns the fingerprint on the key matching `uid`""" key = self.list_keys(uid) return next(iter(key)) def key_trust(self, uid=""): + """Returns the current trust level on the key matching `uid`""" key = self.raw_list_keys(uid).replace(" ", "") trust = key[key.find("\nuid[") + 5 :] return trust[: trust.find("]")] def reset_daemon(self): + """Resets the gpg-agent daemon""" command = [ "gpgconf", "--homedir", @@ -376,11 +451,13 @@ def reset_daemon(self): return kill_output, reset_output async def raw_search(self, query=""): + """Returns HTML of keyserver key search matching `query` uid""" url = f"{self.searchserver}{query}" print(f"querying: {url}") return await self.get(url) async def search(self, query=""): + """Returns keyserver URL of the key found from `query` uid""" query = query.replace("@", "%40") response = await self.raw_search(query) if "We found an entry" not in response: @@ -389,6 +466,7 @@ async def search(self, query=""): return part[: part.find("")] async def network_import(self, uid=""): + """Imports the key matching `uid` from the keyserver.""" key_url = await self.search(uid) if not key_url: raise FileNotFoundError("No key found on server.") @@ -398,11 +476,13 @@ async def network_import(self, uid=""): return self.text_import(key) async def file_import(self, path="", mode="r"): + """Imports a key from the file located at `path`""" async with aiofiles.open(path, mode) as keyfile: key = await keyfile.read() return self.text_import(key) def text_import(self, key=""): + """Imports the `key` string into the local keyring""" command_bugfix = self.command( "--import-options", "import-drop-uids", "--import" ) @@ -416,6 +496,15 @@ def text_import(self, key=""): return self.read_output(command, inputs) async def raw_api_export(self, uid=""): + """ + Uploads the key matching `uid` to the keyserver. Returns a json + string + '''{ + "key-fpr": self.fingerprint, + "status": {self.email: "unpublished"}, + "token": api_token, + }''' + """ key = self.text_export(uid) url = self.keyserver_export_api print(f"contacting: {url}") @@ -424,11 +513,18 @@ async def raw_api_export(self, uid=""): return await self.post(url, json=payload) async def raw_api_verify(self, payload=""): + """ + Prompts the keyserver to verify the list of email addresses in + `payload`["addresses"] with the api_token in `payload`["token"]. + The keyserver then sends a confirmation email asking for consent + to publish the uid information with the key that was uploaded + """ url = self.keyserver_verify_api print(f"sending verification to: {url}") return await self.post(url, json=payload) async def network_export(self, uid=""): + """Exports the key matching `uid` to the keyserver""" response = json.loads(await self.raw_api_export(uid)) payload = { "addresses": [self.key_email(uid)], @@ -441,6 +537,10 @@ async def network_export(self, uid=""): async def file_export( self, path="", uid="", mode="w+", *, secret=False ): + """ + Exports the public key matching `uid` to the `path` directory. + If `secret`==True then exports the secret key that matches `uid` + """ key = self.text_export(uid, secret=secret) fingerprint = self.key_fingerprint(uid) filename = Path(path).absolute() / (fingerprint + ".asc") @@ -448,6 +548,11 @@ async def file_export( return await keyfile.write(key) def text_export(self, uid="", *, secret=False): + """ + Returns a public key string that matches `uid`. Optionally, + returns the secret key as a string that matches `uid` if + `secret`==True + """ if secret == True: # avoid truthiness command = self.command( "-a", "--export-secret-keys", uid, with_passphrase=True