Skip to content

Commit

Permalink
fix running state hooks in a dag template can not refer to output of …
Browse files Browse the repository at this point in the history
…earlier tasks

Signed-off-by: joey <[email protected]>
  • Loading branch information
chengjoey committed Sep 22, 2024
1 parent dc731d0 commit 25be232
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 2 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope *
woc.log.WithField("lifeCycleHook", hookName).WithField("node", hookNodeName).WithField("hookName", hookName).Info("Running hooks")
resolvedArgs := hook.Arguments
var err error
if !resolvedArgs.IsEmpty() && outputs != nil {
if !resolvedArgs.IsEmpty() {
resolvedArgs, err = woc.resolveExitTmplArgument(hook.Arguments, prefix, outputs, scope)
if err != nil {
return false, err
Expand Down
122 changes: 121 additions & 1 deletion workflow/controller/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ spec:
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestWfTemplHookWfWaitForTriggeredHook(t *testing.T) {
func TestWfTmplHookWfWaitForTriggeredHook(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down Expand Up @@ -1258,3 +1258,123 @@ spec:
assert.Nil(t, node.NodeFlag)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestWfTmplHookReferToOutputsOfEarlierTask(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hooks-test
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: task-a
template: make-outputs
arguments:
parameters:
- name: message
value: "A message from task A"
- name: task-b
dependencies: [task-a]
template: echo
arguments:
parameters:
- name: message
value: "A message from task B"
hooks:
running:
expression: tasks["task-b"].status == "Running"
template: echo
arguments:
parameters:
- name: message
value: "A running hook in a DAG task refer to earlier task(A): {{tasks.task-a.outputs.parameters.result}}"
- name: make-outputs
outputs:
parameters:
- name: result
valueFrom:
path: /tmp/value
script:
image: alpine:latest
command: [sh, -c]
source: |
echo "Some output" > /tmp/value
- name: echo
inputs:
parameters:
- name: message
script:
image: bash
command: [bash]
source: |
echo "{{inputs.parameters.message}}"
`)

// Setup
cancel, controller := newController(wf)
defer cancel()
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodRunning)

// Make task-a pod completed and add outputs
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withOutputs(wfv1.Outputs{
Parameters: []wfv1.Parameter{{Name: "result", Value: wfv1.AnyStringPtr("Some output")}},
}))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.Progress("1/2"), woc.wf.Status.Progress)
node := woc.wf.Status.Nodes.FindByDisplayName("task-a")
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)

// Make task-b running
podcs := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
pods, err := podcs.List(ctx, metav1.ListOptions{})
assert.NoError(t, err)

Check failure on line 1341 in workflow/controller/hooks_test.go

View workflow job for this annotation

GitHub Actions / Lint

require-error: for error assertions use require (testifylint)
for _, pod := range pods.Items {
if pod.Annotations["workflows.argoproj.io/node-name"] == "hooks-test.task-b" {
pod.Status.Phase = apiv1.PodRunning
_, _ = podcs.Update(ctx, &pod, metav1.UpdateOptions{})
}
}
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
node = woc.wf.Status.Nodes.FindByDisplayName("task-b.hooks.running")
assert.NotNil(t, node)
assert.Equal(t, wfv1.NodePending, node.Phase)

// Check running hook arg refer to earlier task outputs has been resolved
assert.Equal(t, "A running hook in a DAG task refer to earlier task(A): Some output", node.Inputs.Parameters[0].Value.String())
podcs = woc.controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace())
pods, err = podcs.List(ctx, metav1.ListOptions{})

Check failure on line 1357 in workflow/controller/hooks_test.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to err (ineffassign)
var runningHookPod *apiv1.Pod
for _, pod := range pods.Items {
if pod.Annotations["workflows.argoproj.io/node-name"] == "hooks-test.task-b.hooks.running" {
runningHookPod = &pod
}
}
assert.NotNil(t, runningHookPod)
var tmpl wfv1.Template
var tmplEnv string
for _, env := range runningHookPod.Spec.Containers[0].Env {
if env.Name == common.EnvVarTemplate {
tmplEnv = env.Value
}
}
wfv1.MustUnmarshal(tmplEnv, &tmpl)
assert.Equal(t, "echo \"A running hook in a DAG task refer to earlier task(A): Some output\"\n", tmpl.Script.Source)

// Make task-b and running hook pod completed
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

0 comments on commit 25be232

Please sign in to comment.