From 11984454c59e746cc1cf7d30ad8f603e6502e2c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20H=C3=B6sel?= Date: Sun, 7 Apr 2024 20:47:21 +0200 Subject: [PATCH] add get_certificate_info helper (#400) Move duplicate code from certificate and certificate_info to a common py file so that we can reuse it in the future (such as for a lookup plugin). --- plugins/module_utils/helpers.py | 64 ++++++++++++++++++++++++ plugins/modules/step_ca_certificate.py | 26 ++++------ plugins/modules/step_certificate_info.py | 49 ++++++++---------- 3 files changed, 96 insertions(+), 43 deletions(-) create mode 100644 plugins/module_utils/helpers.py diff --git a/plugins/module_utils/helpers.py b/plugins/module_utils/helpers.py new file mode 100644 index 00000000..69e521d9 --- /dev/null +++ b/plugins/module_utils/helpers.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +import json +from pathlib import Path +from typing import Dict, Any + +from ansible.module_utils.basic import AnsibleModule +from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable + + +@dataclass +class CertificateInfo: + data: Dict[str, Any] + valid: bool + invalid_reason: str = "" + + +def get_certificate_info( + executable: StepCliExecutable, module: AnsibleModule, path: Path, + bundle: bool = False, insecure: bool = False, server_name: str = "", roots: str = "" +) -> CertificateInfo: + """Retrieve information about a certificate and return step-cli json-formatted information + + Args: + executable (StepCliExecutable): The executable to run this command with + module (AnsibleModule): The Ansible module + path (Path): Path to the certificate + bundle (bool, optional): See step-cli docs. Defaults to False. + insecure (bool, optional): See step-cli docs. Defaults to False. + server_name (str, optional): See step-cli docs. Defaults to "". + roots (str, optional): See step-cli docs. Defaults to "". + + Returns: + CertificateInfo: The JSON information as output by step-cli as well as validity information + """ + inspect_args = ["certificate", "inspect", path, "--format", "json"] + if bundle: + inspect_args.append("--bundle") + if insecure: + inspect_args.append("--insecure") + if server_name: + inspect_args.extend(["--server-name", server_name]) + if roots: + inspect_args.extend(["--roots", roots]) + + inspect_cmd = CliCommand(executable, inspect_args, run_in_check_mode=True) + inspect_res = inspect_cmd.run(module) + # The docs say inspect outputs to stderr, but my shell says otherwise: + # https://github.com/smallstep/cli/issues/1032 + try: + data = json.loads(inspect_res.stdout) + except json.JSONDecodeError as e: + module.fail_json(f"Unable to decode returned certificate information. Error: {e}") + + verify_args = ["certificate", "verify", path] + if server_name: + verify_args.extend(["--server-name", server_name]) + if roots: + verify_args.extend(["--roots", roots]) + verify_cmd = CliCommand(executable, verify_args, run_in_check_mode=True, fail_on_error=False) + verify_res = verify_cmd.run(module) + valid = verify_res.rc == 0 + invalid_reason = "" if valid else verify_res.stderr + + return CertificateInfo(data, valid, invalid_reason) diff --git a/plugins/modules/step_ca_certificate.py b/plugins/modules/step_ca_certificate.py index 63c37ea5..428dcad9 100644 --- a/plugins/modules/step_ca_certificate.py +++ b/plugins/modules/step_ca_certificate.py @@ -260,7 +260,6 @@ state: absent revoke_on_delete: true """ -import json from pathlib import Path from typing import cast, Dict, Any @@ -269,6 +268,7 @@ from ..module_utils.params.ca_connection import CaConnectionParams from ..module_utils.cli_wrapper import CliCommand, StepCliExecutable +from ..module_utils import helpers from ..module_utils.constants import DEFAULT_STEP_CLI_EXECUTABLE # maps the kty cli parameter to inspect outputs subject_key_info.key_algorithm.name @@ -312,29 +312,25 @@ def cert_needs_recreation(executable: StepCliExecutable, module: AnsibleModule) str: Reason for certificate recreation, or empty string if no recreation is needed """ module_params = cast(Dict, module.params) - verify_args = ["certificate", "verify", module_params["crt_file"]] - if module_params["verify_roots"]: - verify_args.extend(["--roots", module_params["verify_roots"]]) - verify_cmd = CliCommand(executable, verify_args, fail_on_error=False, run_in_check_mode=True) - res = verify_cmd.run(module) - if res.rc != 0: - return res.stderr + cert_info = helpers.get_certificate_info( + executable, module, module_params["crt_file"], roots=module_params["verify_roots"]) - info_cmd = CliCommand(executable, ["certificate", "inspect", module_params["crt_file"], - "--format", "json"], run_in_check_mode=True) - info_res = info_cmd.run(module) + # certificate is invalid + if not cert_info.valid: + return cert_info.invalid_reason - cert_info = json.loads(info_res.stdout) - key_info = cert_info["subject_key_info"] + key_info = cert_info.data["subject_key_info"] current_kty = key_info["key_algorithm"]["name"] + # ensure SANs match if module_params["san"]: desired_sans = sorted(list(set([module_params["name"]] + module_params["san"]))) - current_sans = sorted(cert_info["names"]) + current_sans = sorted(cert_info.data["names"]) if current_sans != desired_sans: - return f"Certificate names have changed from {cert_info['names']} to {desired_sans}" + return f"Certificate names have changed from {cert_info.data['names']} to {desired_sans}" + # Ensure key type matches if module_params["kty"]: if current_kty != CERTINFO_KEY_TYPES[module_params["kty"]]: return f"Key type has changed from {current_kty} to {CERTINFO_KEY_TYPES[module_params['kty']]}" diff --git a/plugins/modules/step_certificate_info.py b/plugins/modules/step_certificate_info.py index ce3f386c..f22020f0 100644 --- a/plugins/modules/step_certificate_info.py +++ b/plugins/modules/step_certificate_info.py @@ -97,16 +97,15 @@ type: str returned: When I(valid=false) """ -import json from typing import cast, Dict, Any from ansible.module_utils.basic import AnsibleModule from ..module_utils.cli_wrapper import StepCliExecutable, CliCommand +from ..module_utils import helpers from ..module_utils.constants import DEFAULT_STEP_CLI_EXECUTABLE FORMAT_CLIARGS = { - "json": ["--format", "json"], "pem": ["--format", "pem"], "text": ["--format", "text"], "text-short": ["--format", "text", "--short"], @@ -119,23 +118,17 @@ } -def verify(executable: StepCliExecutable, module: AnsibleModule, path: str) -> Dict[str, Any]: - verify_cliarg_map = { - "server_name": "--server-name", - "roots": "--roots", - } - cmd = CliCommand(executable, ["certificate", "verify", path], verify_cliarg_map, fail_on_error=False) - res = cmd.run(module) - - return {"valid": True} if res.rc == 0 else { - "valid": False, - "validity_fail_reason": res.stderr - } +def inspect_non_json(executable: StepCliExecutable, module: AnsibleModule) -> str: + """Run step-cli certificate inspect and return data for a non-json format + Args: + executable (StepCliExecutable): Executable to run with + module (AnsibleModule): ansible module -def inspect(executable: StepCliExecutable, module: AnsibleModule) -> Dict[str, Any]: + Returns: + str: stdout data from step-cli certificate inspect + """ module_params = cast(Dict, module.params) - result = {} certificate_info_cliarg_map = { "bundle": "--bundle", "insecure": "--insecure", @@ -148,13 +141,7 @@ def inspect(executable: StepCliExecutable, module: AnsibleModule) -> Dict[str, A # The docs say inspect outputs to stderr, but my shell says otherwise: # https://github.com/smallstep/cli/issues/1032 res = cmd.run(module) - if module_params["format"] == "json": - data = json.loads(res.stdout) - else: - data = res.stdout - - result[RESULT_FORMAT_KEYNAME[module_params["format"]]] = data - return result + return res.stdout def main(): @@ -175,11 +162,17 @@ def main(): executable = StepCliExecutable(module, module_params["step_cli_executable"]) - try: - result.update(inspect(executable, module)) - except json.JSONDecodeError as e: - module.fail_json(f"Unable to decode returned certificate information. Error: {e}") - result.update(verify(executable, module, module_params["path"])) + cert_info = helpers.get_certificate_info(executable, module, module_params["path"], + bundle=module_params["bundle"], + insecure=module_params["insecure"], + server_name=module_params["server_name"], + roots=module_params["roots"]) + data = cert_info.data if module_params["format"] == "json" else inspect_non_json(executable, module) + result.update({ + "valid": cert_info.valid, + "validity_fail_reason": cert_info.invalid_reason, + RESULT_FORMAT_KEYNAME[module_params["format"]]: data + }) module.exit_json(**result)