Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental Landlock based sandboxing #597

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lz4 = "^4.3.2"
lief = "^0.15.1"
cryptography = ">=41.0,<44.0"
treelib = "^1.7.0"
unblob-native = "^0.1.1"
unblob-native = "^0.1.2"
jefferson = "^0.4.5"
rich = "^13.3.5"
pyfatfs = "^1.0.5"
Expand Down
24 changes: 15 additions & 9 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Iterable, List, Optional, Type
from typing import Iterable, List, Optional, Tuple, Type
from unittest import mock

import pytest
Expand All @@ -13,6 +13,7 @@
from unblob.processing import (
DEFAULT_DEPTH,
DEFAULT_PROCESS_NUM,
DEFAULT_SKIP_EXTENSION,
DEFAULT_SKIP_MAGIC,
ExtractionConfig,
)
Expand Down Expand Up @@ -269,6 +270,7 @@ def test_archive_success(
handlers=BUILTIN_HANDLERS,
verbose=expected_verbosity,
progress_reporter=expected_progress_reporter,
sandbox_access_restrictions=mock.ANY,
)
process_file_mock.assert_called_once_with(config, in_path, None)
logger_config_mock.assert_called_once_with(expected_verbosity, tmp_path, log_path)
Expand Down Expand Up @@ -310,16 +312,16 @@ def test_keep_extracted_chunks(


@pytest.mark.parametrize(
"skip_extension, extracted_files_count",
"skip_extension, expected_skip_extensions",
[
pytest.param([], 5, id="skip-extension-empty"),
pytest.param([""], 5, id="skip-zip-extension-empty-suffix"),
pytest.param([".zip"], 1, id="skip-extension-zip"),
pytest.param([".rlib"], 5, id="skip-extension-rlib"),
pytest.param((), DEFAULT_SKIP_EXTENSION, id="skip-extension-empty"),
pytest.param(("",), ("",), id="skip-zip-extension-empty-suffix"),
pytest.param((".zip",), (".zip",), id="skip-extension-zip"),
pytest.param((".rlib",), (".rlib",), id="skip-extension-rlib"),
],
)
def test_skip_extension(
skip_extension: List[str], extracted_files_count: int, tmp_path: Path
skip_extension: List[str], expected_skip_extensions: Tuple[str, ...], tmp_path: Path
):
runner = CliRunner()
in_path = (
Expand All @@ -335,8 +337,12 @@ def test_skip_extension(
for suffix in skip_extension:
args += ["--skip-extension", suffix]
params = [*args, "--extract-dir", str(tmp_path), str(in_path)]
result = runner.invoke(unblob.cli.cli, params)
assert extracted_files_count == len(list(tmp_path.rglob("*")))
process_file_mock = mock.MagicMock()
with mock.patch.object(unblob.cli, "process_file", process_file_mock):
result = runner.invoke(unblob.cli.cli, params)
assert (
process_file_mock.call_args.args[0].skip_extension == expected_skip_extensions
)
assert result.exit_code == 0


Expand Down
23 changes: 23 additions & 0 deletions unblob/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rich.style import Style
from rich.table import Table
from structlog import get_logger
from unblob_native.sandbox import AccessFS

from unblob.models import DirectoryHandlers, Handlers, ProcessResult
from unblob.plugins import UnblobPluginManager
Expand Down Expand Up @@ -281,6 +282,27 @@ def cli(
extra_magics_to_skip = () if clear_skip_magics else DEFAULT_SKIP_MAGIC
skip_magic = tuple(sorted(set(skip_magic).union(extra_magics_to_skip)))

sandbox_access_restrictions = [
# Python, shared libraries and so on
AccessFS.read("/"),
# Multiprocessing
AccessFS.read_write("/dev/shm"), # noqa: S108
# Extracted contents
AccessFS.read_write(extract_root.as_posix()),
AccessFS.make_dir(extract_root.parent.as_posix()),
]

if report_file:
sandbox_access_restrictions += [
AccessFS.read_write(report_file),
AccessFS.make_reg(report_file.parent),
]
vlaci marked this conversation as resolved.
Show resolved Hide resolved
if log_path:
sandbox_access_restrictions += [
AccessFS.read_write(log_path),
AccessFS.make_reg(log_path.parent),
]

config = ExtractionConfig(
extract_root=extract_root,
force_extract=force,
Expand All @@ -298,6 +320,7 @@ def cli(
progress_reporter=NullProgressReporter
if verbose
else RichConsoleProgressReporter,
sandbox_access_restrictions=sandbox_access_restrictions,
)

logger.info("Start processing file", file=file)
Expand Down
33 changes: 33 additions & 0 deletions unblob/processing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import multiprocessing
import shutil
from operator import attrgetter
Expand All @@ -9,6 +10,11 @@
import plotext as plt
from structlog import get_logger
from unblob_native import math_tools as mt
from unblob_native.sandbox import ( # type: ignore
AccessFS,
SandboxError,
restrict_access,
)

from unblob.handlers import BUILTIN_DIR_HANDLERS, BUILTIN_HANDLERS, Handlers

Expand Down Expand Up @@ -98,6 +104,7 @@ class ExtractionConfig:
dir_handlers: DirectoryHandlers = BUILTIN_DIR_HANDLERS
verbose: int = 1
progress_reporter: Type[ProgressReporter] = NullProgressReporter
sandbox_access_restrictions: List[AccessFS] = []

def get_extract_dir_for(self, path: Path) -> Path:
"""Return extraction dir under root with the name of path."""
Expand All @@ -111,6 +118,30 @@ def get_extract_dir_for(self, path: Path) -> Path:
return extract_dir.expanduser().resolve()


def call_once(fn):
fn.__called_once__ = False

@functools.wraps(fn)
def wrapper(*args, **kwargs):
if fn.__called_once__:
return
fn.__called_once__ = True
fn(*args, **kwargs)

return wrapper


@call_once
def try_enter_sandbox(config: ExtractionConfig):
if not config.sandbox_access_restrictions:
return
try:
restrict_access(*config.sandbox_access_restrictions)
except SandboxError:
logger.warning("Sandboxing FS access is unavailable on this system, skipping.")
restrict_access(*config.sandbox_access_restrictions)
qkaiser marked this conversation as resolved.
Show resolved Hide resolved


@terminate_gracefully
def process_file(
config: ExtractionConfig, input_path: Path, report_file: Optional[Path] = None
Expand All @@ -135,6 +166,8 @@ def process_file(
)
return ProcessResult()

try_enter_sandbox(config)

process_result = _process_task(config, task)

if not config.skip_extraction:
Expand Down
Loading