Skip to content

Commit

Permalink
Testing new execute function
Browse files Browse the repository at this point in the history
  • Loading branch information
PalNilsson committed Sep 25, 2024
1 parent b71b536 commit da69c8e
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.3.1
3.8.3.2
2 changes: 1 addition & 1 deletion pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '1' # build number should be reset to '1' for every new development cycle
BUILD = '2' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
96 changes: 96 additions & 0 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,102 @@ def execute(executable: Any, **kwargs: dict) -> Any:
stdout = ''
stderr = ''

def read_output(pipe, file):
for line in iter(pipe.readline, ''):
file.write(line)
pipe.close()

process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
stdout=kwargs.get('stdout', subprocess.PIPE),
stderr=kwargs.get('stderr', subprocess.PIPE),
cwd=kwargs.get('cwd', getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')
if kwargs.get('returnproc', False):
return process

stdout_file = kwargs.get('stdout', subprocess.PIPE)
stderr_file = kwargs.get('stderr', subprocess.PIPE)

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_file))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_file))

stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
process.wait(timeout=timeout)
except subprocess.TimeoutExpired as exc:
stderr += f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()

stdout_thread.join()
stderr_thread.join()

if stdout_file != subprocess.PIPE:
stdout_file.flush()
stdout_file.seek(0)
stdout = stdout_file.read()

if stderr_file != subprocess.PIPE:
stderr_file.flush()
stderr_file.seek(0)
stderr = stderr_file.read()

try:
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr


def execute_old2(executable: Any, **kwargs: dict) -> Any:
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '')

if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

exit_code = 0
stdout = ''
stderr = ''

def read_output(pipe, output_list):
for line in iter(pipe.readline, ''):
output_list.append(line)
Expand Down

0 comments on commit da69c8e

Please sign in to comment.