Skip to content

Commit

Permalink
Fix certificate symlink handling
Browse files Browse the repository at this point in the history
In response to the identified flaws in the originally delivered fix, for
feature enabling http repositories, this commit addresses the following
issues:

1. Previously, files installed via RPMs that were originally symlinks
   were being switched to standard files. This issue has been resolved
   by preserving symlinks within the /etc/pki directory. Any symlink
   pointing to a file within the /etc/pki directory (whether present in
   the source system or installed by a package in the container) will be
   present in the container, ensuring changes to certificates are
   properly propagated.

2. Lists of trusted CAs were not being updated, as the update-ca-trust
   call was missing inside the container. This commit now includes the
   necessary update-ca-trust call.

The solution specification has been modified as follows:

    - Certificate _files_ in /etc/pki (excluding symlinks) are copied to
      the container as in the original solution.
    - Files installed by packages within the container are preserved and
      given higher priority.
    - Handling of symlinks is enhanced, ensuring that symlinks within
      the /etc/pki directory are preserved, while any symlink pointing
      outside the /etc/pki directory will be copied as a file.
    - Certificates are updated using `update-ca-trust`.
  • Loading branch information
dkubek committed Oct 31, 2023
1 parent 17c88d9 commit 77609aa
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,53 @@ def _get_files_owned_by_rpms(context, dirpath, pkgs=None, recursive=False):
return files_owned_by_rpms


def _copy_decouple(srcdir, dstdir):
for root, _, files in os.walk(srcdir):
for filename in files:
relpath = os.path.relpath(root, srcdir)
source_filepath = os.path.join(root, filename)
target_filepath = os.path.join(dstdir, relpath, filename)

# Skip and report broken symlinks (this follows any possible chain of symlinks)
if not os.path.exists(source_filepath):
api.current_logger().warning('File {} is a broken symlink!'.format(source_filepath))
continue

# Copy symlinks to the target userspace
pointee = None
if os.path.islink(source_filepath):
pointee = os.readlink(source_filepath)

# If source file is a symlink within `src` then preserve it,
# otherwise resolve and copy it as a file it points to
if pointee is not None and not pointee.startswith(srcdir):
# Follow the path until we hit a file or get back to /etc/pki
while not pointee.startswith(srcdir) and os.path.islink(pointee):
pointee = os.readlink(pointee)

# Pointee points to a file outside /etc/pki so we copy it instead
if not pointee.startswith(srcdir) and not os.path.islink(pointee):
source_filepath = pointee
else:
# pointee points back to /etc/pki
pass

# Ensure parent directory exists
parent_dir = os.path.dirname(target_filepath)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir, exist_ok=True)

if os.path.islink(source_filepath):
# Translate pointee to target /etc/pki
target_pointee = os.path.join(dstdir, os.path.relpath(pointee, root))
# TODO(dkubek): Preserve the owner and permissions of the original symlink
run(['ln', '-s', target_pointee, target_filepath])
run(['chmod' '--reference={}'.format(source_filepath), target_filepath])
continue

run(['cp', '-a', source_filepath, target_filepath])


def _copy_certificates(context, target_userspace):
"""
Copy the needed certificates into the container, but preserve original ones
Expand All @@ -360,36 +407,51 @@ def _copy_certificates(context, target_userspace):
files_owned_by_rpms = _get_files_owned_by_rpms(target_context, '/etc/pki', recursive=True)
api.current_logger().debug('Files owned by rpms: {}'.format(' '.join(files_owned_by_rpms)))

# Backup container /etc/pki
run(['mv', target_pki, backup_pki])
context.copytree_from('/etc/pki', target_pki)

# Copy source /etc/pki to the container
_copy_decouple('/etc/pki', target_pki)

# Assertion: If skip_broken_symlinks is True
# => no broken symlinks exist in /etc/pki
# So any new broken symlinks created will be by the installed packages.

# Recover installed packages as they always get precedence
for filepath in files_owned_by_rpms:
src_path = os.path.join(backup_pki, filepath)
dst_path = os.path.join(target_pki, filepath)

# Resolve and skip any broken symlinks
is_broken_symlink = False
while os.path.islink(src_path):
# The symlink points to a path relative to the target userspace so
# we need to readjust it
next_path = os.path.join(target_userspace, os.readlink(src_path)[1:])
if not os.path.exists(next_path):
is_broken_symlink = True

# The path original path of the broken symlink in the container
report_path = os.path.join(target_pki, os.path.relpath(src_path, backup_pki))
api.current_logger().warning('File {} is a broken symlink!'.format(report_path))
break

src_path = next_path
pointee = None
if os.path.islink(src_path):
pointee = os.path.join(target_userspace, os.readlink(src_path)[1:])

while os.path.islink(pointee):
# The symlink points to a path relative to the target userspace so
# we need to readjust it
pointee = os.path.join(target_userspace, os.readlink(src_path)[1:])
if not os.path.exists(pointee):
is_broken_symlink = True

# The path original path of the broken symlink in the container
report_path = os.path.join(target_pki, os.path.relpath(src_path, backup_pki))
api.current_logger().warning('File {} is a broken symlink!'.format(report_path))
break

if is_broken_symlink:
continue

# Cleanup conflicting files
run(['rm', '-rf', dst_path])

# Ensure destination exists
parent_dir = os.path.dirname(dst_path)
run(['mkdir', '-p', parent_dir])
run(['cp', '-a', src_path, dst_path])

# Copy the new file
run(['cp', '-R', '--preserve=all', src_path, dst_path])


def _prep_repository_access(context, target_userspace):
Expand All @@ -401,6 +463,7 @@ def _prep_repository_access(context, target_userspace):
backup_yum_repos_d = os.path.join(target_etc, 'yum.repos.d.backup')

_copy_certificates(context, target_userspace)
context.call(['update-ca-trust', 'extract'])

if not rhsm.skip_rhsm():
run(['rm', '-rf', os.path.join(target_etc, 'rhsm')])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import subprocess
from collections import namedtuple
from pathlib import Path

import pytest

Expand Down Expand Up @@ -48,6 +50,82 @@ def __exit__(self, exception_type, exception_value, traceback):
pass


def traverse_structure(structure, root=Path('/')):
for filename, links_to in structure.items():
filepath = root / filename

if isinstance(links_to, dict):
yield from traverse_structure(links_to, filepath)
else:
yield (filepath, links_to)


def assert_directory_structure_matches(root, initial, expected):
for filepath, links_to in traverse_structure(expected, root=root / 'expected'):
assert filepath.exists()

if links_to is None:
assert filepath.is_file()
continue

assert filepath.is_symlink()
assert os.readlink(filepath) == str(root / 'expected' / links_to.lstrip('/'))


@pytest.fixture
def temp_directory_layout(tmp_path, initial_structure):
for filepath, links_to in traverse_structure(initial_structure, root=tmp_path / 'initial'):
file_path = tmp_path / filepath
file_path.parent.mkdir(parents=True, exist_ok=True)

if links_to is None:
file_path.touch()
continue

file_path.symlink_to(tmp_path / 'initial' / links_to.lstrip('/'))

(tmp_path / 'expected').mkdir()
assert (tmp_path / 'expected').exists()

return tmp_path


# The semantics of initial_structure and expected_structure are as follows:
#
# 1. The outermost dictionary encodes the root of a directory structure
#
# 2. Depending on the value for a key in a dict, each key in the dictionary
# denotes the name of either a:
# a) directory -- if value is dict
# b) regular file -- if value is None
# c) symlink -- if a value is str
#
# 3. The value of a symlink entry is a absolut path to a file in the context of

Check failure on line 103 in repos/system_upgrade/common/actors/targetuserspacecreator/tests/unit_test_targetuserspacecreator.py

View workflow job for this annotation

GitHub Actions / Check for spelling errors

absolut ==> absolute
# the structure.
#
@pytest.mark.parametrize('initial_structure,expected_structure', [
({'dir': {'fileA': None}}, {'dir': {'fileA': None}}),
({'dir': {'fileA': 'nonexistent'}}, {'dir': {}}),
({'dir': {'fileA': '/dir/fileB', 'fileB': None}}, {'dir': {'fileA': '/dir/fileB', 'fileB': None}}),
({'dir': {'fileA': '/dir/fileB', 'fileB': 'nonexistent'}},
{'dir': {}}),
({'dir': {'fileA': '/dir/fileB', 'fileB': '/dir/fileC', 'fileC': None}},
{'dir': {'fileA': '/dir/fileB', 'fileB': '/dir/fileC', 'fileC': None}}),
({'dir': {'fileA': '/dir/fileB', 'fileB': '/dir/fileC', 'fileC': '/outside/fileOut', 'fileE': None},
'outside': {'fileOut': '/outside/fileD', 'fileD': '/dir/fileE'}},
{'dir': {'fileA': '/dir/fileB', 'fileB': '/dir/fileC', 'fileC': '/dir/fileE'}}),
]
)
def test_copy_decouple(monkeypatch, temp_directory_layout, initial_structure, expected_structure):
monkeypatch.setattr(userspacegen, 'run', subprocess.run)
userspacegen._copy_decouple(
str(temp_directory_layout / 'initial' / 'dir'),
str(temp_directory_layout / 'expected' / 'dir'),
)

assert_directory_structure_matches(temp_directory_layout, initial_structure, expected_structure)


@pytest.mark.parametrize('result,dst_ver,arch,prod_type', [
(os.path.join(_CERTS_PATH, '8.1', '479.pem'), '8.1', architecture.ARCH_X86_64, 'ga'),
(os.path.join(_CERTS_PATH, '8.1', '419.pem'), '8.1', architecture.ARCH_ARM64, 'ga'),
Expand Down

0 comments on commit 77609aa

Please sign in to comment.