Skip to content

Commit

Permalink
job-runner: move async one level up in the call stack
Browse files Browse the repository at this point in the history
* The job_runner was converted to async, its tasks run in the background
  (i.e. calls complete before the code has finished running, you cannot
  await them from synchronous code).
* This jiggered up some `finally` blocks which were being invoked too
  soon.
* This PR moves the async code up one level in the stack, brining these
  finally blocks within the lifecycle of the job runner async code
  allowing them to function correctly.
* Closes #2784
  • Loading branch information
oliver-sanders committed Jun 13, 2024
1 parent b4f653f commit d3ec050
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 28 deletions.
38 changes: 35 additions & 3 deletions metomi/rose/config_processors/fileinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Process "file:*" sections in node of a metomi.rose.config_tree.ConfigTree.
"""

import asyncio
from contextlib import suppress
from fnmatch import fnmatch
from glob import glob
Expand Down Expand Up @@ -47,6 +48,13 @@
from metomi.rose.scheme_handler import SchemeHandlersManager


# set containing references to "background" coroutines that are not referenced
# from any code (i.e. are not directly awaited), adding them to this list
# avoids the potential for garbage collection to delete them whilst they are
# running
_BACKGROUND_TASKS = set()


class ConfigProcessorForFile(ConfigProcessorBase):
"""Processor for [file:*] in node of a ConfigTree."""

Expand Down Expand Up @@ -89,29 +97,51 @@ def process(
if not nodes:
return

loop = asyncio.get_event_loop()
loop.set_exception_handler(self.handle_event)
coro = self.__process(conf_tree, nodes, **kwargs)

try:
# event loop is not running (e.g. rose CLI use)
loop.run_until_complete(coro)
except RuntimeError:
# event loop is already running (e.g. cylc CLI use)
# WARNING: this starts the file installation running, but it
# doesn't wait for it to finish, that's your problem :(
task = loop.create_task(coro)
# reference this task from a global variable to prevent it from
# being garbage collected
_BACKGROUND_TASKS.add(task)
# tidy up afterwards
task.add_done_callback(_BACKGROUND_TASKS.discard)

async def __process(self, conf_tree, nodes, **kwargs):
"""Helper for self._process."""
# Create database to store information for incremental updates,
# if it does not already exist.
loc_dao = LocDAO()
loc_dao.create()

cwd = os.getcwd()

file_install_root = conf_tree.node.get_value(
["file-install-root"], os.getenv("ROSE_FILE_INSTALL_ROOT", None)
)
if file_install_root:
file_install_root = env_var_process(file_install_root)
self.manager.fs_util.makedirs(file_install_root)
self.manager.fs_util.chdir(file_install_root)

try:
self._process(conf_tree, nodes, loc_dao, **kwargs)
await self._process(conf_tree, nodes, loc_dao, **kwargs)
finally:
if cwd != os.getcwd():
self.manager.fs_util.chdir(cwd)
if loc_dao.conn:
with suppress(Exception):
loc_dao.conn.close()

def _process(self, conf_tree, nodes, loc_dao, **kwargs):
async def _process(self, conf_tree, nodes, loc_dao, **kwargs):
"""Helper for self.process."""
# Ensure that everything is overwritable
# Ensure that container directories exist
Expand Down Expand Up @@ -360,7 +390,9 @@ def _process(self, conf_tree, nodes, loc_dao, **kwargs):
if nproc_str is not None:
nproc = int(nproc_str)
job_runner = JobRunner(self, nproc)
job_runner(JobManager(jobs), conf_tree, loc_dao, work_dir)
await job_runner(
JobManager(jobs), conf_tree, loc_dao, work_dir
)
except ValueError as exc:
if exc.args and exc.args[0] in jobs:
job = jobs[exc.args[0]]
Expand Down
45 changes: 20 additions & 25 deletions metomi/rose/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@
from metomi.rose.reporter import Event


# set containing references to "background" coroutines that are not referenced
# from any code (i.e. are not directly awaited), adding them to this list
# avoids the potential for garbage collection to delete them whilst they are
# running
_BACKGROUND_TASKS = set()


class JobEvent(Event):
"""Event raised when a job completes."""

Expand Down Expand Up @@ -171,7 +164,14 @@ def __init__(self, job_processor, nproc=None):
"""
self.job_processor = job_processor

def run(self, job_manager, *args, concurrency=6):
async def run(
self,
job_manager,
conf_tree,
loc_dao,
work_dir,
concurrency=6,
):
"""Start the job runner with an instance of JobManager.
Args:
Expand All @@ -183,28 +183,23 @@ def run(self, job_manager, *args, concurrency=6):
The maximum number of jobs to run concurrently.
"""
loop = asyncio.get_event_loop()
loop.set_exception_handler(self.job_processor.handle_event)
coro = self._run(job_manager, *args, concurrency=concurrency)
try:
# event loop is not running (e.g. rose CLI use)
loop.run_until_complete(coro)
except RuntimeError:
# event loop is already running (e.g. cylc CLI use)
# WARNING: this starts the file installation running, but it
# doesn't wait for it to finish, that's your problem :(
task = loop.create_task(coro)
# reference this task from a global variable to prevent it from
# being garbage collected
_BACKGROUND_TASKS.add(task)
# tidy up afterwards
task.add_done_callback(_BACKGROUND_TASKS.discard)
await self._run(
job_manager, conf_tree, loc_dao, work_dir, concurrency=concurrency
)
dead_jobs = job_manager.get_dead_jobs()
if dead_jobs:
raise JobRunnerNotCompletedError(dead_jobs)

async def _run(self, job_manager, *args, concurrency=6):
async def _run(
self,
job_manager,
conf_tree,
loc_dao,
work_dir,
concurrency=6,
):
running = []
args = (conf_tree, loc_dao, work_dir)
await asyncio.gather(
self._run_jobs(running, job_manager, args, concurrency),
self._post_process_jobs(running, job_manager, args),
Expand Down
54 changes: 54 additions & 0 deletions t/workflow-file-install/00-install.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash
#-------------------------------------------------------------------------------
# Copyright (C) British Crown (Met Office) & Contributors.
#
# This file is part of Rose, a framework for meteorological suites.
#
# Rose is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Rose is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test Rose file installation under "cylc install"
# * Rose file installation is tested under "rose app-run" / "rose task-run",
# however, the way the async code is called is different when file
# installation is performed under "cylc install".
# * See https://github.com/metomi/rose/issues/2784
#-------------------------------------------------------------------------------
. $(dirname $0)/test_header
#-------------------------------------------------------------------------------
tests 3

# install a file from the Rose github repository
cat >rose-suite.conf <<__CONF__
[file:README.md]
source=git:[email protected]:metomi/rose::README.md::master
__CONF__

touch flow.cylc
get_reg

# install the workflow
TEST_KEY="${TEST_KEY_BASE}-install"
run_pass "$TEST_KEY" \
cylc install \
--workflow-name="${FLOW}" \
--no-run-name \
.

# ensure no error was produced during file installation
file_cmp "$TEST_KEY.err" "$TEST_KEY.err" </dev/null

# check the README file was produced
run_pass "${TEST_KEY_BASE}-foo" stat $HOME/cylc-run/$FLOW/README.md

purge
exit 0
1 change: 1 addition & 0 deletions t/workflow-file-install/test_header

0 comments on commit d3ec050

Please sign in to comment.