Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(engine_secret): adjust executions in Windows agents #283

Merged
merged 11 commits into from
Jan 7, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ def run_tool_secret_scan(self,
agent_os: str,
agent_work_folder: str,
repository_name: str,
config_tool: DeserializeConfigTool,
config_tool,
cajlopezor marked this conversation as resolved.
Show resolved Hide resolved
secret_tool,
secret_external_checks,
agent_tem_dir:str) -> str:
agent_tem_dir:str,
tool) -> str:
"run tool secret scan"
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import re
from devsecops_engine_tools.engine_core.src.domain.model.input_core import InputCore
from devsecops_engine_tools.engine_sast.engine_secret.src.domain.model.DeserializeConfigTool import (
cajlopezor marked this conversation as resolved.
Show resolved Hide resolved
DeserializeConfigTool,
)

from devsecops_engine_tools.engine_sast.engine_secret.src.domain.model.gateway.tool_gateway import (
ToolGateway,
)
Expand All @@ -29,22 +26,25 @@ def __init__(
self.tool_deserialize = tool_deserialize
self.git_gateway = git_gateway

def process(self, skip_tool, config_tool, secret_tool, dict_args):
def process(self, skip_tool, config_tool, secret_tool, dict_args, tool):
tool = str(tool).lower()
finding_list = []
file_path_findings = ""
secret_external_checks=dict_args["token_external_checks"]
files_pullrequest = None if dict_args["folder_path"] is None else [dict_args["folder_path"]]
cajlopezor marked this conversation as resolved.
Show resolved Hide resolved
cajlopezor marked this conversation as resolved.
Show resolved Hide resolved
if skip_tool == False:
self.tool_gateway.install_tool(self.devops_platform_gateway.get_variable("os"), self.devops_platform_gateway.get_variable("temp_directory"), config_tool.tool_version)
files_pullrequest = self.git_gateway.get_files_pull_request(
self.devops_platform_gateway.get_variable("path_directory"),
self.devops_platform_gateway.get_variable("target_branch"),
config_tool.target_branches,
self.devops_platform_gateway.get_variable("source_branch"),
self.devops_platform_gateway.get_variable("access_token"),
self.devops_platform_gateway.get_variable("organization"),
self.devops_platform_gateway.get_variable("project_name"),
self.devops_platform_gateway.get_variable("repository"),
self.devops_platform_gateway.get_variable("repository_provider"))
self.tool_gateway.install_tool(self.devops_platform_gateway.get_variable("os"), self.devops_platform_gateway.get_variable("temp_directory"), config_tool[tool]["VERSION"])
if files_pullrequest is None:
files_pullrequest = self.git_gateway.get_files_pull_request(
self.devops_platform_gateway.get_variable("path_directory"),
self.devops_platform_gateway.get_variable("target_branch"),
config_tool["TARGET_BRANCHES"],
self.devops_platform_gateway.get_variable("source_branch"),
self.devops_platform_gateway.get_variable("access_token"),
self.devops_platform_gateway.get_variable("organization"),
self.devops_platform_gateway.get_variable("project_name"),
self.devops_platform_gateway.get_variable("repository"),
self.devops_platform_gateway.get_variable("repository_provider"))
findings, file_path_findings = self.tool_gateway.run_tool_secret_scan(
files_pullrequest,
self.devops_platform_gateway.get_variable("os"),
Expand All @@ -53,7 +53,8 @@ def process(self, skip_tool, config_tool, secret_tool, dict_args):
config_tool,
secret_tool,
secret_external_checks,
self.devops_platform_gateway.get_variable("temp_directory"))
self.devops_platform_gateway.get_variable("temp_directory"),
tool)
finding_list = self.tool_deserialize.get_list_vulnerability(
findings,
self.devops_platform_gateway.get_variable("os"),
Expand All @@ -69,12 +70,11 @@ def complete_config_tool(self, dict_args, tool):
init_config_tool = self.devops_platform_gateway.get_remote_config(
dict_args["remote_config_repo"], "engine_sast/engine_secret/ConfigTool.json", dict_args["remote_config_branch"]
)
config_tool = DeserializeConfigTool(json_data=init_config_tool, tool=tool)
config_tool.scope_pipeline = self.devops_platform_gateway.get_variable("pipeline_name")
init_config_tool['SCOPE_PIPELINE'] = self.devops_platform_gateway.get_variable("pipeline_name")

skip_tool = bool(re.match(config_tool.ignore_search_pattern, config_tool.scope_pipeline, re.IGNORECASE))
skip_tool = bool(re.match(init_config_tool["IGNORE_SEARCH_PATTERN"], init_config_tool["SCOPE_PIPELINE"], re.IGNORECASE))

return config_tool, skip_tool
return init_config_tool, skip_tool

def skip_from_exclusion(self, exclusions, skip_tool_isp):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@
from devsecops_engine_tools.engine_core.src.domain.model.gateway.devops_platform_gateway import (
DevopsPlatformGateway,
)
from devsecops_engine_tools.engine_sast.engine_secret.src.domain.model.DeserializeConfigTool import (
DeserializeConfigTool,
)
from devsecops_engine_tools.engine_core.src.domain.model.exclusions import Exclusions
from devsecops_engine_tools.engine_utilities.utils.utils import Utils

from devsecops_engine_tools.engine_core.src.domain.model.threshold import Threshold

class SetInputCore:
def __init__(
self,
tool_remote: DevopsPlatformGateway,
dict_args,
tool,
config_tool: DeserializeConfigTool,
config_tool,
):
self.tool_remote = tool_remote
self.dict_args = dict_args
Expand Down Expand Up @@ -80,12 +77,12 @@ def set_input_core(self, finding_list):
),
threshold_defined=Utils.update_threshold(
self,
self.config_tool.level_compliance,
Threshold(self.config_tool['THRESHOLD']),
exclusions_config,
self.config_tool.scope_pipeline,
self.config_tool["SCOPE_PIPELINE"],
),
path_file_results=finding_list,
custom_message_break_build=self.config_tool.message_info_engine_secret,
scope_pipeline=self.config_tool.scope_pipeline,
custom_message_break_build=self.config_tool["MESSAGE_INFO_ENGINE_SECRET"],
scope_pipeline=self.config_tool["SCOPE_PIPELINE"],
stage_pipeline=self.tool_remote.get_variable("stage").capitalize(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from devsecops_engine_tools.engine_sast.engine_secret.src.domain.model.gateway.tool_gateway import (
ToolGateway,
)
from devsecops_engine_tools.engine_utilities.github.infrastructure.github_api import (
GithubApi,
)

from devsecops_engine_tools.engine_utilities.utils.logger_info import MyLogger
from devsecops_engine_tools.engine_utilities import settings
from devsecops_engine_tools.engine_utilities.utils.utils import Utils

logger = MyLogger.__call__(**settings.SETTING_LOGGER).get_logger()

Expand Down Expand Up @@ -44,7 +43,7 @@ def install_tool(self, agent_os, agent_temp_dir, tool_version) -> any:

def run_install(self, tool_version):
command = f"curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin v{tool_version}"
res = subprocess.run(command, capture_output=True, shell=True)
subprocess.run(command, capture_output=True, shell=True)

def run_install_win(self, agent_temp_dir, tool_version):
command_complete = f"powershell -Command [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; [Net.ServicePointManager]::SecurityProtocol; New-Item -Path {agent_temp_dir} -ItemType Directory -Force; Invoke-WebRequest -Uri 'https://github.com/trufflesecurity/trufflehog/releases/download/v{tool_version}/trufflehog_{tool_version}_windows_amd64.tar.gz' -OutFile {agent_temp_dir}/trufflehog.tar.gz -UseBasicParsing; tar -xzf {agent_temp_dir}/trufflehog.tar.gz -C {agent_temp_dir}; Remove-Item {agent_temp_dir}/trufflehog.tar.gz; $env:Path += '; + {agent_temp_dir}'; & {agent_temp_dir}/trufflehog.exe --version"
Expand All @@ -62,35 +61,20 @@ def run_tool_secret_scan(
config_tool,
secret_tool,
secret_external_checks,
agent_temp_dir
agent_temp_dir,
tool
):
trufflehog_command = "trufflehog"
if "Windows" in agent_os:
trufflehog_command = f"{agent_temp_dir}/trufflehog.exe"
with open(f"{agent_work_folder}/excludedPath.txt", "w") as file:
file.write("\n".join(config_tool.exclude_path))
file.write("\n".join(config_tool[tool]["EXCLUDE_PATH"]))
exclude_path = f"{agent_work_folder}/excludedPath.txt"
include_paths = self.config_include_path(files_commits, agent_work_folder, agent_os)
enable_custom_rules = config_tool.enable_custom_rules.lower()
secret = None
github_api = GithubApi()

if secret_tool is not None:
secret_tmp = secret_tool
secret = github_api.get_installation_access_token(
secret_tmp["github_token"],
config_tool.app_id_github,
config_tool.installation_id_github
)
elif secret_external_checks is not None:
secret = secret_external_checks.split("github:")[1] if "github" in secret_external_checks else None
enable_custom_rules = config_tool[tool]["ENABLE_CUSTOM_RULES"].lower()
Utils().configurate_external_checks(tool, config_tool, secret_tool, secret_external_checks, agent_work_folder)
cajlopezor marked this conversation as resolved.
Show resolved Hide resolved

if enable_custom_rules == "true" and secret is not None:
self.configurate_external_checks(config_tool, secret)
else: #In case that remote config from tool is enable but in the args dont send any type of secrets. So dont modified command
enable_custom_rules = "false"

with concurrent.futures.ThreadPoolExecutor(max_workers=config_tool.number_threads) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=config_tool[tool]["NUMBER_THREADS"]) as executor:
results = executor.map(
self.run_trufflehog,
[trufflehog_command] * len(include_paths),
Expand All @@ -99,8 +83,9 @@ def run_tool_secret_scan(
include_paths,
[repository_name] * len(include_paths),
[enable_custom_rules] * len(include_paths),
[agent_os] * len(include_paths)
)
findings, file_findings = self.create_file(self.decode_output(results), agent_work_folder, config_tool)
findings, file_findings = self.create_file(self.decode_output(results), agent_work_folder, config_tool, tool)
return findings, file_findings

def config_include_path(self, files, agent_work_folder, agent_os):
Expand Down Expand Up @@ -130,12 +115,15 @@ def run_trufflehog(
exclude_path,
include_path,
repository_name,
enable_custom_rules
enable_custom_rules,
agent_os
):
command = f"{trufflehog_command} filesystem {agent_work_folder + '/' + repository_name} --include-paths {include_path} --exclude-paths {exclude_path} --no-verification --no-update --json"

if str(enable_custom_rules).lower() == "true":
command = command.replace("--no-verification --no-update --json", "--config /tmp/rules/trufflehog/custom-rules.yaml --no-verification --no-update --json")
if str(enable_custom_rules).lower() == "true" and "Windows" in agent_os:
cajlopezor marked this conversation as resolved.
Show resolved Hide resolved
command = command.replace("--no-verification --no-update --json", f"--config {agent_work_folder}//rules//trufflehog//custom-rules.yaml --no-verification --no-update --json")
if str(enable_custom_rules).lower() == "true" and "Linux" in agent_os:
command = command.replace("--no-verification --no-update --json", f"--config /tmp/rules/trufflehog/custom-rules.yaml --no-verification --no-update --json")

result = subprocess.run(command, capture_output=True, shell=True, text=True, encoding='utf-8')
return result.stdout.strip()
Expand All @@ -150,7 +138,7 @@ def decode_output(self, results):
result.append(json_obj)
return result

def create_file(self, findings, agent_work_folder, config_tool):
def create_file(self, findings, agent_work_folder, config_tool, tool):
file_findings = os.path.join(agent_work_folder, "secret_scan_result.json")
with open(file_findings, "w") as file:
for find in findings:
Expand All @@ -159,20 +147,8 @@ def create_file(self, findings, agent_work_folder, config_tool):
where_text = original_where.replace(agent_work_folder, "")
find["SourceMetadata"]["Data"]["Filesystem"]["file"] = where_text
find["Id"] = "MISCONFIGURATION_SCANNING" if "exposure" in find["Raw"] else "SECRET_SCANNING"
find["References"] = config_tool.extradata_rules[find["Id"]]["References"] if "SECRET_SCANNING" not in find["Id"] else "N.A"
find["Mitigation"] = config_tool.extradata_rules[find["Id"]]["Mitigation"] if "SECRET_SCANNING" not in find["Id"] else "N.A"
find["References"] = config_tool[tool]["RULES"][find["Id"]]["References"] if "SECRET_SCANNING" not in find["Id"] else "N.A"
find["Mitigation"] = config_tool[tool]["RULES"][find["Id"]]["Mitigation"] if "SECRET_SCANNING" not in find["Id"] else "N.A"
json_str = json.dumps(find)
file.write(json_str + '\n')
return findings, file_findings

def configurate_external_checks(self, config_tool, secret):
try:
github_api = GithubApi()
github_api.download_latest_release_assets(
config_tool.external_dir_owner,
config_tool.external_dir_repo,
secret,
"/tmp",
)
except Exception as ex:
logger.error(f"An error ocurred download external checks {ex}")
return findings, file_findings
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ def engine_secret_scan(devops_platform_gateway, tool_gateway, dict_args, tool, t
secret_scan = SecretScan(tool_gateway, devops_platform_gateway, tool_deserealizator, git_gateway)
config_tool, skip_tool_isp = secret_scan.complete_config_tool(dict_args, tool)
skip_tool = secret_scan.skip_from_exclusion(exclusions, skip_tool_isp)
finding_list, file_path_findings = secret_scan.process(skip_tool, config_tool, secret_tool, dict_args)
finding_list, file_path_findings = secret_scan.process(skip_tool, config_tool, secret_tool, dict_args, tool)
input_core = SetInputCore(devops_platform_gateway, dict_args, tool, config_tool)
return finding_list, input_core.set_input_core(file_path_findings)
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_process(
mock_dict_args = {
"remote_config_repo": "example_repo",
"remote_config_branch": "",
"folder_path": ".",
"folder_path": None,
"environment": "test",
"platform": "local",
"token_external_checks": "fake_github_token",
Expand All @@ -83,15 +83,14 @@ def test_process(
"vulnerability_data"
]

obj_config_tool = DeserializeConfigTool(json_config, 'trufflehog')
mock_devops_gateway_instance.get_remote_config.return_value = json_config
mock_devops_gateway_instance.get_variable.return_value = "example_pipeline"
mock_tool_gateway_instance.run_tool_secret_scan.return_value = (
"vulnerability_data", "path/findings"
)

finding_list, file_path_findings = secret_scan.process(
False, obj_config_tool, secret_tool, mock_dict_args
False, json_config, secret_tool, mock_dict_args, "trufflehog"
)

self.assertEqual(finding_list, ["vulnerability_data"])
Expand Down Expand Up @@ -137,13 +136,12 @@ def test_process_empty(

mock_deserialize_gateway_instance.get_list_vulnerability.return_value = []

obj_config_tool = DeserializeConfigTool(json_config, 'trufflehog')
mock_devops_gateway_instance.get_remote_config.return_value = json_config
mock_devops_gateway_instance.get_variable.return_value = "example_pipeline"
mock_tool_gateway_instance.run_tool_secret_scan.return_value = "", ""

finding_list, file_path_findings = secret_scan.process(
False, obj_config_tool, secret_tool, mock_dict_args
False, json_config, secret_tool, mock_dict_args, "trufflehog"
)

self.assertEqual(finding_list, [])
Expand Down Expand Up @@ -277,7 +275,7 @@ def test_complete_config_tool(
{"remote_config_repo": "repository", "remote_config_branch": ""}, "TRUFFLEHOG"
)

self.assertEqual(config_tool_instance.scope_pipeline, "example_pipeline")
self.assertEqual(config_tool_instance["SCOPE_PIPELINE"], "example_pipeline")

if __name__ == "__main__":
unittest.main()
Loading
Loading