Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run plans in parallel #3265

Draft
wants to merge 1 commit into
base: poc-split-plan-by-max
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 109 additions & 23 deletions tmt/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tmt.log
import tmt.plugins
import tmt.plugins.plan_shapers
import tmt.queue
import tmt.result
import tmt.steps
import tmt.steps.discover
Expand All @@ -58,6 +59,7 @@
import tmt.utils.jira
from tmt.checks import Check
from tmt.lint import LinterOutcome, LinterReturn
from tmt.queue import Queue
from tmt.result import Result, ResultInterpret
from tmt.utils import (
Command,
Expand All @@ -82,6 +84,8 @@
import tmt.steps.discover
import tmt.steps.provision.local

from ._compat.typing import Self


T = TypeVar('T')

Expand Down Expand Up @@ -1700,6 +1704,9 @@ class Plan(
'gate',
]

def step_logger(self, step_name: str) -> tmt.log.Logger:
return self._logger.descend(logger_name=step_name)

def __init__(
self,
*,
Expand Down Expand Up @@ -1752,32 +1759,32 @@ def __init__(

# Initialize test steps
self.discover = tmt.steps.discover.Discover(
logger=logger.descend(logger_name='discover'),
logger=self.step_logger('discover'),
plan=self,
data=self.node.get('discover')
)
self.provision = tmt.steps.provision.Provision(
logger=logger.descend(logger_name='provision'),
logger=self.step_logger('provision'),
plan=self,
data=self.node.get('provision')
)
self.prepare = tmt.steps.prepare.Prepare(
logger=logger.descend(logger_name='prepare'),
logger=self.step_logger('prepare'),
plan=self,
data=self.node.get('prepare')
)
self.execute = tmt.steps.execute.Execute(
logger=logger.descend(logger_name='execute'),
logger=self.step_logger('execute'),
plan=self,
data=self.node.get('execute')
)
self.report = tmt.steps.report.Report(
logger=logger.descend(logger_name='report'),
logger=self.step_logger('report'),
plan=self,
data=self.node.get('report')
)
self.finish = tmt.steps.finish.Finish(
logger=logger.descend(logger_name='finish'),
logger=self.step_logger('finish'),
plan=self,
data=self.node.get('finish')
)
Expand Down Expand Up @@ -3409,6 +3416,63 @@ class RunData(SerializableContainer):
)


@dataclasses.dataclass
class PlanTask(tmt.queue.GuestlessTask[None]):
""" A task to run a plan """

plans: list[Plan]

#: Plan that was executed.
plan: Optional[Plan]

# Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`.
def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None:
super().__init__(logger, **kwargs)

self.plans = plans
self.plan = None

@property
def name(self) -> str:
return cast(str, fmf.utils.listed([plan.name for plan in self.plans]))

def go(self) -> Iterator['Self']:
"""
Perform the task.

Called by :py:class:`Queue` machinery to accomplish the task. It expects
the child class would implement :py:meth:`run`, with ``go`` taking care
of task/queue interaction.

:yields: instances of the same class, describing invocations of the
task and their outcome. For each guest, one instance would be
yielded.
"""

def inject_logger(task: 'Self', plan: Plan, logger: tmt.log.Logger) -> None:
plan.inject_logger(logger)

for step_name in tmt.steps.STEPS:
getattr(plan, step_name).inject_logger(plan.step_logger(step_name))

yield from tmt.queue.execute_units(
self,
self.plans,
lambda task, plan:
plan.name,
inject_logger,
lambda task, plan, logger, executor:
executor.submit(plan.go),
lambda task, plan, logger, result:
dataclasses.replace(self, result=result, exc=None, requested_exit=None, plan=plan),
lambda task, plan, logger, exc:
dataclasses.replace(self, result=None, exc=exc, requested_exit=None, plan=plan),
lambda task, plan, logger, exc:
dataclasses.replace(self, result=None, exc=None, requested_exit=exc, plan=plan),
self.logger
)


class Run(tmt.utils.Common):
""" Test run, a container of plans """

Expand Down Expand Up @@ -3618,9 +3682,9 @@ def plans(self) -> Sequence[Plan]:
return self._plans

@functools.cached_property
def plan_queue(self) -> Sequence[Plan]:
def plan_staging_queue(self) -> Sequence[Plan]:
"""
A list of plans remaining to be executed.
A list of plans remaining to be queued by run and executed.

It is being populated via :py:attr:`plans`, but eventually,
:py:meth:`go` will remove plans from it as they get processed.
Expand All @@ -3639,7 +3703,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None:
"""

plans = cast(list[Plan], self.plans)
plan_queue = cast(list[Plan], self.plan_queue)
plan_queue = cast(list[Plan], self.plan_staging_queue)

if plan in plan_queue:
plan_queue.remove(plan)
Expand Down Expand Up @@ -3808,27 +3872,49 @@ def go(self) -> None:
self.verbose(f"Found {listed(self.plans, 'plan')}.")
self.save()

# Iterate over plans
crashed_plans: list[tuple[Plan, Exception]] = []
queue: Queue[PlanTask] = Queue(
'plans',
self._logger.descend(logger_name=f'{self}.queue'))

while self.plan_queue:
plan = cast(list[Plan], self.plan_queue).pop(0)
failed_tasks: list[PlanTask] = []

try:
plan.go()
def _enqueue_new_plans() -> None:
staging_queue = self.plan_staging_queue[:]

except Exception as exc:
if self.opt('on-plan-error') == 'quit':
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc])
if not staging_queue:
return

crashed_plans.append((plan, exc))
queue.enqueue_task(PlanTask(
self._logger,
cast(list[Plan], staging_queue)
))

if crashed_plans:
for plan in staging_queue:
cast(list[Plan], self.plan_staging_queue).remove(plan)

_enqueue_new_plans()

for outcome in queue.run(stop_on_error=False):
_enqueue_new_plans()

if outcome.exc:
outcome.logger.fail(str(outcome.exc))

failed_tasks.append(outcome)
continue

if failed_tasks:
raise tmt.utils.GeneralError(
'plan failed',
causes=[exc for _, exc in crashed_plans])
causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None])

# crashed_plans: list[tuple[Plan, Exception]] = []

# except Exception as exc:
# if self.opt('on-plan-error') == 'quit':
# raise tmt.utils.GeneralError(
# 'plan failed',
# causes=[exc])

# Update the last run id at the very end
# (override possible runs created during execution)
Expand Down
Loading
Loading