Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.9.4.15 #158

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.3.2
3.9.4.15
2 changes: 1 addition & 1 deletion pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ def check_availablespace(self, files: list):
else:
if disk_space:
available_space = convert_mb_to_b(disk_space)
self.logger.info("locally available space: {available_space} B")
self.logger.info(f"locally available space: {available_space} B")

# are we within the limit?
if totalsize > available_space:
Expand Down
31 changes: 14 additions & 17 deletions pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import re

from collections import namedtuple
from os import environ, getuid
from os import environ, getuid, getpid
from subprocess import (
Popen,
PIPE
Expand All @@ -53,6 +53,7 @@
get_local_oidc_token_info,
update_local_oidc_token_info
)
from pilot.util.psutils import get_process_info
from pilot.util.queuehandling import (
abort_jobs_in_queues,
get_maxwalltime_from_job,
Expand Down Expand Up @@ -84,8 +85,8 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
last_token_check = t_0

# for CPU usage debugging
# cpuchecktime = int(config.Pilot.cpu_check)
# tcpu = t_0
cpuchecktime = int(config.Pilot.cpu_check)
tcpu = t_0
last_minute_check = t_0

queuedata = get_queuedata_from_job(queues)
Expand Down Expand Up @@ -158,15 +159,15 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
time.sleep(1)

# time to check the CPU usage?
# if is_pilot_check(check='cpu_usage'):
# if int(time.time() - tcpu) > cpuchecktime:
# processes = get_process_info('python3 pilot3/pilot.py', pid=getpid())
# if processes:
# logger.info(f'PID={getpid()} has CPU usage={processes[0]}% CMD={processes[2]}')
# nproc = processes[3]
# if nproc > 1:
# logger.info(f'.. there are {nproc} such processes running')
# tcpu = time.time()
if is_pilot_check(check='cpu_usage'):
if int(time.time() - tcpu) > cpuchecktime:
processes = get_process_info('python3 pilot3/pilot.py', pid=getpid())
if processes:
logger.info(f'PID={getpid()} has CPU usage={processes[0]}% CMD={processes[2]}')
nproc = processes[3]
if nproc > 1:
logger.info(f'.. there are {nproc} such processes running')
tcpu = time.time()

# proceed with running the other checks
run_checks(queues, args)
Expand Down Expand Up @@ -283,11 +284,7 @@ def reached_maxtime_abort(args: Any):
args.graceful_stop.set()


#def log_lifetime(sig, frame, traces):
# logger.info('lifetime: %i used, %i maximum', int(time.time() - traces.pilot['lifetime_start']), traces.pilot['lifetime_max'])


def get_process_info(cmd: str, user: str = "", args: str = 'aufx', pid: int = 0) -> list:
def get_process_info_old(cmd: str, user: str = "", args: str = 'aufx', pid: int = 0) -> list:
"""
Return process info for given command.

Expand Down
3 changes: 2 additions & 1 deletion pilot/user/atlas/loopingjob_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def remove_unwanted_files(workdir: str, files: list[str]) -> list[str]:
"memory_" in _file or
"mem." in _file or
"docs/" in _file or
"DBRelease-" in _file):
"DBRelease-" in _file or
_file == ""):
_files.append(_file)

return _files
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '2' # build number should be reset to '1' for every new development cycle
REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '15' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
106 changes: 95 additions & 11 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import logging
import queue
import re
import select
import shlex
import signal
import threading
Expand All @@ -51,6 +52,86 @@
def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.
:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
"""
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message

# Convert executable to string if it is a list
if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

process = None
try:
with execute_lock:
process = subprocess.Popen(
exe,
bufsize=-1,
stdout=kwargs.get('stdout', subprocess.PIPE),
stderr=kwargs.get('stderr', subprocess.PIPE),
cwd=kwargs.get('cwd', getcwd()),
start_new_session=True,
encoding='utf-8',
errors='replace'
)
if kwargs.get('returnproc', False):
return process

# Use communicate() to read stdout and stderr reliably
try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired as exc:
# Timeout handling
stderr = f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
return exit_code, "", stderr
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
return exit_code, "", stderr

exit_code = process.poll()
if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr

finally:
# Ensure the process is cleaned up
if process and not kwargs.get('returnproc', False):
try:
process.wait(timeout=60)
process.stdout.close()
process.stderr.close()
except Exception:
pass


def execute_old3(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.

The function also determines whether the command should be executed within a container.

Expand Down Expand Up @@ -113,24 +194,26 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901

def read_output(stream, queue):
while True:
sleep(0.01)
try:
line = stream.readline()
if not line:
break
# Use select to wait for the stream to be ready for reading
ready, _, _ = select.select([stream], [], [], 1.0)
if ready:
line = stream.readline()
if not line:
break
try:
queue.put_nowait(line)
except queue.Full:
pass # Handle the case where the queue is full
else:
sleep(0.01) # Sleep for a short interval to avoid busy waiting
except (AttributeError, ValueError):
# Handle the case where stream is None (AttributeError) or closed (ValueError)
break
except OSError as e:
if e.errno == errno.EBADF:
# Handle the case where the file descriptor is bad
break
else:
raise
try:
queue.put_nowait(line)
except queue.Full:
sleep(0.01) # Sleep for a short interval to avoid busy waiting

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
Expand All @@ -155,7 +238,8 @@ def read_output(stream, queue):
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()
#exit_code = process.poll()
exit_code = process.returncode

# Wait for the threads to finish reading
try:
Expand Down
39 changes: 38 additions & 1 deletion pilot/util/psutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#
# Authors:
# - Paul Nilsson, [email protected], 2023
# - Paul Nilsson, [email protected], 2023-24

import logging
import os
Expand Down Expand Up @@ -373,3 +373,40 @@ def check_cpu_load():
else:
logger.info("system load is normal")
return False


def get_process_info(cmd: str, user: str = "", pid: int = 0) -> list:
"""
Return process info for given command.

The function returns a list with format [cpu, mem, command, number of commands] for
a given command (e.g. python3 pilot3/pilot.py).

:param cmd: command (str)
:param user: user (str)
:param pid: process id (int)
:return: list with process info (l[0]=cpu usage(%), l[1]=mem usage(%), l[2]=command(string)) (list).
"""
if not _is_psutil_available:
logger.warning('psutil not available, cannot check pilot CPU load')
return []

processes = []
num = 0

for proc in psutil.process_iter(['pid', 'username', 'cpu_percent', 'memory_percent', 'cmdline']):
try:
if user and proc.info['username'] != user:
continue
cmdline = proc.info['cmdline']
if cmdline and cmd in ' '.join(cmdline):
num += 1
if proc.info['pid'] == pid:
processes = [proc.info['cpu_percent'], proc.info['memory_percent'], ' '.join(cmdline)]
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue

if processes:
processes.append(num)

return processes
Loading