Skip to content

Commit

Permalink
fix: terminate workflow should not get throttled Fixes #12778 (#12792)
Browse files Browse the repository at this point in the history
Signed-off-by: Tianchu Zhao <[email protected]>
  • Loading branch information
tczhao authored Mar 26, 2024
1 parent 8b30448 commit a82a689
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {

woc := newWorkflowOperationCtx(wf, wfc)

if !wfc.throttler.Admit(key.(string)) {
if !(woc.GetShutdownStrategy().Enabled() && woc.GetShutdownStrategy() == wfv1.ShutdownStrategyTerminate) && !wfc.throttler.Admit(key.(string)) {
log.WithField("key", key).Info("Workflow processing has been postponed due to max parallelism limit")
if woc.wf.Status.Phase == wfv1.WorkflowUnknown {
woc.markWorkflowPhase(ctx, wfv1.WorkflowPending, "Workflow processing has been postponed because too many workflows are already running")
Expand Down
36 changes: 36 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,13 +691,25 @@ spec:
- name: main
container:
image: my-image
`),
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-wf-2
spec:
shutdown: Terminate
entrypoint: main
templates:
- name: main
container:
image: my-image
`),
f,
)
defer cancel()
ctx := context.Background()
assert.True(t, controller.processNextItem(ctx))
assert.True(t, controller.processNextItem(ctx))
assert.True(t, controller.processNextItem(ctx))

expectWorkflow(ctx, controller, "my-wf-0", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
Expand All @@ -710,6 +722,11 @@ spec:
assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message)
}
})
expectWorkflow(ctx, controller, "my-wf-2", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
assert.Equal(t, wfv1.WorkflowSucceeded, wf.Status.Phase)
}
})
})
}
}
Expand Down Expand Up @@ -1164,6 +1181,25 @@ spec:
assert.Len(t, pods.Items, 0)
}

func TestPendingPodWhenTerminate(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
wf.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
wf.Status.Phase = wfv1.WorkflowPending

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
assert.True(t, controller.processNextItem(ctx))

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodeSkipped, node.Phase)
}
}

func TestWorkflowReferItselfFromExpression(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(fromExrpessingWf)
cancel, controller := newController(wf)
Expand Down

0 comments on commit a82a689

Please sign in to comment.