Skip to content

Commit

Permalink
plugins/semgrep: add csmock semgrep plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rhyw committed Jan 11, 2024
1 parent 509e087 commit 117a232
Showing 1 changed file with 189 additions and 0 deletions.
189 changes: 189 additions & 0 deletions py/plugins/semgrep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Copyright (C) 2014 Red Hat, Inc.
#
# This file is part of csmock.
#
# csmock is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# csmock is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with csmock. If not, see <http://www.gnu.org/licenses/>.

import os

# disable metrics to be sent to semgrep cloud
SEMGREP_SEND_METRICS = "off"

SEMGREP_VERBOSE = True

# FIXME: should we use a dedicated repo to maintain semgrep rules?
SEMGREP_RULES_REPO = "https://git.prodsec.redhat.com/prodsec-services/masssast"

SEMGREP_SCAN_DIR = "/builddir/build/BUILD"

SEMGREP_RULES_CACHE_DIR = "/var/tmp/csmock/semgrep"

# where semgrep cli and its dependencies are installed
SEMGREP_LIB_TARGET_DIR = "/var/tmp/csmock/semgrep_lib"

SEMGREP_SCAN_OUTPUT = "/builddir/semgrep-scan-results.sarif"

SEMGREP_SCAN_LOG = "/builddir/semgrep-scan.log"


class PluginProps:
def __init__(self):
self.description = (
"A fast, open-source, static analysis engine for finding bugs, "
"detecting dependency vulnerabilities, and enforcing code standards."
)

# include this plug-in in `csmock --all-tools`
self.stable = True


class Plugin:
def __init__(self):
self.enabled = False

def get_props(self):
return PluginProps()

def enable(self):
self.enabled = True

def init_parser(self, parser):
parser.add_argument(
"--semgrep-metrics", default=SEMGREP_SEND_METRICS,
help="configures how usage metrics are sent to the Semgrep server")

parser.add_argument(
"--semgrep-rules-repo", default=SEMGREP_RULES_REPO,
help="downstream semgrep rules repo")

parser.add_argument(
"--semgrep-rules-refresh", action="store_true",
help="force clone/refresh latest rules from downstream repo")

parser.add_argument(
"--semgrep-verbose", default=SEMGREP_VERBOSE,
help="show more details about what rules are running, which files failed to parse, etc.")

parser.add_argument(
"--semgrep-scan-opts",
help="space-separated list of additional options passed to the 'semgrep scan' command")

def handle_args(self, parser, args, props):
if not self.enabled:
return

def populate_semgrep_runtime_env():
# Update PYTHONPATH and PATH, so that semgrep cli and its dependent libraries can be found
# Note that this change doesn't persist across sessions.
os.environ['PYTHONPATH'] = f"{os.environ.get('PYTHONPATH')}:{SEMGREP_LIB_TARGET_DIR}"
os.environ['PATH'] = f"{os.environ.get('PATH')}:{SEMGREP_LIB_TARGET_DIR}/bin"

# download semgrep rules from downstream repo
def fetch_semgrep_rules_hook(results, props):
try:
# make sure the cache directory exists
os.makedirs(SEMGREP_RULES_CACHE_DIR, mode=0o755, exist_ok=True)
os.makedirs(SEMGREP_LIB_TARGET_DIR, mode=0o755, exist_ok=True)
except OSError:
results.error(f"failed to create semgrep rules/lib directory")
return 1

# install semgrep cli using pip
cmd = f"python3 -m pip install --target={SEMGREP_LIB_TARGET_DIR} semgrep"
ec = results.exec_cmd(cmd, shell=True)
if 0 != ec:
results.error("failed to install semgrep cli using pip")
else:
# add semgrep install target directory to PYTHONPATH
populate_semgrep_runtime_env()

# command to fetch semgrep rules
repo_clone_cmd = f"git clone {args.semgrep_rules_repo} {SEMGREP_RULES_CACHE_DIR}"

# check whether we can reuse previously downloaded semgrep rules
if not args.semgrep_rules_refresh and os.listdir(SEMGREP_RULES_CACHE_DIR):
results.print_with_ts(f"reusing previously downloaded semgrep rules: {SEMGREP_RULES_CACHE_DIR}")
else:
# remove previously downloaded semgrep repo/rules
results.exec_cmd(["rm", "-rf", f"{SEMGREP_RULES_CACHE_DIR}/*"])

# fetch semgrep rules from downstream repo
ec = results.exec_cmd(repo_clone_cmd.split())
if 0 != ec:
results.error("failed to fetch semgrep rules from downstream repo")
return ec
# query version of semgrep
ec, output = results.get_cmd_output(["semgrep", "--version"], shell=False)
if 0 != ec:
results.error("failed to query semgrep cli version", ec=ec)
return ec

# parse and record the version of semgrep cli
version = output.rstrip("\n")
results.ini_writer.append("analyzer-version-semgrep-cli", version)

# get the results out of the chroot
props.copy_out_files += [SEMGREP_SCAN_OUTPUT, SEMGREP_SCAN_LOG]
return 0

props.pre_mock_hooks += [fetch_semgrep_rules_hook]

def get_chroot_root_path(results, props):
# get the path to the chroot root directory
cmd = f"mock -r {props.mock_profile} --print-root-path"
_, output =results.get_cmd_output(cmd)
return output.rstrip("/\n")

def scan_hook(results, mock, props):
# command to run semgrep scan
cmd = (f"semgrep scan --metrics={args.semgrep_metrics} --sarif"
f" --config={SEMGREP_RULES_CACHE_DIR}/rules")

if args.semgrep_verbose:
cmd += " --verbose"

# append additional options passed to the 'semgrep scan' command
if args.semgrep_scan_opts:
cmd += f" {args.semgrep_scan_opts}"

# eventually append the target directory to be scanned
chroot_root_path = get_chroot_root_path(results, props)
cmd += (f" --output={chroot_root_path}{SEMGREP_SCAN_OUTPUT} {chroot_root_path}{SEMGREP_SCAN_DIR}"
f" 2>{chroot_root_path}{SEMGREP_SCAN_LOG}")
# run semgrep scan
ec = results.exec_cmd(cmd, shell=True)

# according to semgrep cli scan doc, below are all possible return codes
if ec == 123:
results.error("semgrep: Indiscriminate errors reported on standard error.")
elif ec == 124:
results.error("semgrep: Command line parsing errors.")
elif ec == 125:
results.error("semgrep: Unexpected internal errors (bugs).")

return 0

# run semgrep scan after successful build
props.post_install_hooks += [scan_hook]

# convert the results into the csdiff's JSON format
def filter_hook(results):
src = results.dbgdir_raw + SEMGREP_SCAN_OUTPUT
if not os.path.exists(src):
return 0
dst = f"{results.dbgdir_uni}/semgrep-scan-results.json"
cmd = f"csgrep '{src}' --mode=json --prepend-path-prefix={SEMGREP_SCAN_DIR}/ > '{dst}'"
return results.exec_cmd(cmd, shell=True)

props.post_process_hooks += [filter_hook]

0 comments on commit 117a232

Please sign in to comment.