diff --git a/api/handler/batch.go b/api/handler/batch.go index 035eeaa1..3c945d61 100644 --- a/api/handler/batch.go +++ b/api/handler/batch.go @@ -9,6 +9,7 @@ import ( "github.com/ovh/utask/models/task" "github.com/ovh/utask/models/tasktemplate" "github.com/ovh/utask/pkg/taskutils" + "github.com/ovh/utask/pkg/utils" ) type createBatchIn struct { @@ -34,6 +35,10 @@ func CreateBatch(c *gin.Context, in *createBatchIn) (*task.Batch, error) { return nil, err } + if err := utils.ValidateTags(in.Tags); err != nil { + return nil, err + } + if err := dbp.Tx(); err != nil { return nil, err } diff --git a/api/handler/task.go b/api/handler/task.go index bd95d135..d54053e1 100644 --- a/api/handler/task.go +++ b/api/handler/task.go @@ -16,6 +16,7 @@ import ( "github.com/ovh/utask/models/tasktemplate" "github.com/ovh/utask/pkg/auth" "github.com/ovh/utask/pkg/taskutils" + "github.com/ovh/utask/pkg/utils" ) type createTaskIn struct { @@ -51,6 +52,10 @@ func CreateTask(c *gin.Context, in *createTaskIn) (*task.Task, error) { return nil, err } + if err := utils.ValidateTags(in.Tags); err != nil { + return nil, err + } + t, err := taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, []string{}, in.Input, nil, in.Comment, in.Delay, in.Tags) if err != nil { dbp.Rollback() @@ -264,6 +269,10 @@ func UpdateTask(c *gin.Context, in *updateTaskIn) (*task.Task, error) { t.SetInput(clearInput) t.SetWatcherUsernames(in.WatcherUsernames) + + if err := utils.ValidateTags(in.Tags); err != nil { + return nil, err + } t.SetTags(in.Tags, nil) if err := t.Update(dbp, diff --git a/engine/engine.go b/engine/engine.go index 16c833cf..e1383e1a 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -25,6 +25,7 @@ import ( "github.com/ovh/utask/models/runnerinstance" "github.com/ovh/utask/models/task" "github.com/ovh/utask/models/tasktemplate" + "github.com/ovh/utask/pkg/constants" "github.com/ovh/utask/pkg/jsonschema" "github.com/ovh/utask/pkg/now" "github.com/ovh/utask/pkg/utils" @@ -530,6 +531,42 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm } utask.ReleaseExecutionSlot() + if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil { + debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err) + } +} + +func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error { + switch currentTask.State { + case task.StateDone, task.StateWontfix, task.StateCancelled: + default: + return nil + } + if currentTask.Tags == nil { + return nil + } + parentTaskID, ok := currentTask.Tags[constants.SubtaskTagParentTaskID] + if !ok { + return nil + } + + parentTask, err := task.LoadFromPublicID(dbp, parentTaskID) + if err != nil { + return err + } + switch parentTask.State { + case task.StateBlocked, task.StateRunning: + default: + // not allowed to resume a parent task that is not either Running or Blocked. + // Todo state should not be runned as it might need manual resolution from a granted resolver + return nil + } + if parentTask.Resolution == nil { + return nil + } + + debugLogger.Debugf("resuming parent task %q resolution %q", parentTask.PublicID, *parentTask.Resolution) + return GetEngine().Resolve(*parentTask.Resolution, sm) } func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) error { diff --git a/engine/engine_test.go b/engine/engine_test.go index a5333a46..8e3bf1f0 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -24,7 +24,7 @@ import ( "github.com/ovh/utask/db/pgjuju" "github.com/ovh/utask/engine" "github.com/ovh/utask/engine/functions" - "github.com/ovh/utask/engine/functions/runner" + functionrunner "github.com/ovh/utask/engine/functions/runner" "github.com/ovh/utask/engine/step" "github.com/ovh/utask/engine/values" "github.com/ovh/utask/models/resolution" @@ -33,6 +33,7 @@ import ( "github.com/ovh/utask/pkg/now" "github.com/ovh/utask/pkg/plugins/builtin/echo" "github.com/ovh/utask/pkg/plugins/builtin/script" + pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask" ) const ( @@ -67,6 +68,7 @@ func TestMain(m *testing.M) { step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin) step.RegisterRunner(script.Plugin.PluginName(), script.Plugin) + step.RegisterRunner(pluginsubtask.Plugin.PluginName(), pluginsubtask.Plugin) os.Exit(m.Run()) } @@ -859,3 +861,72 @@ func TestBaseBaseConfiguration(t *testing.T) { assert.NotEqual(t, res.Steps["stepOne"].Error, res.Steps["stepTwo"].Error) } + +func TestResolveSubTask(t *testing.T) { + dbp, err := zesty.NewDBProvider(utask.DBName) + require.Nil(t, err) + + tt, err := templateFromYAML(dbp, "variables.yaml") + require.Nil(t, err) + tt.Normalize() + assert.Equal(t, "variableeval", tt.Name) + require.Nil(t, tt.Valid()) + + err = dbp.DB().Insert(tt) + require.Nil(t, err) + + res, err := createResolution("subtask.yaml", map[string]interface{}{}, nil) + require.Nil(t, err, "failed to create resolution: %s", err) + + res, err = runResolution(res) + require.Nil(t, err) + require.NotNil(t, res) + assert.Equal(t, resolution.StateError, res.State) + + subtaskCreationOutput := res.Steps["subtaskCreation"].Output.(map[string]interface{}) + subtaskPublicID := subtaskCreationOutput["id"].(string) + + subtask, err := task.LoadFromPublicID(dbp, subtaskPublicID) + require.Nil(t, err) + assert.Equal(t, task.StateTODO, subtask.State) + + subtaskResolution, err := resolution.Create(dbp, subtask, nil, "", false, nil) + require.Nil(t, err) + + subtaskResolution, err = runResolution(subtaskResolution) + require.Nil(t, err) + assert.Equal(t, task.StateDone, subtaskResolution.State) + for k, v := range subtaskResolution.Steps { + assert.Equal(t, step.StateDone, v.State, "not valid state for step %s", k) + } + + // checking if the parent task is picked up after that the subtask is resolved. + // need to sleep a bit because the parent task is resumed asynchronously + ti := time.Second + i := time.Duration(0) + for i < ti { + res, err = resolution.LoadFromPublicID(dbp, res.PublicID) + require.Nil(t, err) + if res.State != resolution.StateError { + break + } + + time.Sleep(time.Millisecond * 10) + i += time.Millisecond * 10 + } + + ti = time.Second + i = time.Duration(0) + for i < ti { + res, err = resolution.LoadFromPublicID(dbp, res.PublicID) + require.Nil(t, err) + if res.State != resolution.StateRunning { + break + } + + time.Sleep(time.Millisecond * 10) + i += time.Millisecond * 10 + + } + assert.Equal(t, resolution.StateDone, res.State) +} diff --git a/engine/templates_tests/subtask.yaml b/engine/templates_tests/subtask.yaml new file mode 100644 index 00000000..677fb0d2 --- /dev/null +++ b/engine/templates_tests/subtask.yaml @@ -0,0 +1,19 @@ +name: subtaskTemplate +description: Template that spawns a subtask +title_format: "[test] subtask template test" +steps: + subtaskCreation: + description: creating a subtask + action: + type: subtask + configuration: + template: variableeval + echoOK: + description: everything is OK + action: + type: echo + configuration: + output: + foo: OK +result_format: + foo: "{{.step.echoOK.output.foo}}" diff --git a/models/tasktemplate/template.go b/models/tasktemplate/template.go index 6d89db72..588a3f0e 100644 --- a/models/tasktemplate/template.go +++ b/models/tasktemplate/template.go @@ -308,6 +308,10 @@ func (tt *TaskTemplate) Valid() (err error) { } } + if err := utils.ValidateTags(tt.Tags); err != nil { + return err + } + if tt.LongDescription != nil { if err := utils.ValidText("template long description", *tt.LongDescription); err != nil { return err diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go new file mode 100644 index 00000000..5f88a313 --- /dev/null +++ b/pkg/constants/constants.go @@ -0,0 +1,8 @@ +package constants + +const ( + // SubtaskTagParentTaskID is the tag key that utask will use to lookup + // if a completed task has a parent task, and that parent task should be + // resumed. + SubtaskTagParentTaskID = "_utask_parent_task_id" +) diff --git a/pkg/plugins/builtin/subtask/subtask.go b/pkg/plugins/builtin/subtask/subtask.go index ffea255c..cadf1169 100644 --- a/pkg/plugins/builtin/subtask/subtask.go +++ b/pkg/plugins/builtin/subtask/subtask.go @@ -11,6 +11,7 @@ import ( "github.com/ovh/utask/models/task" "github.com/ovh/utask/models/tasktemplate" "github.com/ovh/utask/pkg/auth" + "github.com/ovh/utask/pkg/constants" "github.com/ovh/utask/pkg/plugins/taskplugin" "github.com/ovh/utask/pkg/taskutils" "github.com/ovh/utask/pkg/templateimport" @@ -39,12 +40,14 @@ type SubtaskConfig struct { // SubtaskContext is the metadata inherited from the "parent" task" type SubtaskContext struct { + ParentTaskID string `json:"parent_task_id"` TaskID string `json:"task_id"` RequesterUsername string `json:"requester_username"` } func ctx(stepName string) interface{} { return &SubtaskContext{ + ParentTaskID: "{{ .task.task_id }}", TaskID: fmt.Sprintf("{{ if (index .step `%s` ) }}{{ if (index .step `%s` `output`) }}{{ index .step `%s` `output` `id` }}{{ end }}{{ end }}", stepName, stepName, stepName), RequesterUsername: "{{.task.requester_username}}", } @@ -53,16 +56,24 @@ func ctx(stepName string) interface{} { func validConfig(config interface{}) error { cfg := config.(*SubtaskConfig) + if err := utils.ValidateTags(cfg.Tags); err != nil { + return err + } + dbp, err := zesty.NewDBProvider(utask.DBName) if err != nil { return fmt.Errorf("can't retrieve connexion to DB: %s", err) } _, err = tasktemplate.LoadFromName(dbp, cfg.Template) - if err != nil && !errors.IsNotFound(err) { + if err == nil { + return nil + } + if !errors.IsNotFound(err) { return fmt.Errorf("can't load template from name: %s", err) } + // searching into currently imported templates templates := templateimport.GetTemplates() for _, template := range templates { if template == cfg.Template { @@ -117,6 +128,10 @@ func exec(stepName string, config interface{}, ctx interface{}) (interface{}, in // TODO inherit watchers from parent task ctx := auth.WithIdentity(context.Background(), stepContext.RequesterUsername) + if cfg.Tags == nil { + cfg.Tags = map[string]string{} + } + cfg.Tags[constants.SubtaskTagParentTaskID] = stepContext.ParentTaskID t, err = taskutils.CreateTask(ctx, dbp, tt, watcherUsernames, resolverUsernames, cfg.Input, nil, "Auto created subtask", cfg.Delay, cfg.Tags) if err != nil { dbp.Rollback() diff --git a/pkg/plugins/builtin/tag/tag.go b/pkg/plugins/builtin/tag/tag.go index bed2126a..750a5887 100644 --- a/pkg/plugins/builtin/tag/tag.go +++ b/pkg/plugins/builtin/tag/tag.go @@ -2,6 +2,7 @@ package plugintag import ( "github.com/ovh/utask/pkg/plugins/taskplugin" + "github.com/ovh/utask/pkg/utils" ) // The tag plugin allow to update the tags of a task. @@ -17,7 +18,13 @@ type Config struct { Tags map[string]string `json:"tags"` } -func validConfig(_ interface{}) error { +func validConfig(config interface{}) error { + cfg := config.(*Config) + + if err := utils.ValidateTags(cfg.Tags); err != nil { + return err + } + return nil } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 19d91ad9..756d6222 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/ovh/utask" + "github.com/ovh/utask/pkg/constants" "github.com/juju/errors" ) @@ -39,6 +40,18 @@ func NormalizeName(s string) string { return strings.ToLower(strings.TrimSpace(s)) } +func ValidateTags(tags map[string]string) error { + if tags == nil { + return nil + } + for k := range tags { + if k == constants.SubtaskTagParentTaskID { + return errors.BadRequestf("tag name %q not allowed", k) + } + } + return nil +} + // ListContainsString asserts that a string slice contains a given string func ListContainsString(list []string, item string) bool { if list != nil {