Skip to content

Commit

Permalink
Run plans in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
happz committed Oct 22, 2024
1 parent 8a551a9 commit 77cb02a
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 30 deletions.
172 changes: 149 additions & 23 deletions tmt/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import tempfile
import time
from collections.abc import Iterable, Iterator, Sequence
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -44,6 +45,7 @@
import tmt.log
import tmt.plugins
import tmt.plugins.plan_shapers
import tmt.queue
import tmt.result
import tmt.steps
import tmt.steps.discover
Expand All @@ -58,6 +60,7 @@
import tmt.utils.jira
from tmt.checks import Check
from tmt.lint import LinterOutcome, LinterReturn
from tmt.queue import Queue
from tmt.result import Result, ResultInterpret
from tmt.utils import (
Command,
Expand All @@ -82,6 +85,8 @@
import tmt.steps.discover
import tmt.steps.provision.local

from ._compat.typing import Self


T = TypeVar('T')

Expand Down Expand Up @@ -1700,6 +1705,9 @@ class Plan(
'gate',
]

def step_logger(self, step_name: str) -> tmt.log.Logger:
return self._logger.descend(logger_name=step_name)

def __init__(
self,
*,
Expand Down Expand Up @@ -1752,32 +1760,32 @@ def __init__(

# Initialize test steps
self.discover = tmt.steps.discover.Discover(
logger=logger.descend(logger_name='discover'),
logger=self.step_logger('discover'),
plan=self,
data=self.node.get('discover')
)
self.provision = tmt.steps.provision.Provision(
logger=logger.descend(logger_name='provision'),
logger=self.step_logger('provision'),
plan=self,
data=self.node.get('provision')
)
self.prepare = tmt.steps.prepare.Prepare(
logger=logger.descend(logger_name='prepare'),
logger=self.step_logger('prepare'),
plan=self,
data=self.node.get('prepare')
)
self.execute = tmt.steps.execute.Execute(
logger=logger.descend(logger_name='execute'),
logger=self.step_logger('execute'),
plan=self,
data=self.node.get('execute')
)
self.report = tmt.steps.report.Report(
logger=logger.descend(logger_name='report'),
logger=self.step_logger('report'),
plan=self,
data=self.node.get('report')
)
self.finish = tmt.steps.finish.Finish(
logger=logger.descend(logger_name='finish'),
logger=self.step_logger('finish'),
plan=self,
data=self.node.get('finish')
)
Expand Down Expand Up @@ -3409,6 +3417,102 @@ class RunData(SerializableContainer):
)


@dataclasses.dataclass
class PlanTask(tmt.queue.GuestlessTask[None]):
""" A task to run a plan """

plans: list[Plan]

#: Plan that was executed.
plan: Optional[Plan]

# Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`.
def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None:
super().__init__(logger, **kwargs)

self.plans = plans
self.plan = None

@property
def name(self) -> str:
return fmf.utils.listed([plan.name for plan in self.plans])

def go(self) -> Iterator['Self']:
"""
Perform the task.
Called by :py:class:`Queue` machinery to accomplish the task. It expects
the child class would implement :py:meth:`run`, with ``go`` taking care
of task/queue interaction.
:yields: instances of the same class, describing invocations of the
task and their outcome. For each guest, one instance would be
yielded.
"""

multiple_plans = len(self.plans) > 1

new_loggers = tmt.queue.prepare_loggers(self.logger, [plan.name for plan in self.plans])
old_loggers: dict[str, tmt.log.Logger] = {}

with ThreadPoolExecutor(max_workers=len(self.plans)) as executor:
futures: dict[Future[None], Plan] = {}

for plan in self.plans:
old_loggers[plan.name] = plan._logger
new_logger = new_loggers[plan.name]

plan.inject_logger(new_logger)

for step_name in tmt.steps.STEPS:
getattr(plan, step_name).inject_logger(plan.step_logger(step_name))

if multiple_plans:
new_logger.info('started', color='cyan')

# Submit each task/guest combination (save the guest & logger
# for later)...
futures[executor.submit(plan.go)] = plan

# ... and then sit and wait as they get delivered to us as they
# finish. Unpack the guest and logger, so we could preserve logging
# and prepare the right outcome package.
for future in as_completed(futures):
plan = futures[future]

old_logger = old_loggers[plan.name]
new_logger = new_loggers[plan.name]

if multiple_plans:
new_logger.info('finished', color='cyan')

# `Future.result()` will either 1. reraise an exception the
# callable raised, if any, or 2. return whatever the callable
# returned - which is `None` in our case, therefore we can
# ignore the return value.
try:
result = future.result()

except SystemExit as exc:
task = dataclasses.replace(self, result=None, exc=None, requested_exit=exc)

except Exception as exc:
task = dataclasses.replace(self, result=None, exc=exc, requested_exit=None)

else:
task = dataclasses.replace(self, result=result, exc=None, requested_exit=None)

task.plan = plan

yield task

# Don't forget to restore the original logger.
plan.inject_logger(old_logger)

for step_name in tmt.steps.STEPS:
getattr(plan, step_name).inject_logger(plan.step_logger(step_name))


class Run(tmt.utils.Common):
""" Test run, a container of plans """

Expand Down Expand Up @@ -3618,9 +3722,9 @@ def plans(self) -> Sequence[Plan]:
return self._plans

@functools.cached_property
def plan_queue(self) -> Sequence[Plan]:
def plan_staging_queue(self) -> Sequence[Plan]:
"""
A list of plans remaining to be executed.
A list of plans remaining to be queued by run and executed.
It is being populated via :py:attr:`plans`, but eventually,
:py:meth:`go` will remove plans from it as they get processed.
Expand All @@ -3639,7 +3743,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None:
"""

plans = cast(list[Plan], self.plans)
plan_queue = cast(list[Plan], self.plan_queue)
plan_queue = cast(list[Plan], self.plan_staging_queue)

if plan in plan_queue:
plan_queue.remove(plan)
Expand Down Expand Up @@ -3808,27 +3912,49 @@ def go(self) -> None:
self.verbose(f"Found {listed(self.plans, 'plan')}.")
self.save()

# Iterate over plans
crashed_plans: list[tuple[Plan, Exception]] = []
queue: Queue[PlanTask] = Queue(
'plans',
self._logger.descend(logger_name=f'{self}.queue'))

while self.plan_queue:
plan = cast(list[Plan], self.plan_queue).pop(0)
failed_tasks: list[PlanTask] = []

try:
plan.go()
def _enqueue_new_plans() -> None:
staging_queue = self.plan_staging_queue[:]

except Exception as exc:
if self.opt('on-plan-error') == 'quit':
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc])
if not staging_queue:
return

queue.enqueue_task(PlanTask(
self._logger,
cast(list[Plan], staging_queue)
))

for plan in staging_queue:
cast(list[Plan], self.plan_staging_queue).remove(plan)

crashed_plans.append((plan, exc))
_enqueue_new_plans()

if crashed_plans:
for outcome in queue.run(stop_on_error=False):
_enqueue_new_plans()

if outcome.exc:
outcome.logger.fail(str(outcome.exc))

failed_tasks.append(outcome)
continue

if failed_tasks:
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc for _, exc in crashed_plans])
causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None])

# crashed_plans: list[tuple[Plan, Exception]] = []

# except Exception as exc:
# if self.opt('on-plan-error') == 'quit':
# raise tmt.utils.GeneralError(
# 'plan failed',
# causes=[exc])

# Update the last run id at the very end
# (override possible runs created during execution)
Expand Down
29 changes: 22 additions & 7 deletions tmt/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses
import functools
import itertools
import queue
from collections.abc import Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar
Expand Down Expand Up @@ -33,6 +35,8 @@ class Task(Generic[TaskResultT]):
to their defaults "manually".
"""

id: Optional[int]

#: A logger to use for logging events related to the outcome.
logger: Logger

Expand Down Expand Up @@ -292,38 +296,49 @@ def go(self) -> Iterator['Self']:
guest.inject_logger(old_logger)


class Queue(list[TaskT]):
class Queue(queue.Queue[TaskT]):
""" Queue class for running tasks """

_task_counter: 'itertools.count[int]'

def __init__(self, name: str, logger: Logger) -> None:
super().__init__()

self.name = name
self._logger = logger
self._task_counter = itertools.count(start=1)

def enqueue_task(self, task: TaskT) -> None:
""" Put new task into a queue """

self.append(task)
task.id = next(self._task_counter)

self.put(task)

self._logger.info(
f'queued {self.name} task #{len(self)}',
f'queued {self.name} task #{task.id}',
task.name,
color='cyan')

def run(self) -> Iterator[TaskT]:
def run(self, stop_on_error: bool = True) -> Iterator[TaskT]:
"""
Start crunching the queued tasks.
Tasks are executed in the order, for each task/guest
combination a :py:class:`Task` instance is yielded.
"""

for i, task in enumerate(self):
while True:
try:
task = self.get_nowait()

except queue.Empty:
return

self._logger.info('')

self._logger.info(
f'{self.name} task #{i + 1}',
f'{self.name} task #{task.id}',
task.name,
color='cyan')

Expand All @@ -336,5 +351,5 @@ def run(self) -> Iterator[TaskT]:
yield outcome

# TODO: make this optional
if failed_tasks:
if failed_tasks and stop_on_error:
return
4 changes: 4 additions & 0 deletions tmt/steps/provision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,7 @@ def go(self) -> Iterator['ProvisionTask']:

except SystemExit as exc:
yield ProvisionTask(
id=None,
logger=new_logger,
result=None,
guest=None,
Expand All @@ -2295,6 +2296,7 @@ def go(self) -> Iterator['ProvisionTask']:

except Exception as exc:
yield ProvisionTask(
id=None,
logger=new_logger,
result=None,
guest=None,
Expand All @@ -2305,6 +2307,7 @@ def go(self) -> Iterator['ProvisionTask']:

else:
yield ProvisionTask(
id=None,
logger=new_logger,
result=None,
guest=phase.guest(),
Expand All @@ -2327,6 +2330,7 @@ def enqueue(
phases: list[ProvisionPlugin[ProvisionStepData]],
logger: Logger) -> None:
self.enqueue_task(ProvisionTask(
id=None,
logger=logger,
result=None,
guest=None,
Expand Down

0 comments on commit 77cb02a

Please sign in to comment.