diff --git a/github_app_geo_project/module/audit/utils.py b/github_app_geo_project/module/audit/utils.py index 15c63e78777..2f88a4d9816 100644 --- a/github_app_geo_project/module/audit/utils.py +++ b/github_app_geo_project/module/audit/utils.py @@ -44,6 +44,50 @@ async def snyk( env["PATH"] = f'{env["HOME"]}/.local/bin:{env["PATH"]}' _LOGGER.debug("Updated path: %s", env["PATH"]) + await _install_requirements_dependencies(config, local_config, result, env) + await _install_pipenv_dependencies(config, local_config, result, env) + await _install_poetry_dependencies(config, local_config, result, env) + + env = {**os.environ} + env["FORCE_COLOR"] = "true" + env_no_debug = {**env} + env["DEBUG"] = "*snyk*" # debug mode + + await _snyk_monitor(branch, config, local_config, result, env) + + high_vulnerabilities, fixable_vulnerabilities, fixable_vulnerabilities_summary = await _snyk_test( + branch, config, local_config, result, env_no_debug + ) + + snyk_fix_success, snyk_fix_message = await _snyk_fix( + branch, + config, + local_config, + logs_url, + result, + env_no_debug, + fixable_vulnerabilities, + fixable_vulnerabilities_summary, + ) + + return_message = [ + *[f"{number} {severity} vulnerabilities" for severity, number in high_vulnerabilities.items()], + *[ + f"{number} {severity} vulnerabilities can be fixed" + for severity, number in fixable_vulnerabilities.items() + ], + *([] if snyk_fix_success else ["Error while fixing the vulnerabilities"]), + ] + + return result, snyk_fix_message, return_message, snyk_fix_success + + +async def _install_requirements_dependencies( + config: configuration.SnykConfiguration, + local_config: configuration.SnykConfiguration, + result: list[str], + env: dict[str, str], +) -> None: proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check ["git", "ls-files", "requirements.txt", "*/requirements.txt"], capture_output=True, @@ -98,6 +142,13 @@ async def snyk( message.title = f"Dependencies installed from {file}" _LOGGER.debug(message) + +async def _install_pipenv_dependencies( + config: configuration.SnykConfiguration, + local_config: configuration.SnykConfiguration, + result: list[str], + env: dict[str, str], +) -> None: proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check ["git", "ls-files", "Pipfile", "*/Pipfile"], capture_output=True, encoding="utf-8", timeout=30 ) @@ -152,6 +203,13 @@ async def snyk( message.title = f"Dependencies installed from {file}" _LOGGER.debug(message) + +async def _install_poetry_dependencies( + config: configuration.SnykConfiguration, + local_config: configuration.SnykConfiguration, + result: list[str], + env: dict[str, str], +) -> None: proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check ["git", "ls-files", "poetry.lock", "*/poetry.lock"], capture_output=True, @@ -209,11 +267,14 @@ async def snyk( message.title = f"Dependencies installed from {file}" _LOGGER.debug(message) - env = {**os.environ} - env["FORCE_COLOR"] = "true" - env_no_debug = {**env} - env["DEBUG"] = "*snyk*" # debug mode +async def _snyk_monitor( + branch: str, + config: configuration.SnykConfiguration, + local_config: configuration.SnykConfiguration, + result: list[str], + env: dict[str, str], +) -> None: command = [ "snyk", "monitor", @@ -261,6 +322,14 @@ async def snyk( message.title = "Project monitored" _LOGGER.debug(message) + +async def _snyk_test( + branch: str, + config: configuration.SnykConfiguration, + local_config: configuration.SnykConfiguration, + result: list[str], + env_no_debug: dict[str, str], +) -> tuple[dict[str, int], dict[str, int], dict[str, str]]: command = [ "snyk", "test", @@ -352,7 +421,19 @@ async def snyk( ) _LOGGER.warning(message) result.append(message) + return high_vulnerabilities, fixable_vulnerabilities, fixable_vulnerabilities_summary + +async def _snyk_fix( + branch: str, + config: configuration.SnykConfiguration, + local_config: configuration.SnykConfiguration, + logs_url: str, + result: list[str], + env_no_debug: dict[str, str], + fixable_vulnerabilities: list[str], + fixable_vulnerabilities_summary: list[str], +) -> tuple[bool, module_utils.Message | None]: snyk_fix_success = True snyk_fix_message = None if fixable_vulnerabilities: @@ -392,17 +473,7 @@ async def snyk( else: message.title = "Snyk fix applied" _LOGGER.debug(message) - - return_message = [ - *[f"{number} {severity} vulnerabilities" for severity, number in high_vulnerabilities.items()], - *[ - f"{number} {severity} vulnerabilities can be fixed" - for severity, number in fixable_vulnerabilities.items() - ], - *([] if snyk_fix_success else ["Error while fixing the vulnerabilities"]), - ] - - return result, snyk_fix_message, return_message, snyk_fix_success + return snyk_fix_success, snyk_fix_message def outdated_versions(