Skip to content

Commit

Permalink
execute() now using thread synchronization to protect against polling…
Browse files Browse the repository at this point in the history
… errors ending up in stderr
  • Loading branch information
Paul Nilsson committed Oct 18, 2024
1 parent 528a6b0 commit 3e04dc6
Showing 1 changed file with 4 additions and 91 deletions.
95 changes: 4 additions & 91 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ def read_output(stream, queue):
stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))

stdout_thread.start()
stderr_thread.start()
# start the threads and use thread synchronization
with threading.Lock():
stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
Expand Down Expand Up @@ -183,95 +185,6 @@ def read_output(stream, queue):
return exit_code, stdout, stderr


def execute_old2(executable: Any, **kwargs: dict) -> Any: # noqa: C901
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '')

if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

exit_code = 0
stdout = ''
stderr = ''

def read_output(pipe, output_list):
for line in iter(pipe.readline, ''):
output_list.append(line)
pipe.close()

process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=kwargs.get('cwd', getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')
if kwargs.get('returnproc', False):
return process

stdout_lines = []
stderr_lines = []

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines))

stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
process.wait(timeout=timeout)
except subprocess.TimeoutExpired as exc:
stderr += f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()

stdout_thread.join()
stderr_thread.join()

stdout = ''.join(stdout_lines)
stderr = ''.join(stderr_lines)

try:
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr


def execute_nothreads(executable: Any, **kwargs: dict) -> Any:
"""
Execute the command with its options in the provided executable list using subprocess time-out handler.
Expand Down

0 comments on commit 3e04dc6

Please sign in to comment.