Skip to content

Commit

Permalink
Add _verify_checksum lock in job.py
Browse files Browse the repository at this point in the history
This removes the usage of `forward_model_ok_lock` when sleeping until
checksum_path exists.
  • Loading branch information
jonathan-eq committed Oct 25, 2024
1 parent 2cb3ea0 commit e8bc176
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 24 deletions.
37 changes: 21 additions & 16 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ async def run(
self,
sem: asyncio.BoundedSemaphore,
forward_model_ok_lock: asyncio.Lock,
checksum_lock: asyncio.Lock,
max_submit: int = 1,
) -> None:
self._requested_max_submit = max_submit
Expand All @@ -154,9 +155,9 @@ async def run(
break

if self.returncode.result() == 0:
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
if self._scheduler._manifest_queue is not None:
await self._verify_checksum()
await self._handle_finished_forward_model()
break

Expand Down Expand Up @@ -184,7 +185,9 @@ async def _max_runtime_task(self) -> None:
)
self.returncode.cancel()

async def _verify_checksum(self, timeout: int = 120) -> None:
async def _verify_checksum(
self, checksum_lock: asyncio.Lock, timeout: int = 120
) -> None:
# Wait for job runpath to be in the checksum dictionary
runpath = self.real.run_arg.runpath
while runpath not in self._scheduler.checksum:
Expand Down Expand Up @@ -213,20 +216,22 @@ async def _verify_checksum(self, timeout: int = 120) -> None:
timeout -= 1
logger.debug("Waiting for disk synchronization")
await asyncio.sleep(1)

for info in valid_checksums:
file_path = Path(info["path"])
expected_md5sum = info.get("md5sum")
if file_path.exists() and expected_md5sum:
actual_md5sum = hashlib.md5(file_path.read_bytes()).hexdigest()
if expected_md5sum == actual_md5sum:
logger.debug(f"File {file_path} checksum successful.")
async with checksum_lock:
for info in valid_checksums:
file_path = Path(info["path"])
expected_md5sum = info.get("md5sum")
if file_path.exists() and expected_md5sum:
actual_md5sum = hashlib.md5(file_path.read_bytes()).hexdigest()
if expected_md5sum == actual_md5sum:
logger.debug(f"File {file_path} checksum successful.")
else:
logger.warning(
f"File {file_path} checksum verification failed."
)
elif file_path.exists() and expected_md5sum is None:
logger.warning(f"Checksum not received for file {file_path}")
else:
logger.warning(f"File {file_path} checksum verification failed.")
elif file_path.exists() and expected_md5sum is None:
logger.warning(f"Checksum not received for file {file_path}")
else:
logger.error(f"Disk synchronization failed for {file_path}")
logger.error(f"Disk synchronization failed for {file_path}")

async def _handle_finished_forward_model(self) -> None:
callback_status, status_msg = await forward_model_ok(self.real.run_arg)
Expand Down
8 changes: 7 additions & 1 deletion src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,16 @@ async def execute(
# this lock is to assure that no more than 1 task
# does internalization at a time
forward_model_ok_lock = asyncio.Lock()
verify_checksum_lock = asyncio.Lock()
for iens, job in self._jobs.items():
if job.state != JobState.ABORTED:
self._job_tasks[iens] = asyncio.create_task(
job.run(sem, forward_model_ok_lock, self._max_submit),
job.run(
sem,
forward_model_ok_lock,
verify_checksum_lock,
self._max_submit,
),
name=f"job-{iens}_task",
)
else:
Expand Down
16 changes: 9 additions & 7 deletions tests/ert/unit_tests/scheduler/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ async def load_result(_):
job.started.set()

job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=max_submit)
job.run(
asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=max_submit
)
)

for attempt in range(max_submit):
Expand Down Expand Up @@ -177,7 +179,7 @@ async def test_num_cpu_is_propagated_to_driver(realization: Realization):
scheduler = create_scheduler()
job = Job(scheduler, realization)
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=1)
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
)
job.started.set()
job.returncode.set_result(0)
Expand All @@ -200,7 +202,7 @@ async def test_realization_memory_is_propagated_to_driver(realization: Realizati
scheduler = create_scheduler()
job = Job(scheduler, realization)
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=1)
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -238,7 +240,7 @@ async def test_when_waiting_for_disk_sync_times_out_an_error_is_logged(

with captured_logs(log_msgs, logging.ERROR):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=1)
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -269,7 +271,7 @@ async def test_when_files_in_manifest_are_not_created_an_error_is_logged(

with captured_logs(log_msgs, logging.ERROR):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=1)
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -303,7 +305,7 @@ async def test_when_checksums_do_not_match_a_warning_is_logged(

with captured_logs(log_msgs, logging.WARNING):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=1)
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -331,7 +333,7 @@ async def test_when_no_checksum_info_is_received_a_warning_is_logged(

with captured_logs(log_msgs, logging.WARNING):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), max_submit=1)
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
)
job.started.set()
job.returncode.set_result(0)
Expand Down

0 comments on commit e8bc176

Please sign in to comment.