Skip to content

Commit

Permalink
New version of execute()
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Dec 18, 2024
1 parent f479fe9 commit c14a16b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 24 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.4.9
3.9.4.15
2 changes: 1 addition & 1 deletion pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '9' # build number should be reset to '1' for every new development cycle
BUILD = '15' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
105 changes: 83 additions & 22 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,86 @@
def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.
:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
"""
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message

# Convert executable to string if it is a list
if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

process = None
try:
with execute_lock:
process = subprocess.Popen(
exe,
bufsize=-1,
stdout=kwargs.get('stdout', subprocess.PIPE),
stderr=kwargs.get('stderr', subprocess.PIPE),
cwd=kwargs.get('cwd', getcwd()),
start_new_session=True,
encoding='utf-8',
errors='replace'
)
if kwargs.get('returnproc', False):
return process

# Use communicate() to read stdout and stderr reliably
try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired as exc:
# Timeout handling
stderr = f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
return exit_code, "", stderr
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
return exit_code, "", stderr

exit_code = process.poll()
if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr

finally:
# Ensure the process is cleaned up
if process and not kwargs.get('returnproc', False):
try:
process.wait(timeout=60)
process.stdout.close()
process.stderr.close()
except Exception:
pass


def execute_old3(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.
The function also determines whether the command should be executed within a container.
Expand Down Expand Up @@ -125,35 +205,15 @@ def read_output(stream, queue):
queue.put_nowait(line)
except queue.Full:
pass # Handle the case where the queue is full
except (AttributeError, ValueError):
break
except OSError as e:
if e.errno == errno.EBADF:
break
else:
raise

def read_output_old(stream, queue):
while True:
#sleep(1)
try:
line = stream.readline()
if not line:
break
sleep(0.01) # Sleep for a short interval to avoid busy waiting
except (AttributeError, ValueError):
# Handle the case where stream is None (AttributeError) or closed (ValueError)
break
except OSError as e:
if e.errno == errno.EBADF:
# Handle the case where the file descriptor is bad
break
else:
raise
try:
queue.put_nowait(line)
except queue.Full:
pass
#sleep(0.01) # Sleep for a short interval to avoid busy waiting

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
Expand All @@ -178,7 +238,8 @@ def read_output_old(stream, queue):
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()
#exit_code = process.poll()
exit_code = process.returncode

# Wait for the threads to finish reading
try:
Expand Down

0 comments on commit c14a16b

Please sign in to comment.