Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CWE support in multiple importers #1526

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions vulnerabilities/importers/apache_httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
#

import logging
import re
import urllib

import requests
from bs4 import BeautifulSoup
from cwe2.database import Database
from packageurl import PackageURL
from univers.version_constraint import VersionConstraint
from univers.version_range import ApacheVersionRange
Expand All @@ -23,6 +25,7 @@
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.severity_systems import APACHE_HTTPD
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_item

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -102,11 +105,14 @@ def to_advisory(self, data):
)
)

weaknesses = get_weaknesses(data)

return AdvisoryData(
aliases=[alias],
summary=description or "",
affected_packages=affected_packages,
references=[reference],
weaknesses=weaknesses,
url=reference.url,
)

Expand Down Expand Up @@ -152,3 +158,70 @@ def fetch_links(url):
continue
links.append(urllib.parse.urljoin(url, link))
return links


def get_weaknesses(cve_data):
"""
Extract CWE IDs from CVE data.

Args:
cve_data (dict): The CVE data in a dictionary format.

Returns:
List[int]: A list of unique CWE IDs.

>>> mock_cve_data = {
... "containers": {
... "cna": {
... "providerMetadata": {
... "orgId": "f0158376-9dc2-43b6-827c-5f631a4d8d09"
... },
... "title": "mod_macro buffer over-read",
... "problemTypes": [
... {
... "descriptions": [
... {
... "description": "CWE-125 Out-of-bounds Read",
... "lang": "en",
... "cweId": "CWE-125",
... "type": "CWE"
... }
... ]
... }
... ]
... }
... }
... }
>>> get_weaknesses(mock_cve_data)
[125]
"""
problem_types = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", [])
descriptions = problem_types[0].get("descriptions", []) if len(problem_types) > 0 else []
cwe_string = descriptions[0].get("cweId", "") if len(descriptions) > 0 else ""
cwe_pattern = r"CWE-\d+"
description = descriptions[0].get("description", "") if len(descriptions) > 0 else ""
matches = re.findall(cwe_pattern, description)
db = Database()
weaknesses = []
cwe_string_from_description = ""
if matches:
cwe_string_from_description = matches[0]
if cwe_string or cwe_string_from_description:
if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
elif cwe_string_from_description:
cwe_id = get_cwe_id(cwe_string_from_description)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
ambuj-1211 marked this conversation as resolved.
Show resolved Hide resolved

seen = set()
unique_cwe = [x for x in weaknesses if not (x in seen or seen.add(x))]
return unique_cwe
32 changes: 32 additions & 0 deletions vulnerabilities/importers/debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
#

import logging
import re
from typing import Any
from typing import Iterable
from typing import List
from typing import Mapping

import requests
from cwe2.database import Database
from packageurl import PackageURL
from univers.version_range import DebianVersionRange
from univers.versions import DebianVersion
Expand All @@ -23,6 +25,7 @@
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.utils import dedupe
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_item

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -93,6 +96,7 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
yield from self.parse(pkg_name, records)

def parse(self, pkg_name: str, records: Mapping[str, Any]) -> Iterable[AdvisoryData]:

for cve_id, record in records.items():
affected_versions = []
fixed_versions = []
Expand Down Expand Up @@ -150,10 +154,38 @@ def parse(self, pkg_name: str, records: Mapping[str, Any]) -> Iterable[AdvisoryD
fixed_version=DebianVersion(fixed_version),
)
)
weaknesses = get_cwe_from_debian_advisory(record)

yield AdvisoryData(
aliases=[cve_id],
summary=record.get("description", ""),
affected_packages=affected_packages,
references=references,
weaknesses=weaknesses,
url=self.api_url,
)


def get_cwe_from_debian_advisory(record):
"""
Extracts CWE ID strings from the given raw_data and returns a list of CWE IDs.

>>> get_cwe_from_debian_advisory({"description":"PEAR HTML_QuickForm version 3.2.14 contains an eval injection (CWE-95) vulnerability in HTML_QuickForm's getSubmitValue method, HTML_QuickForm's validate method, HTML_QuickForm_hierselect's _setOptions method, HTML_QuickForm_element's _findValue method, HTML_QuickForm_element's _prepareValue method. that can result in Possible information disclosure, possible impact on data integrity and execution of arbitrary code. This attack appear to be exploitable via A specially crafted query string could be utilised, e.g. http://www.example.com/admin/add_practice_type_id[1]=fubar%27])%20OR%20die(%27OOK!%27);%20//&mode=live. This vulnerability appears to have been fixed in 3.2.15."})
[95]
>>> get_cwe_from_debian_advisory({"description":"There is no WEAKNESS DATA"})
[]
"""
description = record.get("description") or ""
pattern = r"CWE-\d+"
cwe_strings = re.findall(pattern, description)
weaknesses = []
db = Database()
for cwe_string in cwe_strings:
if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
return weaknesses
34 changes: 34 additions & 0 deletions vulnerabilities/importers/fireeye.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
from typing import Iterable
from typing import List

from cwe2.database import Database

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.utils import build_description
from vulnerabilities.utils import dedupe
from vulnerabilities.utils import get_cwe_id

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -77,10 +80,13 @@ def parse_advisory_data(raw_data, file, base_path) -> AdvisoryData:
disc_credits = md_dict.get("## Discovery Credits") # not used
disc_timeline = md_dict.get("## Disclosure Timeline") # not used
references = md_dict.get("## References") or []
cwe_data = md_dict.get("## Common Weakness Enumeration") or []

return AdvisoryData(
aliases=get_aliases(database_id, cve_ref),
summary=build_description(" ".join(summary), " ".join(description)),
references=get_references(references),
weaknesses=get_weaknesses(cwe_data),
url=advisory_url,
)

Expand Down Expand Up @@ -140,3 +146,31 @@ def md_list_to_dict(md_list):
else:
md_dict[md_key].append(md_line)
return md_dict


def get_weaknesses(cwe_data):
"""
Return the list of CWE IDs as integers from a list of weakness summaries, e.g., [379].
Extract the CWE strings from a list of weakness descriptions,
e.g., ["CWE-379: Creation of Temporary File in Directory with Insecure Permissions"], to obtain CWE IDs like CWE-379.
Remove the "CWE-" prefix from each CWE string and convert it to an integer (e.g., 379).
Then, check if the CWE ID exists in the CWE database.
"""
cwe_list = []
for line in cwe_data:
cwe_ids = re.findall(r"CWE-\d+", line)
cwe_list.extend(cwe_ids)

weaknesses = []
db = Database()

for cwe_string in cwe_list:

if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
return weaknesses
38 changes: 37 additions & 1 deletion vulnerabilities/tests/test_debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import os
import re
from unittest.mock import patch

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importers.debian import DebianImporter
from vulnerabilities.importers.debian import get_cwe_from_debian_advisory
from vulnerabilities.improvers.default import DefaultImprover
from vulnerabilities.improvers.valid_versions import DebianBasicImprover
from vulnerabilities.tests import util_tests
Expand Down Expand Up @@ -55,3 +56,38 @@ def test_debian_improver(mock_response):
result.extend(inference)
expected_file = os.path.join(TEST_DATA, f"debian-improver-expected.json")
util_tests.check_results_against_json(result, expected_file)


def test_get_cwe_from_debian_advisories():
record = {
"description": "Legion of the Bouncy Castle Legion of the Bouncy Castle Java Cryptography APIs 1.58 up to but not including 1.60 contains a CWE-580: Use of Externally-Controlled Input to Select Classes or Code ('Unsafe Reflection') vulnerability in XMSS/XMSS^MT private key deserialization that can result in Deserializing an XMSS/XMSS^MT private key can result in the execution of unexpected code. This attack appear to be exploitable via A handcrafted private key can include references to unexpected classes which will be picked up from the class path for the executing application. This vulnerability appears to have been fixed in 1.60 and later.",
"scope": "local",
"releases": {
"bookworm": {
"status": "resolved",
"repositories": {"bookworm": "1.72-2"},
"fixed_version": "1.60-1",
"urgency": "low",
},
"bullseye": {
"status": "resolved",
"repositories": {"bullseye": "1.68-2"},
"fixed_version": "1.60-1",
"urgency": "low",
},
"sid": {
"status": "resolved",
"repositories": {"sid": "1.77-1"},
"fixed_version": "1.60-1",
"urgency": "low",
},
"trixie": {
"status": "resolved",
"repositories": {"trixie": "1.77-1"},
"fixed_version": "1.60-1",
"urgency": "low",
},
},
}
result = get_cwe_from_debian_advisory(record)
assert result == [580]
18 changes: 18 additions & 0 deletions vulnerabilities/tests/test_fireeye.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from vulnerabilities.importer import Reference
from vulnerabilities.importers.fireeye import get_aliases
from vulnerabilities.importers.fireeye import get_references
from vulnerabilities.importers.fireeye import get_weaknesses
from vulnerabilities.importers.fireeye import md_list_to_dict
from vulnerabilities.importers.fireeye import parse_advisory_data
from vulnerabilities.tests import util_tests
Expand Down Expand Up @@ -217,3 +218,20 @@ def test_md_list_to_dict_2(self):
md_list = f.readlines()
md_dict = md_list_to_dict(md_list)
assert md_dict == expected_output

def test_get_weaknesses(self):
assert get_weaknesses(
[
"CWE-379: Creation of Temporary File in Directory with Insecure Permissions",
"CWE-362: Concurrent Execution using Shared Resource with Improper Synchronization ('Race Condition')",
]
) == [379, 362]

assert (
get_weaknesses(
[
"CWE-2345: This cwe id does not exist so it should generate Invalid CWE id error and return empty list."
]
)
== []
)
Loading