Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ne-tools into feature/gitleaks
  • Loading branch information
jcamilomolinar committed Jan 17, 2025
2 parents a1b2e77 + 0b659cf commit b9236c9
Show file tree
Hide file tree
Showing 16 changed files with 632 additions and 125 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=bancolombia_devsecops-engine-tools&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=bancolombia_devsecops-engine-tools)
[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=bancolombia_devsecops-engine-tools&metric=coverage)](https://sonarcloud.io/summary/new_code?id=bancolombia_devsecops-engine-tools)
[![Python Version](https://img.shields.io/badge/python%20-%203.8%20%7C%203.9%20%7C%203.10%20%7C%203.11%20%7C%203.12%20-blue)](#)
[![PyPI](https://img.shields.io/pypi/v/devsecops-engine-tools)](https://pypi.org/project/devsecops-engine-tools/)
[![Docker Pulls](https://img.shields.io/docker/pulls/bancolombia/devsecops-engine-tools
)](https://hub.docker.com/r/bancolombia/devsecops-engine-tools)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Engagement,
Product,
Component,
FindingExclusion
)
from devsecops_engine_tools.engine_core.src.domain.model.exclusions import Exclusions
from devsecops_engine_tools.engine_core.src.domain.model.report import Report
Expand Down Expand Up @@ -42,9 +43,11 @@
@dataclass
class DefectDojoPlatform(VulnerabilityManagementGateway):

RISK_ACCEPTED = "Risk Accepted"
OUT_OF_SCOPE = "Out of Scope"
FALSE_POSITIVE = "False Positive"
TRANSFERRED_FINDING = "Transferred Finding"
ON_WHITELIST = "On Whitelist"

def send_vulnerability_management(
self, vulnerability_management: VulnerabilityManagement
Expand Down Expand Up @@ -204,6 +207,11 @@ def get_findings_excepted(self, service, dict_args, secret_tool, config_tool):
"tags": tool,
"limit": dd_limits_query,
}
white_list_query_params = {
"risk_status": self.ON_WHITELIST,
"tags": tool,
"limit": dd_limits_query,
}

exclusions_risk_accepted = self._get_findings_with_exclusions(
session_manager,
Expand All @@ -212,7 +220,7 @@ def get_findings_excepted(self, service, dict_args, secret_tool, config_tool):
risk_accepted_query_params,
tool,
self._format_date_to_dd_format,
"Risk Accepted",
self.RISK_ACCEPTED,
)

exclusions_false_positive = self._get_findings_with_exclusions(
Expand Down Expand Up @@ -245,11 +253,29 @@ def get_findings_excepted(self, service, dict_args, secret_tool, config_tool):
self.TRANSFERRED_FINDING,
)

white_list = self._get_finding_exclusion(
session_manager, dd_max_retries, {
"type": "white_list",
}
)

exclusions_white_list = self._get_findings_with_exclusions(
session_manager,
service,
dd_max_retries,
white_list_query_params,
tool,
self._format_date_to_dd_format,
self.ON_WHITELIST,
white_list=white_list,
)

return (
list(exclusions_risk_accepted)
+ list(exclusions_false_positive)
+ list(exclusions_out_of_scope)
+ list(exclusions_transfer_finding)
+ list(exclusions_white_list)
)
except Exception as ex:
raise ExceptionFindingsExcepted(
Expand All @@ -273,8 +299,10 @@ def get_all(self, service, dict_args, secret_tool, config_tool):
"HOST_DEFECT_DOJO"
]

session_manager = self._get_session_manager(dict_args, secret_tool, config_tool)

findings = self._get_findings(
self._get_session_manager(dict_args, secret_tool, config_tool),
session_manager,
service,
max_retries,
all_findings_query_params,
Expand All @@ -287,8 +315,14 @@ def get_all(self, service, dict_args, secret_tool, config_tool):
)
)

white_list = self._get_finding_exclusion(
session_manager, max_retries, {
"type": "white_list",
}
)

all_exclusions = self._get_report_exclusions(
all_findings, self._format_date_to_dd_format, host_dd=host_dd
all_findings, self._format_date_to_dd_format, host_dd=host_dd, white_list=white_list
)

return all_findings, all_exclusions
Expand Down Expand Up @@ -462,25 +496,25 @@ def _get_session_manager(self, dict_args, secret_tool, config_tool):
config_tool["VULNERABILITY_MANAGER"]["DEFECT_DOJO"]["HOST_DEFECT_DOJO"],
)

def _get_report_exclusions(self, total_findings, date_fn, host_dd):
def _get_report_exclusions(self, total_findings, date_fn, host_dd, **kwargs):
exclusions = []
for finding in total_findings:
if finding.risk_accepted:
exclusions.append(
self._create_report_exclusion(
finding, date_fn, "engine_risk", "Risk Accepted", host_dd
finding, date_fn, "engine_risk", self.RISK_ACCEPTED, host_dd, **kwargs
)
)
elif finding.false_p:
exclusions.append(
self._create_report_exclusion(
finding, date_fn, "engine_risk", self.FALSE_POSITIVE, host_dd
finding, date_fn, "engine_risk", self.FALSE_POSITIVE, host_dd, **kwargs
)
)
elif finding.out_of_scope:
exclusions.append(
self._create_report_exclusion(
finding, date_fn, "engine_risk", self.OUT_OF_SCOPE, host_dd
finding, date_fn, "engine_risk", self.OUT_OF_SCOPE, host_dd, **kwargs
)
)
elif finding.risk_status == "Transfer Accepted":
Expand All @@ -491,18 +525,26 @@ def _get_report_exclusions(self, total_findings, date_fn, host_dd):
"engine_risk",
self.TRANSFERRED_FINDING,
host_dd,
**kwargs
)
)
elif finding.risk_status == self.ON_WHITELIST:
exclusions.append(
self._create_report_exclusion(
finding, date_fn, "engine_risk", self.ON_WHITELIST, host_dd, **kwargs
)
)
return exclusions

def _get_findings_with_exclusions(
self, session_manager, service, max_retries, query_params, tool, date_fn, reason
self, session_manager, service, max_retries, query_params, tool, date_fn, reason, **kwargs
):
findings = self._get_findings(
session_manager, service, max_retries, query_params
)

return map(
partial(self._create_exclusion, date_fn=date_fn, tool=tool, reason=reason),
partial(self._create_exclusion, date_fn=date_fn, tool=tool, reason=reason, **kwargs),
findings,
)

Expand All @@ -513,6 +555,14 @@ def request_func():
).results

return self._retries_requests(request_func, max_retries, retry_delay=5)

def _get_finding_exclusion(self, session_manager, max_retries, query_params):
def request_func():
return FindingExclusion.get_finding_exclusion(
session=session_manager, **query_params
).results

return self._retries_requests(request_func, max_retries, retry_delay=5)

def _retries_requests(self, request_func, max_retries, retry_delay):
for attempt in range(max_retries):
Expand All @@ -527,23 +577,34 @@ def _retries_requests(self, request_func, max_retries, retry_delay):
logger.error("Maximum number of retries reached, aborting.")
raise e

def _date_reason_based(self, finding, date_fn, reason):
if reason in [self.FALSE_POSITIVE, self.OUT_OF_SCOPE]:
create_date = date_fn(finding.last_status_update)
expired_date = date_fn(None)
elif reason == self.TRANSFERRED_FINDING:
create_date = date_fn(finding.transfer_finding.date)
expired_date = date_fn(finding.transfer_finding.expiration_date)
else:
last_accepted_risk = finding.accepted_risks[-1]
create_date = date_fn(last_accepted_risk["created"])
expired_date = date_fn(last_accepted_risk["expiration_date"])

return create_date, expired_date
def _date_reason_based(self, finding, date_fn, reason, tool, **kwargs):
def get_vuln_id(finding, tool):
if tool == "engine_risk":
return finding.id[0]["vulnerability_id"] if finding.id else finding.vuln_id_from_tool
else:
return finding.vulnerability_ids[0]["vulnerability_id"] if finding.vulnerability_ids else finding.vuln_id_from_tool

def get_dates_from_whitelist(vuln_id, white_list):
matching_finding = next(filter(lambda x: x.unique_id_from_tool == vuln_id, white_list), None)
if matching_finding:
return date_fn(matching_finding.create_date), date_fn(matching_finding.expiration_date)
return date_fn(None), date_fn(None)

reason_to_dates = {
self.FALSE_POSITIVE: lambda: (date_fn(finding.last_status_update), date_fn(None)),
self.OUT_OF_SCOPE: lambda: (date_fn(finding.last_status_update), date_fn(None)),
self.TRANSFERRED_FINDING: lambda: (date_fn(finding.transfer_finding.date), date_fn(finding.transfer_finding.expiration_date)),
self.RISK_ACCEPTED: lambda: (date_fn(finding.accepted_risks[-1]["created"]), date_fn(finding.accepted_risks[-1]["expiration_date"])),
self.ON_WHITELIST: lambda: get_dates_from_whitelist(get_vuln_id(finding, tool), kwargs.get("white_list", [])),
}

def _create_exclusion(self, finding, date_fn, tool, reason):
create_date, expired_date = self._date_reason_based(finding, date_fn, reason)
create_date, expired_date = reason_to_dates.get(reason, lambda: (date_fn(None), date_fn(None)))()
return create_date, expired_date

def _create_exclusion(self, finding, date_fn, tool, reason, **kwargs):
create_date, expired_date = self._date_reason_based(finding, date_fn, reason, tool, **kwargs)

return Exclusions(
id=(
finding.vuln_id_from_tool
Expand All @@ -561,8 +622,8 @@ def _create_exclusion(self, finding, date_fn, tool, reason):
reason=reason,
)

def _create_report_exclusion(self, finding, date_fn, tool, reason, host_dd):
create_date, expired_date = self._date_reason_based(finding, date_fn, reason)
def _create_report_exclusion(self, finding, date_fn, tool, reason, host_dd, **kwargs):
create_date, expired_date = self._date_reason_based(finding, date_fn, reason, tool, **kwargs)

return Exclusions(
id=(
Expand Down
Loading

0 comments on commit b9236c9

Please sign in to comment.