Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some detached <-> scheduler edge cases #8898

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from seba_sqlite.exceptions import ObjectNotFoundError
from seba_sqlite.snapshot import SebaSnapshot

from ert import BatchContext, BatchSimulator
from ert import BatchContext, BatchSimulator, JobState
from ert.config import ErtConfig, QueueSystem
from everest.config import EverestConfig
from everest.config_keys import ConfigKeys as CK
Expand Down Expand Up @@ -180,11 +180,32 @@ def wait_for_server(
)
# Job queueing may fail:
if context is not None and context.has_job_failed(0):
path = context.job_progress(0).steps[0].std_err_file
for err in extract_errors_from_file(path):
update_everserver_status(config, ServerStatus.failed, message=err)
logging.error(err)
raise SystemExit("Failed to start Everest server.")
job_progress = context.job_progress(0)

if job_progress is not None:
path = context.job_progress(0).steps[0].std_err_file
for err in extract_errors_from_file(path):
update_everserver_status(
config, ServerStatus.failed, message=err
)
logging.error(err)
raise SystemExit("Failed to start Everest server.")
else:
try:
state = context.get_job_state(0)

if state == JobState.WAITING:
# Job did fail, but is now in WAITING
logging.error(
"Race condition in wait_for_server, job did fail but is now in WAITING"
)
except IndexError as e:
# Job is no longer registered in scheduler
logging.error(
f"Race condition in wait_for_server, failed job removed from scheduler\n{e}"
)
raise SystemExit("Failed to start Everest server.") from e

sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
if server_is_running(config):
Expand Down
51 changes: 51 additions & 0 deletions tests/everest/test_detached.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
import requests

from ert import JobState
from ert.config import ErtConfig, QueueSystem
from ert.storage import open_storage
from everest.config import EverestConfig
Expand Down Expand Up @@ -183,6 +184,56 @@ def test_wait_for_server(
assert server_status["message"] == expected_error_msg


@patch("everest.detached.server_is_running", return_value=False)
@tmpdir(relpath("test_data", "detached"))
def test_wait_for_handles_failed_job_race_condition_failed_job_to_waiting(
server_is_running_mock, caplog
):
config = EverestConfig.load_file("valid_yaml_config.yml")

class _MockContext(MockContext):
@staticmethod
def job_progress(*args):
return None

@staticmethod
def get_job_state(*args):
return JobState.WAITING

with caplog.at_level(logging.ERROR), pytest.raises(Exception):
wait_for_server(config, timeout=1, context=_MockContext())

assert (
"Race condition in wait_for_server, job did fail but is now in WAITING"
in caplog.messages
)


@patch("everest.detached.server_is_running", return_value=False)
@tmpdir(relpath("test_data", "detached"))
def test_wait_for_handles_failed_job_race_condition_failed_job_removed_from_scheduler(
server_is_running_mock, caplog
):
config = EverestConfig.load_file("valid_yaml_config.yml")

class _MockContext(MockContext):
@staticmethod
def job_progress(*args):
return None

@staticmethod
def get_job_state(*args):
raise IndexError("Some trackback")

with caplog.at_level(logging.ERROR), pytest.raises(SystemExit):
wait_for_server(config, timeout=1, context=_MockContext())

assert any(
"Race condition in wait_for_server, failed job removed from scheduler"
for x in caplog.messages
)


def _get_reference_config():
everest_config = EverestConfig.load_file("config_minimal.yml")
reference_config = ErtConfig.read_site_config()
Expand Down
Loading