Skip to content

Commit

Permalink
Rename job.py to fm_step.py and Job to ForwardModelStep
Browse files Browse the repository at this point in the history
Plus follow-up in test code.
  • Loading branch information
berland committed Oct 23, 2024
1 parent a867d62 commit 65f58d3
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def killed_by_oom(pids: Sequence[int]) -> bool:
return False


class Job:
class ForwardModelStep:
MEMORY_POLL_PERIOD = 5 # Seconds between memory polls

def __init__(
Expand Down
8 changes: 4 additions & 4 deletions src/_ert/forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import psutil

if TYPE_CHECKING:
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.job import ForwardModelStep

class _ChecksumDictBase(TypedDict):
type: Literal["file"]
Expand Down Expand Up @@ -71,7 +71,7 @@ def __repr__(cls):
class Message(metaclass=_MetaMessage):
def __init__(self, job=None):
self.timestamp = dt.now()
self.job: Optional[Job] = job
self.job: Optional[ForwardModelStep] = job
self.error_message: Optional[str] = None

def __repr__(self):
Expand Down Expand Up @@ -116,12 +116,12 @@ def __init__(self):


class Start(Message):
def __init__(self, job: "Job"):
def __init__(self, job: "ForwardModelStep"):
super().__init__(job)


class Running(Message):
def __init__(self, job: "Job", memory_status: ProcessTreeStatus):
def __init__(self, job: "ForwardModelStep", memory_status: ProcessTreeStatus):
super().__init__(job)
self.memory_status = memory_status

Expand Down
6 changes: 3 additions & 3 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import List

from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.fm_step import ForwardModelStep
from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init


Expand All @@ -21,9 +21,9 @@ def __init__(self, jobs_data):
if self.simulation_id is not None:
os.environ["ERT_RUN_ID"] = self.simulation_id

self.jobs: List[Job] = []
self.jobs: List[ForwardModelStep] = []
for index, job_data in enumerate(job_data_list):
self.jobs.append(Job(job_data, index))
self.jobs.append(ForwardModelStep(job_data, index))

self._set_environment()

Expand Down
98 changes: 59 additions & 39 deletions tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ClientConnectionClosedOK,
ClientConnectionError,
)
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.fm_step import ForwardModelStep
from _ert.forward_model_runner.reporting import Event
from _ert.forward_model_runner.reporting.message import (
Exited,
Expand All @@ -41,11 +41,13 @@ def test_report_with_successful_start_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(job1))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(fmstep1))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -63,13 +65,15 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)

job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))

msg = Start(job1).with_error("massive_failure")
msg = Start(fmstep1).with_error("massive_failure")

reporter.report(msg)
reporter.report(Finish())
Expand All @@ -84,12 +88,14 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 0))
reporter.report(Finish().with_error("failed"))

assert len(lines) == 1
Expand All @@ -101,12 +107,14 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 1).with_error("massive_failure"))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 1).with_error("massive_failure"))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -119,12 +127,14 @@ def test_report_with_running_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -138,12 +148,14 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -153,12 +165,14 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish().with_error("massive_failure"))

assert len(lines) == 1
Expand Down Expand Up @@ -195,16 +209,18 @@ def mock_send(msg):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
reporter._reporter_timeout = 4
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
# set _stop_timestamp
reporter.report(Finish())
if reporter._event_publisher_thread.is_alive():
Expand Down Expand Up @@ -234,16 +250,18 @@ def send_func(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch("_ert.forward_model_runner.client.Client.send") as patched_send:
patched_send.side_effect = send_func

reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))

_wait_until(
condition=lambda: patched_send.call_count == 3,
Expand Down Expand Up @@ -275,12 +293,14 @@ def mock_send(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))

# sleep until both Running events have been received
_wait_until(
Expand All @@ -292,20 +312,20 @@ def mock_send(msg):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
# Make sure the publisher thread exits because it got
# ClientConnectionClosedOK. If it hangs it could indicate that the
# exception is not caught/handled correctly
if reporter._event_publisher_thread.is_alive():
reporter._event_publisher_thread.join()

reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Finish())

# set _stop_timestamp was not set to None since the reporter finished on time
assert reporter._timeout_timestamp is not None

# The Running(job1, 300, 10) is popped from the queue, but never sent.
# The Running(fmstep1, 300, 10) is popped from the queue, but never sent.
# The following Running is added to queue along with the sentinel
assert reporter._event_queue.qsize() == 2
# None of the messages after ClientConnectionClosedOK was raised, has been sent
Expand Down
Loading

0 comments on commit 65f58d3

Please sign in to comment.