From d3ec050b155ded5c4137fdc2b9c007c63daca822 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 6 Jun 2024 17:31:10 +0100 Subject: [PATCH] job-runner: move async one level up in the call stack * The job_runner was converted to async, its tasks run in the background (i.e. calls complete before the code has finished running, you cannot await them from synchronous code). * This jiggered up some `finally` blocks which were being invoked too soon. * This PR moves the async code up one level in the stack, brining these finally blocks within the lifecycle of the job runner async code allowing them to function correctly. * Closes #2784 --- metomi/rose/config_processors/fileinstall.py | 38 ++++++++++++-- metomi/rose/job_runner.py | 45 ++++++++-------- t/workflow-file-install/00-install.t | 54 ++++++++++++++++++++ t/workflow-file-install/test_header | 1 + 4 files changed, 110 insertions(+), 28 deletions(-) create mode 100644 t/workflow-file-install/00-install.t create mode 120000 t/workflow-file-install/test_header diff --git a/metomi/rose/config_processors/fileinstall.py b/metomi/rose/config_processors/fileinstall.py index e87045ce90..ae4241e671 100644 --- a/metomi/rose/config_processors/fileinstall.py +++ b/metomi/rose/config_processors/fileinstall.py @@ -17,6 +17,7 @@ """Process "file:*" sections in node of a metomi.rose.config_tree.ConfigTree. """ +import asyncio from contextlib import suppress from fnmatch import fnmatch from glob import glob @@ -47,6 +48,13 @@ from metomi.rose.scheme_handler import SchemeHandlersManager +# set containing references to "background" coroutines that are not referenced +# from any code (i.e. are not directly awaited), adding them to this list +# avoids the potential for garbage collection to delete them whilst they are +# running +_BACKGROUND_TASKS = set() + + class ConfigProcessorForFile(ConfigProcessorBase): """Processor for [file:*] in node of a ConfigTree.""" @@ -89,12 +97,33 @@ def process( if not nodes: return + loop = asyncio.get_event_loop() + loop.set_exception_handler(self.handle_event) + coro = self.__process(conf_tree, nodes, **kwargs) + + try: + # event loop is not running (e.g. rose CLI use) + loop.run_until_complete(coro) + except RuntimeError: + # event loop is already running (e.g. cylc CLI use) + # WARNING: this starts the file installation running, but it + # doesn't wait for it to finish, that's your problem :( + task = loop.create_task(coro) + # reference this task from a global variable to prevent it from + # being garbage collected + _BACKGROUND_TASKS.add(task) + # tidy up afterwards + task.add_done_callback(_BACKGROUND_TASKS.discard) + + async def __process(self, conf_tree, nodes, **kwargs): + """Helper for self._process.""" # Create database to store information for incremental updates, # if it does not already exist. loc_dao = LocDAO() loc_dao.create() cwd = os.getcwd() + file_install_root = conf_tree.node.get_value( ["file-install-root"], os.getenv("ROSE_FILE_INSTALL_ROOT", None) ) @@ -102,8 +131,9 @@ def process( file_install_root = env_var_process(file_install_root) self.manager.fs_util.makedirs(file_install_root) self.manager.fs_util.chdir(file_install_root) + try: - self._process(conf_tree, nodes, loc_dao, **kwargs) + await self._process(conf_tree, nodes, loc_dao, **kwargs) finally: if cwd != os.getcwd(): self.manager.fs_util.chdir(cwd) @@ -111,7 +141,7 @@ def process( with suppress(Exception): loc_dao.conn.close() - def _process(self, conf_tree, nodes, loc_dao, **kwargs): + async def _process(self, conf_tree, nodes, loc_dao, **kwargs): """Helper for self.process.""" # Ensure that everything is overwritable # Ensure that container directories exist @@ -360,7 +390,9 @@ def _process(self, conf_tree, nodes, loc_dao, **kwargs): if nproc_str is not None: nproc = int(nproc_str) job_runner = JobRunner(self, nproc) - job_runner(JobManager(jobs), conf_tree, loc_dao, work_dir) + await job_runner( + JobManager(jobs), conf_tree, loc_dao, work_dir + ) except ValueError as exc: if exc.args and exc.args[0] in jobs: job = jobs[exc.args[0]] diff --git a/metomi/rose/job_runner.py b/metomi/rose/job_runner.py index 86247a8176..ea55f40d0e 100644 --- a/metomi/rose/job_runner.py +++ b/metomi/rose/job_runner.py @@ -21,13 +21,6 @@ from metomi.rose.reporter import Event -# set containing references to "background" coroutines that are not referenced -# from any code (i.e. are not directly awaited), adding them to this list -# avoids the potential for garbage collection to delete them whilst they are -# running -_BACKGROUND_TASKS = set() - - class JobEvent(Event): """Event raised when a job completes.""" @@ -171,7 +164,14 @@ def __init__(self, job_processor, nproc=None): """ self.job_processor = job_processor - def run(self, job_manager, *args, concurrency=6): + async def run( + self, + job_manager, + conf_tree, + loc_dao, + work_dir, + concurrency=6, + ): """Start the job runner with an instance of JobManager. Args: @@ -183,28 +183,23 @@ def run(self, job_manager, *args, concurrency=6): The maximum number of jobs to run concurrently. """ - loop = asyncio.get_event_loop() - loop.set_exception_handler(self.job_processor.handle_event) - coro = self._run(job_manager, *args, concurrency=concurrency) - try: - # event loop is not running (e.g. rose CLI use) - loop.run_until_complete(coro) - except RuntimeError: - # event loop is already running (e.g. cylc CLI use) - # WARNING: this starts the file installation running, but it - # doesn't wait for it to finish, that's your problem :( - task = loop.create_task(coro) - # reference this task from a global variable to prevent it from - # being garbage collected - _BACKGROUND_TASKS.add(task) - # tidy up afterwards - task.add_done_callback(_BACKGROUND_TASKS.discard) + await self._run( + job_manager, conf_tree, loc_dao, work_dir, concurrency=concurrency + ) dead_jobs = job_manager.get_dead_jobs() if dead_jobs: raise JobRunnerNotCompletedError(dead_jobs) - async def _run(self, job_manager, *args, concurrency=6): + async def _run( + self, + job_manager, + conf_tree, + loc_dao, + work_dir, + concurrency=6, + ): running = [] + args = (conf_tree, loc_dao, work_dir) await asyncio.gather( self._run_jobs(running, job_manager, args, concurrency), self._post_process_jobs(running, job_manager, args), diff --git a/t/workflow-file-install/00-install.t b/t/workflow-file-install/00-install.t new file mode 100644 index 0000000000..78cdce5260 --- /dev/null +++ b/t/workflow-file-install/00-install.t @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +#------------------------------------------------------------------------------- +# Copyright (C) British Crown (Met Office) & Contributors. +# +# This file is part of Rose, a framework for meteorological suites. +# +# Rose is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Rose is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Rose. If not, see . +#------------------------------------------------------------------------------- +# Test Rose file installation under "cylc install" +# * Rose file installation is tested under "rose app-run" / "rose task-run", +# however, the way the async code is called is different when file +# installation is performed under "cylc install". +# * See https://github.com/metomi/rose/issues/2784 +#------------------------------------------------------------------------------- +. $(dirname $0)/test_header +#------------------------------------------------------------------------------- +tests 3 + +# install a file from the Rose github repository +cat >rose-suite.conf <<__CONF__ +[file:README.md] +source=git:git@github.com:metomi/rose::README.md::master +__CONF__ + +touch flow.cylc +get_reg + +# install the workflow +TEST_KEY="${TEST_KEY_BASE}-install" +run_pass "$TEST_KEY" \ + cylc install \ + --workflow-name="${FLOW}" \ + --no-run-name \ + . + +# ensure no error was produced during file installation +file_cmp "$TEST_KEY.err" "$TEST_KEY.err"