Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename job.py to fm_step.py and Job to ForwardModelStep #9022

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/_ert/forward_model_runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""_ert.forward_model_runner is called by ert to run jobs in the runpath.
"""_ert.forward_model_runner is called by ert to run forward model steps in the runpath.

It is split into its own package for performance reasons,
simply importing ert can take several seconds, which is not ideal when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def killed_by_oom(pids: Sequence[int]) -> bool:
return False


class Job:
class ForwardModelStep:
MEMORY_POLL_PERIOD = 5 # Seconds between memory polls

def __init__(
Expand Down
16 changes: 8 additions & 8 deletions src/_ert/forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import psutil

if TYPE_CHECKING:
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.forward_model_step import ForwardModelStep

class _ChecksumDictBase(TypedDict):
type: Literal["file"]
Expand Down Expand Up @@ -71,7 +71,7 @@ def __repr__(cls):
class Message(metaclass=_MetaMessage):
def __init__(self, job=None):
self.timestamp = dt.now()
self.job: Optional[Job] = job
self.job: Optional[ForwardModelStep] = job
self.error_message: Optional[str] = None

def __repr__(self):
Expand Down Expand Up @@ -116,19 +116,19 @@ def __init__(self):


class Start(Message):
def __init__(self, job: "Job"):
super().__init__(job)
def __init__(self, fm_step: "ForwardModelStep"):
super().__init__(fm_step)


class Running(Message):
def __init__(self, job: "Job", memory_status: ProcessTreeStatus):
super().__init__(job)
def __init__(self, fm_step: "ForwardModelStep", memory_status: ProcessTreeStatus):
super().__init__(fm_step)
self.memory_status = memory_status


class Exited(Message):
def __init__(self, job, exit_code: int):
super().__init__(job)
def __init__(self, fm_step, exit_code: int):
super().__init__(fm_step)
self.exit_code = exit_code


Expand Down
6 changes: 3 additions & 3 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import List

from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init


Expand All @@ -21,9 +21,9 @@ def __init__(self, jobs_data):
if self.simulation_id is not None:
os.environ["ERT_RUN_ID"] = self.simulation_id

self.jobs: List[Job] = []
self.jobs: List[ForwardModelStep] = []
for index, job_data in enumerate(job_data_list):
self.jobs.append(Job(job_data, index))
self.jobs.append(ForwardModelStep(job_data, index))

self._set_environment()

Expand Down
98 changes: 59 additions & 39 deletions tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ClientConnectionClosedOK,
ClientConnectionError,
)
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting import Event
from _ert.forward_model_runner.reporting.message import (
Exited,
Expand All @@ -41,11 +41,13 @@ def test_report_with_successful_start_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(job1))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(fmstep1))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -63,13 +65,15 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)

job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))

msg = Start(job1).with_error("massive_failure")
msg = Start(fmstep1).with_error("massive_failure")

reporter.report(msg)
reporter.report(Finish())
Expand All @@ -84,12 +88,14 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 0))
reporter.report(Finish().with_error("failed"))

assert len(lines) == 1
Expand All @@ -101,12 +107,14 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 1).with_error("massive_failure"))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 1).with_error("massive_failure"))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -119,12 +127,14 @@ def test_report_with_running_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -138,12 +148,14 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -153,12 +165,14 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish().with_error("massive_failure"))

assert len(lines) == 1
Expand Down Expand Up @@ -195,16 +209,18 @@ def mock_send(msg):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
reporter._reporter_timeout = 4
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
# set _stop_timestamp
reporter.report(Finish())
if reporter._event_publisher_thread.is_alive():
Expand Down Expand Up @@ -234,16 +250,18 @@ def send_func(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch("_ert.forward_model_runner.client.Client.send") as patched_send:
patched_send.side_effect = send_func

reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))

_wait_until(
condition=lambda: patched_send.call_count == 3,
Expand Down Expand Up @@ -275,12 +293,14 @@ def mock_send(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = ForwardModelStep(
{"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))

# sleep until both Running events have been received
_wait_until(
Expand All @@ -292,20 +312,20 @@ def mock_send(msg):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
# Make sure the publisher thread exits because it got
# ClientConnectionClosedOK. If it hangs it could indicate that the
# exception is not caught/handled correctly
if reporter._event_publisher_thread.is_alive():
reporter._event_publisher_thread.join()

reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Finish())

# set _stop_timestamp was not set to None since the reporter finished on time
assert reporter._timeout_timestamp is not None

# The Running(job1, 300, 10) is popped from the queue, but never sent.
# The Running(fmstep1, 300, 10) is popped from the queue, but never sent.
# The following Running is added to queue along with the sentinel
assert reporter._event_queue.qsize() == 2
# None of the messages after ClientConnectionClosedOK was raised, has been sent
Expand Down
Loading