Skip to content

Commit

Permalink
Merge pull request #158 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.9.4.16
  • Loading branch information
PalNilsson authored Dec 18, 2024
2 parents fd44667 + 0b4b127 commit 9e76a9f
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 34 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.3.2
3.9.4.15
2 changes: 1 addition & 1 deletion pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ def check_availablespace(self, files: list):
else:
if disk_space:
available_space = convert_mb_to_b(disk_space)
self.logger.info("locally available space: {available_space} B")
self.logger.info(f"locally available space: {available_space} B")

# are we within the limit?
if totalsize > available_space:
Expand Down
31 changes: 14 additions & 17 deletions pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import re

from collections import namedtuple
from os import environ, getuid
from os import environ, getuid, getpid
from subprocess import (
Popen,
PIPE
Expand All @@ -53,6 +53,7 @@
get_local_oidc_token_info,
update_local_oidc_token_info
)
from pilot.util.psutils import get_process_info
from pilot.util.queuehandling import (
abort_jobs_in_queues,
get_maxwalltime_from_job,
Expand Down Expand Up @@ -84,8 +85,8 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
last_token_check = t_0

# for CPU usage debugging
# cpuchecktime = int(config.Pilot.cpu_check)
# tcpu = t_0
cpuchecktime = int(config.Pilot.cpu_check)
tcpu = t_0
last_minute_check = t_0

queuedata = get_queuedata_from_job(queues)
Expand Down Expand Up @@ -158,15 +159,15 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
time.sleep(1)

# time to check the CPU usage?
# if is_pilot_check(check='cpu_usage'):
# if int(time.time() - tcpu) > cpuchecktime:
# processes = get_process_info('python3 pilot3/pilot.py', pid=getpid())
# if processes:
# logger.info(f'PID={getpid()} has CPU usage={processes[0]}% CMD={processes[2]}')
# nproc = processes[3]
# if nproc > 1:
# logger.info(f'.. there are {nproc} such processes running')
# tcpu = time.time()
if is_pilot_check(check='cpu_usage'):
if int(time.time() - tcpu) > cpuchecktime:
processes = get_process_info('python3 pilot3/pilot.py', pid=getpid())
if processes:
logger.info(f'PID={getpid()} has CPU usage={processes[0]}% CMD={processes[2]}')
nproc = processes[3]
if nproc > 1:
logger.info(f'.. there are {nproc} such processes running')
tcpu = time.time()

# proceed with running the other checks
run_checks(queues, args)
Expand Down Expand Up @@ -283,11 +284,7 @@ def reached_maxtime_abort(args: Any):
args.graceful_stop.set()


#def log_lifetime(sig, frame, traces):
# logger.info('lifetime: %i used, %i maximum', int(time.time() - traces.pilot['lifetime_start']), traces.pilot['lifetime_max'])


def get_process_info(cmd: str, user: str = "", args: str = 'aufx', pid: int = 0) -> list:
def get_process_info_old(cmd: str, user: str = "", args: str = 'aufx', pid: int = 0) -> list:
"""
Return process info for given command.
Expand Down
3 changes: 2 additions & 1 deletion pilot/user/atlas/loopingjob_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def remove_unwanted_files(workdir: str, files: list[str]) -> list[str]:
"memory_" in _file or
"mem." in _file or
"docs/" in _file or
"DBRelease-" in _file):
"DBRelease-" in _file or
_file == ""):
_files.append(_file)

return _files
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '2' # build number should be reset to '1' for every new development cycle
REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '15' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
106 changes: 95 additions & 11 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import logging
import queue
import re
import select
import shlex
import signal
import threading
Expand All @@ -51,6 +52,86 @@
def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.
:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
"""
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message

# Convert executable to string if it is a list
if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

process = None
try:
with execute_lock:
process = subprocess.Popen(
exe,
bufsize=-1,
stdout=kwargs.get('stdout', subprocess.PIPE),
stderr=kwargs.get('stderr', subprocess.PIPE),
cwd=kwargs.get('cwd', getcwd()),
start_new_session=True,
encoding='utf-8',
errors='replace'
)
if kwargs.get('returnproc', False):
return process

# Use communicate() to read stdout and stderr reliably
try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired as exc:
# Timeout handling
stderr = f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
return exit_code, "", stderr
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
return exit_code, "", stderr

exit_code = process.poll()
if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr

finally:
# Ensure the process is cleaned up
if process and not kwargs.get('returnproc', False):
try:
process.wait(timeout=60)
process.stdout.close()
process.stderr.close()
except Exception:
pass


def execute_old3(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.
The function also determines whether the command should be executed within a container.
Expand Down Expand Up @@ -113,24 +194,26 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901

def read_output(stream, queue):
while True:
sleep(0.01)
try:
line = stream.readline()
if not line:
break
# Use select to wait for the stream to be ready for reading
ready, _, _ = select.select([stream], [], [], 1.0)
if ready:
line = stream.readline()
if not line:
break
try:
queue.put_nowait(line)
except queue.Full:
pass # Handle the case where the queue is full
else:
sleep(0.01) # Sleep for a short interval to avoid busy waiting
except (AttributeError, ValueError):
# Handle the case where stream is None (AttributeError) or closed (ValueError)
break
except OSError as e:
if e.errno == errno.EBADF:
# Handle the case where the file descriptor is bad
break
else:
raise
try:
queue.put_nowait(line)
except queue.Full:
sleep(0.01) # Sleep for a short interval to avoid busy waiting

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
Expand All @@ -155,7 +238,8 @@ def read_output(stream, queue):
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()
#exit_code = process.poll()
exit_code = process.returncode

# Wait for the threads to finish reading
try:
Expand Down
39 changes: 38 additions & 1 deletion pilot/util/psutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#
# Authors:
# - Paul Nilsson, [email protected], 2023
# - Paul Nilsson, [email protected], 2023-24

import logging
import os
Expand Down Expand Up @@ -373,3 +373,40 @@ def check_cpu_load():
else:
logger.info("system load is normal")
return False


def get_process_info(cmd: str, user: str = "", pid: int = 0) -> list:
"""
Return process info for given command.
The function returns a list with format [cpu, mem, command, number of commands] for
a given command (e.g. python3 pilot3/pilot.py).
:param cmd: command (str)
:param user: user (str)
:param pid: process id (int)
:return: list with process info (l[0]=cpu usage(%), l[1]=mem usage(%), l[2]=command(string)) (list).
"""
if not _is_psutil_available:
logger.warning('psutil not available, cannot check pilot CPU load')
return []

processes = []
num = 0

for proc in psutil.process_iter(['pid', 'username', 'cpu_percent', 'memory_percent', 'cmdline']):
try:
if user and proc.info['username'] != user:
continue
cmdline = proc.info['cmdline']
if cmdline and cmd in ' '.join(cmdline):
num += 1
if proc.info['pid'] == pid:
processes = [proc.info['cpu_percent'], proc.info['memory_percent'], ' '.join(cmdline)]
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue

if processes:
processes.append(num)

return processes

0 comments on commit 9e76a9f

Please sign in to comment.