Skip to content

Commit

Permalink
fix: make EPSS behave like other data sources (#4125)
Browse files Browse the repository at this point in the history
This will make it so that -d EPSS will actually disable the EPSS data source, and should make it fail more gracefully when the source is not working for any reason.

Signed-off-by: Terri Oda <[email protected]>
  • Loading branch information
terriko authored Jun 26, 2024
1 parent 20a5574 commit 2941eef
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 36 deletions.
5 changes: 5 additions & 0 deletions cve_bin_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from cve_bin_tool.data_sources import (
DataSourceSupport,
curl_source,
epss_source,
gad_source,
nvd_source,
osv_source,
Expand Down Expand Up @@ -722,6 +723,10 @@ def main(argv=None):
source_purl2cpe = purl2cpe_source.PURL2CPE_Source()
enabled_sources.append(source_purl2cpe)

if "EPSS" not in disabled_sources:
source_epss = epss_source.Epss_Source()
enabled_sources.append(source_epss)

if "NVD" not in disabled_sources:
source_nvd = nvd_source.NVD_Source(
nvd_type=nvd_type,
Expand Down
53 changes: 24 additions & 29 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class CVEDB:
LOGGER = LOGGER.getChild("CVEDB")
SOURCES = [
curl_source.Curl_Source,
epss_source.Epss_Source,
osv_source.OSV_Source,
gad_source.GAD_Source,
purl2cpe_source.PURL2CPE_Source,
Expand Down Expand Up @@ -229,8 +230,6 @@ def __init__(
self.cve_count = -1
self.all_cve_entries: list[dict[str, Any]] | None = None

self.epss_data = None

self.exploits_list: list[Any] = []
self.exploit_count = 0

Expand Down Expand Up @@ -520,11 +519,8 @@ def populate_db(self) -> None:
"""

self.populate_metrics()

# EPSS uses metrics table to get the EPSS metric id.
# It can't be run before creation of metrics table.
self.populate_epss()
self.store_epss_data()
self.populate_purl2cpe()

for idx, data in enumerate(self.data):
Expand All @@ -538,22 +534,29 @@ def populate_db(self) -> None:
# if source_name != "NVD" and cve_data[0] is not None:
# cve_data = self.update_vendors(cve_data)

severity_data, affected_data = cve_data
if source_name == "EPSS":
if cve_data is not None:
self.store_epss_data(cve_data)

cursor = self.db_open_and_get_cursor()
else:
severity_data, affected_data = cve_data

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(severity_data, cursor, data_source=source_name)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
cursor,
data_source=source_name,
)
if self.connection is not None:
self.connection.commit()
self.db_close()
cursor = self.db_open_and_get_cursor()

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(
severity_data, cursor, data_source=source_name
)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
cursor,
data_source=source_name,
)
if self.connection is not None:
self.connection.commit()
self.db_close()

def populate_severity(self, severity_data, cursor, data_source):
"""Populate the database with CVE severities."""
Expand Down Expand Up @@ -671,14 +674,6 @@ def populate_metrics(self):
self.connection.commit()
self.db_close()

def populate_epss(self):
"""Exploit Prediction Scoring System (EPSS) data to help users evaluate risks
Add EPSS data into the database"""
epss = epss_source.Epss_Source()
cursor = self.db_open_and_get_cursor()
self.epss_data = run_coroutine(epss.update_epss(cursor))
self.db_close()

def metric_finder(self, cursor, cve):
"""
SQL query to retrieve the metrics_name based on the metrics_id
Expand Down Expand Up @@ -908,11 +903,11 @@ def populate_exploit_db(self, exploits):
self.connection.commit()
self.db_close()

def store_epss_data(self):
def store_epss_data(self, epss_data):
"""Insert Exploit Prediction Scoring System (EPSS) data into database."""
insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]
cursor = self.db_open_and_get_cursor()
cursor.executemany(insert_cve_metrics, self.epss_data)
cursor.executemany(insert_cve_metrics, epss_data)
self.connection.commit()
self.db_close()

Expand Down
32 changes: 26 additions & 6 deletions cve_bin_tool/data_sources/epss_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

import csv
Expand All @@ -18,7 +21,9 @@


class Epss_Source:
SOURCE = "Epss"
"""Data source for downloading and storing epss data"""

SOURCE = "EPSS"
CACHEDIR = DISK_LOCATION_DEFAULT
BACKUPCACHEDIR = DISK_LOCATION_BACKUP
LOGGER = logging.getLogger().getChild("CVEDB")
Expand All @@ -32,6 +37,7 @@ def __init__(self, error_mode=ErrorMode.TruncTrace):
self.epss_path = str(Path(self.cachedir) / "epss")
self.file_name = os.path.join(self.epss_path, "epss_scores-current.csv")
self.epss_metric_id = None
self.source_name = self.SOURCE

async def update_epss(self, cursor):
"""
Expand All @@ -43,14 +49,12 @@ async def update_epss(self, cursor):
- EPSS score
- EPSS percentile
"""
self.EPSS_id_finder(cursor)
await self.download_and_parse_epss()
return self.epss_data
self.LOGGER.debug("Fetching EPSS data...")

async def download_and_parse_epss(self):
"""Downloads and parses the EPSS data from the CSV file."""
self.EPSS_id_finder(cursor)
await self.download_epss_data()
self.epss_data = self.parse_epss_data()
return self.epss_data

async def download_epss_data(self):
"""Downloads the EPSS CSV file and saves it to the local filesystem.
Expand Down Expand Up @@ -138,3 +142,19 @@ def parse_epss_data(self, file_path=None):
(cve_id, self.epss_metric_id, epss_score, epss_percentile)
)
return parsed_data

async def get_cve_data(self):
"""Gets EPSS data.
This function is so that the epss source matches the others api-wise to make for
easier disabling/enabling.
returns (data, "EPSS") so that the source can be identified for storing data
"""

try:
await self.update_epss()
except Exception as e:
self.LOGGER.debug(f"Error while fetching EPSS data: {e}")
self.LOGGER.error("Unable to fetch EPSS, skipping EPSS.")

return self.epss_data, self.SOURCE
2 changes: 1 addition & 1 deletion cve_bin_tool/data_sources/nvd_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def format_data_api2(self, all_cve_entries):
cvss_data = cve_cvss["cvssMetricV2"][0]["cvssData"]
cve["CVSS_version"] = 2
else:
LOGGER.info(f"Unknown CVSS metrics field {cve_item['id']}")
LOGGER.debug(f"Unknown CVSS metrics field {cve_item['id']}")
cvss_available = False
if cvss_available:
cve["severity"] = cvss_data.get("baseSeverity", "UNKNOWN")
Expand Down

0 comments on commit 2941eef

Please sign in to comment.