Skip to content

Commit

Permalink
Combine rss and oom_score checker in fm_runner
Browse files Browse the repository at this point in the history
Calling process.children is noticeably slow when the running quick
forward model steps.
Combined the two so that children only gets called once.
  • Loading branch information
JHolba committed Aug 29, 2024
1 parent 26d5376 commit 51f40c7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
58 changes: 32 additions & 26 deletions src/_ert_forward_model_runner/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import datetime as dt
from pathlib import Path
from subprocess import Popen, run
from typing import Optional
from typing import Optional, Tuple

from psutil import AccessDenied, NoSuchProcess, Process, TimeoutExpired, ZombieProcess

Expand Down Expand Up @@ -179,7 +179,7 @@ def ensure_file_handles_closed():

max_memory_usage = 0
while exit_code is None:
memory_rss = _get_rss_for_processtree(process)
(memory_rss, oom_score) = _get_rss_and_oom_score_for_processtree(process)
max_memory_usage = max(memory_rss, max_memory_usage)
yield Running(
self,
Expand All @@ -188,7 +188,7 @@ def ensure_file_handles_closed():
max_rss=max_memory_usage,
fm_step_id=self.index,
fm_step_name=self.job_data.get("name"),
oom_score=_get_oom_score_for_processtree(process),
oom_score=oom_score,
),
)

Expand Down Expand Up @@ -349,23 +349,9 @@ def _check_target_file_is_written(self, target_file_mtime: int, timeout=5):
return f"Could not find target_file:{target_file}"


def _get_rss_for_processtree(process: Process) -> int:
"""Sum the memory measure RSS (resident set size) for a process and all
its descendants."""
try:
memory_rss = process.memory_info().rss + sum(
child.memory_info().rss for child in process.children(recursive=True)
)
except (NoSuchProcess, AccessDenied, ZombieProcess):
# In case of a process that has died and is in some transitional
# state, we ignore any failures. Only seen on OSX thus far.
#
# See https://github.com/giampaolo/psutil/issues/1044#issuecomment-298745532 # noqa
memory_rss = 0
return memory_rss


def _get_oom_score_for_processtree(process: Process) -> Optional[int]:
def _get_rss_and_oom_score_for_processtree(
process: Process,
) -> Tuple[int, Optional[int]]:
"""Obtain the oom_score (the Linux kernel uses this number to
decide which process to kill first in out-of-memory siturations).
Expand All @@ -376,24 +362,44 @@ def _get_oom_score_for_processtree(process: Process) -> Optional[int]:
oom_score defaults to 0 in Linux, but varies between -1000 and 1000.
If returned value is None, then there is no information, e.g. if run
on an OS not providing /proc/<pid>/oom_score
Sum the memory measure RSS (resident set size) for a process and all
its descendants.
"""

oom_score = None
memory_rss = 0
# A value of None means that we have no information.
with contextlib.suppress(ValueError, FileNotFoundError):
oom_score = int(
Path(f"/proc/{process.pid}/oom_score").read_text(encoding="utf-8")
)
with contextlib.suppress(
ValueError, NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError
):
memory_rss = process.memory_info().rss

with contextlib.suppress(
NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError
):
for child in process.children(recursive=True):
with contextlib.suppress(ValueError, FileNotFoundError):
with contextlib.suppress(
ValueError,
FileNotFoundError,
NoSuchProcess,
AccessDenied,
ZombieProcess,
ProcessLookupError,
):
oom_score_child = int(
Path(f"/proc/{child.pid}/oom_score").read_text(encoding="utf-8")
)
if oom_score is None:
oom_score = oom_score_child
else:
oom_score = max(oom_score, oom_score_child)
return oom_score
oom_score = (
max(oom_score, oom_score_child)
if oom_score is not None
else oom_score_child
)
with contextlib.suppress(NoSuchProcess, AccessDenied, ZombieProcess):
memory_rss += process.memory_info().rss

return (memory_rss, oom_score)
11 changes: 6 additions & 5 deletions tests/unit_tests/forward_model_runner/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from unittest.mock import PropertyMock, patch
from unittest.mock import MagicMock, PropertyMock, patch

import numpy as np
import pytest

from _ert_forward_model_runner.job import Job, _get_oom_score_for_processtree
from _ert_forward_model_runner.job import Job, _get_rss_and_oom_score_for_processtree
from _ert_forward_model_runner.reporting.message import Exited, Running, Start


Expand Down Expand Up @@ -53,12 +53,12 @@ def test_memory_usage_counts_grandchildren():
import sys
import time
counter = int(sys.argv[1])
counter = int(sys.argv[-1])
numbers = list(range(int(1e6)))
if counter > 0:
parent = os.fork()
if not parent:
os.execv(sys.argv[0], [sys.argv[0], str(counter - 1)])
os.execv(sys.argv[-2], [sys.argv[-2], str(counter - 1)])
time.sleep(0.3)""" # Too low sleep will make the test faster but flaky
)
)
Expand Down Expand Up @@ -158,6 +158,7 @@ class MockedProcess:
"""A very lightweight mocked psutil.Process object"""

pid: int
memory_info = MagicMock()

def children(self, recursive: bool = True):
if self.pid == 123:
Expand All @@ -171,7 +172,7 @@ def read_text_side_effect(self: pathlib.Path, *args, **kwargs):

with patch("pathlib.Path.read_text", autospec=True) as mocked_read_text:
mocked_read_text.side_effect = read_text_side_effect
oom_score = _get_oom_score_for_processtree(MockedProcess(123))
(_, oom_score) = _get_rss_and_oom_score_for_processtree(MockedProcess(123))

assert oom_score == 456

Expand Down

0 comments on commit 51f40c7

Please sign in to comment.