Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Calculating Risk in VulnerableCode #1593

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vulnerabilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class Meta:
"latest_non_vulnerable_version",
"affected_by_vulnerabilities",
"fixing_vulnerabilities",
"risk_score",
]


Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from vulnerabilities.improvers import valid_versions
from vulnerabilities.improvers import vulnerability_status
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.pipelines import compute_package_risk
from vulnerabilities.pipelines import enhance_with_exploitdb
from vulnerabilities.pipelines import enhance_with_kev
from vulnerabilities.pipelines import enhance_with_metasploit
Expand Down Expand Up @@ -37,6 +38,7 @@
enhance_with_kev.VulnerabilityKevPipeline,
enhance_with_metasploit.MetasploitImproverPipeline,
enhance_with_exploitdb.ExploitDBImproverPipeline,
compute_package_risk.ComputePackageRiskPipeline,
]

IMPROVERS_REGISTRY = {
Expand Down
23 changes: 23 additions & 0 deletions vulnerabilities/migrations/0075_package_risk_score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.16 on 2024-10-29 10:55

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0074_update_pysec_advisory_created_by"),
]

operations = [
migrations.AddField(
model_name="package",
name="risk_score",
field=models.DecimalField(
decimal_places=2,
help_text="Risk score between 0.00 and 10.00, where higher values indicate greater vulnerability risk for the package.",
max_digits=4,
null=True,
),
),
]
8 changes: 8 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,14 @@ class Package(PackageURLMixin):
help_text="True if the package does not exist in the upstream package manager or its repository.",
)

risk_score = models.DecimalField(
null=True,
max_digits=4,
decimal_places=2,
help_text="Risk score between 0.00 and 10.00, where higher values "
"indicate greater vulnerability risk for the package.",
)

objects = PackageQuerySet.as_manager()

def save(self, *args, **kwargs):
Expand Down
59 changes: 59 additions & 0 deletions vulnerabilities/pipelines/compute_package_risk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from aboutcode.pipeline import LoopProgress

from vulnerabilities.models import Package
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.risk import compute_package_risk


class ComputePackageRiskPipeline(VulnerableCodePipeline):
"""
Risk Assessment Pipeline for Package Vulnerabilities: Iterate through the packages and evaluate their associated risk.
"""

pipeline_id = "compute_package_risk"
license_expression = None

@classmethod
def steps(cls):
return (cls.add_package_risk_score,)

def add_package_risk_score(self):
affected_packages = Package.objects.filter(
affected_by_vulnerabilities__isnull=False
).distinct()

self.log(f"Calculating risk for {affected_packages.count():,d} affected package records")

progress = LoopProgress(total_iterations=affected_packages.count(), logger=self.log)

updatables = []
updated_package_count = 0
batch_size = 5000

for package in progress.iter(affected_packages.paginated()):
risk_score = compute_package_risk(package)
package.risk_score = risk_score
updatables.append(package)

if len(updatables) >= batch_size:
updated_package_count += bulk_update_package_risk_score(
packages=updatables,
logger=self.log,
)
updated_package_count += bulk_update_package_risk_score(
packages=updatables,
logger=self.log,
)
self.log(f"Successfully added risk score for {updated_package_count:,d} package")


def bulk_update_package_risk_score(packages, logger):
package_count = 0
if packages:
try:
Package.objects.bulk_update(objs=packages, fields=["risk_score"])
package_count += len(packages)
except Exception as e:
logger(f"Error updating packages: {e}")
packages.clear()
return package_count
125 changes: 125 additions & 0 deletions vulnerabilities/risk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from pathlib import Path

from vulnerabilities.models import AffectedByPackageRelatedVulnerability
from vulnerabilities.models import Exploit
from vulnerabilities.models import Package
from vulnerabilities.models import Vulnerability
from vulnerabilities.models import VulnerabilityReference
from vulnerabilities.severity_systems import EPSS
from vulnerabilities.utils import load_json

DEFAULT_WEIGHT = 1
WEIGHT_CONFIG_PATH = Path(__file__).parent.parent / "weight_config.json"
WEIGHT_CONFIG = load_json(WEIGHT_CONFIG_PATH)


def get_weighted_severity(severities):
"""
Weighted Severity is the maximum value obtained when each Severity is multiplied
by its associated Weight/10.
Example of Weighted Severity: max(7*(10/10), 8*(3/10), 6*(8/10)) = 7
"""

score_map = {
"low": 3,
"moderate": 6.9,
"medium": 6.9,
"high": 8.9,
"important": 8.9,
"critical": 10.0,
"urgent": 10.0,
}

score_list = []
for severity in severities:
weights = []
for key, value in WEIGHT_CONFIG.items():
if severity.reference.url.startswith(key):
weights.append(value)
continue
weights.append(DEFAULT_WEIGHT)

if not weights:
return 0

max_weight = float(max(weights)) / 10
vul_score = severity.value
try:
vul_score = float(vul_score)
vul_score_value = vul_score * max_weight
except ValueError:
vul_score = vul_score.lower()
vul_score_value = score_map.get(vul_score, 0) * max_weight

score_list.append(vul_score_value)
return max(score_list) if score_list else 0


def get_exploitability_level(exploits, references, severities):
"""
Exploitability refers to the potential or
probability of a software package vulnerability being exploited by
malicious actors to compromise systems, applications, or networks.
It is determined automatically by discovery of exploits.
"""
# no exploit known ( default .5)
exploit_level = 0.5

if exploits:
# Automatable Exploit with PoC script published OR known exploits (KEV) in the wild OR known ransomware
exploit_level = 2

elif severities:
# high EPSS.
epss = severities.filter(
scoring_system=EPSS.identifier,
)
epss = any(float(epss.value) > 0.8 for epss in epss)
if epss:
exploit_level = 2

elif references:
# PoC/Exploit script published
ref_exploits = references.filter(
reference_type=VulnerabilityReference.EXPLOIT,
)
if ref_exploits:
exploit_level = 1

return exploit_level


def compute_vulnerability_risk(vulnerability: Vulnerability):
"""
Risk may be expressed as a number ranging from 0 to 10.
Risk is calculated from weighted severity and exploitability values.
It is the maximum value of (the weighted severity multiplied by its exploitability) or 10

Risk = min(weighted severity * exploitability, 10)
"""
references = vulnerability.references
severities = vulnerability.severities.select_related("reference")
exploits = Exploit.objects.filter(vulnerability=vulnerability)
if references.exists() or severities.exists() or exploits.exists():
weighted_severity = get_weighted_severity(severities)
exploitability = get_exploitability_level(exploits, references, severities)
return min(weighted_severity * exploitability, 10)


def compute_package_risk(package: Package):
"""
Calculate the risk for a package by iterating over all vulnerabilities that affects this package
and determining the associated risk.
"""
ziadhany marked this conversation as resolved.
Show resolved Hide resolved

result = []
for pkg_related_vul in AffectedByPackageRelatedVulnerability.objects.filter(
package=package
).prefetch_related("vulnerability"):
if risk := compute_vulnerability_risk(pkg_related_vul.vulnerability):
result.append(risk)

if not result:
return

return f"{max(result):.2f}"
14 changes: 14 additions & 0 deletions vulnerabilities/templates/package_details.html
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@
{% endif %}
</td>
</tr>
<tr>
<td class="two-col-left">
<span
class="has-tooltip-multiline has-tooltip-black has-tooltip-arrow has-tooltip-text-left"
data-tooltip="Risk is expressed as a number ranging from 0 to 10. It is calculated based on weighted severity and exploitability values. The risk score is the maximum value of either the weighted severity multiplied by its exploitability or 10.">
Risk
</span>
</td>
<td class="two-col-right">
{% if package.risk_score %}
<a target="_self">{{ package.risk_score }}</a>
{% endif %}
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
24 changes: 24 additions & 0 deletions vulnerabilities/tests/pipelines/test_compute_package_risk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest

from vulnerabilities.models import AffectedByPackageRelatedVulnerability
from vulnerabilities.models import Package
from vulnerabilities.pipelines.compute_package_risk import ComputePackageRiskPipeline
from vulnerabilities.tests.test_risk import vulnerability


@pytest.mark.django_db
def test_simple_risk_pipeline(vulnerability):
pkg = Package.objects.create(type="pypi", name="foo", version="2.3.0")
assert Package.objects.count() == 1

improver = ComputePackageRiskPipeline()
improver.execute()

assert pkg.risk_score is None

AffectedByPackageRelatedVulnerability.objects.create(package=pkg, vulnerability=vulnerability)
improver = ComputePackageRiskPipeline()
improver.execute()

pkg = Package.objects.get(type="pypi", name="foo", version="2.3.0")
assert str(pkg.risk_score) == str(3.11)
1 change: 1 addition & 0 deletions vulnerabilities/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ def test_api_with_lesser_and_greater_fixed_by_packages(self):
}
],
"resource_url": "http://testserver/packages/pkg:maven/com.fasterxml.jackson.core/[email protected]",
"risk_score": None,
}

assert response == expected
Expand Down
Loading
Loading