Skip to content

Commit

Permalink
Add safeguards for constant subprocess commands (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
drdavella authored Apr 2, 2024
1 parent 92629a8 commit c6b96a8
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 53 deletions.
26 changes: 15 additions & 11 deletions integration_tests/test_process_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,29 @@ class TestProcessSandbox(BaseIntegrationTest):
original_code = """
import subprocess
subprocess.run("echo 'hi'", shell=True)
subprocess.run(["ls", "-l"])
cmd = " ".join(["ls"])
subprocess.run(cmd, shell=True)
subprocess.run([cmd, "-l"])
subprocess.call("echo 'hi'", shell=True)
subprocess.call(["ls", "-l"])
subprocess.call(cmd, shell=True)
subprocess.call([cmd, "-l"])
subprocess.check_output(["ls", "-l"])
subprocess.check_output([cmd, "-l"])
subprocess.run("ls -l", shell=True)
var = "hello"
"""
replacement_lines = [
(2, """from security import safe_command\n\n"""),
(3, """safe_command.run(subprocess.run, "echo 'hi'", shell=True)\n"""),
(4, """safe_command.run(subprocess.run, ["ls", "-l"])\n"""),
(6, """safe_command.run(subprocess.call, "echo 'hi'", shell=True)\n"""),
(7, """safe_command.run(subprocess.call, ["ls", "-l"])\n"""),
(5, """safe_command.run(subprocess.run, cmd, shell=True)\n"""),
(6, """safe_command.run(subprocess.run, [cmd, "-l"])\n"""),
(8, """safe_command.run(subprocess.call, cmd, shell=True)\n"""),
(9, """safe_command.run(subprocess.call, [cmd, "-l"])\n"""),
]
expected_diff = '--- \n+++ \n@@ -1,10 +1,11 @@\n import subprocess\n+from security import safe_command\n \n-subprocess.run("echo \'hi\'", shell=True)\n-subprocess.run(["ls", "-l"])\n+safe_command.run(subprocess.run, "echo \'hi\'", shell=True)\n+safe_command.run(subprocess.run, ["ls", "-l"])\n \n-subprocess.call("echo \'hi\'", shell=True)\n-subprocess.call(["ls", "-l"])\n+safe_command.run(subprocess.call, "echo \'hi\'", shell=True)\n+safe_command.run(subprocess.call, ["ls", "-l"])\n \n subprocess.check_output(["ls", "-l"])\n \n'
expected_line_change = "3"
expected_diff = '--- \n+++ \n@@ -1,12 +1,13 @@\n import subprocess\n+from security import safe_command\n \n cmd = " ".join(["ls"])\n \n-subprocess.run(cmd, shell=True)\n-subprocess.run([cmd, "-l"])\n+safe_command.run(subprocess.run, cmd, shell=True)\n+safe_command.run(subprocess.run, [cmd, "-l"])\n \n-subprocess.call(cmd, shell=True)\n-subprocess.call([cmd, "-l"])\n+safe_command.run(subprocess.call, cmd, shell=True)\n+safe_command.run(subprocess.call, [cmd, "-l"])\n \n subprocess.check_output([cmd, "-l"])\n \n'
expected_line_change = "5"
num_changes = 4
num_changed_files = 2
change_description = ProcessSandbox.change_description
Expand Down
23 changes: 11 additions & 12 deletions src/core_codemods/process_creation_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,20 @@ class ProcessSandbox(SimpleCodemod):
rules:
- pattern-either:
- patterns:
- pattern: subprocess.run(...)
- pattern: subprocess.$FUNC(...)
- pattern-not: subprocess.$FUNC("...", ...)
- pattern-not: subprocess.$FUNC(["...", ...], ...)
- metavariable-pattern:
metavariable: $FUNC
patterns:
- pattern-either:
- pattern: run
- pattern: call
- pattern: Popen
- pattern-inside: |
import subprocess
...
- patterns:
- pattern: subprocess.call(...)
- pattern-inside: |
import subprocess
...
- patterns:
- pattern: subprocess.Popen(...)
- pattern-inside: |
import subprocess
...
"""
"""

def on_result_found(self, original_node, updated_node):
self.add_needed_import("security", "safe_command")
Expand Down
140 changes: 110 additions & 30 deletions tests/codemods/test_process_creation_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ def test_import_subprocess(self, adds_dependency, tmpdir):
input_code = """
import subprocess
subprocess.run("echo 'hi'", shell=True)
var = "hello"
def foo(cmd):
subprocess.run(cmd, shell=True)
var = "hello"
"""
expected = """
import subprocess
from security import safe_command
safe_command.run(subprocess.run, "echo 'hi'", shell=True)
var = "hello"
def foo(cmd):
safe_command.run(subprocess.run, cmd, shell=True)
var = "hello"
"""
self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)
Expand All @@ -34,15 +36,17 @@ def test_import_alias(self, adds_dependency, tmpdir):
input_code = """
import subprocess as sub
sub.run("echo 'hi'", shell=True)
var = "hello"
def foo(cmd):
sub.run(cmd, shell=True)
var = "hello"
"""
expected = """
import subprocess as sub
from security import safe_command
safe_command.run(sub.run, "echo 'hi'", shell=True)
var = "hello"
def foo(cmd):
safe_command.run(sub.run, cmd, shell=True)
var = "hello"
"""
self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)
Expand All @@ -51,22 +55,25 @@ def test_from_subprocess(self, adds_dependency, tmpdir):
input_code = """
from subprocess import run
run("echo 'hi'", shell=True)
var = "hello"
def foo(cmd):
run(cmd, shell=True)
var = "hello"
"""
expected = """
from subprocess import run
from security import safe_command
safe_command.run(run, "echo 'hi'", shell=True)
var = "hello"
def foo(cmd):
safe_command.run(run, cmd, shell=True)
var = "hello"
"""
self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)

def test_subprocess_nameerror(self, _, tmpdir):
input_code = """
subprocess.run("echo 'hi'", shell=True)
def foo(cmd):
subprocess.run(cmd, shell=True)
import subprocess
"""
Expand All @@ -80,32 +87,36 @@ def test_subprocess_nameerror(self, _, tmpdir):
"""
import subprocess
import csv
subprocess.run("echo 'hi'", shell=True)
csv.excel
def foo(cmd):
subprocess.run(cmd, shell=True)
csv.excel
""",
"""
import subprocess
import csv
from security import safe_command
safe_command.run(subprocess.run, "echo 'hi'", shell=True)
csv.excel
def foo(cmd):
safe_command.run(subprocess.run, cmd, shell=True)
csv.excel
""",
),
(
"""
import subprocess
from csv import excel
subprocess.run("echo 'hi'", shell=True)
excel
def foo(cmd):
subprocess.run(cmd, shell=True)
excel
""",
"""
import subprocess
from csv import excel
from security import safe_command
safe_command.run(subprocess.run, "echo 'hi'", shell=True)
excel
def foo(cmd):
safe_command.run(subprocess.run, cmd, shell=True)
excel
""",
),
],
Expand All @@ -123,38 +134,62 @@ def test_multifunctions(self, adds_dependency, tmpdir):
input_code = """
import subprocess
subprocess.run("echo 'hi'", shell=True)
subprocess.check_output(["ls", "-l"])"""
def foo(cmd, cmd2):
subprocess.run(cmd, shell=True)
subprocess.check_output([cmd2, "-l"])"""

expected = """
import subprocess
from security import safe_command
safe_command.run(subprocess.run, "echo 'hi'", shell=True)
subprocess.check_output(["ls", "-l"])"""
def foo(cmd, cmd2):
safe_command.run(subprocess.run, cmd, shell=True)
subprocess.check_output([cmd2, "-l"])"""

self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)

@pytest.mark.parametrize("command", ["run", "Popen"])
def test_subprocess_imported_cmd(self, adds_dependency, tmpdir, command):
input_code = f"""
import subprocess
from whatever import x
subprocess.{command}([x, "-l"])
"""
expected = f"""
import subprocess
from whatever import x
from security import safe_command
safe_command.run(subprocess.{command}, [x, "-l"])
"""
self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)

def test_custom_run(self, _, tmpdir):
input_code = """
from app_funcs import run
run("echo 'hi'", shell=True)"""
def foo(cmd):
run(cmd, shell=True)
"""
expected = input_code
self.run_and_assert(tmpdir, input_code, expected)

def test_subprocess_call(self, adds_dependency, tmpdir):
input_code = """
import subprocess
subprocess.call(["ls", "-l"])
def foo(cmd):
subprocess.call([cmd, "-l"])
"""
expected = """
import subprocess
from security import safe_command
safe_command.run(subprocess.call, ["ls", "-l"])
def foo(cmd):
safe_command.run(subprocess.call, [cmd, "-l"])
"""
self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)
Expand All @@ -163,13 +198,58 @@ def test_subprocess_popen(self, adds_dependency, tmpdir):
input_code = """
import subprocess
subprocess.Popen(["ls", "-l"])
def foo(cmd):
subprocess.Popen([cmd, "-l"])
"""
expected = """
import subprocess
from security import safe_command
safe_command.run(subprocess.Popen, ["ls", "-l"])
def foo(cmd):
safe_command.run(subprocess.Popen, [cmd, "-l"])
"""
self.run_and_assert(tmpdir, input_code, expected)
adds_dependency.assert_called_once_with(Security)

def test_hardcoded_string(self, _, tmpdir):
input_code = (
expected
) = """
import subprocess
subprocess.Popen("ls -l", shell=True)
"""
self.run_and_assert(tmpdir, input_code, expected)

def test_hardcoded_string_propagation(self, _, tmpdir):
input_code = (
expected
) = """
import subprocess
cmd = "ls -l"
subprocess.Popen(cmd, shell=True)
"""
self.run_and_assert(tmpdir, input_code, expected)

def test_hardcoded_array(self, _, tmpdir):
input_code = (
expected
) = """
import subprocess
subprocess.Popen(["ls", "-l"])
"""
self.run_and_assert(tmpdir, input_code, expected)

@pytest.mark.xfail(reason="Semgrep doesn't seem to support array propagation")
def test_hardcoded_array_propagation(self, _, tmpdir):
input_code = (
expected
) = """
import subprocess
cmd = ["ls", "-l"]
subprocess.Popen(cmd)
"""
self.run_and_assert(tmpdir, input_code, expected)

0 comments on commit c6b96a8

Please sign in to comment.