Skip to content

Commit

Permalink
Run plans in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
happz committed Oct 22, 2024
1 parent 8a551a9 commit 30438d3
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 104 deletions.
132 changes: 109 additions & 23 deletions tmt/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tmt.log
import tmt.plugins
import tmt.plugins.plan_shapers
import tmt.queue
import tmt.result
import tmt.steps
import tmt.steps.discover
Expand All @@ -58,6 +59,7 @@
import tmt.utils.jira
from tmt.checks import Check
from tmt.lint import LinterOutcome, LinterReturn
from tmt.queue import Queue
from tmt.result import Result, ResultInterpret
from tmt.utils import (
Command,
Expand All @@ -82,6 +84,8 @@
import tmt.steps.discover
import tmt.steps.provision.local

from ._compat.typing import Self


T = TypeVar('T')

Expand Down Expand Up @@ -1700,6 +1704,9 @@ class Plan(
'gate',
]

def step_logger(self, step_name: str) -> tmt.log.Logger:
return self._logger.descend(logger_name=step_name)

def __init__(
self,
*,
Expand Down Expand Up @@ -1752,32 +1759,32 @@ def __init__(

# Initialize test steps
self.discover = tmt.steps.discover.Discover(
logger=logger.descend(logger_name='discover'),
logger=self.step_logger('discover'),
plan=self,
data=self.node.get('discover')
)
self.provision = tmt.steps.provision.Provision(
logger=logger.descend(logger_name='provision'),
logger=self.step_logger('provision'),
plan=self,
data=self.node.get('provision')
)
self.prepare = tmt.steps.prepare.Prepare(
logger=logger.descend(logger_name='prepare'),
logger=self.step_logger('prepare'),
plan=self,
data=self.node.get('prepare')
)
self.execute = tmt.steps.execute.Execute(
logger=logger.descend(logger_name='execute'),
logger=self.step_logger('execute'),
plan=self,
data=self.node.get('execute')
)
self.report = tmt.steps.report.Report(
logger=logger.descend(logger_name='report'),
logger=self.step_logger('report'),
plan=self,
data=self.node.get('report')
)
self.finish = tmt.steps.finish.Finish(
logger=logger.descend(logger_name='finish'),
logger=self.step_logger('finish'),
plan=self,
data=self.node.get('finish')
)
Expand Down Expand Up @@ -3409,6 +3416,63 @@ class RunData(SerializableContainer):
)


@dataclasses.dataclass
class PlanTask(tmt.queue.GuestlessTask[None]):
""" A task to run a plan """

plans: list[Plan]

#: Plan that was executed.
plan: Optional[Plan]

# Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`.
def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None:
super().__init__(logger, **kwargs)

self.plans = plans
self.plan = None

@property
def name(self) -> str:
return cast(str, fmf.utils.listed([plan.name for plan in self.plans]))

def go(self) -> Iterator['Self']:
"""
Perform the task.
Called by :py:class:`Queue` machinery to accomplish the task. It expects
the child class would implement :py:meth:`run`, with ``go`` taking care
of task/queue interaction.
:yields: instances of the same class, describing invocations of the
task and their outcome. For each guest, one instance would be
yielded.
"""

def inject_logger(task: 'Self', plan: Plan, logger: tmt.log.Logger) -> None:
plan.inject_logger(logger)

for step_name in tmt.steps.STEPS:
getattr(plan, step_name).inject_logger(plan.step_logger(step_name))

yield from tmt.queue.execute_units(
self,
self.plans,
lambda task, plan:
plan.name,
inject_logger,
lambda task, plan, logger, executor:
executor.submit(plan.go),
lambda task, plan, logger, result:
dataclasses.replace(self, result=result, exc=None, requested_exit=None, plan=plan),
lambda task, plan, logger, exc:
dataclasses.replace(self, result=None, exc=exc, requested_exit=None, plan=plan),
lambda task, plan, logger, exc:
dataclasses.replace(self, result=None, exc=None, requested_exit=exc, plan=plan),
self.logger
)


class Run(tmt.utils.Common):
""" Test run, a container of plans """

Expand Down Expand Up @@ -3618,9 +3682,9 @@ def plans(self) -> Sequence[Plan]:
return self._plans

@functools.cached_property
def plan_queue(self) -> Sequence[Plan]:
def plan_staging_queue(self) -> Sequence[Plan]:
"""
A list of plans remaining to be executed.
A list of plans remaining to be queued by run and executed.
It is being populated via :py:attr:`plans`, but eventually,
:py:meth:`go` will remove plans from it as they get processed.
Expand All @@ -3639,7 +3703,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None:
"""

plans = cast(list[Plan], self.plans)
plan_queue = cast(list[Plan], self.plan_queue)
plan_queue = cast(list[Plan], self.plan_staging_queue)

if plan in plan_queue:
plan_queue.remove(plan)
Expand Down Expand Up @@ -3808,27 +3872,49 @@ def go(self) -> None:
self.verbose(f"Found {listed(self.plans, 'plan')}.")
self.save()

# Iterate over plans
crashed_plans: list[tuple[Plan, Exception]] = []
queue: Queue[PlanTask] = Queue(
'plans',
self._logger.descend(logger_name=f'{self}.queue'))

while self.plan_queue:
plan = cast(list[Plan], self.plan_queue).pop(0)
failed_tasks: list[PlanTask] = []

try:
plan.go()
def _enqueue_new_plans() -> None:
staging_queue = self.plan_staging_queue[:]

except Exception as exc:
if self.opt('on-plan-error') == 'quit':
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc])
if not staging_queue:
return

crashed_plans.append((plan, exc))
queue.enqueue_task(PlanTask(
self._logger,
cast(list[Plan], staging_queue)
))

if crashed_plans:
for plan in staging_queue:
cast(list[Plan], self.plan_staging_queue).remove(plan)

_enqueue_new_plans()

for outcome in queue.run(stop_on_error=False):
_enqueue_new_plans()

if outcome.exc:
outcome.logger.fail(str(outcome.exc))

failed_tasks.append(outcome)
continue

if failed_tasks:
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc for _, exc in crashed_plans])
causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None])

# crashed_plans: list[tuple[Plan, Exception]] = []

# except Exception as exc:
# if self.opt('on-plan-error') == 'quit':
# raise tmt.utils.GeneralError(
# 'plan failed',
# causes=[exc])

# Update the last run id at the very end
# (override possible runs created during execution)
Expand Down
Loading

0 comments on commit 30438d3

Please sign in to comment.