diff --git a/pyproject.toml b/pyproject.toml index 86d11bc484..37e4161032 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,7 @@ warn_unused_configs = true [[tool.mypy.overrides]] module = [ + "src.archivematicaCommon.lib.executeOrRunSubProcess", "src.MCPClient.lib.client.*", "src.MCPClient.lib.clientScripts.characterize_file", "src.MCPClient.lib.clientScripts.has_packages", @@ -64,6 +65,7 @@ module = [ "src.MCPClient.lib.clientScripts.policy_check", "src.MCPClient.lib.clientScripts.transcribe_file", "src.MCPClient.lib.clientScripts.validate_file", + "tests.archivematicaCommon.test_execute_functions", "tests.dashboard.fpr.test_views", "tests.MCPClient.conftest", "tests.MCPClient.test_characterize_file", diff --git a/src/archivematicaCommon/lib/executeOrRunSubProcess.py b/src/archivematicaCommon/lib/executeOrRunSubProcess.py index 86a3778906..0b8ada6cf5 100644 --- a/src/archivematicaCommon/lib/executeOrRunSubProcess.py +++ b/src/archivematicaCommon/lib/executeOrRunSubProcess.py @@ -21,16 +21,29 @@ import subprocess import sys import tempfile +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +from archivematicaFunctions import escape + +Arguments = List[str] +Input = Union[str, bytes, io.IOBase] +Environment = Dict[str, str] +Command = Union[str, List[str]] +Result = Tuple[int, str, str] def launchSubProcess( - command, - stdIn="", - printing=True, - arguments=None, - env_updates=None, - capture_output=False, -): + command: Command, + stdIn: Input = "", + printing: bool = True, + arguments: Optional[Arguments] = None, + env_updates: Optional[Environment] = None, + capture_output: bool = False, +) -> Result: """ Launches a subprocess using ``command``, where ``command`` is either: a) a single string containing a commandline statement, or @@ -89,7 +102,7 @@ def launchSubProcess( stdin_pipe = subprocess.PIPE communicate_input = stdIn elif isinstance(stdIn, io.IOBase): - stdin_pipe = stdIn + stdin_pipe = stdIn.fileno() communicate_input = None else: raise Exception("stdIn must be a string or a file object") @@ -103,8 +116,8 @@ def launchSubProcess( env=my_env, ) std_out, std_error = p.communicate(input=communicate_input) - stdOut = std_out.decode() - stdError = std_error.decode() + stdOut = escape(std_out) + stdError = escape(std_error) else: # Ignore the stdout of the subprocess, capturing only stderr with open(os.devnull, "w") as devnull: @@ -116,7 +129,7 @@ def launchSubProcess( stderr=subprocess.PIPE, ) __, std_error = p.communicate(input=communicate_input) - stdError = std_error.decode() + stdError = escape(std_error) retcode = p.returncode # If we are not capturing output and the subprocess has succeeded, set # its stderr to the empty string. @@ -139,8 +152,13 @@ def launchSubProcess( def createAndRunScript( - text, stdIn="", printing=True, arguments=None, env_updates=None, capture_output=True -): + text: Command, + stdIn: Input = "", + printing: bool = True, + arguments: Optional[Arguments] = None, + env_updates: Optional[Environment] = None, + capture_output: bool = True, +) -> Result: if arguments is None: arguments = [] if env_updates is None: @@ -150,7 +168,10 @@ def createAndRunScript( encoding="utf-8", mode="wt", delete=False ) as tmpfile: os.chmod(tmpfile.name, 0o770) - tmpfile.write(text) + if isinstance(text, str): + tmpfile.write(text) + else: + tmpfile.write(" ".join(text)) tmpfile.close() cmd = [tmpfile.name] cmd.extend(arguments) @@ -168,14 +189,14 @@ def createAndRunScript( def executeOrRun( - type, - text, - stdIn="", - printing=True, - arguments=None, - env_updates=None, - capture_output=True, -): + type: str, + text: Command, + stdIn: Input = "", + printing: bool = True, + arguments: Optional[Arguments] = None, + env_updates: Optional[Environment] = None, + capture_output: bool = True, +) -> Result: """ Attempts to run the provided command on the shell, with the text of "stdIn" passed as standard input if provided. The type parameter @@ -220,7 +241,9 @@ def executeOrRun( capture_output=capture_output, ) if type == "bashScript": - text = "#!/bin/bash\n" + text + if not isinstance(text, str): + raise ValueError("command must be a str") + text = f"#!/bin/bash\n{text}" return createAndRunScript( text, stdIn=stdIn, @@ -230,7 +253,9 @@ def executeOrRun( capture_output=capture_output, ) if type == "pythonScript": - text = "#!/usr/bin/env python\n" + text + if not isinstance(text, str): + raise ValueError("command must be a str") + text = f"#!/usr/bin/env python\n{text}" return createAndRunScript( text, stdIn=stdIn, @@ -248,3 +273,4 @@ def executeOrRun( env_updates=env_updates, capture_output=capture_output, ) + raise ValueError(f"unknown type {type}") diff --git a/tests/archivematicaCommon/test_execute_functions.py b/tests/archivematicaCommon/test_execute_functions.py index 0effb27190..0f1fa41b91 100644 --- a/tests/archivematicaCommon/test_execute_functions.py +++ b/tests/archivematicaCommon/test_execute_functions.py @@ -1,13 +1,16 @@ +import pathlib import shlex import tempfile +from typing import Generator from unittest.mock import ANY +from unittest.mock import Mock from unittest.mock import patch import executeOrRunSubProcess as execsub import pytest -def test_capture_output(): +def test_capture_output() -> None: """Tests behaviour of capture_output when executing sub processes.""" # Test that stdout and stderr are not captured by default @@ -61,7 +64,7 @@ def test_capture_output(): @pytest.fixture -def temp_path(tmp_path): +def temp_path(tmp_path: pathlib.Path) -> Generator[str, None, None]: """Creates custom temp path, yields the value, and resets to original value.""" original_tempdir = tempfile.tempdir @@ -73,7 +76,9 @@ def temp_path(tmp_path): @patch("executeOrRunSubProcess.launchSubProcess") -def test_createAndRunScript_creates_tmpfile_in_custom_dir(launchSubProcess, temp_path): +def test_createAndRunScript_creates_tmpfile_in_custom_dir( + launchSubProcess: Mock, temp_path: str +) -> None: """Tests execution of launchSubProcess when executing createAndRunScript.""" script_content = "#!/bin/bash\necho 'Script output'\nexit 0" @@ -89,3 +94,22 @@ def test_createAndRunScript_creates_tmpfile_in_custom_dir(launchSubProcess, temp ) args, _ = launchSubProcess.call_args assert args[0][0].startswith(temp_path) + + +@patch("subprocess.Popen") +def test_launchSubProcess_replaces_non_utf8_output_with_replacement_characters( + popen: Mock, +) -> None: + communicate_return_code = 0 + communicate_output = b"Output \xae" + communicate_error = b"Error \xae" + popen.return_value = Mock( + returncode=communicate_return_code, + **{"communicate.return_value": (communicate_output, communicate_error)}, + ) + + code, stdout, stderr = execsub.launchSubProcess("mycommand", capture_output=True) + + assert code == communicate_return_code + assert stdout == communicate_output.decode(errors="replace") + assert stderr == communicate_error.decode(errors="replace")