From c7947653bd80cacca8486c56036a3840586dfa09 Mon Sep 17 00:00:00 2001 From: Meet Soni <92802561+inosmeet@users.noreply.github.com> Date: Wed, 19 Jun 2024 03:30:50 +0530 Subject: [PATCH] refactor: modified language parsers for purl2cpe support (#4188) changed all the language parsers according to the purl2cpe database and made the database query universal for all parsers removed 'slf4j-simple' and 'slf4j-api' products from fail_pom.xml file as they can be found now with purl2cpe Signed-off-by: Meet Soni --- cve_bin_tool/parsers/__init__.py | 70 +++++++++++++++++------------- cve_bin_tool/parsers/dart.py | 10 +++-- cve_bin_tool/parsers/go.py | 11 ++--- cve_bin_tool/parsers/java.py | 20 ++++++--- cve_bin_tool/parsers/javascript.py | 15 +++++-- cve_bin_tool/parsers/perl.py | 11 +++-- cve_bin_tool/parsers/php.py | 14 ++++-- cve_bin_tool/parsers/python.py | 8 ++-- cve_bin_tool/parsers/r.py | 10 +++-- cve_bin_tool/parsers/ruby.py | 15 ++++--- cve_bin_tool/parsers/rust.py | 13 +++--- cve_bin_tool/parsers/swift.py | 10 +++-- test/language_data/fail_pom.xml | 13 ------ test/test_language_scanner.py | 7 ++- 14 files changed, 137 insertions(+), 90 deletions(-) diff --git a/cve_bin_tool/parsers/__init__.py b/cve_bin_tool/parsers/__init__.py index 843b210fb1..88cb7332d5 100644 --- a/cve_bin_tool/parsers/__init__.py +++ b/cve_bin_tool/parsers/__init__.py @@ -85,7 +85,7 @@ def find_vendor(self, product, version): ) return vendorlist - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generate purl string based on various components.""" purl = PackageURL( type=self.purl_pkg_type, @@ -104,36 +104,48 @@ def find_vendor_from_purl(self, purl, ver) -> Tuple[List[ScanInfo], bool]: It then decodes the CPE data to extract vendor, product, and version information. If the version matches the provided version, it constructs a ScanInfo object for each matching entry and returns a list of these objects. """ - - query = "SELECT cpe from purl2cpe WHERE purl=?" - cursor = self.db_open_and_get_cursor() - cursor.execute(query, [str(purl)]) - cpeList = cursor.fetchall() - vendorlist: list[ScanInfo] = [] - vendors = set() - - if cpeList != []: - for item in cpeList: - vendor, product, version = self.decode_cpe23(str(item)) - vendors.add((vendor, product)) - else: - return vendorlist, False - purl_with_ver = f"{str(purl)}@{ver}" - for vendor, product in vendors: - vendorlist.append( - ScanInfo( - ProductInfo( - vendor, - product, - ver, - "/usr/local/bin/product", - purl=purl_with_ver, - ), - self.filename, + try: + purl = purl.to_dict() + param1 = f"pkg:{purl['type']}/{purl['name']}" + param2 = f"pkg:{purl['type']}/%/{purl['name']}" + + query = """ + SELECT cpe from purl2cpe WHERE purl LIKE ? + UNION + SELECT cpe from purl2cpe WHERE purl LIKE ? + """ + cursor = self.db_open_and_get_cursor() + cursor.execute(query, (param1, param2)) + cpeList = cursor.fetchall() + vendorlist: list[ScanInfo] = [] + vendors = set() + + if cpeList != []: + for item in cpeList: + vendor, _, _ = self.decode_cpe23(str(item)) + vendors.add((vendor, purl["name"])) + else: + return vendorlist, False + + purl_with_ver = f"{str(purl)}@{ver}" + for vendor, product in vendors: + vendorlist.append( + ScanInfo( + ProductInfo( + vendor, + product, + ver, + "/usr/local/bin/product", + purl_with_ver, + ), + self.filename, + ) ) - ) - return vendorlist, True + return vendorlist, True + except Exception as e: + self.logger.error(f"Error occurred: {e}") + return [], False def db_open_and_get_cursor(self) -> sqlite3.Cursor: """Opens connection to sqlite database, returns cursor object.""" diff --git a/cve_bin_tool/parsers/dart.py b/cve_bin_tool/parsers/dart.py index b84847395a..41e7b0b077 100644 --- a/cve_bin_tool/parsers/dart.py +++ b/cve_bin_tool/parsers/dart.py @@ -19,15 +19,14 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "pub" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """ Generates PURL after normalizing all components. pubspec: https://dart.dev/tools/pub/pubspec#name purl-spec for pub: https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst#pub """ - # Normalize product and vendor for Dart packages + # Normalize product for Dart packages product = re.sub(r"[^a-zA-Z0-9_]", "", product).lower() - vendor = "UNKNOWN" # The vendor is not explicitly defined for pub packages if not product: return None @@ -50,7 +49,10 @@ def run_checker(self, filename): for package_name, package_detail in data.get("packages", {}).items(): product = package_name version = package_detail.get("version").replace('"', "") - vendor = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendor, result = self.find_vendor_from_purl(purl, version) + if not result: + vendor = self.find_vendor(product, version) if vendor: yield from vendor self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/go.py b/cve_bin_tool/parsers/go.py index d94751a725..6dbf3d4542 100644 --- a/cve_bin_tool/parsers/go.py +++ b/cve_bin_tool/parsers/go.py @@ -29,16 +29,13 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "golang" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9_-]", "", product) - vendor = re.sub(r"^[^a-zA-Z_]|[^a-zA-Z0-9_-]", "", vendor) if not re.match(r"^[a-zA-Z0-9_-]", product): return - if vendor == "": - vendor = "UNKNOWN" purl = super().generate_purl( product, @@ -73,7 +70,11 @@ def run_checker(self, filename): if len(parts) >= 2: product = line.split(" ")[0].split("/")[-1] version = line.split(" ")[1][1:].split("-")[0].split("+")[0] - vendors = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendors, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendors = self.find_vendor(product, version) if vendors is not None: yield from vendors self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/java.py b/cve_bin_tool/parsers/java.py index 9677c49b3c..81ab59ea15 100644 --- a/cve_bin_tool/parsers/java.py +++ b/cve_bin_tool/parsers/java.py @@ -1,3 +1,4 @@ +# NOTE: DONE # Copyright (C) 2022 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later """Script containing all functionalities relating to parsing of Java-based files.""" @@ -18,11 +19,10 @@ def __init__(self, cve_db, logger, validate=True): self.validate = validate self.purl_pkg_type = "maven" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components of a Maven package.""" - # Normalize product and vendor + # Normalize product product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() - vendor = re.sub(r"[^a-zA-Z0-9._-]", "", vendor).lower() if vendor else "UNKNOWN" if not product: return None @@ -97,7 +97,10 @@ def run_checker(self, filename): if product is None and parent is not None: product = parent.find(schema + "artifactId").text if product is not None and version is not None: - product_info = self.find_vendor(product, version) + purl = self.generate_purl(product) + product_info, result = self.find_vendor_from_purl(purl, version) + if not result: + product_info = self.find_vendor(product, version) if product_info is not None: yield from product_info @@ -130,7 +133,14 @@ def run_checker(self, filename): self.logger.debug(f"{file_path} {product.text} {version}") if version[0].isdigit(): # Valid version identifier - product_info = self.find_vendor(product.text, version) + purl = self.generate_purl(product.text) + product_info, result = self.find_vendor_from_purl( + purl, version + ) + if not result: + product_info = self.find_vendor( + product.text, version + ) if product_info is not None: yield from product_info self.logger.debug(f"Done scanning file: {filename}") diff --git a/cve_bin_tool/parsers/javascript.py b/cve_bin_tool/parsers/javascript.py index 023eddc8d3..6f58230315 100644 --- a/cve_bin_tool/parsers/javascript.py +++ b/cve_bin_tool/parsers/javascript.py @@ -15,10 +15,9 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "npm" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() - vendor = "UNKNOWN" # Typically, the vendor is not explicitly defined for npm packages if not product: return None @@ -44,7 +43,11 @@ def run_checker(self, filename): if "name" in data and "version" in data: product = data["name"] version = data["version"] - vendor = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendor, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendor = self.find_vendor(product, version) else: vendor = None if vendor is not None: @@ -93,7 +96,11 @@ def run_checker(self, filename): product_version_mapping.append((product, version)) for product, version in product_version_mapping: - vendor = self.find_vendor(product, version) + purl = self.generate_purl(product, "") + vendor, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendor = self.find_vendor(product, version) if vendor is not None: yield from vendor self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/perl.py b/cve_bin_tool/parsers/perl.py index 90d6160cf8..566e0fcdc9 100644 --- a/cve_bin_tool/parsers/perl.py +++ b/cve_bin_tool/parsers/perl.py @@ -13,11 +13,10 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "cpan" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" # Normalize product and vendor for Perl packages product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() - vendor = "UNKNOWN" # Typically, the vendor is not explicitly defined for CPAN packages if not product: return None @@ -53,7 +52,13 @@ def run_checker(self, filename): # Print the extracted dependencies for dependency in dependencies: - vendor = self.find_vendor(dependency[0], dependency[1]) + product = dependency[0] + version = dependency[1] + purl = self.generate_purl(product) + vendor, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendor = self.find_vendor(product, version) if vendor is not None: yield from vendor self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/php.py b/cve_bin_tool/parsers/php.py index a486d4e5ce..c39f8ff644 100644 --- a/cve_bin_tool/parsers/php.py +++ b/cve_bin_tool/parsers/php.py @@ -1,3 +1,6 @@ +# NOTE: remains not complete + + # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: GPL-3.0-or-later """Python script containing all functionalities related to parsing of php's composer.lock files.""" @@ -19,12 +22,11 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "composer" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" - vendor = re.sub(r"[^a-zA-Z0-9._-]", "", vendor).lower() product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() - if not vendor or not product: + if not product: return None purl = super().generate_purl( @@ -51,7 +53,11 @@ def run_checker(self, filename): version = version[1:] if "dev" in version: continue - vendor = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendor, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendor = self.find_vendor(product, version) if vendor is not None: yield from vendor self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/python.py b/cve_bin_tool/parsers/python.py index 534cb55285..a403389bc7 100644 --- a/cve_bin_tool/parsers/python.py +++ b/cve_bin_tool/parsers/python.py @@ -25,7 +25,7 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "pypi" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() @@ -96,7 +96,7 @@ def run_checker(self, filename): for line in lines["install"]: product = line["metadata"]["name"] version = line["metadata"]["version"] - purl = self.generate_purl(product, "") + purl = self.generate_purl(product) vendor, result = self.find_vendor_from_purl(purl, version) if not result: @@ -119,7 +119,7 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "pypi" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() @@ -147,7 +147,7 @@ def run_checker(self, filename): try: product = search(compile(r"^Name: (.+)$", MULTILINE), lines).group(1) version = search(compile(r"^Version: (.+)$", MULTILINE), lines).group(1) - purl = self.generate_purl(product, "") + purl = self.generate_purl(product) vendor, result = self.find_vendor_from_purl(purl, version) if vendor is not None: diff --git a/cve_bin_tool/parsers/r.py b/cve_bin_tool/parsers/r.py index cbaac0d548..a3e5da6d9a 100644 --- a/cve_bin_tool/parsers/r.py +++ b/cve_bin_tool/parsers/r.py @@ -30,11 +30,10 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "cran" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9.-]", "", product) - vendor = "UNKNOWN" if not re.match(r"^[a-zA-Z0-9_-]", product): return @@ -57,7 +56,12 @@ def run_checker(self, filename): for package in content["Packages"]: product = content["Packages"][package]["Package"] version = content["Packages"][package]["Version"] - vendor = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendor, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendor = self.find_vendor(product, version) + if vendor is not None: yield from vendor self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/ruby.py b/cve_bin_tool/parsers/ruby.py index b59b53dd2c..e904b9a638 100644 --- a/cve_bin_tool/parsers/ruby.py +++ b/cve_bin_tool/parsers/ruby.py @@ -1,4 +1,6 @@ -# Copyright (C) 2022 Intel Corporation +# Copyright (C) 2024 Intel Corporation + + # SPDX-License-Identifier: GPL-3.0-or-later import re @@ -29,16 +31,13 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "gem" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"^[^a-z]|[^a-z0-9_-]", "", product) - vendor = re.sub(r"^[^a-z]|[^a-z0-9_-]", "", vendor) if not re.match(r"^[a-z]|[a-z0-9_-]", product): return - if vendor == "": - vendor = "UNKNOWN" purl = super().generate_purl( product, @@ -69,7 +68,11 @@ def run_checker(self, filename): ): product = line.strip().split()[0] version = line.strip().split("(")[1][:-1] - vendors = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendors, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendors = self.find_vendor(product, version) if vendors is not None: yield from vendors self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/cve_bin_tool/parsers/rust.py b/cve_bin_tool/parsers/rust.py index 46e1c00fd1..f7b7e25a97 100644 --- a/cve_bin_tool/parsers/rust.py +++ b/cve_bin_tool/parsers/rust.py @@ -28,16 +28,13 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "cargo" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"^[^a-zA-Z_]|[^a-zA-Z0-9_-]", "", product) - vendor = re.sub(r"^[^a-zA-Z_]|[^a-zA-Z0-9_-]", "", vendor) if not re.match(r"^[a-zA-Z_]|[a-zA-Z0-9_-]", product): return - if vendor == "": - vendor = "UNKNOWN" purl = super().generate_purl( product, @@ -63,7 +60,13 @@ def run_checker(self, filename): else: if product == "" and version == "": continue - vendors = self.find_vendor(product, version) + + purl = self.generate_purl(product) + vendors, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendors = self.find_vendor(product, version) + if vendors is not None: yield from vendors product = "" diff --git a/cve_bin_tool/parsers/swift.py b/cve_bin_tool/parsers/swift.py index 3e3b8e130f..c983826a4a 100644 --- a/cve_bin_tool/parsers/swift.py +++ b/cve_bin_tool/parsers/swift.py @@ -32,15 +32,13 @@ def __init__(self, cve_db, logger): super().__init__(cve_db, logger) self.purl_pkg_type = "swift" - def generate_purl(self, product, vendor, qualifier={}, subpath=None): + def generate_purl(self, product, vendor="", qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9_-]", "", product) if not re.match(r"[a-zA-Z0-9_-]", product): return - if not vendor: - vendor = "UNKNOWN" purl = super().generate_purl( product, @@ -71,7 +69,11 @@ def run_checker(self, filename): domain = parse.netloc self.logger.debug(domain) - vendors = self.find_vendor(product, version) + purl = self.generate_purl(product) + vendors, result = self.find_vendor_from_purl(purl, version) + + if not result: + vendors = self.find_vendor(product, version) if vendors is not None: yield from vendors self.logger.debug(f"Done scanning file: {self.filename}") diff --git a/test/language_data/fail_pom.xml b/test/language_data/fail_pom.xml index 42ff70065a..e6b7c4387f 100644 --- a/test/language_data/fail_pom.xml +++ b/test/language_data/fail_pom.xml @@ -22,7 +22,6 @@ 5.8.2 4.2.0 UTF-8 - 2.0.0-alpha5 @@ -80,18 +79,6 @@ ${mockito.version} test - - org.slf4j - slf4j-api - ${slf4j.version} - test - - - org.slf4j - slf4j-simple - ${slf4j.version} - test - diff --git a/test/test_language_scanner.py b/test/test_language_scanner.py index d30dfb325e..b1ce977f48 100644 --- a/test/test_language_scanner.py +++ b/test/test_language_scanner.py @@ -169,7 +169,12 @@ def setup_class(cls): @pytest.mark.parametrize( "filename, product_list", - (((str(TEST_FILE_PATH / "pom.xml")), ["jmeter", "hamcrest"]),), + ( + ( + (str(TEST_FILE_PATH / "pom.xml")), + ["jmeter", "hamcrest", "slf4j-simple", "slf4j-api"], + ), + ), ) def test_java_package(self, filename: str, product_list: set[str]) -> None: """Test against a valid pom.xml file for Java packages"""