diff --git a/oarepo_c4gh/key/gpg_agent.py b/oarepo_c4gh/key/gpg_agent.py index 185c4a1..ef7b949 100644 --- a/oarepo_c4gh/key/gpg_agent.py +++ b/oarepo_c4gh/key/gpg_agent.py @@ -147,24 +147,22 @@ def ensure_public_key(self): key_dgram = client.recv(4096) key_line, key_rest = line_from_dgram(key_dgram) key_struct = parse_binary_sexp(key_line[2:]) - if key_struct is None: - continue - if len(key_struct) < 2: - continue - if key_struct[0] != b"public-key": - continue - if len(key_struct[1]) < 1: - continue - if key_struct[1][0] != b"ecc": + if ( + (key_struct is None) + or (len(key_struct) < 2) + or (key_struct[0] != b"public-key") + or (len(key_struct[1])) < 1 + or (key_struct[1][0] != b"ecc") + ): continue curve_struct = next( v for v in key_struct[1][1:] if v[0] == b"curve" ) - if curve_struct is None: - continue - if len(curve_struct) < 2: - continue - if curve_struct[1] != b"Curve25519": + if ( + (curve_struct is None) + or (len(curve_struct) < 2) + or (curve_struct[1] != b"Curve25519") + ): continue q_struct = next(v for v in key_struct[1][1:] if v[0] == b"q") if q_struct is None: diff --git a/tests/test_key_gpg_agent.py b/tests/test_key_gpg_agent.py index 3a32d5f..9ce5716 100644 --- a/tests/test_key_gpg_agent.py +++ b/tests/test_key_gpg_agent.py @@ -6,6 +6,8 @@ compute_run_gnupg_base, expect_assuan_OK, parse_binary_sexp, + encode_assuan_buffer, + decode_assuan_buffer, ) from oarepo_c4gh.exceptions import Crypt4GHKeyException import socket @@ -30,6 +32,8 @@ def test_assuan_expect(self): self.assertRaises(Crypt4GHKeyException, lambda: expect_assuan_OK(r)) w.send(b"OK\nnoise") self.assertRaises(Crypt4GHKeyException, lambda: expect_assuan_OK(r)) + r.close() + w.close() def test_assuan_binary_sexps(self): assert ( @@ -42,6 +46,25 @@ def test_assuan_binary_sexps(self): parse_binary_sexp(b":") is None ), "Error handling malformed S-Expression 2" + def test_assuan_binary_encoding(self): + line = b"25%\r\n" + eline = encode_assuan_buffer(line) + assert eline == b"25%25%0D%0A", "Assuan encoding mismatch" + reline = decode_assuan_buffer(eline) + assert reline == line, "Assuan encoding round-trip mismatch" + + def test_connection_error(self): + key = GPGAgentKey(socket_path="/dev/null") + self.assertRaises(Crypt4GHKeyException, lambda: key.public_key) + + def test_empty_keygrips(self): + tempdir = tempfile.TemporaryDirectory() + homedir = tempdir.name + os.system(f"gpg-agent --homedir {homedir} --daemon") + key = GPGAgentKey(home_dir=homedir) + self.assertRaises(Crypt4GHKeyException, lambda: key.public_key) + tempdir.cleanup() + def test_everything(self): tempdir = tempfile.TemporaryDirectory() homedir = tempdir.name @@ -51,6 +74,7 @@ def test_everything(self): os.system(f"gpg-agent --homedir {homedir} --daemon") key = GPGAgentKey(home_dir=homedir) key.public_key + tempdir.cleanup() if __name__ == "__main__":