Skip to content

Commit

Permalink
Wrong scan policy name check (RedHatProductSecurity#266)
Browse files Browse the repository at this point in the history
Raise an error if a wrong scan policy name is provided

See: RedHatProductSecurity#266
  • Loading branch information
ccronca authored Nov 27, 2024
1 parent b55025e commit 7927bec
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 3 deletions.
6 changes: 5 additions & 1 deletion rapidast.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ def run_scanner(name, config, args, scan_exporter):
return 1

# Part 2: setup the environment (e.g.: spawn a server)
scanner.setup()
try:
scanner.setup()
except Exception as excp: # pylint: disable=W0718
logging.error(f"Failed to set up the scanner: {excp}")
scanner.state = scanners.State.ERROR

logging.debug(scanner)

Expand Down
60 changes: 60 additions & 0 deletions scanners/zap/zap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import re
import shutil
import tarfile
import xml.etree.ElementTree as ET
from base64 import urlsafe_b64encode
from collections import namedtuple
from pathlib import Path

import yaml

Expand Down Expand Up @@ -637,6 +639,10 @@ def _setup_active_scan(self):
if not job["parameters"].get("policy"):
job["parameters"]["policy"] = "API-scan-minimal"

validate_active_scan_policy(
policy_path=Path(MODULE_DIR) / "policies" / f"{job['parameters']['policy']}.policy",
)

self.automation_config["jobs"].append(job)

def _construct_report_af(self, report_format):
Expand Down Expand Up @@ -1026,3 +1032,57 @@ def ensure_list(entry):
automation_config["env"]["contexts"] = []
automation_config["env"]["contexts"].append({"name": context})
return ensure_default(automation_config["env"]["contexts"][-1])


class PolicyFileNotFoundError(FileNotFoundError):
"""Raised when the policy file is not found."""


class MissingConfigurationNodeError(RuntimeError):
"""Raised when the root <configuration> node is missing"""


class MissingPolicyNodeError(RuntimeError):
"""Raised when the <policy> node inside <configuration> is missing"""


class MismatchedPolicyNameError(RuntimeError):
"""Raised when the <policy> node content does not match the filename"""


class InvalidXMLFileError(RuntimeError):
"""Raised when the policy file is not a valid XML"""


def validate_active_scan_policy(policy_path: Path):
policy_name = policy_path.stem

logging.info(f"Starting validation of ZAP active scan policy: '{policy_path}'")

if not policy_path.is_file():
raise PolicyFileNotFoundError(
f"Policy '{policy_name}' not found in '{policy_path.parent}' directory. "
f"Please check the policy name in the configuration"
)

try:
tree = ET.parse(policy_path)
root = tree.getroot()

if not root.tag or root.tag != "configuration":
raise MissingConfigurationNodeError(f"Missing <configuration> node in '{policy_name}.policy'")

policy_node = root.find("policy")
if policy_node is None:
raise MissingPolicyNodeError(f"Missing <policy> node inside <configuration> in '{policy_name}.policy'")

if policy_node.text.strip() != policy_name:
raise MismatchedPolicyNameError(
f"The <policy> node in '{policy_name}' does not match the filename. "
f"Expected '{policy_name}', but found '{policy_node.text.strip()}'"
)

except ET.ParseError as exc:
raise InvalidXMLFileError(f"Policy file '{policy_path}' is not a valid XML file") from exc

logging.info(f"Validation successful for policy file: '{policy_path}'")
12 changes: 10 additions & 2 deletions tests/scanners/zap/test_setup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import patch

import pytest
import requests

import configmodel.converter
import scanners
from scanners.zap.zap import find_context
from scanners.zap.zap import MODULE_DIR
from scanners.zap.zap_none import ZapNone

# from pytest_mock import mocker
Expand Down Expand Up @@ -232,9 +236,9 @@ def test_setup_include_urls(test_config):
assert "def" in find_context(test_zap.automation_config)["includePaths"]


def test_setup_active_scan(test_config):
@patch("scanners.zap.zap.validate_active_scan_policy")
def test_setup_active_scan(mock_validate_active_scan_policy, test_config):
test_config.set("scanners.zap.activeScan.maxRuleDurationInMins", 10)

test_zap = ZapNone(config=test_config)
test_zap.setup()

Expand All @@ -244,6 +248,10 @@ def test_setup_active_scan(test_config):
assert item["parameters"]["maxRuleDurationInMins"] == 10
assert item["parameters"]["context"] == "Default Context"
assert item["parameters"]["user"] == ""
mock_validate_active_scan_policy.assert_called_once_with(
policy_path=Path(f"{MODULE_DIR}/policies/API-scan-minimal.policy")
)

break
else:
assert False
Expand Down
85 changes: 85 additions & 0 deletions tests/scanners/zap/test_setup_activescan_policy_validatio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from pathlib import Path

import pytest

from scanners.zap.zap import InvalidXMLFileError
from scanners.zap.zap import MismatchedPolicyNameError
from scanners.zap.zap import MissingConfigurationNodeError
from scanners.zap.zap import MissingPolicyNodeError
from scanners.zap.zap import PolicyFileNotFoundError
from scanners.zap.zap import validate_active_scan_policy


@pytest.fixture
def valid_policy_file(tmp_path):
policy_name = "policy1"
valid_xml_content = """<configuration>
<policy>policy1</policy>
<setting name="max_connections">100</setting>
</configuration>"""
file_path = tmp_path / f"{policy_name}.policy"
file_path.write_text(valid_xml_content)
return file_path


@pytest.fixture
def invalid_xml_file(tmp_path):
policy_name = "policy1"
invalid_xml_content = """<configuration>
<policy>policy2</policy>
</configuration>""" # Mismatched policy name
file_path = tmp_path / f"{policy_name}.policy"
file_path.write_text(invalid_xml_content)
return file_path


@pytest.fixture
def missing_policy_file():
return Path("/non/existent/path/policy1.policy")


def test_valid_policy(valid_policy_file):
validate_active_scan_policy(valid_policy_file)


def test_missing_policy_file(missing_policy_file):
with pytest.raises(PolicyFileNotFoundError):
validate_active_scan_policy(missing_policy_file)


def test_invalid_xml_file(invalid_xml_file):
with pytest.raises(MismatchedPolicyNameError):
validate_active_scan_policy(invalid_xml_file)


def test_invalid_xml_parse(invalid_xml_file):
invalid_xml_content = """<configuration>
<policy>policy1</policy>
</configuration""" # Missing closing tag
file_path = invalid_xml_file
file_path.write_text(invalid_xml_content)

with pytest.raises(InvalidXMLFileError):
validate_active_scan_policy(file_path)


def test_missing_configuration_node(invalid_xml_file):
invalid_xml_content = """<as>
<policy>policy1</policy>
</as>"""
file_path = invalid_xml_file
file_path.write_text(invalid_xml_content)

with pytest.raises(MissingConfigurationNodeError):
validate_active_scan_policy(file_path)


def test_missing_policy_node(invalid_xml_file):
invalid_xml_content = """<configuration>
<po>policy1</po>
</configuration>"""
file_path = invalid_xml_file
file_path.write_text(invalid_xml_content)

with pytest.raises(MissingPolicyNodeError):
validate_active_scan_policy(file_path)
61 changes: 61 additions & 0 deletions tests/test_rapidast_run_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from unittest.mock import MagicMock
from unittest.mock import patch

import rapidast
from rapidast import scanners


@patch("rapidast.scanners.str_to_scanner")
def test_run_scanner_setup_failure(mock_str_to_scanner):
"""
Test that if an exception occurs during `scanner.setup`, the `run_scanner` method
catches the exception, returns 1, and updates the scanner's state to 'ERROR'
"""

mock_config = MagicMock()
mock_args = MagicMock()
mock_scan_exporter = MagicMock()

mock_scanner = MagicMock()
mock_str_to_scanner.return_value = lambda config, name: mock_scanner

mock_scanner.setup.side_effect = Exception("Setup failed")

result = rapidast.run_scanner("mock_name", mock_config, mock_args, mock_scan_exporter)

assert result == 1
mock_scanner.setup.assert_called_once()
assert mock_scanner.state == scanners.State.ERROR


@patch("rapidast.scanners.str_to_scanner")
def test_run_scanner_setup_success(mock_str_to_scanner):
"""
Test that if `scanner.setup` is successful, `run_scanner` continues as expected.
Subsequent actions are mocked to focus on ensuring `run_scanner` returns a successful
result (0)
"""

def update_state(state):
mock_scanner.state = state

def update_state_ready():
update_state(scanners.State.READY)

def update_state_processed():
update_state(scanners.State.PROCESSED)

mock_config = MagicMock()
mock_args = MagicMock()
mock_scan_exporter = MagicMock()

mock_scanner = MagicMock()
mock_str_to_scanner.return_value = lambda config, name: mock_scanner

mock_scanner.setup.side_effect = update_state_ready
mock_scanner.postprocess.side_effect = update_state_processed

result = rapidast.run_scanner("mock_name", mock_config, mock_args, mock_scan_exporter)

assert result == 0
mock_scanner.setup.assert_called_once()

0 comments on commit 7927bec

Please sign in to comment.