Skip to content

Commit

Permalink
v0.4.6 update - all the docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
rmlibre committed Dec 17, 2019
1 parent 4ae9289 commit 0d8920d
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions tiny_gnupg/tiny_gnupg.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,20 @@


class GnuPG:
"""
GnuPG - A linux specific, small, simple & intuitive wrapper for
creating, using and managing GnuPG's Ed-25519 curve keys. This class
favors reducing code size & complexity with strong, bias defaults
over flexibility in the api. It's designed to turn the complex,
legacy, but powerful gnupg system into a fun tool to develop with.
"""
def __init__(
self, username="", email="", passphrase="", torify=False
):
"""
Initialize an instance intended to create, manage, or represent
a single key in the local package gnupg keyring
"""
self.set_homedir()
self.email = email
self.username = username
Expand All @@ -36,14 +47,17 @@ def __init__(
self.set_network_variables()

def set_homedir(self, path=HOME_PATH):
"""Initialize a home directory to store gpg2 binary & data"""
self.home = self.format_homedir(path)
self.executable = str(Path(self.home).absolute() / "gpg2")
self.set_home_permissions(self.home)

def format_homedir(self, path=HOME_PATH):
"""Return an absolute path string for the home directory"""
return str(Path(path).absolute())

def set_home_permissions(self, home):
"""Set safer permissions on the home directory"""
try:
home = str(Path(home).absolute())
command = ["chmod", "-R", "700", home]
Expand All @@ -52,6 +66,7 @@ def set_home_permissions(self, home):
print(f"Invalid permission to modify home folder: {home}")

def set_base_command(self, torify=False):
"""Contruct the default commands used to call gnupg2"""
torify = ["torify"] if torify else []
self.base_passphrase_command = torify + [
self.executable,
Expand Down Expand Up @@ -79,6 +94,7 @@ def set_base_command(self, torify=False):
]

def set_fingerprint(self, uid=""):
"""Populate `fingerprint` attribute for persistent user"""
try:
self.fingerprint = self.key_fingerprint(uid)
except:
Expand All @@ -91,6 +107,7 @@ def set_network_variables(
keyserver="http://zkaan2xfbuxia2wpf7ofnkbz6r5zdbbvxbunvp5g2iebopbfc4iqmbad.onion",
search="search?q=",
):
"""Set network variables for adaptable implementations"""
self.port = port
self.tor_port = tor_port
self._keyserver = keyserver
Expand All @@ -100,22 +117,27 @@ def set_network_variables(

@property
def keyserver(self):
"""Autoconstruct keyserver URL with adaptable port number"""
return f"{self._keyserver}:{self.port}/"

@property
def keyserver_export_api(self):
"""Autoconstruct specific keyserver key upload api URL"""
return self.keyserver + "vks/v1/upload"

@property
def keyserver_verify_api(self):
"""Autoconstruct specific keyserver key verification api URL"""
return self.keyserver + "vks/v1/request-verify"

@property
def searchserver(self):
"""Autoconstruct specific keyserver search URL"""
return f"{self.keyserver}{self._search_string}"

@property
def connector(self):
"""Autoconstruct an aiohttp_socks.SocksConnector instance"""
return self._connector(
socks_ver=SocksVer.SOCKS5,
host="127.0.0.1",
Expand All @@ -125,10 +147,12 @@ def connector(self):

@property
def session(self):
"""Autoconstruct an aiohttp.ClientSession instance"""
return self._session(connector=self.connector)

@async_contextmanager
async def network_get(self, url="", **kw):
"""Opens a aiohttp.ClientSession.get context manager"""
try:
session = await self.session.__aenter__()
yield await session.get(url, **kw)
Expand All @@ -137,37 +161,50 @@ async def network_get(self, url="", **kw):

@async_contextmanager
async def network_post(self, url="", **kw):
"""Opens a aiohttp.ClientSession.post context manager"""
try:
session = await self.session.__aenter__()
yield await session.post(url, **kw)
finally:
await session.close()

async def get(self, url="", **kw):
"""Returns text of an aiohttp.ClientSession.get request"""
async with self.network_get(url, **kw) as response:
return await response.text()

async def post(self, url="", **kw):
"""Returns text of an aiohttp.ClientSession.post request"""
async with self.network_post(url, **kw) as response:
return await response.text()

def command(self, *options, with_passphrase=False):
"""Autoformats gpg2 commands soley from additional options"""
if with_passphrase:
return self.base_passphrase_command + [*options]
else:
return self.base_command + [*options]

def encode_inputs(self, *inputs):
"""Prepares inputs *X for subprocess.check_output(input=*X)"""
return ("\n".join(inputs) + "\n").encode()

def read_output(self, command=(), inputs=b"", shell=False):
"""Quotes terminal escape characters & runs user commands"""
return check_output(
[quote(part) for part in command],
input=inputs,
shell=shell,
).decode()

def gen_key(self):
"""
Generates a set of ed25519 keys with isolated roles:
Main Key - Certification
Subkey - Signing
Subkey - Authentication
Subkey - Encryption
"""
command = [
self.executable,
"--yes",
Expand Down Expand Up @@ -204,6 +241,13 @@ def gen_key(self):
return self.add_subkeys(self.fingerprint)

def add_subkeys(self, uid=""):
"""
Adds three subkeys with isolated roles to key matching `uid`:
`uid` Key
Subkey - Signing
Subkey - Authentication
Subkey - Encryption
"""
command = self.command(
"--command-fd",
"0",
Expand Down Expand Up @@ -234,6 +278,7 @@ def add_subkeys(self, uid=""):
return self.read_output(command, inputs)

def delete(self, uid=""):
"""Deletes secret & public key matching `uid` from keyring"""
uid = self.key_fingerprint(uid) # avoid non-fingerprint uid crash
try:
command = self.command(
Expand All @@ -251,6 +296,7 @@ def delete(self, uid=""):
return self.read_output(command, inputs)

def revoke(self, uid=""):
"""Imports & generates revocation cert for key matching `uid`"""
command = self.command(
"--command-fd",
"0",
Expand All @@ -264,6 +310,7 @@ def revoke(self, uid=""):
return self.text_import(revoke_cert)

def trust(self, uid="", level=5):
"""Sets trust `level` to key matching `uid` in the keyring"""
level = str(int(level))
if not 1 <= int(level) <= 5:
raise ValueError("Trust levels must be between 1 and 5.")
Expand All @@ -272,6 +319,11 @@ def trust(self, uid="", level=5):
return self.read_output(command, inputs)

def encrypt(self, message="", uid="", sign=True, local_user=""):
"""
Encrypts `message` to key matching `uid` & signs with key
matching `local_user` or defaults to instance key. Optionally
doesn't sign `message`.
"""
uid = self.key_fingerprint(uid) # avoid wkd lookups
command = self.command(
"--command-fd",
Expand All @@ -290,11 +342,17 @@ def encrypt(self, message="", uid="", sign=True, local_user=""):
return self.read_output(command, inputs[:-1])

def decrypt(self, message=""):
"""Decrypts `message` autodetecting correct key from keyring"""
command = self.command("-d", with_passphrase=True)
inputs = self.encode_inputs(self.passphrase, message)
return self.read_output(command, inputs)

def sign(self, target="", local_user="", *, key=False):
"""
Signs key matching `target` uid with a key matching `local_user`
uid or the instance default. Optionally signs `target` message
if `key`==False.
"""
if key == True: # avoid truthiness
command = self.command(
"--local-user",
Expand All @@ -317,18 +375,27 @@ def sign(self, target="", local_user="", *, key=False):
return self.read_output(command, inputs)

def verify(self, message=""):
"""
Verifies signed `message` if the corresponding public key is in
the local keyring
"""
command = self.command("--verify")
inputs = self.encode_inputs(message)
return self.read_output(command, inputs)

def raw_list_keys(self, uid=""):
"""Returns the terminal output of the --list-keys `uid` option"""
if uid:
command = self.command("--list-keys", uid)
else:
command = self.command("--list-keys")
return self.read_output(command)

def format_list_keys(self, raw_list_keys_terminal_output):
"""
Returns a dict of fingerprints & email addresses scraped from
the terminal output of the --list-keys option
"""
keys = raw_list_keys_terminal_output.split("\npub ")
fingerprints = [
part[part.find("\nuid") - 40 : part.find("\nuid")]
Expand All @@ -342,9 +409,14 @@ def format_list_keys(self, raw_list_keys_terminal_output):
return dict(zip(fingerprints, emails))

def list_keys(self, uid=""):
"""
Returns a dict of fingerprints & email addresses of all keys in
the local keyring, or optionally the key matching `uid`.
"""
return self.format_list_keys(self.raw_list_keys(uid))

def key_email(self, uid=""):
"""Returns the email address on the key matching `uid`"""
parts = self.raw_list_keys(uid).replace(" ", "")
for part in parts.split("\nuid"):
if "@" in part and "]" in part:
Expand All @@ -354,15 +426,18 @@ def key_email(self, uid=""):
return part

def key_fingerprint(self, uid=""):
"""Returns the fingerprint on the key matching `uid`"""
key = self.list_keys(uid)
return next(iter(key))

def key_trust(self, uid=""):
"""Returns the current trust level on the key matching `uid`"""
key = self.raw_list_keys(uid).replace(" ", "")
trust = key[key.find("\nuid[") + 5 :]
return trust[: trust.find("]")]

def reset_daemon(self):
"""Resets the gpg-agent daemon"""
command = [
"gpgconf",
"--homedir",
Expand All @@ -376,11 +451,13 @@ def reset_daemon(self):
return kill_output, reset_output

async def raw_search(self, query=""):
"""Returns HTML of keyserver key search matching `query` uid"""
url = f"{self.searchserver}{query}"
print(f"querying: {url}")
return await self.get(url)

async def search(self, query=""):
"""Returns keyserver URL of the key found from `query` uid"""
query = query.replace("@", "%40")
response = await self.raw_search(query)
if "We found an entry" not in response:
Expand All @@ -389,6 +466,7 @@ async def search(self, query=""):
return part[: part.find("</a>")]

async def network_import(self, uid=""):
"""Imports the key matching `uid` from the keyserver."""
key_url = await self.search(uid)
if not key_url:
raise FileNotFoundError("No key found on server.")
Expand All @@ -398,11 +476,13 @@ async def network_import(self, uid=""):
return self.text_import(key)

async def file_import(self, path="", mode="r"):
"""Imports a key from the file located at `path`"""
async with aiofiles.open(path, mode) as keyfile:
key = await keyfile.read()
return self.text_import(key)

def text_import(self, key=""):
"""Imports the `key` string into the local keyring"""
command_bugfix = self.command(
"--import-options", "import-drop-uids", "--import"
)
Expand All @@ -416,6 +496,15 @@ def text_import(self, key=""):
return self.read_output(command, inputs)

async def raw_api_export(self, uid=""):
"""
Uploads the key matching `uid` to the keyserver. Returns a json
string
'''{
"key-fpr": self.fingerprint,
"status": {self.email: "unpublished"},
"token": api_token,
}'''
"""
key = self.text_export(uid)
url = self.keyserver_export_api
print(f"contacting: {url}")
Expand All @@ -424,11 +513,18 @@ async def raw_api_export(self, uid=""):
return await self.post(url, json=payload)

async def raw_api_verify(self, payload=""):
"""
Prompts the keyserver to verify the list of email addresses in
`payload`["addresses"] with the api_token in `payload`["token"].
The keyserver then sends a confirmation email asking for consent
to publish the uid information with the key that was uploaded
"""
url = self.keyserver_verify_api
print(f"sending verification to: {url}")
return await self.post(url, json=payload)

async def network_export(self, uid=""):
"""Exports the key matching `uid` to the keyserver"""
response = json.loads(await self.raw_api_export(uid))
payload = {
"addresses": [self.key_email(uid)],
Expand All @@ -441,13 +537,22 @@ async def network_export(self, uid=""):
async def file_export(
self, path="", uid="", mode="w+", *, secret=False
):
"""
Exports the public key matching `uid` to the `path` directory.
If `secret`==True then exports the secret key that matches `uid`
"""
key = self.text_export(uid, secret=secret)
fingerprint = self.key_fingerprint(uid)
filename = Path(path).absolute() / (fingerprint + ".asc")
async with aiofiles.open(filename, mode) as keyfile:
return await keyfile.write(key)

def text_export(self, uid="", *, secret=False):
"""
Returns a public key string that matches `uid`. Optionally,
returns the secret key as a string that matches `uid` if
`secret`==True
"""
if secret == True: # avoid truthiness
command = self.command(
"-a", "--export-secret-keys", uid, with_passphrase=True
Expand Down

0 comments on commit 0d8920d

Please sign in to comment.