-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #312 from camptocamp/fix
Audit: Review reporting
- Loading branch information
Showing
2 changed files
with
115 additions
and
93 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,12 +24,18 @@ | |
|
||
async def snyk( | ||
branch: str, config: configuration.SnykConfiguration, local_config: configuration.SnykConfiguration | ||
) -> tuple[list[module_utils.Message], module_utils.Message, str | None]: | ||
) -> tuple[list[module_utils.Message], module_utils.Message, list[str], bool]: | ||
""" | ||
Audit the code with Snyk. | ||
Return: | ||
------ | ||
the output messages (Install errors, high of upgradable vulnerabilities), | ||
the message of the fix commit, | ||
the dashboard's message (with resume of the vulnerabilities), | ||
is on success (errors: vulnerability that can be fixed by upgrading the dependency). | ||
""" | ||
result = [] | ||
return_message = None | ||
|
||
proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check | ||
["echo"], | ||
|
@@ -242,104 +248,103 @@ async def snyk( | |
_LOGGER.debug(message) | ||
else: | ||
_LOGGER.error("Snyk test JSON returned nothing on project %s branch %s", os.getcwd(), branch) | ||
|
||
test_json = json.loads(test_json_str) if test_json_str else [] | ||
|
||
if not isinstance(test_json, list): | ||
test_json = [test_json] | ||
|
||
vulnerabilities = False | ||
error = False | ||
high_vulnerabilities: dict[str, int] = {} | ||
fixable_vulnerabilities: dict[str, int] = {} | ||
for row in test_json: | ||
result.append( | ||
module_utils.HtmlMessage( | ||
"\n".join( | ||
[ | ||
f"Package manager: {row.get('packageManager', '-')}", | ||
f"Target file: {row.get('displayTargetFile', '-')}", | ||
f"Project path: {row.get('path', '-')}", | ||
row.get("summary", ""), | ||
] | ||
) | ||
message = module_utils.HtmlMessage( | ||
"\n".join( | ||
[ | ||
f"Package manager: {row.get('packageManager', '-')}", | ||
f"Target file: {row.get('displayTargetFile', '-')}", | ||
f"Project path: {row.get('path', '-')}", | ||
row.get("summary", ""), | ||
] | ||
) | ||
) | ||
|
||
# TODO: Example message: | ||
# Pin [email protected] to [email protected] to fix | ||
# ✗ Resource Exhaustion (new) [Medium Severity][https://security.snyk.io/vuln/SNYK-PYTHON-IDNA-6597975] in [email protected] | ||
# introduced by [email protected] > [email protected] and 6 other path(s) | ||
_LOGGER.info(message) | ||
|
||
for vuln in row["vulnerabilities"]: | ||
if vuln.get("isUpgradable", False) or vuln.get("isPatchable", False): | ||
vulnerabilities = True | ||
result.append( | ||
module_utils.HtmlMessage( | ||
"\n".join( | ||
[ | ||
vuln["id"], | ||
" > ".join(vuln["from"]), | ||
*[ | ||
f"{identifier_type} {', '.join(identifiers)}" | ||
for identifier_type, identifiers in vuln["identifiers"].items() | ||
], | ||
*[ | ||
f"[{reference['title']}]({reference['url']})" | ||
for reference in vuln["references"] | ||
], | ||
"", | ||
markdown.markdown(vuln["description"]), | ||
] | ||
), | ||
f"[{vuln['severity'].upper()}] {vuln['packageName']}@{vuln['version']}: {vuln['title']}, fixed in {', '.join(vuln['fixedIn'])}", | ||
) | ||
fixable = vuln.get("isUpgradable", False) or vuln.get("isPatchable", False) | ||
severity = vuln["severity"] | ||
display = False | ||
if fixable: | ||
fixable_vulnerabilities[severity] = fixable_vulnerabilities.get(severity, 0) + 1 | ||
display = True | ||
if severity in ("high", "critical"): | ||
high_vulnerabilities[severity] = high_vulnerabilities.get(severity, 0) + 1 | ||
display = True | ||
if not display: | ||
continue | ||
title = " ".join( | ||
[ | ||
f"[{vuln['severity'].upper()}]", | ||
f"{vuln['packageName']}@{vuln['version']}:", | ||
f'<a href="https://security.snyk.io/vuln/{vuln["id"]}">{vuln["title"]}</a>', | ||
] | ||
) | ||
if vuln.get("isUpgradable", False): | ||
title += " [Fixed in: " + ", ".join(vuln["fixedIn"]) + "]." | ||
elif vuln.get("isPatchable", False): | ||
title += " [Patch available]." | ||
else: | ||
title += "." | ||
message = module_utils.HtmlMessage( | ||
"\n".join( | ||
[ | ||
vuln["id"], | ||
" > ".join(vuln["from"]), | ||
*[ | ||
f"{identifier_type} {', '.join(identifiers)}" | ||
for identifier_type, identifiers in vuln["identifiers"].items() | ||
], | ||
*[f"[{reference['title']}]({reference['url']})" for reference in vuln["references"]], | ||
"", | ||
markdown.markdown(vuln["description"]), | ||
] | ||
), | ||
title, | ||
) | ||
_LOGGER.warning(message) | ||
result.append(message) | ||
|
||
if error: | ||
command = ["snyk", "test"] + config.get("test-arguments", configuration.SNYK_TEST_ARGUMENTS_DEFAULT) | ||
snyk_fix_success = True | ||
if fixable_vulnerabilities: | ||
command = ["snyk", "fix"] + config.get("fix-arguments", configuration.SNYK_FIX_ARGUMENTS_DEFAULT) | ||
async with asyncio.timeout(300): | ||
test_proc = await asyncio.create_subprocess_exec( | ||
*command, env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | ||
snyk_fix_proc = await asyncio.create_subprocess_exec( | ||
*command, env=env_no_debug, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | ||
) | ||
stdout, stderr = await test_proc.communicate() | ||
assert test_proc.returncode is not None | ||
dashboard_message = module_utils.AnsiProcessMessage( | ||
command, test_proc.returncode, stdout.decode(), stderr.decode() | ||
) | ||
return_message = "Error while testing the project" | ||
dashboard_message.title = return_message | ||
_LOGGER.error(dashboard_message) | ||
|
||
if vulnerabilities: | ||
message = module_utils.HtmlMessage(" ".join(m.to_html() for m in result)) | ||
message.title = "Vulnerabilities found" | ||
_LOGGER.warning(message) | ||
|
||
command = ["snyk", "fix"] + config.get("fix-arguments", configuration.SNYK_FIX_ARGUMENTS_DEFAULT) | ||
async with asyncio.timeout(300): | ||
snyk_fix_proc = await asyncio.create_subprocess_exec( | ||
*command, env=env_no_debug, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | ||
) | ||
stdout, stderr = await snyk_fix_proc.communicate() | ||
assert snyk_fix_proc.returncode is not None | ||
snyk_fix_message = module_utils.AnsiMessage(stdout.decode().strip()) | ||
message = module_utils.AnsiProcessMessage( | ||
command, snyk_fix_proc.returncode, stdout.decode(), stderr.decode() | ||
) | ||
if snyk_fix_proc.returncode != 0: | ||
message.title = "Error while fixing the project" | ||
_LOGGER.warning(message) | ||
result.append(message) | ||
else: | ||
message.title = "Snyk fix applied" | ||
_LOGGER.debug(message) | ||
|
||
if snyk_fix_proc.returncode != 0 and vulnerabilities: | ||
result.append( | ||
module_utils.AnsiProcessMessage( | ||
stdout, stderr = await snyk_fix_proc.communicate() | ||
assert snyk_fix_proc.returncode is not None | ||
snyk_fix_message = module_utils.AnsiMessage(stdout.decode().strip()) | ||
message = module_utils.AnsiProcessMessage( | ||
command, snyk_fix_proc.returncode, stdout.decode(), stderr.decode() | ||
) | ||
) | ||
snyk_fix_success = snyk_fix_proc.returncode == 0 | ||
if snyk_fix_proc.returncode != 0: | ||
message.title = "Error while fixing the project" | ||
_LOGGER.error(message) | ||
result.append(message) | ||
else: | ||
message.title = "Snyk fix applied" | ||
_LOGGER.debug(message) | ||
|
||
return_message = [ | ||
*[f"{number} {severity} vulnerabilities" for severity, number in high_vulnerabilities.items()], | ||
*[ | ||
f"{number} {severity} vulnerabilities can be fixed" | ||
for severity, number in fixable_vulnerabilities.items() | ||
], | ||
*([] if snyk_fix_success else ["Error while fixing the vulnerabilities"]), | ||
] | ||
|
||
return result, snyk_fix_message, return_message | ||
return result, snyk_fix_message, return_message, snyk_fix_success | ||
|
||
|
||
def outdated_versions( | ||
|