Skip to content

Commit

Permalink
feat: new json format for output (#3980)
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersans authored Aug 8, 2024
1 parent 0881252 commit 83b22b9
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 23 deletions.
9 changes: 5 additions & 4 deletions cve_bin_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def main(argv=None):
note: don't use spaces between comma (',') and the output formats.
"""
),
metavar="{csv,json,console,html,pdf}",
metavar="{csv,json,json2,console,html,pdf}",
default="console",
)
output_group.add_argument(
Expand Down Expand Up @@ -565,7 +565,8 @@ def main(argv=None):
configs = conf.parse_config()

args = ChainMap(args, configs, defaults)
if args["generate_config"] != "":
organized_arguments = {}
if args["format"] == "json2" or args["generate_config"] != "":
store = parser.parse_args(argv[1:])
arg_groups = {}
for grp in parser._action_groups:
Expand All @@ -574,7 +575,6 @@ def main(argv=None):
}
arg_groups[grp.title] = argparse.Namespace(**grp_dict)

organized_arguments = {}
for group_title, group_args in arg_groups.items():
group_title = group_title.replace(" ", "_")
organized_arguments[group_title] = {}
Expand Down Expand Up @@ -883,7 +883,7 @@ def main(argv=None):

output_formats = set(args["format"].split(","))
output_formats = [output_format.strip() for output_format in output_formats]
extensions = ["csv", "json", "console", "html", "pdf"]
extensions = ["csv", "json", "console", "html", "pdf", "json2"]
for output_format in output_formats:
if output_format not in extensions:
LOGGER.error(
Expand Down Expand Up @@ -1180,6 +1180,7 @@ def main(argv=None):
affected_versions=args["affected_versions"],
exploits=args["exploits"],
metrics=metrics,
organized_arguements=organized_arguments,
detailed=args["detailed"],
vex_filename=args["vex_output"],
vex_type=args["vex_type"],
Expand Down
41 changes: 23 additions & 18 deletions cve_bin_tool/output_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import IO

from cve_bin_tool.cve_scanner import CVEData
from cve_bin_tool.cvedb import CVEDB
from cve_bin_tool.error_handler import ErrorHandler, ErrorMode
from cve_bin_tool.log import LOGGER
from cve_bin_tool.output_engine.console import output_console
from cve_bin_tool.output_engine.html import output_html
from cve_bin_tool.output_engine.json_output import output_json, output_json2
from cve_bin_tool.output_engine.util import (
ProductInfo,
Remarks,
VersionInfo,
add_extension_if_not,
format_output,
format_path,
Expand All @@ -29,26 +32,10 @@
intermediate_output,
)
from cve_bin_tool.sbom_manager.generate import SBOMGenerate
from cve_bin_tool.util import ProductInfo, Remarks, VersionInfo
from cve_bin_tool.version import VERSION
from cve_bin_tool.vex_manager.generate import VEXGenerate


def output_json(
all_cve_data: dict[ProductInfo, CVEData],
all_cve_version_info: dict[str, VersionInfo] | None,
outfile: IO,
detailed: bool = False,
affected_versions: int = 0,
metrics: bool = False,
):
"""Output a JSON of CVEs"""
formatted_output = format_output(
all_cve_data, all_cve_version_info, detailed, affected_versions, metrics
)
json.dump(formatted_output, outfile, indent=" ")


def save_intermediate(
all_cve_data: dict[ProductInfo, CVEData],
filename: str,
Expand Down Expand Up @@ -684,6 +671,7 @@ def __init__(
vex_type: str = "",
vex_product_info: dict[str, str] = {},
offline: bool = False,
organized_arguements: dict = None,
):
"""Constructor for OutputEngine class."""
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
Expand Down Expand Up @@ -711,6 +699,7 @@ def __init__(
self.sbom_format = sbom_format
self.sbom_root = sbom_root
self.offline = offline
self.organized_arguements = organized_arguements
self.sbom_packages = {}
self.vex_type = vex_type
self.vex_product_info = vex_product_info
Expand All @@ -730,6 +719,18 @@ def output_cves(self, outfile, output_type="console"):
self.affected_versions,
self.metrics,
)
elif output_type == "json2":
output_json2(
self.all_cve_data,
self.all_cve_version_info,
self.time_of_last_update,
outfile,
self.affected_versions,
self.organized_arguements,
self.detailed,
self.exploits,
self.metrics,
)
elif output_type == "csv":
output_csv(
self.all_cve_data,
Expand Down Expand Up @@ -868,7 +869,9 @@ def output_file(self, output_type="console"):
"Switching Back to Default Naming Convention"
)
self.filename = generate_filename(output_type)

# if extension is set to .json2 due to current code logic make it .json
if self.filename.endswith(".json2"):
self.filename = self.filename[:-1]
# Log the filename generated
self.logger.info(f"{output_type.upper()} report stored at {self.filename}")

Expand All @@ -883,6 +886,8 @@ def output_file(self, output_type="console"):
def check_file_path(self, filepath: str, output_type: str, prefix: str = "output"):
"""Generate a new filename if file already exists."""
# check if the file already exists
if filepath.endswith(".json2"):
filepath = filepath[:-1]
if Path(filepath).is_file():
self.logger.warning(f"Failed to write at '{filepath}'. File already exists")
self.logger.info("Generating a new filename with Default Naming Convention")
Expand Down
126 changes: 126 additions & 0 deletions cve_bin_tool/output_engine/json_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import annotations

import json
from datetime import datetime
from typing import IO

from cve_bin_tool.cvedb import CVEDB
from cve_bin_tool.util import CVEData, ProductInfo, VersionInfo
from cve_bin_tool.version import VERSION

from .util import format_output, get_cve_summary


def vulnerabilities_builder(
all_cve_data, exploits, all_cve_version_info, detailed, affected_versions, metrics
):
"""
Builds a dictionary of vulnerabilities based on the provided inputs.
"""
vulnerabilities = {}
vulnerabilities["summary"] = get_cve_summary(all_cve_data, exploits)
vulnerability_reports = []
source_entries_map = {}
formatted_cve_data = format_output(
all_cve_data, all_cve_version_info, detailed, affected_versions, metrics
)
for cve_entry in formatted_cve_data:
source = cve_entry["source"]
if source not in source_entries_map:
source_entries_map[source] = [cve_entry]
else:
source_entries_map[source].append(cve_entry)

for source, entries in source_entries_map.items():
report = {"datasource": source, "entries": entries}
vulnerability_reports.append(report)
vulnerabilities["report"] = vulnerability_reports
return vulnerabilities


def db_entries_count():
"""
Retrieves the count of CVE entries from the database grouped by data source.
Returns:
dict: A dictionary containing the count of CVE entries for each data source.
"""
instance = CVEDB()
cursor = instance.db_open_and_get_cursor()
cve_entries_check = "SELECT data_source, COUNT(*) as number FROM cve_severity GROUP BY data_source ORDER BY number DESC"
cursor.execute(cve_entries_check)
data_entries = {}
rows = cursor.fetchall()
for row in rows:
source = row[0]
entries = row[1]
data_entries[source] = entries
instance.db_close()
return data_entries


def metadata_builder(organized_parameters):
"""
Builds metadata dictionary based on the organized parameters.
"""
metadata = {}
metadata["tool"] = {"name": "cve-bin-tool", "version": f"{VERSION}"}
metadata["generation_date"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
parameter = {}
for key, value in organized_parameters.items():
parameter_values = {}
for k, v in value.items():
val = v["arg_value"]
parameter_values[k] = val
if parameter_values:
parameter[key.lower()] = parameter_values
metadata["parameter"] = parameter
return metadata


def output_json(
all_cve_data: dict[ProductInfo, CVEData],
all_cve_version_info: dict[str, VersionInfo],
outfile: IO,
detailed: bool = False,
affected_versions: int = 0,
metrics: bool = False,
):
"""Output a JSON of CVEs"""
formatted_output = format_output(
all_cve_data, all_cve_version_info, detailed, affected_versions, metrics
)
json.dump(formatted_output, outfile, indent=2)


def output_json2(
all_cve_data: dict[ProductInfo, CVEData],
all_cve_version_info: dict[str, VersionInfo],
time_of_last_update: datetime,
outfile: IO,
affected_versions: int,
organized_parameters: dict,
detailed: bool = False,
exploits: bool = False,
metrics: bool = False,
):
"""Output a JSON of CVEs in JSON2 format"""
output = {}
output["$schema"] = ""
output["metadata"] = metadata_builder(organized_parameters)
output["database_info"] = {
"last_updated": time_of_last_update.strftime("%Y-%m-%d %H:%M:%S"),
"total_entries": db_entries_count(),
}
output["vulnerabilities"] = vulnerabilities_builder(
all_cve_data,
exploits,
all_cve_version_info,
detailed,
affected_versions,
metrics,
)
json.dump(output, outfile, indent=2)
Loading

0 comments on commit 83b22b9

Please sign in to comment.