Skip to content

Commit

Permalink
edns: implement support for --category
Browse files Browse the repository at this point in the history
Support only 'global' category for now.

Run 'trust anchor' on the certificate dumped into file.

If --path is defined the certificate is dumped to the path, if not
predefined runtime path is used.

If the trust tool is missing, raise exception unless in pre-install
phase where it is expected.

NOTE: it is possible that in the future the import tool (trust) will
be run with parameter (derived from) --category value
  • Loading branch information
rvykydal committed Dec 10, 2024
1 parent e564a96 commit 61de0f5
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 14 deletions.
37 changes: 34 additions & 3 deletions dracut/parse-kickstart
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ os.urandom = ks_random
import sys
import logging
import glob
import subprocess
from pykickstart.parser import KickstartParser, preprocessKickstart
from pykickstart.sections import NullSection
from pykickstart.version import returnClassForVersion
Expand Down Expand Up @@ -429,6 +430,19 @@ def _dump_certificate(cert, root="/", dump_dir=None):
f.write('\n')


def _import_certificate(path):
"""Import the certificate into global store."""
log.debug("Importing certificate %s.", path)

CA_IMPORT_TOOL = '/usr/sbin/trust'
if not os.path.exists(CA_IMPORT_TOOL):
msg = "{} is missing. Cannot import certificate.".format(CA_IMPORT_TOOL)
log.error(msg)
return

subprocess.check_call([CA_IMPORT_TOOL, "anchor", path])


def process_certificates(handler):
"""Import certificates defined in %certificate sections."""
for cert in handler.certificates:
Expand All @@ -439,9 +453,26 @@ def process_certificates(handler):
log.error("Missing certificate file name, skipping.")
continue

_dump_certificate(cert)
# Dump for transport to switchroot
_dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/")
category = None
if hasattr(cert, "category") and cert.category:
category = cert.category

if category is None:
_dump_certificate(cert)
# Dump for transport to switchroot
_dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/")
elif category == "global":
if cert.dir:
_dump_certificate(cert)
# Dump for transport to switchroot
_dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/")
# Dump for import and transport to switchroot
import_dir = CERT_TRANSPORT_DIR+"/category/"+category
_dump_certificate(cert, dump_dir=import_dir)
_import_certificate(import_dir)
else:
log.error("Invalid category %s, skipping", category)
continue


def process_kickstart(ksfile):
Expand Down
10 changes: 10 additions & 0 deletions pyanaconda/modules/common/structures/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self):
self._filename = ""
self._cert = ""
self._dir = ""
self._category = ""

@property
def filename(self) -> Str:
Expand Down Expand Up @@ -56,3 +57,12 @@ def dir(self) -> Str:
@dir.setter
def dir(self, value: Str) -> None:
self._dir = value

@property
def category(self) -> Str:
"""The certificate category."""
return self._category

@category.setter
def category(self, value: Str) -> None:
self._category = value
9 changes: 8 additions & 1 deletion pyanaconda/modules/security/certificates/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@ def process_kickstart(self, data):
cert_data.cert = cert.cert
if cert.dir:
cert_data.dir = cert.dir
if cert.category:
cert_data.category = cert.category
certificates.append(cert_data)
self.set_certificates(certificates)

def setup_kickstart(self, data):
"""Setup the kickstart data."""
for cert in self._certificates:
cert_ksdata = Certificate(cert=cert.cert, filename=cert.filename, dir=cert.dir)
cert_ksdata = Certificate(
cert=cert.cert,
filename=cert.filename,
dir=cert.dir,
category=cert.category,
)
data.certificates.append(cert_ksdata)

@property
Expand Down
58 changes: 53 additions & 5 deletions pyanaconda/modules/security/certificates/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@
from pyanaconda.anaconda_loggers import get_module_logger
from pyanaconda.modules.common.task import Task
from pyanaconda.modules.common.errors.installation import SecurityInstallationError
from pyanaconda.core import util
from pyanaconda.core.path import make_directories, join_paths
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF, INSTALLATION_PHASE_PREINSTALL

log = get_module_logger(__name__)

CA_IMPORT_TOOL = "/usr/bin/trust"


class ImportCertificatesTask(Task):
"""Task for importing certificates into a system.
Currently it just dumps the file to the path.
Dump the certificate into the specified file and directory and/or
import the certificate using a tool.
"""

CERT_DIR_CATEGORY_GLOBAL = "/run/install/certificates/category/global"

def __init__(self, sysroot, certificates, payload_type=None, phase=None):
"""Create a new certificates import task.
Expand All @@ -50,15 +56,18 @@ def __init__(self, sysroot, certificates, payload_type=None, phase=None):
def name(self):
return "Import CA certificates"

def _dump_certificate(self, cert, root):
def _dump_certificate(self, cert, root, dir=None):
"""Dump the certificate into specified file and directory."""

if not cert.dir:
cert_dir = dir or cert.dir

if not cert_dir:
raise SecurityInstallationError(
"Certificate destination is missing for {}".format(cert.filename)
)

dst_dir = join_paths(root, cert.dir)
dst_dir = join_paths(root, cert_dir)
log.debug("Dumping certificate %s into %s.", cert.filename, dst_dir)
if not os.path.exists(dst_dir):
log.debug("Path %s for certificate does not exist, creating.", dst_dir)
make_directories(dst_dir)
Expand All @@ -72,10 +81,32 @@ def _dump_certificate(self, cert, root):
f.write(cert.cert)
f.write('\n')

def _import_certificate(self, root, path):
"""Import the certificate into the global store."""
log.debug("Importing certificate %s in root %s.", path, root)

if not os.path.lexists(root + CA_IMPORT_TOOL):
msg = "{} is missing. Cannot import certificate.".format(CA_IMPORT_TOOL)
if self._phase != INSTALLATION_PHASE_PREINSTALL:
raise SecurityInstallationError(msg)
else:
log.error(msg)
return

util.execWithRedirect(
CA_IMPORT_TOOL,
["anchor", path],
root=self._sysroot
)

def run(self):
"""Import CA certificates.
Dump the certificates into specified files and directories
and / or run the import tool depending on the specified category.
Supported categories:
global - imports to the global CA trust store
"""
if self._phase == INSTALLATION_PHASE_PREINSTALL:
if self._payload_type != PAYLOAD_TYPE_DNF:
Expand All @@ -85,4 +116,21 @@ def run(self):

for cert in self._certificates:
log.debug("Importing certificate %s", cert)
self._dump_certificate(cert, self._sysroot)

if not cert.category:
self._dump_certificate(
cert,
self._sysroot
)
elif cert.category == "global":
cert_dir = cert.dir or self.CERT_DIR_CATEGORY_GLOBAL
self._dump_certificate(
cert,
self._sysroot, dir=cert_dir
)
self._import_certificate(
self._sysroot,
join_paths(cert_dir, cert.filename),
)
else:
log.warning("Invalid category %s, skipping", cert.category)
3 changes: 3 additions & 0 deletions scripts/anaconda-import-initramfs-certs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@

# certificates dumped to the specified file are copied to root
cp -rv /run/install/certificates/path/* /

# 'global' category certificates are iported to root global store
trust anchor /run/install/certificates/category/global/*
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
from pyanaconda.modules.common.constants.objects import CERTIFICATES
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF, PAYLOAD_TYPE_LIVE_OS, \
INSTALLATION_PHASE_PREINSTALL
from pyanaconda.core.path import join_paths
from pyanaconda.modules.common.structures.security import CertificateData
from pyanaconda.modules.common.errors.installation import SecurityInstallationError
from pyanaconda.modules.security.certificates.certificates import CertificatesModule
from pyanaconda.modules.security.certificates.certificates_interface import CertificatesInterface
from pyanaconda.modules.security.certificates.installation import ImportCertificatesTask
from pyanaconda.modules.security.certificates.installation import ImportCertificatesTask, \
CA_IMPORT_TOOL
from tests.unit_tests.pyanaconda_tests import check_dbus_property, check_task_creation, \
patch_dbus_publish_object

Expand Down Expand Up @@ -84,11 +86,13 @@ def test_certificates_property(self):
'cert': get_variant(Str, CERT1_CERT),
'filename': get_variant(Str, 'rvtest.pem'),
'dir': get_variant(Str, '/etc/pki/ca-trust/extracted/pem'),
'category': get_variant(Str, ''),
},
{
'cert': get_variant(Str, CERT2_CERT),
'filename': get_variant(Str, 'rvtest2.pem'),
'dir': get_variant(Str, ''),
'category': get_variant(Str, 'global'),
}
]
self._check_dbus_property(
Expand Down Expand Up @@ -196,8 +200,9 @@ def test_import_certificates_task_files(self):
self._check_cert_file(cert1, sysroot)
self._check_cert_file(cert2, sysroot)

def _check_cert_file(self, cert, sysroot, missing=False):
cert_file = sysroot + cert.dir + "/" + cert.filename
def _check_cert_file(self, cert, sysroot, missing=False, dir=''):
cert_dir = dir or cert.dir
cert_file = sysroot + cert_dir + "/" + cert.filename
if missing:
assert os.path.exists(cert_file) is False
else:
Expand Down Expand Up @@ -265,3 +270,116 @@ def test_import_certificates_pre_nondnf_payload(self):
phase=INSTALLATION_PHASE_PREINSTALL
).run()
self._check_cert_file(cert2, sysroot)

@patch('pyanaconda.core.util.execWithRedirect')
def test_import_certificates_category_unknown(self, execWithRedirect):
"""Test the ImportCertificatesTask for unknown category"""
cert = CertificateData()
cert.cert = CERT1_CERT
cert.filename = "cert.pem"
cert.category = "unknown"
cert.dir = "/dir/to/dump/cert"

with tempfile.TemporaryDirectory() as sysroot:
ImportCertificatesTask(
sysroot=sysroot,
certificates=[cert],
).run()
# There is no exception
# The certificate is not dumped
self._check_cert_file(cert, sysroot, missing=True)
# The tool is not called
execWithRedirect.assert_not_called()

@patch('pyanaconda.modules.security.certificates.installation.os.path.lexists')
@patch('pyanaconda.core.util.execWithRedirect')
def test_import_certificates_category_global(self, execWithRedirect, mock_lexists):
"""Test the ImportCertificatesTask for category global"""
cert = CertificateData()
cert.cert = CERT1_CERT
cert.filename = "cert.pem"
cert.category = "global"

mock_lexists.return_value = True

with tempfile.TemporaryDirectory() as sysroot:
ImportCertificatesTask(
sysroot=sysroot,
certificates=[cert],
).run()
cert_dir = ImportCertificatesTask.CERT_DIR_CATEGORY_GLOBAL
# The certificate is dumped into runtime directory
self._check_cert_file(cert, sysroot,
dir=cert_dir)
# The import tool is called
cert_dir = join_paths(cert_dir, cert.filename)
execWithRedirect.assert_called_once_with(
CA_IMPORT_TOOL,
['anchor', cert_dir],
root=sysroot,
)

@patch('pyanaconda.modules.security.certificates.installation.os.path.lexists')
@patch('pyanaconda.core.util.execWithRedirect')
def test_import_certificates_category_global_and_dir(self, execWithRedirect, mock_lexists):
"""Test the ImportCertificatesTask for category global with defined dir"""
cert = CertificateData()
cert.cert = CERT1_CERT
cert.filename = "cert.pem"
cert.category = "global"
cert.dir = "/dir/to/dump/cert"

mock_lexists.return_value = True

with tempfile.TemporaryDirectory() as sysroot:
ImportCertificatesTask(
sysroot=sysroot,
certificates=[cert],
).run()
# The certificate is dumped accrding to --dir
self._check_cert_file(cert, sysroot)
# The import tool is called
cert_dir = join_paths(cert.dir, cert.filename)
execWithRedirect.assert_called_once_with(
CA_IMPORT_TOOL,
['anchor', cert_dir],
root=sysroot,
)

@patch('pyanaconda.modules.security.certificates.installation.os.path.lexists')
@patch('pyanaconda.core.util.execWithRedirect')
def test_import_certificates_category_global_missing_tool(self, execWithRedirect, mock_lexists):
"""Test the ImportCertificatesTask for category global when tool is missing"""
cert = CertificateData()
cert.cert = CERT1_CERT
cert.filename = "cert.pem"
cert.category = "global"
cert.dir = "/dir/to/dump/cert"

mock_lexists.return_value = False

# pre-install phase: no exception, cert dumped
with tempfile.TemporaryDirectory() as sysroot:
ImportCertificatesTask(
sysroot=sysroot,
certificates=[cert],
phase=INSTALLATION_PHASE_PREINSTALL,
payload_type=PAYLOAD_TYPE_DNF,
).run()
# The file is dumped
self._check_cert_file(cert, sysroot)
# The import tool is not called
execWithRedirect.assert_not_called()

# non pre-install phase: exception is raised
with tempfile.TemporaryDirectory() as sysroot:
with self.assertRaises(SecurityInstallationError):
ImportCertificatesTask(
sysroot=sysroot,
certificates=[cert],
).run()

# The file was dumped
self._check_cert_file(cert, sysroot)
# The import tool is not called
execWithRedirect.assert_not_called()
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_certificates_kickstart(self):
-----END CERTIFICATE-----
%end
%certificate --filename=rvtest2.pem --dir=/cert_dir2
%certificate --filename=rvtest2.pem --dir=/cert_dir2 --category=global
-----BEGIN CERTIFICATE-----
MIIBkTCCATegAwIBAgIUN6r4TjFJqP/TS6U25iOGL2Wt/6kwCgYIKoZIzj0EAwIw
FjEUMBIGA1UEAwwLUlZURVNUIDIgQ0EwHhcNMjQxMTIwMTQwMzIxWhcNMzQxMTE4
Expand Down Expand Up @@ -202,7 +202,7 @@ def test_certificates_kickstart(self):
-----END CERTIFICATE-----
%end
%certificate --filename=rvtest2.pem --dir=/cert_dir2
%certificate --filename=rvtest2.pem --dir=/cert_dir2 --category=global
-----BEGIN CERTIFICATE-----
MIIBkTCCATegAwIBAgIUN6r4TjFJqP/TS6U25iOGL2Wt/6kwCgYIKoZIzj0EAwIw
FjEUMBIGA1UEAwwLUlZURVNUIDIgQ0EwHhcNMjQxMTIwMTQwMzIxWhcNMzQxMTE4
Expand Down

0 comments on commit 61de0f5

Please sign in to comment.