diff --git a/github_app_geo_project/module/audit/__init__.py b/github_app_geo_project/module/audit/__init__.py index 311929dc306..2dbf825ad03 100644 --- a/github_app_geo_project/module/audit/__init__.py +++ b/github_app_geo_project/module/audit/__init__.py @@ -57,6 +57,8 @@ class _EventData(BaseModel): def _get_process_output( context: module.ProcessContext[configuration.AuditConfiguration, _EventData, _TransversalStatus], issue_check: module_utils.DashboardIssue, + short_message: list[str], + success: bool, ) -> module.ProcessOutput[_EventData, _TransversalStatus]: assert context.module_event_data.type is not None issue_check.set_check(context.module_event_data.type, False) @@ -64,6 +66,8 @@ def _get_process_output( return module.ProcessOutput( dashboard=issue_check.to_string(), transversal_status=context.transversal_status, + success=success, + output={"summary": "\n".join(short_message)} if short_message else {}, ) @@ -73,8 +77,9 @@ def _process_error( issue_check: module_utils.DashboardIssue, error_message: list[str | models.OutputData] | None = None, message: str | None = None, -) -> None: +) -> str | None: full_repo = f"{context.github_project.owner}/{context.github_project.repository}" + output_url = None if error_message: logs_url = urllib.parse.urljoin(context.service_url, f"logs/{context.job_id}") output_id = module_utils.add_output( @@ -113,6 +118,7 @@ def _process_error( del context.transversal_status.repositories[full_repo].types[key] if not context.transversal_status.repositories[full_repo].types: del context.transversal_status.repositories[full_repo] + return output_url def _process_outdated( @@ -140,7 +146,10 @@ def _process_outdated( async def _process_snyk_dpkg( context: module.ProcessContext[configuration.AuditConfiguration, _EventData, _TransversalStatus], issue_check: module_utils.DashboardIssue, -) -> None: +) -> tuple[list[str], bool]: + short_message: list[str] = [] + success = True + key = f"Undefined {context.module_event_data.version}" new_branch = f"ghci/audit/{context.module_event_data.type}/{context.module_event_data.version}" if context.module_event_data.type == "snyk": @@ -168,7 +177,7 @@ async def _process_snyk_dpkg( # Checkout the right branch on a temporary directory with tempfile.TemporaryDirectory() as tmpdirname: os.chdir(tmpdirname) - success = module_utils.git_clone(context.github_project, branch) + success &= module_utils.git_clone(context.github_project, branch) local_config: configuration.AuditConfiguration = {} if context.module_event_data.type in ("snyk", "dpkg"): @@ -199,16 +208,19 @@ async def _process_snyk_dpkg( message.title = "Setting the Python version" _LOGGER.debug(message) - result, body, short_message = await audit_utils.snyk( + result, body, short_message, new_success = await audit_utils.snyk( branch, context.module_config.get("snyk", {}), local_config.get("snyk", {}) ) - _process_error( + success &= new_success + output_url = _process_error( context, key, issue_check, [{"title": m.title, "children": [m.to_html("no-title")]} for m in result], - short_message, + ", ".join(short_message), ) + if output_url is not None: + short_message.append(f"[See also]({output_url})") if context.module_event_data.type == "dpkg": body = module_utils.HtmlMessage("Update dpkg packages") @@ -232,10 +244,11 @@ async def _process_snyk_dpkg( else: repo = context.github_project.repo - success, pull_request = module_utils.create_commit_pull_request( + new_success, pull_request = module_utils.create_commit_pull_request( branch, new_branch, f"Audit {key}", body.to_markdown(), repo ) - if not success: + success &= new_success + if not new_success: _LOGGER.error("Error while create commit or pull request") else: if pull_request is not None: @@ -244,6 +257,8 @@ async def _process_snyk_dpkg( except Exception: # pylint: disable=broad-except _LOGGER.exception("Audit %s error", key) + return short_message, success + class Audit(module.Module[configuration.AuditConfiguration, _EventData, _TransversalStatus]): """The audit module.""" @@ -328,6 +343,8 @@ async def process( """ issue_check = module_utils.DashboardIssue(context.issue_data) repo = context.github_project.repo + short_message: list[str] = [] + success = True module_utils.manage_updated_separated( context.transversal_status.updated, @@ -426,9 +443,9 @@ async def process( ) return ProcessOutput(actions=actions) else: - await _process_snyk_dpkg(context, issue_check) + short_message, success = await _process_snyk_dpkg(context, issue_check) - return _get_process_output(context, issue_check) + return _get_process_output(context, issue_check, short_message, success) def get_json_schema(self) -> dict[str, Any]: """Get the JSON schema of the module configuration.""" diff --git a/github_app_geo_project/module/audit/utils.py b/github_app_geo_project/module/audit/utils.py index 207b9ce07c3..2a79f97a835 100644 --- a/github_app_geo_project/module/audit/utils.py +++ b/github_app_geo_project/module/audit/utils.py @@ -24,12 +24,18 @@ async def snyk( branch: str, config: configuration.SnykConfiguration, local_config: configuration.SnykConfiguration -) -> tuple[list[module_utils.Message], module_utils.Message, str | None]: +) -> tuple[list[module_utils.Message], module_utils.Message, list[str], bool]: """ Audit the code with Snyk. + + Return: + ------ + the output messages (Install errors, high of upgradable vulnerabilities), + the message of the fix commit, + the dashboard's message (with resume of the vulnerabilities), + is on success (errors: vulnerability that can be fixed by upgrading the dependency). """ result = [] - return_message = None proc = subprocess.run( # nosec # pylint: disable=subprocess-run-check ["echo"], @@ -242,104 +248,103 @@ async def snyk( _LOGGER.debug(message) else: _LOGGER.error("Snyk test JSON returned nothing on project %s branch %s", os.getcwd(), branch) + test_json = json.loads(test_json_str) if test_json_str else [] if not isinstance(test_json, list): test_json = [test_json] - vulnerabilities = False - error = False + high_vulnerabilities: dict[str, int] = {} + fixable_vulnerabilities: dict[str, int] = {} for row in test_json: - result.append( - module_utils.HtmlMessage( - "\n".join( - [ - f"Package manager: {row.get('packageManager', '-')}", - f"Target file: {row.get('displayTargetFile', '-')}", - f"Project path: {row.get('path', '-')}", - row.get("summary", ""), - ] - ) + message = module_utils.HtmlMessage( + "\n".join( + [ + f"Package manager: {row.get('packageManager', '-')}", + f"Target file: {row.get('displayTargetFile', '-')}", + f"Project path: {row.get('path', '-')}", + row.get("summary", ""), + ] ) ) - - # TODO: Example message: - # Pin idna@3.3 to idna@3.7 to fix - # ✗ Resource Exhaustion (new) [Medium Severity][https://security.snyk.io/vuln/SNYK-PYTHON-IDNA-6597975] in idna@3.3 - # introduced by requests@2.31.0 > idna@3.3 and 6 other path(s) + _LOGGER.info(message) for vuln in row["vulnerabilities"]: - if vuln.get("isUpgradable", False) or vuln.get("isPatchable", False): - vulnerabilities = True - result.append( - module_utils.HtmlMessage( - "\n".join( - [ - vuln["id"], - " > ".join(vuln["from"]), - *[ - f"{identifier_type} {', '.join(identifiers)}" - for identifier_type, identifiers in vuln["identifiers"].items() - ], - *[ - f"[{reference['title']}]({reference['url']})" - for reference in vuln["references"] - ], - "", - markdown.markdown(vuln["description"]), - ] - ), - f"[{vuln['severity'].upper()}] {vuln['packageName']}@{vuln['version']}: {vuln['title']}, fixed in {', '.join(vuln['fixedIn'])}", - ) + fixable = vuln.get("isUpgradable", False) or vuln.get("isPatchable", False) + severity = vuln["severity"] + display = False + if fixable: + fixable_vulnerabilities[severity] = fixable_vulnerabilities.get(severity, 0) + 1 + display = True + if severity in ("high", "critical"): + high_vulnerabilities[severity] = high_vulnerabilities.get(severity, 0) + 1 + display = True + if not display: + continue + title = " ".join( + [ + f"[{vuln['severity'].upper()}]", + f"{vuln['packageName']}@{vuln['version']}:", + f'{vuln["title"]}', + ] + ) + if vuln.get("isUpgradable", False): + title += " [Fixed in: " + ", ".join(vuln["fixedIn"]) + "]." + elif vuln.get("isPatchable", False): + title += " [Patch available]." + else: + title += "." + message = module_utils.HtmlMessage( + "\n".join( + [ + vuln["id"], + " > ".join(vuln["from"]), + *[ + f"{identifier_type} {', '.join(identifiers)}" + for identifier_type, identifiers in vuln["identifiers"].items() + ], + *[f"[{reference['title']}]({reference['url']})" for reference in vuln["references"]], + "", + markdown.markdown(vuln["description"]), + ] + ), + title, ) + _LOGGER.warning(message) + result.append(message) - if error: - command = ["snyk", "test"] + config.get("test-arguments", configuration.SNYK_TEST_ARGUMENTS_DEFAULT) + snyk_fix_success = True + if fixable_vulnerabilities: + command = ["snyk", "fix"] + config.get("fix-arguments", configuration.SNYK_FIX_ARGUMENTS_DEFAULT) async with asyncio.timeout(300): - test_proc = await asyncio.create_subprocess_exec( - *command, env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + snyk_fix_proc = await asyncio.create_subprocess_exec( + *command, env=env_no_debug, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) - stdout, stderr = await test_proc.communicate() - assert test_proc.returncode is not None - dashboard_message = module_utils.AnsiProcessMessage( - command, test_proc.returncode, stdout.decode(), stderr.decode() - ) - return_message = "Error while testing the project" - dashboard_message.title = return_message - _LOGGER.error(dashboard_message) - - if vulnerabilities: - message = module_utils.HtmlMessage(" ".join(m.to_html() for m in result)) - message.title = "Vulnerabilities found" - _LOGGER.warning(message) - - command = ["snyk", "fix"] + config.get("fix-arguments", configuration.SNYK_FIX_ARGUMENTS_DEFAULT) - async with asyncio.timeout(300): - snyk_fix_proc = await asyncio.create_subprocess_exec( - *command, env=env_no_debug, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await snyk_fix_proc.communicate() - assert snyk_fix_proc.returncode is not None - snyk_fix_message = module_utils.AnsiMessage(stdout.decode().strip()) - message = module_utils.AnsiProcessMessage( - command, snyk_fix_proc.returncode, stdout.decode(), stderr.decode() - ) - if snyk_fix_proc.returncode != 0: - message.title = "Error while fixing the project" - _LOGGER.warning(message) - result.append(message) - else: - message.title = "Snyk fix applied" - _LOGGER.debug(message) - - if snyk_fix_proc.returncode != 0 and vulnerabilities: - result.append( - module_utils.AnsiProcessMessage( + stdout, stderr = await snyk_fix_proc.communicate() + assert snyk_fix_proc.returncode is not None + snyk_fix_message = module_utils.AnsiMessage(stdout.decode().strip()) + message = module_utils.AnsiProcessMessage( command, snyk_fix_proc.returncode, stdout.decode(), stderr.decode() ) - ) + snyk_fix_success = snyk_fix_proc.returncode == 0 + if snyk_fix_proc.returncode != 0: + message.title = "Error while fixing the project" + _LOGGER.error(message) + result.append(message) + else: + message.title = "Snyk fix applied" + _LOGGER.debug(message) + + return_message = [ + *[f"{number} {severity} vulnerabilities" for severity, number in high_vulnerabilities.items()], + *[ + f"{number} {severity} vulnerabilities can be fixed" + for severity, number in fixable_vulnerabilities.items() + ], + *([] if snyk_fix_success else ["Error while fixing the vulnerabilities"]), + ] - return result, snyk_fix_message, return_message + return result, snyk_fix_message, return_message, snyk_fix_success def outdated_versions(