From 30438d3ea17de713da786a3ecee9317045bc1a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20Prchl=C3=ADk?= Date: Mon, 7 Oct 2024 13:48:04 +0200 Subject: [PATCH] Run plans in parallel --- tmt/base.py | 132 +++++++++++++++---- tmt/queue.py | 219 ++++++++++++++++++++------------ tmt/steps/__init__.py | 4 +- tmt/steps/provision/__init__.py | 4 + 4 files changed, 255 insertions(+), 104 deletions(-) diff --git a/tmt/base.py b/tmt/base.py index 556b241596..e492456164 100644 --- a/tmt/base.py +++ b/tmt/base.py @@ -44,6 +44,7 @@ import tmt.log import tmt.plugins import tmt.plugins.plan_shapers +import tmt.queue import tmt.result import tmt.steps import tmt.steps.discover @@ -58,6 +59,7 @@ import tmt.utils.jira from tmt.checks import Check from tmt.lint import LinterOutcome, LinterReturn +from tmt.queue import Queue from tmt.result import Result, ResultInterpret from tmt.utils import ( Command, @@ -82,6 +84,8 @@ import tmt.steps.discover import tmt.steps.provision.local + from ._compat.typing import Self + T = TypeVar('T') @@ -1700,6 +1704,9 @@ class Plan( 'gate', ] + def step_logger(self, step_name: str) -> tmt.log.Logger: + return self._logger.descend(logger_name=step_name) + def __init__( self, *, @@ -1752,32 +1759,32 @@ def __init__( # Initialize test steps self.discover = tmt.steps.discover.Discover( - logger=logger.descend(logger_name='discover'), + logger=self.step_logger('discover'), plan=self, data=self.node.get('discover') ) self.provision = tmt.steps.provision.Provision( - logger=logger.descend(logger_name='provision'), + logger=self.step_logger('provision'), plan=self, data=self.node.get('provision') ) self.prepare = tmt.steps.prepare.Prepare( - logger=logger.descend(logger_name='prepare'), + logger=self.step_logger('prepare'), plan=self, data=self.node.get('prepare') ) self.execute = tmt.steps.execute.Execute( - logger=logger.descend(logger_name='execute'), + logger=self.step_logger('execute'), plan=self, data=self.node.get('execute') ) self.report = tmt.steps.report.Report( - logger=logger.descend(logger_name='report'), + logger=self.step_logger('report'), plan=self, data=self.node.get('report') ) self.finish = tmt.steps.finish.Finish( - logger=logger.descend(logger_name='finish'), + logger=self.step_logger('finish'), plan=self, data=self.node.get('finish') ) @@ -3409,6 +3416,63 @@ class RunData(SerializableContainer): ) +@dataclasses.dataclass +class PlanTask(tmt.queue.GuestlessTask[None]): + """ A task to run a plan """ + + plans: list[Plan] + + #: Plan that was executed. + plan: Optional[Plan] + + # Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`. + def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None: + super().__init__(logger, **kwargs) + + self.plans = plans + self.plan = None + + @property + def name(self) -> str: + return cast(str, fmf.utils.listed([plan.name for plan in self.plans])) + + def go(self) -> Iterator['Self']: + """ + Perform the task. + + Called by :py:class:`Queue` machinery to accomplish the task. It expects + the child class would implement :py:meth:`run`, with ``go`` taking care + of task/queue interaction. + + :yields: instances of the same class, describing invocations of the + task and their outcome. For each guest, one instance would be + yielded. + """ + + def inject_logger(task: 'Self', plan: Plan, logger: tmt.log.Logger) -> None: + plan.inject_logger(logger) + + for step_name in tmt.steps.STEPS: + getattr(plan, step_name).inject_logger(plan.step_logger(step_name)) + + yield from tmt.queue.execute_units( + self, + self.plans, + lambda task, plan: + plan.name, + inject_logger, + lambda task, plan, logger, executor: + executor.submit(plan.go), + lambda task, plan, logger, result: + dataclasses.replace(self, result=result, exc=None, requested_exit=None, plan=plan), + lambda task, plan, logger, exc: + dataclasses.replace(self, result=None, exc=exc, requested_exit=None, plan=plan), + lambda task, plan, logger, exc: + dataclasses.replace(self, result=None, exc=None, requested_exit=exc, plan=plan), + self.logger + ) + + class Run(tmt.utils.Common): """ Test run, a container of plans """ @@ -3618,9 +3682,9 @@ def plans(self) -> Sequence[Plan]: return self._plans @functools.cached_property - def plan_queue(self) -> Sequence[Plan]: + def plan_staging_queue(self) -> Sequence[Plan]: """ - A list of plans remaining to be executed. + A list of plans remaining to be queued by run and executed. It is being populated via :py:attr:`plans`, but eventually, :py:meth:`go` will remove plans from it as they get processed. @@ -3639,7 +3703,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None: """ plans = cast(list[Plan], self.plans) - plan_queue = cast(list[Plan], self.plan_queue) + plan_queue = cast(list[Plan], self.plan_staging_queue) if plan in plan_queue: plan_queue.remove(plan) @@ -3808,27 +3872,49 @@ def go(self) -> None: self.verbose(f"Found {listed(self.plans, 'plan')}.") self.save() - # Iterate over plans - crashed_plans: list[tuple[Plan, Exception]] = [] + queue: Queue[PlanTask] = Queue( + 'plans', + self._logger.descend(logger_name=f'{self}.queue')) - while self.plan_queue: - plan = cast(list[Plan], self.plan_queue).pop(0) + failed_tasks: list[PlanTask] = [] - try: - plan.go() + def _enqueue_new_plans() -> None: + staging_queue = self.plan_staging_queue[:] - except Exception as exc: - if self.opt('on-plan-error') == 'quit': - raise tmt.utils.GeneralError( - 'plan failed', - causes=[exc]) + if not staging_queue: + return - crashed_plans.append((plan, exc)) + queue.enqueue_task(PlanTask( + self._logger, + cast(list[Plan], staging_queue) + )) - if crashed_plans: + for plan in staging_queue: + cast(list[Plan], self.plan_staging_queue).remove(plan) + + _enqueue_new_plans() + + for outcome in queue.run(stop_on_error=False): + _enqueue_new_plans() + + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) + + failed_tasks.append(outcome) + continue + + if failed_tasks: raise tmt.utils.GeneralError( 'plan failed', - causes=[exc for _, exc in crashed_plans]) + causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None]) + +# crashed_plans: list[tuple[Plan, Exception]] = [] + +# except Exception as exc: +# if self.opt('on-plan-error') == 'quit': +# raise tmt.utils.GeneralError( +# 'plan failed', +# causes=[exc]) # Update the last run id at the very end # (override possible runs created during execution) diff --git a/tmt/queue.py b/tmt/queue.py index 400b902d78..e86a3ef4c2 100644 --- a/tmt/queue.py +++ b/tmt/queue.py @@ -1,17 +1,116 @@ import dataclasses import functools +import itertools +import queue from collections.abc import Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed -from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar from tmt.log import Logger if TYPE_CHECKING: + import tmt.utils from tmt._compat.typing import Self from tmt.steps.provision import Guest TaskResultT = TypeVar('TaskResultT') +T = TypeVar('T', bound='tmt.utils.Common') + + +def execute_units( + task: 'Task[TaskResultT]', + units: list[T], + get_label: Callable[ + ['Task[TaskResultT]', T], + str + ], + inject_logger: Callable[ + ['Task[TaskResultT]', T, Logger], + None + ], + submit: Callable[ + ['Task[TaskResultT]', T, Logger, ThreadPoolExecutor], + Future[TaskResultT] + ], + on_success: Callable[ + ['Task[TaskResultT]', T, Logger, TaskResultT], + 'Task[TaskResultT]' + ], + on_error: Callable[ + ['Task[TaskResultT]', T, Logger, Exception], + 'Task[TaskResultT]' + ], + on_exit: Callable[ + ['Task[TaskResultT]', T, Logger, SystemExit], + 'Task[TaskResultT]' + ], + logger: Logger + ) -> Iterator['Task[TaskResultT]']: + multiple_units = len(units) > 1 + + new_loggers = prepare_loggers(logger, [get_label(task, unit) for unit in units]) + old_loggers: dict[str, Logger] = {} + + with ThreadPoolExecutor(max_workers=len(units)) as executor: + futures: dict[Future[TaskResultT], T] = {} + + for unit in units: + # Swap guest's logger for the one we prepared, with labels + # and stuff. + # + # We can't do the same for phases - phase is shared among + # guests, its `self.$loggingmethod()` calls need to be + # fixed to use a logger we pass to it through the executor. + # + # Possibly, the same thing should happen to guest methods as + # well, then the phase would pass the given logger to guest + # methods when it calls them, propagating the single logger we + # prepared... + old_loggers[get_label(task, unit)] = unit._logger + new_logger = new_loggers[get_label(task, unit)] + + inject_logger(task, unit, new_logger) + + if multiple_units: + new_logger.info('started', color='cyan') + + # Submit each task/guest combination (save the guest & logger + # for later)... + futures[submit(task, unit, new_logger, executor)] = unit + + # ... and then sit and wait as they get delivered to us as they + # finish. Unpack the guest and logger, so we could preserve logging + # and prepare the right outcome package. + for future in as_completed(futures): + unit = futures[future] + + old_logger = old_loggers[get_label(task, unit)] + new_logger = new_loggers[get_label(task, unit)] + + if multiple_units: + new_logger.info('finished', color='cyan') + + # `Future.result()` will either 1. reraise an exception the + # callable raised, if any, or 2. return whatever the callable + # returned - which is `None` in our case, therefore we can + # ignore the return value. + try: + result = future.result() + + except SystemExit as exc: + yield on_exit(task, unit, new_logger, exc) + + except Exception as exc: + yield on_error(task, unit, new_logger, exc) + + else: + yield on_success(task, unit, new_logger, result) + + yield task + + # Don't forget to restore the original logger. + inject_logger(task, unit, old_logger) @dataclasses.dataclass @@ -33,6 +132,8 @@ class Task(Generic[TaskResultT]): to their defaults "manually". """ + id: Optional[int] + #: A logger to use for logging events related to the outcome. logger: Logger @@ -196,7 +297,7 @@ def __init__(self, logger: Logger, guests: list['Guest'], **kwargs: Any) -> None def guest_ids(self) -> list[str]: return sorted([guest.multihost_name for guest in self.guests]) - def run_on_guest(self, guest: 'Guest', logger: Logger) -> None: + def run_on_guest(self, guest: 'Guest', logger: Logger) -> TaskResultT: """ Perform the task. @@ -222,96 +323,50 @@ def go(self) -> Iterator['Self']: yielded. """ - multiple_guests = len(self.guests) > 1 - - new_loggers = prepare_loggers(self.logger, [guest.multihost_name for guest in self.guests]) - old_loggers: dict[str, Logger] = {} - - with ThreadPoolExecutor(max_workers=len(self.guests)) as executor: - futures: dict[Future[None], Guest] = {} - - for guest in self.guests: - # Swap guest's logger for the one we prepared, with labels - # and stuff. - # - # We can't do the same for phases - phase is shared among - # guests, its `self.$loggingmethod()` calls need to be - # fixed to use a logger we pass to it through the executor. - # - # Possibly, the same thing should happen to guest methods as - # well, then the phase would pass the given logger to guest - # methods when it calls them, propagating the single logger we - # prepared... - old_loggers[guest.multihost_name] = guest._logger - new_logger = new_loggers[guest.multihost_name] - - guest.inject_logger(new_logger) - - if multiple_guests: - new_logger.info('started', color='cyan') - - # Submit each task/guest combination (save the guest & logger - # for later)... - futures[ - executor.submit(self.run_on_guest, guest, new_logger) - ] = guest - - # ... and then sit and wait as they get delivered to us as they - # finish. Unpack the guest and logger, so we could preserve logging - # and prepare the right outcome package. - for future in as_completed(futures): - guest = futures[future] - - old_logger = old_loggers[guest.multihost_name] - new_logger = new_loggers[guest.multihost_name] - - if multiple_guests: - new_logger.info('finished', color='cyan') - - # `Future.result()` will either 1. reraise an exception the - # callable raised, if any, or 2. return whatever the callable - # returned - which is `None` in our case, therefore we can - # ignore the return value. - try: - result = future.result() - - except SystemExit as exc: - task = dataclasses.replace(self, result=None, exc=None, requested_exit=exc) - - except Exception as exc: - task = dataclasses.replace(self, result=None, exc=exc, requested_exit=None) - - else: - task = dataclasses.replace(self, result=result, exc=None, requested_exit=None) - - task.guest = guest - - yield task - - # Don't forget to restore the original logger. - guest.inject_logger(old_logger) - - -class Queue(list[TaskT]): + yield from execute_units( + self, + self.guests, + lambda task, guest: + guest.multihost_name, + lambda task, guest, logger: + guest.inject_logger(logger), + lambda task, guest, logger, executor: + executor.submit(self.run_on_guest, guest, logger), + lambda task, guest, logger, result: + dataclasses.replace(self, result=result, exc=None, requested_exit=None, guest=guest), + lambda task, guest, logger, exc: + dataclasses.replace(self, result=None, exc=exc, requested_exit=None, guest=guest), + lambda task, guest, logger, exc: + dataclasses.replace(self, result=None, exc=None, requested_exit=exc, guest=guest), + self.logger + ) + + +class Queue(queue.Queue[TaskT]): """ Queue class for running tasks """ + _task_counter: 'itertools.count[int]' + def __init__(self, name: str, logger: Logger) -> None: super().__init__() self.name = name self._logger = logger + self._task_counter = itertools.count(start=1) def enqueue_task(self, task: TaskT) -> None: """ Put new task into a queue """ - self.append(task) + task.id = next(self._task_counter) + + self.put(task) self._logger.info( - f'queued {self.name} task #{len(self)}', + f'queued {self.name} task #{task.id}', task.name, color='cyan') - def run(self) -> Iterator[TaskT]: + def run(self, stop_on_error: bool = True) -> Iterator[TaskT]: """ Start crunching the queued tasks. @@ -319,11 +374,17 @@ def run(self) -> Iterator[TaskT]: combination a :py:class:`Task` instance is yielded. """ - for i, task in enumerate(self): + while True: + try: + task = self.get_nowait() + + except queue.Empty: + return + self._logger.info('') self._logger.info( - f'{self.name} task #{i + 1}', + f'{self.name} task #{task.id}', task.name, color='cyan') @@ -336,5 +397,5 @@ def run(self) -> Iterator[TaskT]: yield outcome # TODO: make this optional - if failed_tasks: + if failed_tasks and stop_on_error: return diff --git a/tmt/steps/__init__.py b/tmt/steps/__init__.py index 14964a0adc..b6f00ce992 100644 --- a/tmt/steps/__init__.py +++ b/tmt/steps/__init__.py @@ -2243,8 +2243,8 @@ def phase_name(self) -> str: def name(self) -> str: return f'{self.phase_name} on {fmf.utils.listed(self.guest_ids)}' - def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None: - self.phase.go(guest=guest, logger=logger) + def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> PluginReturnValueT: + return self.phase.go(guest=guest, logger=logger) class PhaseQueue(tmt.queue.Queue[Union[ActionTask, PluginTask[StepDataT, PluginReturnValueT]]]): diff --git a/tmt/steps/provision/__init__.py b/tmt/steps/provision/__init__.py index de43937e18..80b6bc1d5e 100644 --- a/tmt/steps/provision/__init__.py +++ b/tmt/steps/provision/__init__.py @@ -2285,6 +2285,7 @@ def go(self) -> Iterator['ProvisionTask']: except SystemExit as exc: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=None, @@ -2295,6 +2296,7 @@ def go(self) -> Iterator['ProvisionTask']: except Exception as exc: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=None, @@ -2305,6 +2307,7 @@ def go(self) -> Iterator['ProvisionTask']: else: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=phase.guest(), @@ -2327,6 +2330,7 @@ def enqueue( phases: list[ProvisionPlugin[ProvisionStepData]], logger: Logger) -> None: self.enqueue_task(ProvisionTask( + id=None, logger=logger, result=None, guest=None,