Skip to content

Commit

Permalink
Merge pull request #153 from PalNilsson/next
Browse files Browse the repository at this point in the history
3.9.2.41
  • Loading branch information
PalNilsson authored Dec 5, 2024
2 parents 3440240 + 2d4e5b4 commit 9edf4e8
Show file tree
Hide file tree
Showing 21 changed files with 397 additions and 61 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.2.23b
3.9.2.41
5 changes: 4 additions & 1 deletion pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ class StageOutClient(StagingClient):

mode = "stage-out"

def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list:
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = None) -> list:
"""
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`.
Expand All @@ -1083,6 +1083,9 @@ def prepare_destinations(self, files: list, activities: list or str, alt_exclude
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
:return: updated fspec entries (list).
"""
if alt_exclude is None: # to bypass pylint complaint if declared as [] above
alt_exclude = []

if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
return files

Expand Down
2 changes: 2 additions & 0 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ def _stage_in(args: object, job: JobData) -> bool:
logger.info(" -- lfn=%s, status_code=%s, status=%s", infile.lfn, infile.status_code, status)

# write time stamps to pilot timing file

# MOVE THIS TO AFTER REMOTE FILE OPEN HAS BEEN VERIFIED (actually just before the payload starts)
add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args)

remain_files = [infile for infile in job.indata if infile.status not in ['remote_io', 'transferred', 'no_transfer']]
Expand Down
9 changes: 6 additions & 3 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1684,10 +1684,13 @@ def get_job_definition_from_server(args: Any, taskid: str = "") -> str:
cmd = https.get_server_command(args.url, args.port)
if cmd != "":
logger.info(f'executing server command: {cmd}')
res = https.request2(cmd, data=data, panda=True) # will be a dictionary
logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok
if not res: # fallback to curl solution
if "curlgetjob" in infosys.queuedata.catchall:
res = https.request(cmd, data=data)
else:
res = https.request2(cmd, data=data, panda=True) # will be a dictionary
logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok
if not res: # fallback to curl solution
res = https.request(cmd, data=data)

return res

Expand Down
25 changes: 24 additions & 1 deletion pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,30 @@ def scan_for_memory_errors(subprocesses: list) -> str:
if search_str in line:
diagnostics = line[line.find(search_str):]
logger.warning(f'found memory error: {diagnostics}')
break

# make sure that this message is for a true subprocess of the pilot
# extract the pid from the message and compare it to the subprocesses list
match = search(r'Killed process (\d+)', diagnostics)
if match:
try:
found_pid = int(match.group(1))
logger.info(f"extracted PID: {found_pid}")

# is it a known subprocess?
if found_pid in subprocesses:
logger.info("PID found in the list of subprocesses")
break
else:
logger.warning("the extracted PID is not a known subprocess of the payload")
diagnostics = ""
# is the extracted PID a subprocess of the main pilot process itself?

except (ValueError, TypeError, AttributeError) as e:
logger.warning(f"failed to extract PID from the message: {e}")
diagnostics = ""
else:
logger.warning("PID could not be extracted from the message")
diagnostics = ""

if diagnostics:
break
Expand Down
40 changes: 25 additions & 15 deletions pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
from pilot.common.errorcodes import ErrorCodes
from pilot.control.job import send_state
from pilot.info import JobData
from pilot.util.auxiliary import set_pilot_state # , show_memory_usage
from pilot.util.auxiliary import (
set_pilot_state, # , show_memory_usage
list_items
)
from pilot.util.config import config
from pilot.util.container import execute
from pilot.util.constants import (
Expand All @@ -55,7 +58,11 @@
write_file,
read_file
)
from pilot.util.processes import kill_processes
from pilot.util.processes import (
kill_process,
kill_processes,
)
from pilot.util.psutils import find_lingering_processes
from pilot.util.timing import (
add_to_pilot_timing,
get_time_measurement
Expand Down Expand Up @@ -108,8 +115,6 @@ def pre_setup(self, job: JobData):
"""
# write time stamps to pilot timing file
update_time = time.time()
logger.debug(f"setting pre-setup time to {update_time} s")
logger.debug(f"gmtime is {time.gmtime(update_time)}")
add_to_pilot_timing(job.jobid, PILOT_PRE_SETUP, update_time, self.__args)

def post_setup(self, job: JobData, update_time: bool = None):
Expand Down Expand Up @@ -488,8 +493,6 @@ def pre_payload(self, job: JobData):
"""
# write time stamps to pilot timing file
update_time = time.time()
logger.debug(f"setting pre-payload time to {update_time} s")
logger.debug(f"gmtime is {time.gmtime(update_time)}")
add_to_pilot_timing(job.jobid, PILOT_PRE_PAYLOAD, update_time, self.__args)

def post_payload(self, job: JobData):
Expand All @@ -502,8 +505,6 @@ def post_payload(self, job: JobData):
"""
# write time stamps to pilot timing file
update_time = time.time()
logger.debug(f"setting post-payload time to {update_time} s")
logger.debug(f"gmtime is {time.gmtime(update_time)}")
add_to_pilot_timing(job.jobid, PILOT_POST_PAYLOAD, update_time, self.__args)

def run_command(self, cmd: str, label: str = "") -> Any:
Expand Down Expand Up @@ -691,11 +692,10 @@ def wait_graceful(self, args: object, proc: Any) -> int:

return exit_code

def get_payload_command(self, job: JobData) -> str:
def get_payload_command(self) -> str:
"""
Return the payload command string.
:param job: job object (JobData)
:return: command (str).
"""
cmd = ""
Expand All @@ -705,14 +705,14 @@ def get_payload_command(self, job: JobData) -> str:
user = __import__(
f"pilot.user.{pilot_user}.common", globals(), locals(), [pilot_user], 0
)
cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs
cmd = user.get_payload_command(self.__job, args=self.__args) # + 'sleep 900' # to test looping jobs
except PilotException as error:
self.post_setup(job)
self.post_setup(self.__job)
logger.error(traceback.format_exc())
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(
self.__job.piloterrorcodes, self.__job.piloterrordiags = errors.add_error_code(
error.get_error_code()
)
self.__traces.pilot["error_code"] = job.piloterrorcodes[0]
self.__traces.pilot["error_code"] = self.__job.piloterrorcodes[0]
logger.fatal(
f"could not define payload command (traces error set to: {self.__traces.pilot['error_code']})"
)
Expand Down Expand Up @@ -799,7 +799,7 @@ def run(self) -> tuple[int, str]: # noqa: C901
self.pre_setup(self.__job)

# get the user defined payload command
cmd = self.get_payload_command(self.__job)
cmd = self.get_payload_command()
if not cmd:
logger.warning("aborting run() since payload command could not be defined")
return errors.UNKNOWNPAYLOADFAILURE, "undefined payload command"
Expand Down Expand Up @@ -982,6 +982,16 @@ def run(self) -> tuple[int, str]: # noqa: C901
if self.__job.utilities != {}:
self.stop_utilities()

# make sure there are no lingering processes
items = find_lingering_processes(os.getpid())
if items:
logger.warning("found lingering processes - will now be removed")
list_items(items)
for item in items:
kill_process(item, hardkillonly=True)
else:
logger.info("found no lingering processes")

if self.__job.is_hpo and state != "failed":
# in case there are more hyper-parameter points, move away the previous log files
# self.rename_log_files(iteration)
Expand Down
1 change: 1 addition & 0 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class JobData(BaseData):
prodproxy = "" # to keep track of production proxy on unified queues
completed = False # True when job has finished or failed, used by https::send_update()
lsetuptime = 0 # payload setup time (lsetup)
runningstart = None # time when the payload started running (only for internal monitoring purposes, not the actual start time)

# time variable used for on-the-fly cpu consumption time measurements done by job monitoring
t0 = None # payload startup time
Expand Down
22 changes: 19 additions & 3 deletions pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import logging
import os
import re
import time

from collections import defaultdict
from functools import reduce
Expand Down Expand Up @@ -56,7 +57,9 @@
UTILITY_AFTER_PAYLOAD_FINISHED,
UTILITY_AFTER_PAYLOAD_STARTED2,
UTILITY_BEFORE_STAGEIN,
UTILITY_AFTER_PAYLOAD_FINISHED2
UTILITY_AFTER_PAYLOAD_FINISHED2,
PILOT_PRE_REMOTEIO,
PILOT_POST_REMOTEIO
)
from pilot.util.container import execute
from pilot.util.filehandling import (
Expand All @@ -81,6 +84,7 @@
get_trimmed_dictionary,
is_child
)
from pilot.util.timing import add_to_pilot_timing
from pilot.util.tracereport import TraceReport
from .container import (
create_root_container_command,
Expand Down Expand Up @@ -487,14 +491,17 @@ def get_nthreads(catchall: str) -> int:
return _nthreads if _nthreads else 1


def get_payload_command(job: JobData) -> str:
def get_payload_command(job: JobData, args: object = None) -> str:
"""
Return the full command for executing the payload, including the sourcing of all setup files and setting of environment variables.
:param job: job object (JobData)
:return: command (str).
:param args: pilot arguments (object)
:return: command (str)
:raises TrfDownloadFailure: in case of download failure.
"""
if not args: # bypass pylint complaint
pass
# Should the pilot do the setup or does jobPars already contain the information?
preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams)

Expand All @@ -515,6 +522,8 @@ def get_payload_command(job: JobData) -> str:
exitcode = 0
diagnostics = ""

t0 = int(time.time())
add_to_pilot_timing(job.jobid, PILOT_PRE_REMOTEIO, t0, args)
try:
exitcode, diagnostics, not_opened_turls, lsetup_time = open_remote_files(job.indata, job.workdir, get_nthreads(catchall))
except Exception as exc:
Expand All @@ -532,8 +541,15 @@ def get_payload_command(job: JobData) -> str:
else:
process_remote_file_traces(path, job, not_opened_turls) # ignore PyCharm warning, path is str

t1 = int(time.time())
add_to_pilot_timing(job.jobid, PILOT_POST_REMOTEIO, t1, args)
dt = t1 - t0
logger.info(f'remote file verification finished in {dt} s')

# fail the job if the remote files could not be verified
if exitcode != 0:
# improve the error diagnostics
diagnostics = errors.format_diagnostics(exitcode, diagnostics)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode, msg=diagnostics)
raise PilotException(diagnostics, code=exitcode)
else:
Expand Down
112 changes: 112 additions & 0 deletions pilot/user/atlas/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import logging
import os
import re
import select
import shlex
import subprocess
import time
Expand Down Expand Up @@ -838,6 +839,117 @@ def execute_remote_file_open(path: str, python_script_timeout: int) -> tuple[int
python_completed = False # Flag to track completion of 'python3' process
lsetup_completed_at = None

with open(new_path, "w", encoding='utf-8') as file:
while True:
# Check for timeout (once per second)
if time.time() - start_time > lsetup_timeout and not lsetup_completed:
logger.warning("timeout for 'lsetup' exceeded - killing script")
exit_code = 2 # 'lsetup' timeout
process.kill()
break

# Use select to check if there is data to read (to byspass any blocking operation that will prevent time-out checks)
ready, _, _ = select.select([process.stdout], [], [], 1.0)
if ready:
output = process.stdout.readline() # Read bytes directly
if output:
output = output.decode().strip()
file.write(output + "\n")
stdout += output + "\n"

# Check for LSETUP_COMPLETED message
if output == "LSETUP_COMPLETED":
lsetup_completed = True
lsetup_completed_at = time.time()
start_time = time.time() # Reset start time for 'python3' timeout

# Check for PYTHON_COMPLETED message
if "PYTHON_COMPLETED" in output:
python_completed = True
match = re.search(r"\d+$", output)
if match:
exit_code = int(match.group())
logger.info(f"python remote open command has completed with exit code {exit_code}")
else:
logger.info("python remote open command has completed without any exit code")
break

# Timeout for python script after LSETUP_COMPLETED
if lsetup_completed and ((time.time() - lsetup_completed_at) > python_script_timeout):
logger.warning(f"(1) timeout for 'python3' subscript exceeded - killing script "
f"({time.time()} - {lsetup_completed_at} > {python_script_timeout})")
exit_code = 3
process.kill()
break

# Timeout for python script after LSETUP_COMPLETED
if lsetup_completed and ((time.time() - start_time) > python_script_timeout):
logger.warning(f"(2) timeout for 'python3' subscript exceeded - killing script "
f"({time.time()} - {start_time} > {python_script_timeout})")
exit_code = 3
process.kill()
break

if python_completed:
logger.info('aborting since python command has finished')
return_code = process.poll()
if return_code:
logger.warning(f"script execution completed with return code: {return_code}")
# exit_code = return_code
break

# Check if script has completed normally
return_code = process.poll()
if return_code is not None:
pass
# logger.info(f"script execution completed with return code: {return_code}")
# exit_code = return_code
# break

time.sleep(0.5)

# Ensure process is terminated
if process.poll() is None:
process.terminate()

# Check if 'lsetup' was completed
lsetup_time = int(lsetup_completed_at - lsetup_start_time) if lsetup_completed_at else 0

return exit_code, stdout, lsetup_time


def execute_remote_file_open_old(path: str, python_script_timeout: int) -> tuple[int, str, int]: # noqa: C901
"""
Execute the remote file open script.
:param path: path to container script (str)
:param python_script_timeout: timeout (int)
:return: exit code (int), stdout (str), lsetup time (int) (tuple).
"""
lsetup_timeout = 600 # Timeout for 'lsetup' step
exit_code = 1
stdout = ""

# Start the Bash script process with non-blocking I/O
try:
process = subprocess.Popen(["bash", path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0)
fcntl.fcntl(process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # Set non-blocking
except OSError as e:
logger.warning(f"error starting subprocess: {e}")
return exit_code, "", 0

# Split the path at the last dot
filename, _ = path.rsplit(".", 1)

# Create the new path with the desired suffix
new_path = f"{filename}.stdout"

start_time = time.time() # Track execution start time
lsetup_start_time = start_time
lsetup_completed = False # Flag to track completion of 'lsetup' process
python_completed = False # Flag to track completion of 'python3' process
lsetup_completed_at = None

with open(new_path, "w", encoding='utf-8') as file:
while True:
# Check for timeout (once per second)
Expand Down
Loading

0 comments on commit 9edf4e8

Please sign in to comment.