Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rockylinux advisories #1535

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from vulnerabilities.pipelines import nvd_importer
from vulnerabilities.pipelines import pypa_importer
from vulnerabilities.pipelines import pysec_importer
from vulnerabilities.pipelines import rockylinux_importer

IMPORTERS_REGISTRY = [
alpine_linux.AlpineImporter,
Expand Down Expand Up @@ -77,6 +78,7 @@
gitlab_importer.GitLabImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
nvd_importer.NVDImporterPipeline,
rockylinux_importer.RockylinuxImporterPipeline,
pysec_importer.PyPIImporterPipeline,
]

Expand Down
1 change: 1 addition & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
valid_versions.RubyImprover,
valid_versions.GithubOSVImprover,
vulnerability_status.VulnerabilityStatusImprover,
valid_versions.RockyLinuxImprover,
valid_versions.CurlImprover,
flag_ghost_packages.FlagGhostPackagePipeline,
enhance_with_kev.VulnerabilityKevPipeline,
Expand Down
6 changes: 6 additions & 0 deletions vulnerabilities/improvers/valid_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from vulnerabilities.pipelines.gitlab_importer import GitLabImporterPipeline
from vulnerabilities.pipelines.nginx_importer import NginxImporterPipeline
from vulnerabilities.pipelines.npm_importer import NpmImporterPipeline
from vulnerabilities.pipelines.rockylinux_importer import RockylinuxImporterPipeline
from vulnerabilities.utils import AffectedPackage as LegacyAffectedPackage
from vulnerabilities.utils import clean_nginx_git_tag
from vulnerabilities.utils import get_affected_packages_by_patched_package
Expand Down Expand Up @@ -478,6 +479,11 @@ class GithubOSVImprover(ValidVersionImprover):
ignorable_versions = []


class RockyLinuxImprover(ValidVersionImprover):
importer = RockylinuxImporterPipeline
ignorable_versions = []


class CurlImprover(ValidVersionImprover):
importer = CurlImporter
ignorable_versions = []
277 changes: 277 additions & 0 deletions vulnerabilities/pipelines/rockylinux_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
import re
from typing import Dict
from typing import Iterable
from typing import List

import requests
from cwe2.database import Database
from dateutil import parser as dateparser
from packageurl import PackageURL
from univers.version_range import RpmVersionRange

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
from vulnerabilities.rpm_utils import rpm_to_purl
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import requests_with_5xx_retry

logger = logging.getLogger(__name__)
requests_session = requests_with_5xx_retry(max_retries=5, backoff_factor=1)


class RockylinuxImporterPipeline(VulnerableCodeBaseImporterPipeline):
pipeline_id = "rockylinux_importer"

spdx_license_expression = "CC-BY-4.0"
license_url = "https://access.redhat.com/security/data"
importer_name = "Rockylinux Importer"

@classmethod
def steps(cls):
return (cls.fetch, cls.collect_and_store_advisories, cls.import_new_advisories)

def fetch(self) -> Iterable[List[Dict]]:
page_no = 0
self.advisory_data = []

while True:
current_url = f"https://errata.rockylinux.org/api/v2/advisories?filters.product=&filters.fetchRelated=true&page={page_no}&limit=100"
try:
response = requests_session.get(current_url)
if response.status_code != requests.codes.ok:
logger.error(f"Failed to fetch RedHat CVE results from {current_url}")
break
cve_data = response.json().get("advisories") or []
self.advisory_data.extend(cve_data)
except Exception as e:
logger.error(f"Failed to fetch rockylinux CVE results from {current_url} {e}")
break
if not cve_data:
break
page_no += 1

def advisories_count(self):
return len(self.advisory_data)

def collect_advisories(self) -> Iterable[AdvisoryData]:
for rl_advisory in self.advisory_data:
yield to_advisory(rl_advisory)


class VersionParsingError(Exception):
pass


def to_advisory(advisory_data) -> AdvisoryData:

"""
Convert Rockylinux advisory data into an AdvisoryData object.

Args:
advisory_data (dict): A dictionary containing advisory information.

Returns:
AdvisoryData: An instance of AdvisoryData with processed information.

Example:
>>> advisory_data = {
... "name": "RLSA-2021:4364",
... "publishedAt": "2021-11-09T09:11:20Z",
... "description": "The binutils packages provide a collection of binary utilities for the manipulation",
... "affectedProducts": ["Rocky Linux 8"],
... "rpms": {
... "Rocky Linux 8": {
... "nvras": [
... "gfs2-utils-0:3.2.0-11.el8.aarch64.rpm",
... "gfs2-utils-0:3.2.0-11.el8.src.rpm",
... "gfs2-utils-0:3.2.0-11.el8.x86_64.rpm",
... "gfs2-utils-debuginfo-0:3.2.0-11.el8.aarch64.rpm",
... "gfs2-utils-debuginfo-0:3.2.0-11.el8.x86_64.rpm",
... "gfs2-utils-debugsource-0:3.2.0-11.el8.aarch64.rpm",
... "gfs2-utils-debugsource-0:3.2.0-11.el8.x86_64.rpm"
... ]
... }
... },
... "fixes": [
... {
... "ticket": "1942434",
... "sourceBy": "Red Hat",
... "sourceLink": "https://bugzilla.redhat.com/show_bug.cgi?id=1942434",
... "description": ""
... }
... ],
... "cves": [
... {
... "name": "CVE-2021-3487",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-3487",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:N/I:N/A:H",
... "cvss3BaseScore": "6.5",
... "cwe": "CWE-20->CWE-400"
... }
... ]
... }
>>> advisory = to_advisory(advisory_data)
>>> advisory.aliases
'RLSA-2021:4364'
>>> advisory.date_published.year
2021
>>> len(advisory.affected_packages)
7
>>> len(advisory.references)
2
>>> advisory.weaknesses
[400, 20]
"""

aliases = advisory_data.get("name") or ""
date_published = dateparser.parse(advisory_data.get("publishedAt", ""))

summary = advisory_data.get("description") or ""
affected_products = advisory_data.get("affectedProducts") or []
affected_packages = []
for products in affected_products:
rpms = advisory_data.get("rpms", {})
packages = rpms.get(products, {}).get("nvras", [])
affected_packages.extend(packages)
processed_affected_packages: List[AffectedPackage] = []
for rpm in affected_packages:
purl = rpm_to_purl(rpm_string=rpm.rsplit(".rpm", 1)[0] or "", namespace="rocky-linux")
if purl:
try:
affected_version_range = RpmVersionRange.from_versions(sequence=[purl.version])
processed_affected_packages.append(
AffectedPackage(
package=PackageURL(
type=purl.type,
name=purl.name,
namespace=purl.namespace,
qualifiers=purl.qualifiers,
subpath=purl.subpath,
),
affected_version_range=affected_version_range,
fixed_version=None,
)
)
except VersionParsingError as e:
logger.error(f"Failed to parse version {purl.version} for {purl} {e}")

references = [
Reference(
severities=[], url=fix.get("sourceLink") or "", reference_id=fix.get("ticket") or ""
)
for fix in advisory_data["fixes"]
]

for ref in advisory_data.get("cves", []):

name = ref.get("name", "")
if not isinstance(name, str):
logger.error(f"Invalid advisory type {name}")
continue

if ref.get("sourceLink", ""):
name_upper = name.upper()
cvss_vector = ref.get("cvss3ScoringVector", "")

if "CVE" in name_upper and cvss_vector != "UNKNOWN":
base_score = ref.get("cvss3BaseScore", "")

if base_score and cvss_vector:
severities = [
VulnerabilitySeverity(
system=severity_systems.CVSSV31,
value=base_score,
scoring_elements=cvss_vector,
)
]
else:
severities = []

references.append(
Reference(
severities=severities, url=ref.get("sourceLink", ""), reference_id=name
)
)

return AdvisoryData(
aliases=aliases,
summary=summary,
affected_packages=processed_affected_packages,
references=references,
date_published=date_published,
weaknesses=get_cwes_from_rockylinux_advisory(advisory_data),
url=f"https://errata.rockylinux.org/{aliases}",
)


def get_cwes_from_rockylinux_advisory(advisory_data) -> [int]:
"""
Extract CWE IDs from advisory data and validate them against a database.

:param advisory_data: Dictionary containing CVE information.
:return: List of valid CWE IDs.

>>> advisory_data = {"cves": [
... {
... "name": "CVE-2022-24999",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-24999",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
... "cvss3BaseScore": "7.5",
... "cwe": "CWE-1321"
... },
... {
... "name": "CVE-2022-3517",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-3517",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
... "cvss3BaseScore": "7.5",
... "cwe": "CWE-400"
... },
... {
... "name": "CVE-2022-43548",
... "sourceBy": "MITRE",
... "sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-43548",
... "cvss3ScoringVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:N",
... "cvss3BaseScore": "7.5",
... "cwe": "CWE-20 -> CWE-400"
... }
... ]}
>>> get_cwes_from_rockylinux_advisory(advisory_data)
[400, 1321, 20]
>>> get_cwes_from_rockylinux_advisory({"cves": [{"name": "CVE-1234-1234","cwe": "None"}]})
[]
"""

cwe_ids = []
for cve in advisory_data.get("cves", []):
cwe_pattern = r"CWE-\d+"
cwe_id_list = re.findall(cwe_pattern, cve.get("cwe", ""))
cwe_ids.extend(cwe_id_list)
weaknesses = []
db = Database()
for cwe_string in cwe_ids:
if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except ValueError:
logger.error("Invalid CWE id")
unique_set = set(weaknesses)
return list(unique_set)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch

from packageurl import PackageURL

from vulnerabilities.pipelines.rockylinux_importer import get_cwes_from_rockylinux_advisory
from vulnerabilities.pipelines.rockylinux_importer import to_advisory
from vulnerabilities.rpm_utils import rpm_to_purl
from vulnerabilities.utils import load_json

TEST_DATA = Path(__file__).parent.parent / "test_data" / "rockylinux"


class TestRockylinuxImporterPipeline(TestCase):
def test_to_advisory1(self):
test1 = os.path.join(TEST_DATA, "rockylinux_test1.json")
mock_response = load_json(test1)
expected_result = load_json(os.path.join(TEST_DATA, "rockylinux_expected1.json"))
# print(f"1st is {to_advisory(mock_response).to_dict()}")
assert to_advisory(mock_response).to_dict() == expected_result

def test_to_advisory2(self):
test2 = os.path.join(TEST_DATA, "rockylinux_test2.json")
mock_response2 = load_json(test2)
expected_result2 = load_json(os.path.join(TEST_DATA, "rockylinux_expected2.json"))
assert to_advisory(mock_response2).to_dict() == expected_result2
# print(f"2nd is {to_advisory(mock_response2).to_dict()}")

def test_rpm_to_purl(self):
assert rpm_to_purl("foobar", "rocky-linux") is None
assert rpm_to_purl("foo-bar-devel-0:sys76", "rocky-linux") is None
assert rpm_to_purl("cockpit-0:264.1-1.el8.aarch64", "rocky-linux") == PackageURL(
type="rpm",
namespace="rocky-linux",
name="cockpit",
version="264.1-1.el8",
qualifiers={"arch": "aarch64"},
)

def test_get_cwes_from_rockylinux_advisory(self):
advisory_data = {
"cves": [
{
"name": "CVE-2022-3140",
"sourceBy": "MITRE",
"sourceLink": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-3140",
"cvss3ScoringVector": "CVSS:3.1/AV:L/AC:L/PR:N/UI:R/S:U/C:L/I:L/A:L",
"cvss3BaseScore": "5.3",
"cwe": "CWE-88->CWE-20",
}
]
}
assert get_cwes_from_rockylinux_advisory(advisory_data=advisory_data) == [88, 20]
Loading
Loading