From 97859935be02f22c303630afa6ad08e429578ae1 Mon Sep 17 00:00:00 2001 From: Dylan Pulver Date: Wed, 31 Jul 2024 10:26:58 -0400 Subject: [PATCH] testing: second batch of new tests --- .../ecosystems/python/test_dependencies.py | 55 +++++++++++ tests/scan/ecosystems/python/test_main.py | 95 +++++++++++++++++-- tests/scan/ecosystems/test_target.py | 52 ++++++++++ tests/scan/finder/test_file_finder.py | 83 ++++++++++++++++ tests/scan/finder/test_handlers.py | 64 +++++++++++++ 5 files changed, 341 insertions(+), 8 deletions(-) create mode 100644 tests/scan/ecosystems/python/test_dependencies.py create mode 100644 tests/scan/ecosystems/test_target.py create mode 100644 tests/scan/finder/test_file_finder.py create mode 100644 tests/scan/finder/test_handlers.py diff --git a/tests/scan/ecosystems/python/test_dependencies.py b/tests/scan/ecosystems/python/test_dependencies.py new file mode 100644 index 0000000..53e719d --- /dev/null +++ b/tests/scan/ecosystems/python/test_dependencies.py @@ -0,0 +1,55 @@ +import unittest +from unittest.mock import MagicMock, mock_open, patch +from pathlib import Path +from collections import defaultdict + +from packaging.specifiers import SpecifierSet +from packaging.version import parse as parse_version + +from safety.scan.ecosystems.python.main import ( + get_closest_ver, is_pinned_requirement) +from safety.scan.ecosystems.python.dependencies import ( + find_version,is_supported_by_parser, parse_requirement, read_requirements, + read_dependencies, read_virtual_environment_dependencies, + get_dependencies +) +from safety_schemas.models import PythonDependency, PythonSpecification, FileType +from safety.scan.ecosystems.base import InspectableFile +from dparse import filetypes + + +class TestEcosystemsPython(unittest.TestCase): + + def test_get_closest_ver(self): + versions = ["1.0.0", "1.2.0", "2.0.0"] + spec = SpecifierSet(">=1.0.0") + version = "1.1.0" + result = get_closest_ver(versions, version, spec) + self.assertEqual(result, {'upper': parse_version("1.2.0"), 'lower': parse_version("1.0.0")}) + + + def test_is_pinned_requirement(self): + spec = SpecifierSet("==1.0.0") + self.assertTrue(is_pinned_requirement(spec)) + spec = SpecifierSet(">=1.0.0") + self.assertFalse(is_pinned_requirement(spec)) + + def test_find_version(self): + specs = [MagicMock(spec=PythonSpecification)] + specs[0].specifier = SpecifierSet("==1.0.0") + self.assertEqual(find_version(specs), "1.0.0") + + def test_is_supported_by_parser(self): + self.assertTrue(is_supported_by_parser("requirements.txt")) + self.assertFalse(is_supported_by_parser("not_supported_file.md")) + + def test_parse_requirement(self): + dep = "test_package>=1.0.0" + found = "path/to/requirements.txt" + result = parse_requirement(dep, found) + self.assertIsInstance(result, PythonSpecification) + self.assertEqual(result.found, Path(found).resolve()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/scan/ecosystems/python/test_main.py b/tests/scan/ecosystems/python/test_main.py index 35917cd..40806e8 100644 --- a/tests/scan/ecosystems/python/test_main.py +++ b/tests/scan/ecosystems/python/test_main.py @@ -1,25 +1,43 @@ import unittest -from unittest.mock import MagicMock -from safety.scan.ecosystems.python.main import should_fail, VulnerabilitySeverityLabels +from unittest.mock import MagicMock, patch +from datetime import datetime +from packaging.specifiers import SpecifierSet + +from safety.scan.ecosystems.python.main import ( + should_fail, VulnerabilitySeverityLabels, + ignore_vuln_if_needed, get_vulnerability, PythonFile +) +from safety_schemas.models import ( + ConfigModel, Vulnerability, PythonDependency, PythonSpecification, + FileType, IgnoredItems, IgnoredItemDetail, IgnoreCodes +) +from safety.models import Severity class TestMain(unittest.TestCase): def setUp(self): self.config = MagicMock() self.vulnerability = MagicMock() + self.dependency = MagicMock(spec=PythonDependency) + self.file_type = MagicMock(spec=FileType) + self.vuln_id = "vuln_id" + self.cve = MagicMock() + self.ignore_vulns = {} + self.specification = MagicMock(spec=PythonSpecification) + self.config.dependency_vulnerability = MagicMock() def test_fail_on_disabled(self): - self.config.depedendency_vulnerability.fail_on.enabled = False + self.config.dependency_vulnerability.fail_on.enabled = False result = should_fail(self.config, self.vulnerability) self.assertFalse(result) def test_severity_none(self): - self.config.depedendency_vulnerability.fail_on.enabled = True + self.config.dependency_vulnerability.fail_on.enabled = True self.vulnerability.severity = None result = should_fail(self.config, self.vulnerability) self.assertFalse(result) def test_severity_none_with_fail_on_unknow_none(self): - self.config.depedendency_vulnerability.fail_on.enabled = True + self.config.depedendency_vulnerability.fail_on.enabled = True self.vulnerability.severity = None self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.UNKNOWN] @@ -28,17 +46,17 @@ def test_severity_none_with_fail_on_unknow_none(self): self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.NONE] self.assertTrue(should_fail(self.config, self.vulnerability)) - self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.UNKNOWN, + self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.UNKNOWN, VulnerabilitySeverityLabels.NONE] self.assertTrue(should_fail(self.config, self.vulnerability)) - self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.LOW, + self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.LOW, VulnerabilitySeverityLabels.MEDIUM] self.assertFalse(should_fail(self.config, self.vulnerability)) self.vulnerability.severity = MagicMock() self.vulnerability.severity.cvssv3 = {"base_severity": "NONE"} - + self.config.depedendency_vulnerability.fail_on.cvss_severity = [VulnerabilitySeverityLabels.NONE] self.assertTrue(should_fail(self.config, self.vulnerability)) @@ -63,3 +81,64 @@ def test_unexpected_severity_with_warning(self): result = should_fail(self.config, self.vulnerability) self.assertIn("Unexpected base severity value", log.output[0]) self.assertFalse(result) + + def test_ignore_vuln_if_needed_ignore_environment(self): + self.file_type = FileType.VIRTUAL_ENVIRONMENT + ignore_vuln_if_needed( + dependency=self.dependency, file_type=self.file_type, + vuln_id=self.vuln_id, cve=self.cve, + ignore_vulns=self.ignore_vulns, ignore_unpinned=False, + ignore_environment=True, specification=self.specification + ) + self.assertIn(self.vuln_id, self.ignore_vulns) + self.assertEqual(self.ignore_vulns[self.vuln_id].code, IgnoreCodes.environment_dependency) + + + def test_python_file_init(self): + file_type = FileType.VIRTUAL_ENVIRONMENT + file = MagicMock() + python_file = PythonFile(file_type, file) + self.assertEqual(python_file.ecosystem, file_type.ecosystem) + self.assertEqual(python_file.file_type, file_type) + + + @patch('safety.scan.ecosystems.python.main.get_from_cache', return_value={}) + def test_python_file_remediate_no_db_full(self, mock_get_from_cache): + file_type = FileType.VIRTUAL_ENVIRONMENT + file = MagicMock() + python_file = PythonFile(file_type, file) + python_file.dependency_results = MagicMock() + python_file.remediate() + mock_get_from_cache.assert_called_once_with(db_name="insecure_full.json", skip_time_verification=True) + + @patch('safety.scan.ecosystems.python.main.get_from_cache') + def test_python_file_remediate_with_db_full(self, mock_get_from_cache): + mock_get_from_cache.return_value = { + 'vulnerable_packages': { + 'dependency_name': [ + { + 'type': 'pyup', + 'ids': [{'type': 'pyup', 'id': 'vuln_id'}], + 'affected_versions': ['1.0.0'] + } + ] + } + } + file_type = FileType.VIRTUAL_ENVIRONMENT + file = MagicMock() + python_file = PythonFile(file_type, file) + dependency = MagicMock(spec=PythonDependency) + dependency.name = "dependency_name" + dependency.specifications = [MagicMock(spec=PythonSpecification)] + dependency.secure_versions = ["1.0.1"] + python_file.dependency_results = MagicMock() + python_file.dependency_results.get_affected_dependencies.return_value = [dependency] + + # Mock vulnerabilities attribute + for spec in dependency.specifications: + spec.vulnerabilities = [] + + python_file.remediate() + + mock_get_from_cache.assert_called_with(db_name="insecure_full.json", skip_time_verification=True) + self.assertEqual(dependency.secure_versions, ["1.0.1"]) diff --git a/tests/scan/ecosystems/test_target.py b/tests/scan/ecosystems/test_target.py new file mode 100644 index 0000000..a91ab58 --- /dev/null +++ b/tests/scan/ecosystems/test_target.py @@ -0,0 +1,52 @@ +import unittest +from unittest.mock import MagicMock, mock_open, patch +from pathlib import Path +from typer import FileTextWrite +from safety_schemas.models import Ecosystem, FileType +from safety.scan.ecosystems.python.main import PythonFile +from safety.scan.ecosystems.target import InspectableFileContext, TargetFile + +class TestInspectableFileContext(unittest.TestCase): + def setUp(self): + self.file_path = Path("/fake/path/to/requirements.txt") + self.file_type = MagicMock(spec=FileType) + self.file_type.ecosystem = Ecosystem.PYTHON + + @patch("builtins.open", new_callable=mock_open, read_data="data") + def test_enter_success(self, mock_open): + with InspectableFileContext(self.file_path, self.file_type) as inspectable_file: + self.assertIsInstance(inspectable_file, PythonFile) + mock_open.assert_called_once_with(self.file_path, mode='r+') + + @patch("builtins.open", new_callable=mock_open) + def test_enter_failure(self, mock_open): + mock_open.side_effect = IOError("Permission denied") + with InspectableFileContext(self.file_path, self.file_type) as inspectable_file: + self.assertIsNone(inspectable_file) + mock_open.assert_called_once_with(self.file_path, mode='r+') + + @patch("builtins.open", new_callable=mock_open, read_data="data") + def test_exit(self, mock_open): + with InspectableFileContext(self.file_path, self.file_type) as inspectable_file: + pass + inspectable_file.file.close.assert_called_once() + +class TestTargetFile(unittest.TestCase): + def setUp(self): + self.file = MagicMock(spec=FileTextWrite) + self.file_type_python = MagicMock(spec=FileType) + self.file_type_python.ecosystem = Ecosystem.PYTHON + + def test_create_python_file(self): + result = TargetFile.create(file_type=self.file_type_python, file=self.file) + self.assertIsInstance(result, PythonFile) + + def test_create_unsupported_ecosystem(self): + file_type_unknown = MagicMock(spec=FileType) + file_type_unknown.ecosystem = "UNKNOWN" + file_type_unknown.value = "unsupported_value" + with self.assertRaises(ValueError): + TargetFile.create(file_type=file_type_unknown, file=self.file) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/scan/finder/test_file_finder.py b/tests/scan/finder/test_file_finder.py new file mode 100644 index 0000000..deaba21 --- /dev/null +++ b/tests/scan/finder/test_file_finder.py @@ -0,0 +1,83 @@ +import unittest +from unittest.mock import MagicMock, patch +from pathlib import Path +from safety_schemas.models import Ecosystem, FileType +from safety.scan.finder.file_finder import FileFinder, should_exclude +from safety.scan.finder.handlers import FileHandler + +class TestShouldExclude(unittest.TestCase): + def test_should_exclude_absolute(self): + excludes = {Path("/path/to/exclude")} + to_analyze = Path("/path/to/exclude/file.txt") + self.assertTrue(should_exclude(excludes, to_analyze)) + + def test_should_exclude_relative(self): + excludes = {Path("exclude")} + to_analyze = Path("exclude/file.txt").resolve() + self.assertTrue(should_exclude(excludes, to_analyze)) + + def test_should_not_exclude(self): + excludes = {Path("/path/to/exclude")} + to_analyze = Path("/path/to/include/file.txt") + self.assertFalse(should_exclude(excludes, to_analyze)) + +class TestFileFinder(unittest.TestCase): + def setUp(self): + self.max_level = 2 + self.ecosystems = [Ecosystem.PYTHON] + self.target = Path("/path/to/target") + self.console = MagicMock() + self.live_status = MagicMock() + + self.handler = MagicMock(spec=FileHandler) + self.handler.can_handle.return_value = FileType.REQUIREMENTS_TXT + self.handlers = {self.handler} + + @patch('safety.scan.finder.file_finder.os.walk') + @patch('safety.scan.finder.handlers.ECOSYSTEM_HANDLER_MAPPING', {'PYTHON': lambda: self.handler}) + def test_process_directory(self, mock_os_walk): + mock_os_walk.return_value = [ + ("/path/to/target", ["subdir"], ["file1.txt", "file2.py"]), + ("/path/to/target/subdir", [], ["file3.txt"]) + ] + + finder = FileFinder( + max_level=self.max_level, ecosystems=self.ecosystems, + target=self.target, console=self.console, + live_status=self.live_status, handlers=self.handlers + ) + + dir_path, files = finder.process_directory(self.target) + self.assertEqual(str(dir_path), str(self.target)) # Convert dir_path to string + self.assertIn(FileType.REQUIREMENTS_TXT.value, files) # Use the actual file type + self.assertEqual(len(files[FileType.REQUIREMENTS_TXT.value]), 3) # Adjust based on the actual expected filetype + + @patch('safety.scan.finder.file_finder.os.walk') + @patch('safety.scan.finder.handlers.ECOSYSTEM_HANDLER_MAPPING', {'PYTHON': lambda: self.handler}) + def test_search(self, mock_os_walk): + mock_os_walk.return_value = [ + ("/path/to/target", ["subdir"], ["file1.txt", "file2.py"]), + ("/path/to/target/subdir", [], ["file3.txt"]) + ] + + finder = FileFinder( + max_level=self.max_level, ecosystems=self.ecosystems, + target=self.target, console=self.console, + live_status=self.live_status, handlers=self.handlers + ) + + dir_path, files = finder.search() + self.assertEqual(str(dir_path), str(self.target)) # Convert dir_path to string + self.assertIn(FileType.REQUIREMENTS_TXT.value, files) # Use the actual file type + self.assertEqual(len(files[FileType.REQUIREMENTS_TXT.value]), 3) # Adjust based on the actual expected filetype + + def test_should_exclude(self): + excludes = {Path("/exclude/this")} + path_to_analyze = Path("/exclude/this/file") + self.assertTrue(should_exclude(excludes, path_to_analyze)) + + path_to_analyze = Path("/do/not/exclude/this/file") + self.assertFalse(should_exclude(excludes, path_to_analyze)) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/scan/finder/test_handlers.py b/tests/scan/finder/test_handlers.py new file mode 100644 index 0000000..1104dcb --- /dev/null +++ b/tests/scan/finder/test_handlers.py @@ -0,0 +1,64 @@ +import unittest +from unittest.mock import MagicMock, patch +from pathlib import Path +from typing import Dict, List + +from safety_schemas.models import Ecosystem, FileType +from safety.scan.finder.handlers import FileHandler, PythonFileHandler, SafetyProjectFileHandler, ECOSYSTEM_HANDLER_MAPPING + +# Concrete subclass for testing +class TestableFileHandler(FileHandler): + def download_required_assets(self, session): + return {} + +class TestFileHandler(unittest.TestCase): + + def setUp(self): + self.handler = TestableFileHandler() + self.handler.ecosystem = MagicMock(spec=Ecosystem) + self.handler.ecosystem.file_types = [FileType.REQUIREMENTS_TXT] + + def test_cannot_handle(self): + root = "/path/to" + file_name = "unknown_file.xyz" + include_files: Dict[FileType, List[Path]] = {} + result = self.handler.can_handle(root, file_name, include_files) + self.assertIsNone(result) + + def test_download_required_assets(self): + self.assertEqual(self.handler.download_required_assets(None), {}) + + +class TestPythonFileHandler(unittest.TestCase): + + def setUp(self): + self.handler = PythonFileHandler() + + @patch('safety.safety.fetch_database') + def test_download_required_assets(self, mock_fetch_database): + session = MagicMock() + self.handler.download_required_assets(session) + self.assertEqual(mock_fetch_database.call_count, 2) + + +class TestSafetyProjectFileHandler(unittest.TestCase): + + def setUp(self): + self.handler = SafetyProjectFileHandler() + + def test_download_required_assets(self): + session = MagicMock() + self.handler.download_required_assets(session) + # Since the function does nothing, we just check it runs without error + self.assertTrue(True) + + +class TestEcosystemHandlerMapping(unittest.TestCase): + + def test_mapping(self): + self.assertIsInstance(ECOSYSTEM_HANDLER_MAPPING[Ecosystem.PYTHON](), PythonFileHandler) + self.assertIsInstance(ECOSYSTEM_HANDLER_MAPPING[Ecosystem.SAFETY_PROJECT](), SafetyProjectFileHandler) + + +if __name__ == '__main__': + unittest.main()