Skip to content

Commit

Permalink
fix: ensure that nodes complete when workflow fails with parallelism …
Browse files Browse the repository at this point in the history
…and failFast enabled. Fixes #13806

Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and oninowang committed Oct 28, 2024
1 parent 3dfea6d commit a9fb455
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 69 deletions.
26 changes: 26 additions & 0 deletions test/e2e/expectedfailures/parallelism-dag-fail-fast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parallelism-dag-fail-fast
spec:
entrypoint: main
templates:
- name: main
parallelism: 2
failFast: true
dag:
tasks:
- name: task1
template: fail
- name: task2
template: sleep
- name: fail
container:
image: alpine:latest
command: [ sh, -c ]
args: ["exit 1"]
- name: sleep
container:
image: alpine:latest
command: [sh, -c]
args: ["sleep 5"]
38 changes: 0 additions & 38 deletions test/e2e/expectedfailures/parallelism-dag-failure.yaml

This file was deleted.

25 changes: 25 additions & 0 deletions test/e2e/expectedfailures/parallelism-step-fail-fast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: parallelism-step-fail-fast
spec:
entrypoint: main
templates:
- name: main
parallelism: 2
failFast: true
steps:
- - name: step1
template: fail
- name: step2
template: sleep
- name: fail
container:
image: alpine:latest
command: [sh, -c]
args: ["exit 1"]
- name: sleep
container:
image: alpine:latest
command: [ sh, -c ]
args: [ "sleep 5" ]
28 changes: 0 additions & 28 deletions test/e2e/expectedfailures/parallelism-step-failure.yaml

This file was deleted.

31 changes: 31 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,3 +1415,34 @@ func (s *FunctionalSuite) TestWorkflowExitHandlerCrashEnsureNodeIsPresent() {
assert.NotNil(t, hookNode.Inputs.Parameters[0].Value)
})
}

func (s *FunctionalSuite) TestWorkflowParallelismStepFailFast() {
s.Given().
Workflow("@expectedfailures/parallelism-step-fail-fast.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, "template has failed or errored children and failFast enabled", status.Message)
assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("[0]").Phase)
assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("step1").Phase)
assert.Equal(t, wfv1.NodeSucceeded, status.Nodes.FindByDisplayName("step2").Phase)
})
}

func (s *FunctionalSuite) TestWorkflowParallelismDAGFailFast() {
s.Given().
Workflow("@expectedfailures/parallelism-dag-fail-fast.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, "template has failed or errored children and failFast enabled", status.Message)
assert.Equal(t, wfv1.NodeFailed, status.Nodes.FindByDisplayName("task1").Phase)
assert.Equal(t, wfv1.NodeSucceeded, status.Nodes.FindByDisplayName("task2").Phase)
})
}
39 changes: 36 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,25 @@ func (woc *wfOperationCtx) markNodeWaitingForLock(nodeName string, lockName stri
return node, nil
}

func (woc *wfOperationCtx) findLeafNodeWithType(boundaryID string, nodeType wfv1.NodeType) *wfv1.NodeStatus {
var leafNode *wfv1.NodeStatus
var dfs func(nodeID string)
dfs = func(nodeID string) {
node, err := woc.wf.Status.Nodes.Get(nodeID)
if err != nil {
return
}
if node.Type == nodeType {
leafNode = node
}
for _, childID := range node.Children {
dfs(childID)
}
}
dfs(boundaryID)
return leafNode
}

// checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism
func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error {
if woc.execWf.Spec.Parallelism != nil && woc.activePods >= *woc.execWf.Spec.Parallelism {
Expand All @@ -2781,7 +2800,14 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
if node != nil && (tmpl.GetType() == wfv1.TemplateTypeDAG || tmpl.GetType() == wfv1.TemplateTypeSteps) {
// Check failFast
if tmpl.IsFailFast() && woc.getUnsuccessfulChildren(node.ID) > 0 {
woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
if woc.getActivePods(node.ID) == 0 {
if tmpl.GetType() == wfv1.TemplateTypeSteps {
if leafStepGroupNode := woc.findLeafNodeWithType(node.ID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil {
woc.markNodePhase(leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
return ErrParallelismReached
}

Expand All @@ -2792,7 +2818,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
}
}

// if we are about to execute a pod, make sure our parent hasn't reached it's limit
// if we are about to execute a pod, make sure our parent hasn't reached its limit
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
if err != nil {
Expand All @@ -2810,7 +2836,14 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node

// Check failFast
if boundaryTemplate.IsFailFast() && woc.getUnsuccessfulChildren(boundaryID) > 0 {
woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
if woc.getActivePods(boundaryID) == 0 {
if boundaryTemplate.GetType() == wfv1.TemplateTypeSteps {
if leafStepGroupNode := woc.findLeafNodeWithType(boundaryID, wfv1.NodeTypeStepGroup); leafStepGroupNode != nil {
woc.markNodePhase(leafStepGroupNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
}
woc.markNodePhase(boundaryNode.Name, wfv1.NodeFailed, "template has failed or errored children and failFast enabled")
}
return ErrParallelismReached
}

Expand Down

0 comments on commit a9fb455

Please sign in to comment.