Skip to content

Commit

Permalink
Audit: Refactor snyk method
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Jun 12, 2024
1 parent abe0550 commit 618c8cb
Showing 1 changed file with 86 additions and 15 deletions.
101 changes: 86 additions & 15 deletions github_app_geo_project/module/audit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,50 @@ async def snyk(
env["PATH"] = f'{env["HOME"]}/.local/bin:{env["PATH"]}'
_LOGGER.debug("Updated path: %s", env["PATH"])

await _install_requirements_dependencies(config, local_config, result, env)
await _install_pipenv_dependencies(config, local_config, result, env)
await _install_poetry_dependencies(config, local_config, result, env)

env = {**os.environ}
env["FORCE_COLOR"] = "true"
env_no_debug = {**env}
env["DEBUG"] = "*snyk*" # debug mode

await _snyk_monitor(branch, config, local_config, result, env)

high_vulnerabilities, fixable_vulnerabilities, fixable_vulnerabilities_summary = await _snyk_test(
branch, config, local_config, result, env_no_debug
)

snyk_fix_success, snyk_fix_message = await _snyk_fix(
branch,
config,
local_config,
logs_url,
result,
env_no_debug,
fixable_vulnerabilities,
fixable_vulnerabilities_summary,
)

return_message = [
*[f"{number} {severity} vulnerabilities" for severity, number in high_vulnerabilities.items()],
*[
f"{number} {severity} vulnerabilities can be fixed"
for severity, number in fixable_vulnerabilities.items()
],
*([] if snyk_fix_success else ["Error while fixing the vulnerabilities"]),
]

return result, snyk_fix_message, return_message, snyk_fix_success


async def _install_requirements_dependencies(
config: configuration.SnykConfiguration,
local_config: configuration.SnykConfiguration,
result: list[str],
env: dict[str, str],
) -> None:
proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check
["git", "ls-files", "requirements.txt", "*/requirements.txt"],
capture_output=True,
Expand Down Expand Up @@ -98,6 +142,13 @@ async def snyk(
message.title = f"Dependencies installed from {file}"
_LOGGER.debug(message)


async def _install_pipenv_dependencies(
config: configuration.SnykConfiguration,
local_config: configuration.SnykConfiguration,
result: list[str],
env: dict[str, str],
) -> None:
proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check
["git", "ls-files", "Pipfile", "*/Pipfile"], capture_output=True, encoding="utf-8", timeout=30
)
Expand Down Expand Up @@ -152,6 +203,13 @@ async def snyk(
message.title = f"Dependencies installed from {file}"
_LOGGER.debug(message)


async def _install_poetry_dependencies(
config: configuration.SnykConfiguration,
local_config: configuration.SnykConfiguration,
result: list[str],
env: dict[str, str],
) -> None:
proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check
["git", "ls-files", "poetry.lock", "*/poetry.lock"],
capture_output=True,
Expand Down Expand Up @@ -209,11 +267,14 @@ async def snyk(
message.title = f"Dependencies installed from {file}"
_LOGGER.debug(message)

env = {**os.environ}
env["FORCE_COLOR"] = "true"
env_no_debug = {**env}
env["DEBUG"] = "*snyk*" # debug mode

async def _snyk_monitor(
branch: str,
config: configuration.SnykConfiguration,
local_config: configuration.SnykConfiguration,
result: list[str],
env: dict[str, str],
) -> None:
command = [
"snyk",
"monitor",
Expand Down Expand Up @@ -261,6 +322,14 @@ async def snyk(
message.title = "Project monitored"
_LOGGER.debug(message)


async def _snyk_test(
branch: str,
config: configuration.SnykConfiguration,
local_config: configuration.SnykConfiguration,
result: list[str],
env_no_debug: dict[str, str],
) -> tuple[dict[str, int], dict[str, int], dict[str, str]]:
command = [
"snyk",
"test",
Expand Down Expand Up @@ -352,7 +421,19 @@ async def snyk(
)
_LOGGER.warning(message)
result.append(message)
return high_vulnerabilities, fixable_vulnerabilities, fixable_vulnerabilities_summary


async def _snyk_fix(
branch: str,
config: configuration.SnykConfiguration,
local_config: configuration.SnykConfiguration,
logs_url: str,
result: list[str],
env_no_debug: dict[str, str],
fixable_vulnerabilities: list[str],
fixable_vulnerabilities_summary: list[str],
) -> tuple[bool, module_utils.Message | None]:
snyk_fix_success = True
snyk_fix_message = None
if fixable_vulnerabilities:
Expand Down Expand Up @@ -392,17 +473,7 @@ async def snyk(
else:
message.title = "Snyk fix applied"
_LOGGER.debug(message)

return_message = [
*[f"{number} {severity} vulnerabilities" for severity, number in high_vulnerabilities.items()],
*[
f"{number} {severity} vulnerabilities can be fixed"
for severity, number in fixable_vulnerabilities.items()
],
*([] if snyk_fix_success else ["Error while fixing the vulnerabilities"]),
]

return result, snyk_fix_message, return_message, snyk_fix_success
return snyk_fix_success, snyk_fix_message


def outdated_versions(
Expand Down

0 comments on commit 618c8cb

Please sign in to comment.