From 27fb633bbe45321eecd8225c32a2fd16882633a9 Mon Sep 17 00:00:00 2001 From: Nataliia Solomko Date: Tue, 13 Aug 2024 13:15:15 +0300 Subject: [PATCH] T5743: HTTPS API ability to import PKI certificates --- data/templates/https/nginx.default.j2 | 2 +- python/vyos/configsession.py | 11 +++++ python/vyos/pki.py | 2 +- src/op_mode/pki.py | 33 ++++++++------ src/services/vyos-http-api-server | 62 +++++++++++++++++++++++++++ 5 files changed, 96 insertions(+), 14 deletions(-) diff --git a/data/templates/https/nginx.default.j2 b/data/templates/https/nginx.default.j2 index 4619361e57..1dde66ebf1 100644 --- a/data/templates/https/nginx.default.j2 +++ b/data/templates/https/nginx.default.j2 @@ -48,7 +48,7 @@ server { ssl_ciphers 'ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:DHE-RSA-AES256-SHA:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!3DES:!MD5:!PSK'; # proxy settings for HTTP API, if enabled; 503, if not - location ~ ^/(retrieve|configure|config-file|image|container-image|generate|show|reboot|reset|poweroff|docs|openapi.json|redoc|graphql) { + location ~ ^/(retrieve|configure|config-file|image|import-pki|container-image|generate|show|reboot|reset|poweroff|docs|openapi.json|redoc|graphql) { {% if api is vyos_defined %} proxy_pass http://unix:/run/api.sock; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index ccf2ce8f26..7d51b94e40 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -34,6 +34,9 @@ SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py'] INSTALL_IMAGE = ['/usr/libexec/vyos/op_mode/image_installer.py', '--action', 'add', '--no-prompt', '--image-path'] +IMPORT_PKI = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'import'] +IMPORT_PKI_NO_PROMPT = ['/usr/libexec/vyos/op_mode/pki.py', + '--action', 'import', '--no-prompt'] REMOVE_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py', '--action', 'delete', '--no-prompt', '--image-name'] SET_DEFAULT_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py', @@ -239,6 +242,14 @@ def remove_image(self, name): out = self.__run_command(REMOVE_IMAGE + [name]) return out + def import_pki(self, path): + out = self.__run_command(IMPORT_PKI + path) + return out + + def import_pki_no_prompt(self, path): + out = self.__run_command(IMPORT_PKI_NO_PROMPT + path) + return out + def set_default_image(self, name): out = self.__run_command(SET_DEFAULT_IMAGE + [name]) return out diff --git a/python/vyos/pki.py b/python/vyos/pki.py index 27fe793a88..5a0e2ddda7 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -271,7 +271,7 @@ def load_private_key(raw_data, passphrase=None, wrap_tags=True): try: return serialization.load_pem_private_key(bytes(raw_data, 'utf-8'), password=passphrase) - except ValueError: + except (ValueError, TypeError): return False def load_openssh_public_key(raw_data, type): diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 84b0800235..b1a42d6c35 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -693,7 +693,7 @@ def generate_wireguard_psk(interface=None, peer=None, install=False): print(f'Pre-shared key: {psk}') # Import functions -def import_ca_certificate(name, path=None, key_path=None): +def import_ca_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -717,19 +717,20 @@ def import_ca_certificate(name, path=None, key_path=None): return key = None - passphrase = ask_input('Enter private key passphrase: ') or None + if not no_prompt: + passphrase = ask_input('Enter private key passphrase: ') or None with open(key_path) as f: key_data = f.read() key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) if not key: - print(f'Invalid private key or passphrase: {path}') + print(f'Invalid private key or passphrase: {key_path}') return install_certificate(name, private_key=key, is_ca=True) -def import_certificate(name, path=None, key_path=None): +def import_certificate(name, path=None, key_path=None, no_prompt=False, passphrase=None): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -753,14 +754,15 @@ def import_certificate(name, path=None, key_path=None): return key = None - passphrase = ask_input('Enter private key passphrase: ') or None + if not no_prompt: + passphrase = ask_input('Enter private key passphrase: ') or None with open(key_path) as f: key_data = f.read() key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) if not key: - print(f'Invalid private key or passphrase: {path}') + print(f'Invalid private key or passphrase: {key_path}') return install_certificate(name, private_key=key, is_ca=False) @@ -799,7 +801,7 @@ def import_dh_parameters(name, path): install_dh_parameters(name, dh) -def import_keypair(name, path=None, key_path=None): +def import_keypair(name, path=None, key_path=None, no_prompt=False, passphrase=None): if path: if not os.path.exists(path): print(f'File not found: {path}') @@ -823,14 +825,15 @@ def import_keypair(name, path=None, key_path=None): return key = None - passphrase = ask_input('Enter private key passphrase: ') or None + if not no_prompt: + passphrase = ask_input('Enter private key passphrase: ') or None with open(key_path) as f: key_data = f.read() key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) if not key: - print(f'Invalid private key or passphrase: {path}') + print(f'Invalid private key or passphrase: {key_path}') return install_keypair(name, None, private_key=key, prompt=False) @@ -1011,6 +1014,9 @@ def show_crl(name=None, pem=False): parser.add_argument('--filename', help='Write certificate into specified filename', action='store') parser.add_argument('--key-filename', help='Write key into specified filename', action='store') + parser.add_argument('--no-prompt', action='store_true', help='Perform action non-interactively') + parser.add_argument('--passphrase', help='A passphrase to decrypt the private key') + args = parser.parse_args() try: @@ -1054,15 +1060,18 @@ def show_crl(name=None, pem=False): generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) elif args.action == 'import': if args.ca: - import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename) + import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename, + no_prompt=args.no_prompt, passphrase=args.passphrase) elif args.certificate: - import_certificate(args.certificate, path=args.filename, key_path=args.key_filename) + import_certificate(args.certificate, path=args.filename, key_path=args.key_filename, + no_prompt=args.no_prompt, passphrase=args.passphrase) elif args.crl: import_crl(args.crl, args.filename) elif args.dh: import_dh_parameters(args.dh, args.filename) elif args.keypair: - import_keypair(args.keypair, path=args.filename, key_path=args.key_filename) + import_keypair(args.keypair, path=args.filename, key_path=args.key_filename, + no_prompt=args.no_prompt, passphrase=args.passphrase) elif args.openvpn: import_openvpn_secret(args.openvpn, args.filename) elif args.action == 'show': diff --git a/src/services/vyos-http-api-server b/src/services/vyos-http-api-server index 7f5233c6b5..97633577d3 100755 --- a/src/services/vyos-http-api-server +++ b/src/services/vyos-http-api-server @@ -212,6 +212,22 @@ class ImageModel(ApiModel): } } +class ImportPkiModel(ApiModel): + op: StrictStr + path: List[StrictStr] + passphrase: StrictStr = None + + class Config: + schema_extra = { + "example": { + "key": "id_key", + "op": "import_pki", + "path": ["op", "mode", "path"], + "passphrase": "passphrase", + } + } + + class ContainerImageModel(ApiModel): op: StrictStr name: StrictStr = None @@ -585,6 +601,14 @@ def _configure_op(data: Union[ConfigureModel, ConfigureListModel, return success(msg) +def create_path_import_pki_no_prompt(path): + correct_paths = ['ca', 'certificate', 'key-pair'] + if path[1] not in correct_paths: + return False + path[1] = '--' + path[1].replace('-', '') + path[3] = '--key-filename' + return path[1:] + @app.post('/configure') def configure_op(data: Union[ConfigureModel, ConfigureListModel], @@ -814,6 +838,44 @@ def reset_op(data: ResetModel): return success(res) +@app.post('/import-pki') +def import_pki(data: ImportPkiModel): + session = app.state.vyos_session + + op = data.op + path = data.path + + lock.acquire() + + try: + if op == 'import-pki': + # need to get rid or interactive mode for private key + if len(path) == 5 and path[3] in ['key-file', 'private-key']: + path_no_prompt = create_path_import_pki_no_prompt(path) + if not path_no_prompt: + return error(400, f"Invalid command: {' '.join(path)}") + if data.passphrase: + path_no_prompt += ['--passphrase', data.passphrase] + res = session.import_pki_no_prompt(path_no_prompt) + else: + res = session.import_pki(path) + if not res[0].isdigit(): + return error(400, res) + # commit changes + session.commit() + res = res.split('. ')[0] + else: + return error(400, f"'{op}' is not a valid operation") + except ConfigSessionError as e: + return error(400, str(e)) + except Exception as e: + logger.critical(traceback.format_exc()) + return error(500, "An internal error occured. Check the logs for details.") + finally: + lock.release() + + return success(res) + @app.post('/poweroff') def poweroff_op(data: PoweroffModel): session = app.state.vyos_session