From 61de0f5b1ed393c880844b9a3ad40bdd3afd02af Mon Sep 17 00:00:00 2001 From: Radek Vykydal Date: Fri, 6 Dec 2024 09:24:51 +0100 Subject: [PATCH] edns: implement support for --category Support only 'global' category for now. Run 'trust anchor' on the certificate dumped into file. If --path is defined the certificate is dumped to the path, if not predefined runtime path is used. If the trust tool is missing, raise exception unless in pre-install phase where it is expected. NOTE: it is possible that in the future the import tool (trust) will be run with parameter (derived from) --category value --- dracut/parse-kickstart | 37 +++++- .../modules/common/structures/security.py | 10 ++ .../security/certificates/certificates.py | 9 +- .../security/certificates/installation.py | 58 +++++++- scripts/anaconda-import-initramfs-certs | 3 + .../security/test_module_certificates.py | 124 +++++++++++++++++- .../modules/security/test_module_security.py | 4 +- 7 files changed, 231 insertions(+), 14 deletions(-) diff --git a/dracut/parse-kickstart b/dracut/parse-kickstart index d823818476e..75c28db5d14 100755 --- a/dracut/parse-kickstart +++ b/dracut/parse-kickstart @@ -32,6 +32,7 @@ os.urandom = ks_random import sys import logging import glob +import subprocess from pykickstart.parser import KickstartParser, preprocessKickstart from pykickstart.sections import NullSection from pykickstart.version import returnClassForVersion @@ -429,6 +430,19 @@ def _dump_certificate(cert, root="/", dump_dir=None): f.write('\n') +def _import_certificate(path): + """Import the certificate into global store.""" + log.debug("Importing certificate %s.", path) + + CA_IMPORT_TOOL = '/usr/sbin/trust' + if not os.path.exists(CA_IMPORT_TOOL): + msg = "{} is missing. Cannot import certificate.".format(CA_IMPORT_TOOL) + log.error(msg) + return + + subprocess.check_call([CA_IMPORT_TOOL, "anchor", path]) + + def process_certificates(handler): """Import certificates defined in %certificate sections.""" for cert in handler.certificates: @@ -439,9 +453,26 @@ def process_certificates(handler): log.error("Missing certificate file name, skipping.") continue - _dump_certificate(cert) - # Dump for transport to switchroot - _dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/") + category = None + if hasattr(cert, "category") and cert.category: + category = cert.category + + if category is None: + _dump_certificate(cert) + # Dump for transport to switchroot + _dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/") + elif category == "global": + if cert.dir: + _dump_certificate(cert) + # Dump for transport to switchroot + _dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/") + # Dump for import and transport to switchroot + import_dir = CERT_TRANSPORT_DIR+"/category/"+category + _dump_certificate(cert, dump_dir=import_dir) + _import_certificate(import_dir) + else: + log.error("Invalid category %s, skipping", category) + continue def process_kickstart(ksfile): diff --git a/pyanaconda/modules/common/structures/security.py b/pyanaconda/modules/common/structures/security.py index c0c93629ba1..18f82c24395 100644 --- a/pyanaconda/modules/common/structures/security.py +++ b/pyanaconda/modules/common/structures/security.py @@ -29,6 +29,7 @@ def __init__(self): self._filename = "" self._cert = "" self._dir = "" + self._category = "" @property def filename(self) -> Str: @@ -56,3 +57,12 @@ def dir(self) -> Str: @dir.setter def dir(self, value: Str) -> None: self._dir = value + + @property + def category(self) -> Str: + """The certificate category.""" + return self._category + + @category.setter + def category(self, value: Str) -> None: + self._category = value diff --git a/pyanaconda/modules/security/certificates/certificates.py b/pyanaconda/modules/security/certificates/certificates.py index 0ca989b00fa..95b3ff45285 100644 --- a/pyanaconda/modules/security/certificates/certificates.py +++ b/pyanaconda/modules/security/certificates/certificates.py @@ -54,13 +54,20 @@ def process_kickstart(self, data): cert_data.cert = cert.cert if cert.dir: cert_data.dir = cert.dir + if cert.category: + cert_data.category = cert.category certificates.append(cert_data) self.set_certificates(certificates) def setup_kickstart(self, data): """Setup the kickstart data.""" for cert in self._certificates: - cert_ksdata = Certificate(cert=cert.cert, filename=cert.filename, dir=cert.dir) + cert_ksdata = Certificate( + cert=cert.cert, + filename=cert.filename, + dir=cert.dir, + category=cert.category, + ) data.certificates.append(cert_ksdata) @property diff --git a/pyanaconda/modules/security/certificates/installation.py b/pyanaconda/modules/security/certificates/installation.py index f71576f4dfe..02429917fec 100644 --- a/pyanaconda/modules/security/certificates/installation.py +++ b/pyanaconda/modules/security/certificates/installation.py @@ -20,18 +20,24 @@ from pyanaconda.anaconda_loggers import get_module_logger from pyanaconda.modules.common.task import Task from pyanaconda.modules.common.errors.installation import SecurityInstallationError +from pyanaconda.core import util from pyanaconda.core.path import make_directories, join_paths from pyanaconda.core.constants import PAYLOAD_TYPE_DNF, INSTALLATION_PHASE_PREINSTALL log = get_module_logger(__name__) +CA_IMPORT_TOOL = "/usr/bin/trust" + class ImportCertificatesTask(Task): """Task for importing certificates into a system. - Currently it just dumps the file to the path. + Dump the certificate into the specified file and directory and/or + import the certificate using a tool. """ + CERT_DIR_CATEGORY_GLOBAL = "/run/install/certificates/category/global" + def __init__(self, sysroot, certificates, payload_type=None, phase=None): """Create a new certificates import task. @@ -50,15 +56,18 @@ def __init__(self, sysroot, certificates, payload_type=None, phase=None): def name(self): return "Import CA certificates" - def _dump_certificate(self, cert, root): + def _dump_certificate(self, cert, root, dir=None): """Dump the certificate into specified file and directory.""" - if not cert.dir: + cert_dir = dir or cert.dir + + if not cert_dir: raise SecurityInstallationError( "Certificate destination is missing for {}".format(cert.filename) ) - dst_dir = join_paths(root, cert.dir) + dst_dir = join_paths(root, cert_dir) + log.debug("Dumping certificate %s into %s.", cert.filename, dst_dir) if not os.path.exists(dst_dir): log.debug("Path %s for certificate does not exist, creating.", dst_dir) make_directories(dst_dir) @@ -72,10 +81,32 @@ def _dump_certificate(self, cert, root): f.write(cert.cert) f.write('\n') + def _import_certificate(self, root, path): + """Import the certificate into the global store.""" + log.debug("Importing certificate %s in root %s.", path, root) + + if not os.path.lexists(root + CA_IMPORT_TOOL): + msg = "{} is missing. Cannot import certificate.".format(CA_IMPORT_TOOL) + if self._phase != INSTALLATION_PHASE_PREINSTALL: + raise SecurityInstallationError(msg) + else: + log.error(msg) + return + + util.execWithRedirect( + CA_IMPORT_TOOL, + ["anchor", path], + root=self._sysroot + ) + def run(self): """Import CA certificates. Dump the certificates into specified files and directories + and / or run the import tool depending on the specified category. + + Supported categories: + global - imports to the global CA trust store """ if self._phase == INSTALLATION_PHASE_PREINSTALL: if self._payload_type != PAYLOAD_TYPE_DNF: @@ -85,4 +116,21 @@ def run(self): for cert in self._certificates: log.debug("Importing certificate %s", cert) - self._dump_certificate(cert, self._sysroot) + + if not cert.category: + self._dump_certificate( + cert, + self._sysroot + ) + elif cert.category == "global": + cert_dir = cert.dir or self.CERT_DIR_CATEGORY_GLOBAL + self._dump_certificate( + cert, + self._sysroot, dir=cert_dir + ) + self._import_certificate( + self._sysroot, + join_paths(cert_dir, cert.filename), + ) + else: + log.warning("Invalid category %s, skipping", cert.category) diff --git a/scripts/anaconda-import-initramfs-certs b/scripts/anaconda-import-initramfs-certs index bc7dc275668..1b537080df6 100755 --- a/scripts/anaconda-import-initramfs-certs +++ b/scripts/anaconda-import-initramfs-certs @@ -4,3 +4,6 @@ # certificates dumped to the specified file are copied to root cp -rv /run/install/certificates/path/* / + +# 'global' category certificates are iported to root global store +trust anchor /run/install/certificates/category/global/* diff --git a/tests/unit_tests/pyanaconda_tests/modules/security/test_module_certificates.py b/tests/unit_tests/pyanaconda_tests/modules/security/test_module_certificates.py index 41bf9959a58..af4fcc943ad 100644 --- a/tests/unit_tests/pyanaconda_tests/modules/security/test_module_certificates.py +++ b/tests/unit_tests/pyanaconda_tests/modules/security/test_module_certificates.py @@ -28,11 +28,13 @@ from pyanaconda.modules.common.constants.objects import CERTIFICATES from pyanaconda.core.constants import PAYLOAD_TYPE_DNF, PAYLOAD_TYPE_LIVE_OS, \ INSTALLATION_PHASE_PREINSTALL +from pyanaconda.core.path import join_paths from pyanaconda.modules.common.structures.security import CertificateData from pyanaconda.modules.common.errors.installation import SecurityInstallationError from pyanaconda.modules.security.certificates.certificates import CertificatesModule from pyanaconda.modules.security.certificates.certificates_interface import CertificatesInterface -from pyanaconda.modules.security.certificates.installation import ImportCertificatesTask +from pyanaconda.modules.security.certificates.installation import ImportCertificatesTask, \ + CA_IMPORT_TOOL from tests.unit_tests.pyanaconda_tests import check_dbus_property, check_task_creation, \ patch_dbus_publish_object @@ -84,11 +86,13 @@ def test_certificates_property(self): 'cert': get_variant(Str, CERT1_CERT), 'filename': get_variant(Str, 'rvtest.pem'), 'dir': get_variant(Str, '/etc/pki/ca-trust/extracted/pem'), + 'category': get_variant(Str, ''), }, { 'cert': get_variant(Str, CERT2_CERT), 'filename': get_variant(Str, 'rvtest2.pem'), 'dir': get_variant(Str, ''), + 'category': get_variant(Str, 'global'), } ] self._check_dbus_property( @@ -196,8 +200,9 @@ def test_import_certificates_task_files(self): self._check_cert_file(cert1, sysroot) self._check_cert_file(cert2, sysroot) - def _check_cert_file(self, cert, sysroot, missing=False): - cert_file = sysroot + cert.dir + "/" + cert.filename + def _check_cert_file(self, cert, sysroot, missing=False, dir=''): + cert_dir = dir or cert.dir + cert_file = sysroot + cert_dir + "/" + cert.filename if missing: assert os.path.exists(cert_file) is False else: @@ -265,3 +270,116 @@ def test_import_certificates_pre_nondnf_payload(self): phase=INSTALLATION_PHASE_PREINSTALL ).run() self._check_cert_file(cert2, sysroot) + + @patch('pyanaconda.core.util.execWithRedirect') + def test_import_certificates_category_unknown(self, execWithRedirect): + """Test the ImportCertificatesTask for unknown category""" + cert = CertificateData() + cert.cert = CERT1_CERT + cert.filename = "cert.pem" + cert.category = "unknown" + cert.dir = "/dir/to/dump/cert" + + with tempfile.TemporaryDirectory() as sysroot: + ImportCertificatesTask( + sysroot=sysroot, + certificates=[cert], + ).run() + # There is no exception + # The certificate is not dumped + self._check_cert_file(cert, sysroot, missing=True) + # The tool is not called + execWithRedirect.assert_not_called() + + @patch('pyanaconda.modules.security.certificates.installation.os.path.lexists') + @patch('pyanaconda.core.util.execWithRedirect') + def test_import_certificates_category_global(self, execWithRedirect, mock_lexists): + """Test the ImportCertificatesTask for category global""" + cert = CertificateData() + cert.cert = CERT1_CERT + cert.filename = "cert.pem" + cert.category = "global" + + mock_lexists.return_value = True + + with tempfile.TemporaryDirectory() as sysroot: + ImportCertificatesTask( + sysroot=sysroot, + certificates=[cert], + ).run() + cert_dir = ImportCertificatesTask.CERT_DIR_CATEGORY_GLOBAL + # The certificate is dumped into runtime directory + self._check_cert_file(cert, sysroot, + dir=cert_dir) + # The import tool is called + cert_dir = join_paths(cert_dir, cert.filename) + execWithRedirect.assert_called_once_with( + CA_IMPORT_TOOL, + ['anchor', cert_dir], + root=sysroot, + ) + + @patch('pyanaconda.modules.security.certificates.installation.os.path.lexists') + @patch('pyanaconda.core.util.execWithRedirect') + def test_import_certificates_category_global_and_dir(self, execWithRedirect, mock_lexists): + """Test the ImportCertificatesTask for category global with defined dir""" + cert = CertificateData() + cert.cert = CERT1_CERT + cert.filename = "cert.pem" + cert.category = "global" + cert.dir = "/dir/to/dump/cert" + + mock_lexists.return_value = True + + with tempfile.TemporaryDirectory() as sysroot: + ImportCertificatesTask( + sysroot=sysroot, + certificates=[cert], + ).run() + # The certificate is dumped accrding to --dir + self._check_cert_file(cert, sysroot) + # The import tool is called + cert_dir = join_paths(cert.dir, cert.filename) + execWithRedirect.assert_called_once_with( + CA_IMPORT_TOOL, + ['anchor', cert_dir], + root=sysroot, + ) + + @patch('pyanaconda.modules.security.certificates.installation.os.path.lexists') + @patch('pyanaconda.core.util.execWithRedirect') + def test_import_certificates_category_global_missing_tool(self, execWithRedirect, mock_lexists): + """Test the ImportCertificatesTask for category global when tool is missing""" + cert = CertificateData() + cert.cert = CERT1_CERT + cert.filename = "cert.pem" + cert.category = "global" + cert.dir = "/dir/to/dump/cert" + + mock_lexists.return_value = False + + # pre-install phase: no exception, cert dumped + with tempfile.TemporaryDirectory() as sysroot: + ImportCertificatesTask( + sysroot=sysroot, + certificates=[cert], + phase=INSTALLATION_PHASE_PREINSTALL, + payload_type=PAYLOAD_TYPE_DNF, + ).run() + # The file is dumped + self._check_cert_file(cert, sysroot) + # The import tool is not called + execWithRedirect.assert_not_called() + + # non pre-install phase: exception is raised + with tempfile.TemporaryDirectory() as sysroot: + with self.assertRaises(SecurityInstallationError): + ImportCertificatesTask( + sysroot=sysroot, + certificates=[cert], + ).run() + + # The file was dumped + self._check_cert_file(cert, sysroot) + # The import tool is not called + execWithRedirect.assert_not_called() diff --git a/tests/unit_tests/pyanaconda_tests/modules/security/test_module_security.py b/tests/unit_tests/pyanaconda_tests/modules/security/test_module_security.py index e37edb636c4..d06ca28d7a8 100644 --- a/tests/unit_tests/pyanaconda_tests/modules/security/test_module_security.py +++ b/tests/unit_tests/pyanaconda_tests/modules/security/test_module_security.py @@ -173,7 +173,7 @@ def test_certificates_kickstart(self): -----END CERTIFICATE----- %end - %certificate --filename=rvtest2.pem --dir=/cert_dir2 + %certificate --filename=rvtest2.pem --dir=/cert_dir2 --category=global -----BEGIN CERTIFICATE----- MIIBkTCCATegAwIBAgIUN6r4TjFJqP/TS6U25iOGL2Wt/6kwCgYIKoZIzj0EAwIw FjEUMBIGA1UEAwwLUlZURVNUIDIgQ0EwHhcNMjQxMTIwMTQwMzIxWhcNMzQxMTE4 @@ -202,7 +202,7 @@ def test_certificates_kickstart(self): -----END CERTIFICATE----- %end - %certificate --filename=rvtest2.pem --dir=/cert_dir2 + %certificate --filename=rvtest2.pem --dir=/cert_dir2 --category=global -----BEGIN CERTIFICATE----- MIIBkTCCATegAwIBAgIUN6r4TjFJqP/TS6U25iOGL2Wt/6kwCgYIKoZIzj0EAwIw FjEUMBIGA1UEAwwLUlZURVNUIDIgQ0EwHhcNMjQxMTIwMTQwMzIxWhcNMzQxMTE4