diff --git a/tasks/workflow/consul_test.go b/tasks/workflow/consul_test.go index 6730cf4ce..53ec2e891 100644 --- a/tasks/workflow/consul_test.go +++ b/tasks/workflow/consul_test.go @@ -79,6 +79,9 @@ func TestRunConsulWorkflowPackageTests(t *testing.T) { t.Run("TestRunWorkflowStepReplay", func(t *testing.T) { testRunWorkflowStepReplay(t, srv, client) }) + t.Run("testConcurrentTaskExecutionsForNextStep", func(t *testing.T) { + testConcurrentTaskExecutionsForNextStep(t, srv, client) + }) }) } diff --git a/tasks/workflow/task_execution.go b/tasks/workflow/task_execution.go index 86dcaf350..5601a5d6e 100644 --- a/tasks/workflow/task_execution.go +++ b/tasks/workflow/task_execution.go @@ -19,6 +19,7 @@ import ( "fmt" "path" "strconv" + "strings" "time" "github.com/hashicorp/consul/api" @@ -141,6 +142,7 @@ func numberOfRunningExecutionsForTask(cc *api.Client, taskID string) (*consuluti func (t *taskExecution) notifyEnd() error { log.Debugf("notifyEnd for taskExecution %q", t.id) execPath := path.Join(consulutil.TasksPrefix, t.taskID, ".runningExecutions") + registeredPath := path.Join(consulutil.TasksPrefix, t.taskID, ".registeredExecutions") l, e, err := numberOfRunningExecutionsForTask(t.cc, t.taskID) if err != nil { return err @@ -149,10 +151,28 @@ func (t *taskExecution) notifyEnd() error { kv := t.cc.KV() // Delete our execID + ops := api.KVTxnOps{ + &api.KVTxnOp{ + Verb: api.KVDelete, + Key: path.Join(execPath, t.id), + }, + &api.KVTxnOp{ + Verb: api.KVDelete, + Key: path.Join(registeredPath, t.step), + }, + } log.Debugf("Deleting runningExecutions with id %q for task %q", t.id, t.taskID) - _, err = kv.Delete(path.Join(execPath, t.id), nil) + ok, response, _, err := kv.Txn(ops, nil) if err != nil { - return errors.Wrap(err, consulutil.ConsulGenericErrMsg) + return errors.Wrap(err, "Failed to execute transaction") + } + if !ok { + // Check the response + var errs []string + for _, e := range response.Errors { + errs = append(errs, e.What) + } + return errors.Errorf("Failed to execute transaction: %s", strings.Join(errs, ", ")) } if e <= 1 && t.finalFunction != nil { log.Debugf("notifyEnd running finalFunction for taskExecution %q", t.id) diff --git a/tasks/workflow/testdata/workflow.yaml b/tasks/workflow/testdata/workflow.yaml index 89600bd64..3055696a4 100644 --- a/tasks/workflow/testdata/workflow.yaml +++ b/tasks/workflow/testdata/workflow.yaml @@ -79,6 +79,22 @@ topology_template: protocol: tcp network_name: PRIVATE initiator: source + + Compute_2: + type: ystia.yorc.tests.nodes.WFCompute + capabilities: + scalable: + properties: + min_instances: 1 + max_instances: 1 + default_instances: 1 + endpoint: + properties: + secure: true + protocol: tcp + network_name: PRIVATE + initiator: source + JobNode: type: ystia.yorc.tests.nodes.JobNode workflows: @@ -142,6 +158,12 @@ topology_template: on_success: - WFNode_hostedOnComputeHost_add_target - WFNode_creating + Compute_2_install: + target: Compute_2 + activities: + - delegate: install + on_success: + - WFNode_creating WFNode_initial: target: WFNode activities: diff --git a/tasks/workflow/worker_test.go b/tasks/workflow/worker_test.go index 29502746e..d88ac5daa 100644 --- a/tasks/workflow/worker_test.go +++ b/tasks/workflow/worker_test.go @@ -233,6 +233,81 @@ func testRunWorkflowStepReplay(t *testing.T, srv *testutil.TestServer, client *a require.Equal(t, true, foundStep, "Did not find step %s in next steps to execute", expectedNextStep) } +func testConcurrentTaskExecutionsForNextStep(t *testing.T, srv *testutil.TestServer, client *api.Client) { + myWorker := &worker{ + consulClient: client, + cfg: config.Configuration{ + // Ensure we are not deleting filesystem files elsewhere + WorkingDirectory: "./testdata/work/", + }, + } + myWorker2 := &worker{ + consulClient: client, + cfg: config.Configuration{ + // Ensure we are not deleting filesystem files elsewhere + WorkingDirectory: "./testdata/work/", + }, + } + deploymentID := "TestRunWf" + taskID := "tWorkflow" + topologyPath := "testdata/workflow.yaml" + ctx := context.Background() + err := deployments.StoreDeploymentDefinition(ctx, deploymentID, topologyPath) + require.NoError(t, err, "Unexpected error storing %s", topologyPath) + + // Registering test data with the first step of the workflow already done + srv.PopulateKV(t, testData(deploymentID)) + + wfName := "install" + + srv.PopulateKV(t, testData(deploymentID)) + deployments.SetDeploymentStatus(context.Background(), deploymentID, deployments.DEPLOYMENT_IN_PROGRESS) + + mockExecutor := &mockExecutor{} + registry.GetRegistry().RegisterDelegates([]string{"ystia.yorc.tests.nodes.WFCompute"}, mockExecutor, "tests") + + var myTaskExecution taskExecution + myTaskExecution.cc = client + myTaskExecution.targetID = deploymentID + myTaskExecution.taskID = taskID + myTaskExecution.step = "Compute_install" + + var myTaskExecution2 taskExecution + myTaskExecution2.cc = client + myTaskExecution2.targetID = deploymentID + myTaskExecution2.taskID = taskID + myTaskExecution2.step = "Compute_2_install" + + expectedNextStep := "WFNode_creating" + + // Test for duplicated task execution + // When Compute_install and Compute_2_install steps complete, they continue to register the next step WFNode_creating + // but only one task execution should be registered for the step WFNode_creating + srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_install"), []byte("initial")) + srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "Compute_2_install"), []byte("DONE")) + srv.SetKV(t, path.Join(consulutil.WorkflowsPrefix, taskID, "WFNode_initial"), []byte("DONE")) + + err = myWorker.runWorkflowStep(context.Background(), &myTaskExecution, wfName, false) + require.NoError(t, err) + err = myWorker2.runWorkflowStep(context.Background(), &myTaskExecution2, wfName, false) + require.NoError(t, err) + + // Test that consul contains an execution for next step + execKeys, _, err := consulutil.GetKV().Keys(consulutil.ExecutionsTaskPrefix+"/", "/", nil) + require.NoError(t, err) + foundStep := 0 + for _, execKey := range execKeys { + execID := path.Base(execKey) + execPath := path.Join(consulutil.ExecutionsTaskPrefix, execID) + found, value, err := consulutil.GetStringValue(path.Join(execPath, "step")) + require.NoError(t, err) + if found && value == expectedNextStep { + foundStep++ + } + } + require.Equal(t, 1, foundStep, "Found more than one task execution is registered for one step %s", expectedNextStep) +} + // Construct key/value to initialise KV before running test func testData(deploymentID string) map[string][]byte { return map[string][]byte{ diff --git a/tasks/workflow/workflow.go b/tasks/workflow/workflow.go index 097732d05..a99d246c6 100644 --- a/tasks/workflow/workflow.go +++ b/tasks/workflow/workflow.go @@ -49,6 +49,14 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps, ops := make(api.KVTxnOps, 0) var stepOps api.KVTxnOps for _, step := range steps { + exist, err := existTaskExecutionForStep(taskID, step.Name) + if err != nil { + return ops, errors.Wrapf(err, "Failed to check registered task execution for TaskID:%q, step:%q", taskID, step.Name) + } + if exist { + log.Debugf("Step:%q in TaskID:%q has already been registered for execution", step.Name, taskID) + continue + } // Add execution key for initial steps only u, err := uuid.NewV4() if err != nil { @@ -73,6 +81,11 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps, Key: path.Join(consulutil.TasksPrefix, taskID, ".runningExecutions", execID), Value: []byte(""), }, + &api.KVTxnOp{ + Verb: api.KVSet, + Key: path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", step.Name), + Value: []byte(""), + }, } ops = append(ops, stepOps...) log.Debugf("Will store runningExecutions with id %q in txn for task %q", execID, taskID) @@ -80,6 +93,14 @@ func createWorkflowStepsOperations(taskID string, steps []*step) (api.KVTxnOps, return ops, nil } +func existTaskExecutionForStep(taskID string, stepName string) (bool, error) { + exist, _, err := consulutil.GetValue(path.Join(consulutil.TasksPrefix, taskID, ".registeredExecutions", stepName)) + if err != nil { + return false, err + } + return exist, nil +} + func getCallOperationsFromStep(s *step) []string { ops := make([]string, 0) for _, a := range s.Activities {