Skip to content

Commit

Permalink
Merge pull request #155 from PalNilsson/next
Browse files Browse the repository at this point in the history
3.9.3.2
  • Loading branch information
PalNilsson authored Dec 11, 2024
2 parents 95f7af1 + 258f159 commit 21d86a5
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 12 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.2.41
3.9.3.2
8 changes: 5 additions & 3 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,16 @@ def execute_payloads(queues: namedtuple, traces: Any, args: object): # noqa: C9
job.t0 = os.times()
exit_code, diagnostics = payload_executor.run()
if exit_code and exit_code > 1000: # pilot error code, add to list
logger.debug(f'pilot error code received (code={exit_code}, diagnostics=\n{diagnostics})')
logger.warning(f'pilot error code received (code={exit_code}, diagnostics=\n{diagnostics})')
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics)

logger.debug(f'run() returned exit_code={exit_code}')
set_cpu_consumption_time(job)
job.transexitcode = exit_code % 255
out.close()
err.close()
if out:
out.close()
if err:
err.close()

# some HPO jobs will produce new output files (following lfn name pattern), discover those and replace the job.outdata list
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
Expand Down
4 changes: 2 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,8 @@ def collect_zombies(self, depth: int = None):
for zombie in self.zombies:
try:
logger.info(f"zombie collector waiting for pid {zombie}")
_id, _ = os.waitpid(zombie, os.WNOHANG if current_depth else 0)
# dangerous, can lead to blocking : _id, _ = os.waitpid(zombie, os.WNOHANG if current_depth else 0)
_id, _ = os.waitpid(zombie, os.WNOHANG)
except OSError as exc:
logger.info(f"harmless exception when collecting zombies: {exc}")
zombies_to_remove.append(zombie)
Expand All @@ -1099,7 +1100,6 @@ def collect_zombies(self, depth: int = None):
# Remove collected zombies from the list
for zombie in zombies_to_remove:
self.zombies.remove(zombie)

if current_depth == 0:
break

Expand Down
2 changes: 1 addition & 1 deletion pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def get_memory_monitor_info_path(workdir: str, allowtxtfile: bool = False) -> st
if os.path.exists(init_path):
path = init_path
else:
logger.info(f"neither {path}, nor {init_path} exist")
logger.debug(f"neither {path}, nor {init_path} exist")
path = ""

if path == "" and allowtxtfile:
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '41' # build number should be reset to '1' for every new development cycle
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '2' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
7 changes: 5 additions & 2 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901

def read_output(stream, queue):
while True:
sleep(0.01)
try:
line = stream.readline()
if not line:
Expand All @@ -126,8 +127,10 @@ def read_output(stream, queue):
break
else:
raise

queue.put(line)
try:
queue.put_nowait(line)
except queue.Full:
sleep(0.01) # Sleep for a short interval to avoid busy waiting

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i
return exit_code, diagnostics

# update the OOM process info to prevent killing processes in the wrong order in case the job is killed (once)
update_oom_info(job.pid, job.transformation)
# update_oom_info(job.pid, job.transformation)

# should the pilot abort the payload?
exit_code, diagnostics = should_abort_payload(current_time, mt)
Expand Down

0 comments on commit 21d86a5

Please sign in to comment.