diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go new file mode 100644 index 0000000000..42e6ab95f5 --- /dev/null +++ b/bundle/config/mutator/apply_presets.go @@ -0,0 +1,209 @@ +package mutator + +import ( + "context" + "path" + "slices" + "sort" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/textutil" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/ml" +) + +type applyPresets struct{} + +// Apply all presets, e.g. the prefix presets that +// adds a prefix to all names of all resources. +func ApplyPresets() *applyPresets { + return &applyPresets{} +} + +type Tag struct { + Key string + Value string +} + +func (m *applyPresets) Name() string { + return "ApplyPresets" +} + +func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + if d := validatePauseStatus(b); d != nil { + return d + } + + r := b.Config.Resources + t := b.Config.Presets + prefix := t.NamePrefix + tags := toTagArray(t.Tags) + + // Jobs presets: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus + for _, j := range r.Jobs { + j.Name = prefix + j.Name + if j.Tags == nil { + j.Tags = make(map[string]string) + } + for _, tag := range tags { + if j.Tags[tag.Key] == "" { + j.Tags[tag.Key] = tag.Value + } + } + if j.MaxConcurrentRuns == 0 { + j.MaxConcurrentRuns = t.JobsMaxConcurrentRuns + } + if t.TriggerPauseStatus != "" { + paused := jobs.PauseStatusPaused + if t.TriggerPauseStatus == config.Unpaused { + paused = jobs.PauseStatusUnpaused + } + + if j.Schedule != nil && j.Schedule.PauseStatus == "" { + j.Schedule.PauseStatus = paused + } + if j.Continuous != nil && j.Continuous.PauseStatus == "" { + j.Continuous.PauseStatus = paused + } + if j.Trigger != nil && j.Trigger.PauseStatus == "" { + j.Trigger.PauseStatus = paused + } + } + } + + // Pipelines presets: Prefix, PipelinesDevelopment + for i := range r.Pipelines { + r.Pipelines[i].Name = prefix + r.Pipelines[i].Name + if config.IsExplicitlyEnabled(t.PipelinesDevelopment) { + r.Pipelines[i].Development = true + } + if t.TriggerPauseStatus == config.Paused { + r.Pipelines[i].Continuous = false + } + + // As of 2024-06, pipelines don't yet support tags + } + + // Models presets: Prefix, Tags + for _, m := range r.Models { + m.Name = prefix + m.Name + for _, t := range tags { + exists := slices.ContainsFunc(m.Tags, func(modelTag ml.ModelTag) bool { + return modelTag.Key == t.Key + }) + if !exists { + // Only add this tag if the resource didn't include any tag that overrides its value. + m.Tags = append(m.Tags, ml.ModelTag{Key: t.Key, Value: t.Value}) + } + } + } + + // Experiments presets: Prefix, Tags + for _, e := range r.Experiments { + filepath := e.Name + dir := path.Dir(filepath) + base := path.Base(filepath) + if dir == "." { + e.Name = prefix + base + } else { + e.Name = dir + "/" + prefix + base + } + for _, t := range tags { + exists := false + for _, experimentTag := range e.Tags { + if experimentTag.Key == t.Key { + exists = true + break + } + } + if !exists { + e.Tags = append(e.Tags, ml.ExperimentTag{Key: t.Key, Value: t.Value}) + } + } + } + + // Model serving endpoint presets: Prefix + for i := range r.ModelServingEndpoints { + r.ModelServingEndpoints[i].Name = normalizePrefix(prefix) + r.ModelServingEndpoints[i].Name + + // As of 2024-06, model serving endpoints don't yet support tags + } + + // Registered models presets: Prefix + for i := range r.RegisteredModels { + r.RegisteredModels[i].Name = normalizePrefix(prefix) + r.RegisteredModels[i].Name + + // As of 2024-06, registered models don't yet support tags + } + + // Quality monitors presets: Prefix + if t.TriggerPauseStatus == config.Paused { + for i := range r.QualityMonitors { + // Remove all schedules from monitors, since they don't support pausing/unpausing. + // Quality monitors might support the "pause" property in the future, so at the + // CLI level we do respect that property if it is set to "unpaused." + if r.QualityMonitors[i].Schedule != nil && r.QualityMonitors[i].Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused { + r.QualityMonitors[i].Schedule = nil + } + } + } + + // Schemas: Prefix + for i := range r.Schemas { + prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_" + r.Schemas[i].Name = prefix + r.Schemas[i].Name + // HTTP API for schemas doesn't yet support tags. It's only supported in + // the Databricks UI and via the SQL API. + } + + return nil +} + +func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics { + p := b.Config.Presets.TriggerPauseStatus + if p == "" || p == config.Paused || p == config.Unpaused { + return nil + } + return diag.Diagnostics{{ + Summary: "Invalid value for trigger_pause_status, should be PAUSED or UNPAUSED", + Severity: diag.Error, + Locations: []dyn.Location{b.Config.GetLocation("presets.trigger_pause_status")}, + }} +} + +// toTagArray converts a map of tags to an array of tags. +// We sort tags so ensure stable ordering. +func toTagArray(tags map[string]string) []Tag { + var tagArray []Tag + if tags == nil { + return tagArray + } + for key, value := range tags { + tagArray = append(tagArray, Tag{Key: key, Value: value}) + } + sort.Slice(tagArray, func(i, j int) bool { + return tagArray[i].Key < tagArray[j].Key + }) + return tagArray +} + +// normalizePrefix prefixes strings like '[dev lennart] ' to 'dev_lennart_'. +// We leave unicode letters and numbers but remove all "special characters." +func normalizePrefix(prefix string) string { + prefix = strings.ReplaceAll(prefix, "[", "") + prefix = strings.Trim(prefix, " ") + + // If the prefix ends with a ']', we add an underscore to the end. + // This makes sure that we get names like "dev_user_endpoint" instead of "dev_userendpoint" + suffix := "" + if strings.HasSuffix(prefix, "]") { + suffix = "_" + } + + return textutil.NormalizeString(prefix) + suffix +} diff --git a/bundle/config/mutator/apply_presets_test.go b/bundle/config/mutator/apply_presets_test.go new file mode 100644 index 0000000000..35dac1f7dc --- /dev/null +++ b/bundle/config/mutator/apply_presets_test.go @@ -0,0 +1,196 @@ +package mutator_test + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +func TestApplyPresetsPrefix(t *testing.T) { + tests := []struct { + name string + prefix string + job *resources.Job + want string + }{ + { + name: "add prefix to job", + prefix: "prefix-", + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + }, + }, + want: "prefix-job1", + }, + { + name: "add empty prefix to job", + prefix: "", + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + }, + }, + want: "job1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": tt.job, + }, + }, + Presets: config.Presets{ + NamePrefix: tt.prefix, + }, + }, + } + + ctx := context.Background() + diag := bundle.Apply(ctx, b, mutator.ApplyPresets()) + + if diag.HasError() { + t.Fatalf("unexpected error: %v", diag) + } + + require.Equal(t, tt.want, b.Config.Resources.Jobs["job1"].Name) + }) + } +} + +func TestApplyPresetsTags(t *testing.T) { + tests := []struct { + name string + tags map[string]string + job *resources.Job + want map[string]string + }{ + { + name: "add tags to job", + tags: map[string]string{"env": "dev"}, + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + Tags: nil, + }, + }, + want: map[string]string{"env": "dev"}, + }, + { + name: "merge tags with existing job tags", + tags: map[string]string{"env": "dev"}, + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + Tags: map[string]string{"team": "data"}, + }, + }, + want: map[string]string{"env": "dev", "team": "data"}, + }, + { + name: "don't override existing job tags", + tags: map[string]string{"env": "dev"}, + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + Tags: map[string]string{"env": "prod"}, + }, + }, + want: map[string]string{"env": "prod"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": tt.job, + }, + }, + Presets: config.Presets{ + Tags: tt.tags, + }, + }, + } + + ctx := context.Background() + diag := bundle.Apply(ctx, b, mutator.ApplyPresets()) + + if diag.HasError() { + t.Fatalf("unexpected error: %v", diag) + } + + tags := b.Config.Resources.Jobs["job1"].Tags + require.Equal(t, tt.want, tags) + }) + } +} + +func TestApplyPresetsJobsMaxConcurrentRuns(t *testing.T) { + tests := []struct { + name string + job *resources.Job + setting int + want int + }{ + { + name: "set max concurrent runs", + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + MaxConcurrentRuns: 0, + }, + }, + setting: 5, + want: 5, + }, + { + name: "do not override existing max concurrent runs", + job: &resources.Job{ + JobSettings: &jobs.JobSettings{ + Name: "job1", + MaxConcurrentRuns: 3, + }, + }, + setting: 5, + want: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": tt.job, + }, + }, + Presets: config.Presets{ + JobsMaxConcurrentRuns: tt.setting, + }, + }, + } + ctx := context.Background() + diag := bundle.Apply(ctx, b, mutator.ApplyPresets()) + + if diag.HasError() { + t.Fatalf("unexpected error: %v", diag) + } + + require.Equal(t, tt.want, b.Config.Resources.Jobs["job1"].MaxConcurrentRuns) + }) + } +} diff --git a/bundle/config/mutator/process_target_mode.go b/bundle/config/mutator/process_target_mode.go index 9db97907dd..92ed286899 100644 --- a/bundle/config/mutator/process_target_mode.go +++ b/bundle/config/mutator/process_target_mode.go @@ -2,17 +2,14 @@ package mutator import ( "context" - "path" "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/libs/auth" "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/log" - "github.com/databricks/databricks-sdk-go/service/catalog" - "github.com/databricks/databricks-sdk-go/service/jobs" - "github.com/databricks/databricks-sdk-go/service/ml" ) type processTargetMode struct{} @@ -30,102 +27,75 @@ func (m *processTargetMode) Name() string { // Mark all resources as being for 'development' purposes, i.e. // changing their their name, adding tags, and (in the future) // marking them as 'hidden' in the UI. -func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { +func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) { if !b.Config.Bundle.Deployment.Lock.IsExplicitlyEnabled() { log.Infof(ctx, "Development mode: disabling deployment lock since bundle.deployment.lock.enabled is not set to true") disabled := false b.Config.Bundle.Deployment.Lock.Enabled = &disabled } - r := b.Config.Resources + t := &b.Config.Presets shortName := b.Config.Workspace.CurrentUser.ShortName - prefix := "[dev " + shortName + "] " - - // Generate a normalized version of the short name that can be used as a tag value. - tagValue := b.Tagging.NormalizeValue(shortName) - - for i := range r.Jobs { - r.Jobs[i].Name = prefix + r.Jobs[i].Name - if r.Jobs[i].Tags == nil { - r.Jobs[i].Tags = make(map[string]string) - } - r.Jobs[i].Tags["dev"] = tagValue - if r.Jobs[i].MaxConcurrentRuns == 0 { - r.Jobs[i].MaxConcurrentRuns = developmentConcurrentRuns - } - - // Pause each job. As an exception, we don't pause jobs that are explicitly - // marked as "unpaused". This allows users to override the default behavior - // of the development mode. - if r.Jobs[i].Schedule != nil && r.Jobs[i].Schedule.PauseStatus != jobs.PauseStatusUnpaused { - r.Jobs[i].Schedule.PauseStatus = jobs.PauseStatusPaused - } - if r.Jobs[i].Continuous != nil && r.Jobs[i].Continuous.PauseStatus != jobs.PauseStatusUnpaused { - r.Jobs[i].Continuous.PauseStatus = jobs.PauseStatusPaused - } - if r.Jobs[i].Trigger != nil && r.Jobs[i].Trigger.PauseStatus != jobs.PauseStatusUnpaused { - r.Jobs[i].Trigger.PauseStatus = jobs.PauseStatusPaused - } - } - for i := range r.Pipelines { - r.Pipelines[i].Name = prefix + r.Pipelines[i].Name - r.Pipelines[i].Development = true - // (pipelines don't yet support tags) + if t.NamePrefix == "" { + t.NamePrefix = "[dev " + shortName + "] " } - for i := range r.Models { - r.Models[i].Name = prefix + r.Models[i].Name - r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "dev", Value: tagValue}) - } - - for i := range r.Experiments { - filepath := r.Experiments[i].Name - dir := path.Dir(filepath) - base := path.Base(filepath) - if dir == "." { - r.Experiments[i].Name = prefix + base - } else { - r.Experiments[i].Name = dir + "/" + prefix + base - } - r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "dev", Value: tagValue}) + if t.Tags == nil { + t.Tags = map[string]string{} } - - for i := range r.ModelServingEndpoints { - prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_" - r.ModelServingEndpoints[i].Name = prefix + r.ModelServingEndpoints[i].Name - // (model serving doesn't yet support tags) + _, exists := t.Tags["dev"] + if !exists { + t.Tags["dev"] = b.Tagging.NormalizeValue(shortName) } - for i := range r.RegisteredModels { - prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_" - r.RegisteredModels[i].Name = prefix + r.RegisteredModels[i].Name - // (registered models in Unity Catalog don't yet support tags) + if t.JobsMaxConcurrentRuns == 0 { + t.JobsMaxConcurrentRuns = developmentConcurrentRuns } - for i := range r.QualityMonitors { - // Remove all schedules from monitors, since they don't support pausing/unpausing. - // Quality monitors might support the "pause" property in the future, so at the - // CLI level we do respect that property if it is set to "unpaused". - if r.QualityMonitors[i].Schedule != nil && r.QualityMonitors[i].Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused { - r.QualityMonitors[i].Schedule = nil - } + if t.TriggerPauseStatus == "" { + t.TriggerPauseStatus = config.Paused } - for i := range r.Schemas { - prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_" - r.Schemas[i].Name = prefix + r.Schemas[i].Name - // HTTP API for schemas doesn't yet support tags. It's only supported in - // the Databricks UI and via the SQL API. + if !config.IsExplicitlyDisabled(t.PipelinesDevelopment) { + enabled := true + t.PipelinesDevelopment = &enabled } - - return nil } func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics { + p := b.Config.Presets + u := b.Config.Workspace.CurrentUser + + // Make sure presets don't set the trigger status to UNPAUSED; + // this could be surprising since most users (and tools) expect triggers + // to be paused in development. + // (Note that there still is an exceptional case where users set the trigger + // status to UNPAUSED at the level of an individual object, whic hwas + // historically allowed.) + if p.TriggerPauseStatus == config.Unpaused { + return diag.Diagnostics{{ + Severity: diag.Error, + Summary: "target with 'mode: development' cannot set trigger pause status to UNPAUSED by default", + Locations: []dyn.Location{b.Config.GetLocation("presets.trigger_pause_status")}, + }} + } + + // Make sure this development copy has unique names and paths to avoid conflicts if path := findNonUserPath(b); path != "" { return diag.Errorf("%s must start with '~/' or contain the current username when using 'mode: development'", path) } + if p.NamePrefix != "" && !strings.Contains(p.NamePrefix, u.ShortName) && !strings.Contains(p.NamePrefix, u.UserName) { + // Resources such as pipelines require a unique name, e.g. '[dev steve] my_pipeline'. + // For this reason we require the name prefix to contain the current username; + // it's a pitfall for users if they don't include it and later find out that + // only a single user can do development deployments. + return diag.Diagnostics{{ + Severity: diag.Error, + Summary: "prefix should contain the current username or ${workspace.current_user.short_name} to ensure uniqueness when using 'mode: development'", + Locations: []dyn.Location{b.Config.GetLocation("presets.name_prefix")}, + }} + } return nil } @@ -182,10 +152,11 @@ func (m *processTargetMode) Apply(ctx context.Context, b *bundle.Bundle) diag.Di switch b.Config.Bundle.Mode { case config.Development: diags := validateDevelopmentMode(b) - if diags != nil { + if diags.HasError() { return diags } - return transformDevelopmentMode(ctx, b) + transformDevelopmentMode(ctx, b) + return diags case config.Production: isPrincipal := auth.IsServicePrincipal(b.Config.Workspace.CurrentUser.UserName) return validateProductionMode(ctx, b, isPrincipal) diff --git a/bundle/config/mutator/process_target_mode_test.go b/bundle/config/mutator/process_target_mode_test.go index f0c8aee9ea..1c8671b4c5 100644 --- a/bundle/config/mutator/process_target_mode_test.go +++ b/bundle/config/mutator/process_target_mode_test.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/tags" sdkconfig "github.com/databricks/databricks-sdk-go/config" "github.com/databricks/databricks-sdk-go/service/catalog" @@ -51,6 +52,7 @@ func mockBundle(mode config.Mode) *bundle.Bundle { Schedule: &jobs.CronSchedule{ QuartzCronExpression: "* * * * *", }, + Tags: map[string]string{"existing": "tag"}, }, }, "job2": { @@ -82,7 +84,7 @@ func mockBundle(mode config.Mode) *bundle.Bundle { }, }, Pipelines: map[string]*resources.Pipeline{ - "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1", Continuous: true}}, }, Experiments: map[string]*resources.MlflowExperiment{ "experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}}, @@ -129,12 +131,13 @@ func mockBundle(mode config.Mode) *bundle.Bundle { func TestProcessTargetModeDevelopment(t *testing.T) { b := mockBundle(config.Development) - m := ProcessTargetMode() + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) // Job 1 assert.Equal(t, "[dev lennart] job1", b.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, b.Config.Resources.Jobs["job1"].Tags["existing"], "tag") assert.Equal(t, b.Config.Resources.Jobs["job1"].Tags["dev"], "lennart") assert.Equal(t, b.Config.Resources.Jobs["job1"].Schedule.PauseStatus, jobs.PauseStatusPaused) @@ -145,6 +148,7 @@ func TestProcessTargetModeDevelopment(t *testing.T) { // Pipeline 1 assert.Equal(t, "[dev lennart] pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) + assert.Equal(t, false, b.Config.Resources.Pipelines["pipeline1"].Continuous) assert.True(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) // Experiment 1 @@ -182,7 +186,8 @@ func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) { }) b.Config.Workspace.CurrentUser.ShortName = "Héllö wörld?!" - diags := bundle.Apply(context.Background(), b, ProcessTargetMode()) + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) // Assert that tag normalization took place. @@ -196,7 +201,8 @@ func TestProcessTargetModeDevelopmentTagNormalizationForAzure(t *testing.T) { }) b.Config.Workspace.CurrentUser.ShortName = "Héllö wörld?!" - diags := bundle.Apply(context.Background(), b, ProcessTargetMode()) + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) // Assert that tag normalization took place (Azure allows more characters than AWS). @@ -210,17 +216,53 @@ func TestProcessTargetModeDevelopmentTagNormalizationForGcp(t *testing.T) { }) b.Config.Workspace.CurrentUser.ShortName = "Héllö wörld?!" - diags := bundle.Apply(context.Background(), b, ProcessTargetMode()) + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) // Assert that tag normalization took place. assert.Equal(t, "Hello_world", b.Config.Resources.Jobs["job1"].Tags["dev"]) } +func TestValidateDevelopmentMode(t *testing.T) { + // Test with a valid development mode bundle + b := mockBundle(config.Development) + diags := validateDevelopmentMode(b) + require.NoError(t, diags.Error()) + + // Test with a bundle that has a non-user path + b.Config.Workspace.RootPath = "/Shared/.bundle/x/y/state" + diags = validateDevelopmentMode(b) + require.ErrorContains(t, diags.Error(), "root_path") + + // Test with a bundle that has an unpaused trigger pause status + b = mockBundle(config.Development) + b.Config.Presets.TriggerPauseStatus = config.Unpaused + diags = validateDevelopmentMode(b) + require.ErrorContains(t, diags.Error(), "UNPAUSED") + + // Test with a bundle that has a prefix not containing the username or short name + b = mockBundle(config.Development) + b.Config.Presets.NamePrefix = "[prod]" + diags = validateDevelopmentMode(b) + require.Len(t, diags, 1) + assert.Equal(t, diag.Error, diags[0].Severity) + assert.Contains(t, diags[0].Summary, "") + + // Test with a bundle that has valid user paths + b = mockBundle(config.Development) + b.Config.Workspace.RootPath = "/Users/lennart@company.com/.bundle/x/y/state" + b.Config.Workspace.StatePath = "/Users/lennart@company.com/.bundle/x/y/state" + b.Config.Workspace.FilePath = "/Users/lennart@company.com/.bundle/x/y/files" + b.Config.Workspace.ArtifactPath = "/Users/lennart@company.com/.bundle/x/y/artifacts" + diags = validateDevelopmentMode(b) + require.NoError(t, diags.Error()) +} + func TestProcessTargetModeDefault(t *testing.T) { b := mockBundle("") - m := ProcessTargetMode() + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name) @@ -306,7 +348,7 @@ func TestAllResourcesMocked(t *testing.T) { func TestAllResourcesRenamed(t *testing.T) { b := mockBundle(config.Development) - m := ProcessTargetMode() + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) @@ -336,8 +378,7 @@ func TestDisableLocking(t *testing.T) { ctx := context.Background() b := mockBundle(config.Development) - err := bundle.Apply(ctx, b, ProcessTargetMode()) - require.Nil(t, err) + transformDevelopmentMode(ctx, b) assert.False(t, b.Config.Bundle.Deployment.Lock.IsEnabled()) } @@ -347,7 +388,97 @@ func TestDisableLockingDisabled(t *testing.T) { explicitlyEnabled := true b.Config.Bundle.Deployment.Lock.Enabled = &explicitlyEnabled - err := bundle.Apply(ctx, b, ProcessTargetMode()) - require.Nil(t, err) + transformDevelopmentMode(ctx, b) assert.True(t, b.Config.Bundle.Deployment.Lock.IsEnabled(), "Deployment lock should remain enabled in development mode when explicitly enabled") } + +func TestPrefixAlreadySet(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.NamePrefix = "custom_lennart_deploy_" + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.Equal(t, "custom_lennart_deploy_job1", b.Config.Resources.Jobs["job1"].Name) +} + +func TestTagsAlreadySet(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.Tags = map[string]string{ + "custom": "tag", + "dev": "foo", + } + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.Equal(t, "tag", b.Config.Resources.Jobs["job1"].Tags["custom"]) + assert.Equal(t, "foo", b.Config.Resources.Jobs["job1"].Tags["dev"]) +} + +func TestTagsNil(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.Tags = nil + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.Equal(t, "lennart", b.Config.Resources.Jobs["job2"].Tags["dev"]) +} + +func TestTagsEmptySet(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.Tags = map[string]string{} + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.Equal(t, "lennart", b.Config.Resources.Jobs["job2"].Tags["dev"]) +} + +func TestJobsMaxConcurrentRunsAlreadySet(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.JobsMaxConcurrentRuns = 10 + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.Equal(t, 10, b.Config.Resources.Jobs["job1"].MaxConcurrentRuns) +} + +func TestJobsMaxConcurrentRunsDisabled(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.JobsMaxConcurrentRuns = 1 + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.Equal(t, 1, b.Config.Resources.Jobs["job1"].MaxConcurrentRuns) +} + +func TestTriggerPauseStatusWhenUnpaused(t *testing.T) { + b := mockBundle(config.Development) + b.Config.Presets.TriggerPauseStatus = config.Unpaused + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.ErrorContains(t, diags.Error(), "target with 'mode: development' cannot set trigger pause status to UNPAUSED by default") +} + +func TestPipelinesDevelopmentDisabled(t *testing.T) { + b := mockBundle(config.Development) + notEnabled := false + b.Config.Presets.PipelinesDevelopment = ¬Enabled + + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) + diags := bundle.Apply(context.Background(), b, m) + require.NoError(t, diags.Error()) + + assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} diff --git a/bundle/config/presets.go b/bundle/config/presets.go new file mode 100644 index 0000000000..61009a2521 --- /dev/null +++ b/bundle/config/presets.go @@ -0,0 +1,32 @@ +package config + +const Paused = "PAUSED" +const Unpaused = "UNPAUSED" + +type Presets struct { + // NamePrefix to prepend to all resource names. + NamePrefix string `json:"name_prefix,omitempty"` + + // PipelinesDevelopment is the default value for the development field of pipelines. + PipelinesDevelopment *bool `json:"pipelines_development,omitempty"` + + // TriggerPauseStatus is the default value for the pause status of all triggers and schedules. + // Either config.Paused, config.Unpaused, or empty. + TriggerPauseStatus string `json:"trigger_pause_status,omitempty"` + + // JobsMaxConcurrentRuns is the default value for the max concurrent runs of jobs. + JobsMaxConcurrentRuns int `json:"jobs_max_concurrent_runs,omitempty"` + + // Tags to add to all resources. + Tags map[string]string `json:"tags,omitempty"` +} + +// IsExplicitlyEnabled tests whether this feature is explicitly enabled. +func IsExplicitlyEnabled(feature *bool) bool { + return feature != nil && *feature +} + +// IsExplicitlyDisabled tests whether this feature is explicitly disabled. +func IsExplicitlyDisabled(feature *bool) bool { + return feature != nil && !*feature +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 2c6fe1a4aa..86dc33921d 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -60,6 +60,10 @@ type Root struct { // RunAs section allows to define an execution identity for jobs and pipelines runs RunAs *jobs.JobRunAs `json:"run_as,omitempty"` + // Presets applies preset transformations throughout the bundle, e.g. + // adding a name prefix to deployed resources. + Presets Presets `json:"presets,omitempty"` + Experimental *Experimental `json:"experimental,omitempty"` // Permissions section allows to define permissions which will be @@ -307,6 +311,7 @@ func (r *Root) MergeTargetOverrides(name string) error { "resources", "sync", "permissions", + "presets", } { if root, err = mergeField(root, target, f); err != nil { return err diff --git a/bundle/config/target.go b/bundle/config/target.go index acc493574b..a2ef4d7356 100644 --- a/bundle/config/target.go +++ b/bundle/config/target.go @@ -20,6 +20,10 @@ type Target struct { // development purposes. Mode Mode `json:"mode,omitempty"` + // Mutator configurations that e.g. change the + // name prefix of deployed resources. + Presets Presets `json:"presets,omitempty"` + // Overrides the compute used for jobs and other supported assets. ComputeID string `json:"compute_id,omitempty"` diff --git a/bundle/config/validate/all_resources_have_values.go b/bundle/config/validate/all_resources_have_values.go index 019fe48a21..7f96e529a7 100644 --- a/bundle/config/validate/all_resources_have_values.go +++ b/bundle/config/validate/all_resources_have_values.go @@ -3,6 +3,7 @@ package validate import ( "context" "fmt" + "slices" "strings" "github.com/databricks/cli/bundle" @@ -21,27 +22,36 @@ func (m *allResourcesHaveValues) Name() string { } func (m *allResourcesHaveValues) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - rv := b.Config.Value().Get("resources") - - // Skip if there are no resources block defined, or the resources block is empty. - if rv.Kind() == dyn.KindInvalid || rv.Kind() == dyn.KindNil { - return nil - } + diags := diag.Diagnostics{} _, err := dyn.MapByPattern( - rv, - dyn.NewPattern(dyn.AnyKey(), dyn.AnyKey()), + b.Config.Value(), + dyn.NewPattern(dyn.Key("resources"), dyn.AnyKey(), dyn.AnyKey()), func(p dyn.Path, v dyn.Value) (dyn.Value, error) { - if v.Kind() == dyn.KindInvalid || v.Kind() == dyn.KindNil { - // Type of the resource, stripped of the trailing 's' to make it - // singular. - rType := strings.TrimSuffix(p[0].Key(), "s") - - rName := p[1].Key() - return v, fmt.Errorf("%s %s is not defined", rType, rName) + if v.Kind() != dyn.KindNil { + return v, nil } + + // Type of the resource, stripped of the trailing 's' to make it + // singular. + rType := strings.TrimSuffix(p[1].Key(), "s") + + // Name of the resource. Eg: "foo" in "jobs.foo". + rName := p[2].Key() + + diags = append(diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: fmt.Sprintf("%s %s is not defined", rType, rName), + Locations: v.Locations(), + Paths: []dyn.Path{slices.Clone(p)}, + }) + return v, nil }, ) - return diag.FromErr(err) + if err != nil { + diags = append(diags, diag.FromErr(err)...) + } + + return diags } diff --git a/bundle/libraries/local_path.go b/bundle/libraries/local_path.go index 5b5ec6c076..3e32adfde2 100644 --- a/bundle/libraries/local_path.go +++ b/bundle/libraries/local_path.go @@ -66,6 +66,11 @@ func IsLibraryLocal(dep string) bool { } func isPackage(name string) bool { + // If the dependency has ==, it's a package with version + if strings.Contains(name, "==") { + return true + } + // If the dependency has no extension, it's a PyPi package name return path.Ext(name) == "" } diff --git a/bundle/libraries/local_path_test.go b/bundle/libraries/local_path_test.go index be4028d522..7299cdc934 100644 --- a/bundle/libraries/local_path_test.go +++ b/bundle/libraries/local_path_test.go @@ -54,6 +54,7 @@ func TestIsLibraryLocal(t *testing.T) { {path: "-r /Workspace/my_project/requirements.txt", expected: false}, {path: "s3://mybucket/path/to/package", expected: false}, {path: "dbfs:/mnt/path/to/package", expected: false}, + {path: "beautifulsoup4==4.12.3", expected: false}, } for i, tc := range testCases { diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 25591a0a8e..8039a4f131 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -58,6 +58,7 @@ func Initialize() bundle.Mutator { mutator.SetRunAs(), mutator.OverrideCompute(), mutator.ProcessTargetMode(), + mutator.ApplyPresets(), mutator.DefaultQueueing(), mutator.ExpandPipelineGlobPaths(), diff --git a/bundle/tests/environments_job_and_pipeline_test.go b/bundle/tests/environments_job_and_pipeline_test.go index 0abeb487c6..218d2e4709 100644 --- a/bundle/tests/environments_job_and_pipeline_test.go +++ b/bundle/tests/environments_job_and_pipeline_test.go @@ -1,7 +1,6 @@ package config_tests import ( - "path/filepath" "testing" "github.com/databricks/cli/bundle/config" @@ -15,8 +14,6 @@ func TestJobAndPipelineDevelopmentWithEnvironment(t *testing.T) { assert.Len(t, b.Config.Resources.Pipelines, 1) p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"] - l := b.Config.GetLocation("resources.pipelines.nyc_taxi_pipeline") - assert.Equal(t, "environments_job_and_pipeline/databricks.yml", filepath.ToSlash(l.File)) assert.Equal(t, b.Config.Bundle.Mode, config.Development) assert.True(t, p.Development) require.Len(t, p.Libraries, 1) @@ -30,8 +27,6 @@ func TestJobAndPipelineStagingWithEnvironment(t *testing.T) { assert.Len(t, b.Config.Resources.Pipelines, 1) p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"] - l := b.Config.GetLocation("resources.pipelines.nyc_taxi_pipeline") - assert.Equal(t, "environments_job_and_pipeline/databricks.yml", filepath.ToSlash(l.File)) assert.False(t, p.Development) require.Len(t, p.Libraries, 1) assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) @@ -44,16 +39,12 @@ func TestJobAndPipelineProductionWithEnvironment(t *testing.T) { assert.Len(t, b.Config.Resources.Pipelines, 1) p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"] - pl := b.Config.GetLocation("resources.pipelines.nyc_taxi_pipeline") - assert.Equal(t, "environments_job_and_pipeline/databricks.yml", filepath.ToSlash(pl.File)) assert.False(t, p.Development) require.Len(t, p.Libraries, 1) assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) assert.Equal(t, "nyc_taxi_production", p.Target) j := b.Config.Resources.Jobs["pipeline_schedule"] - jl := b.Config.GetLocation("resources.jobs.pipeline_schedule") - assert.Equal(t, "environments_job_and_pipeline/databricks.yml", filepath.ToSlash(jl.File)) assert.Equal(t, "Daily refresh of production pipeline", j.Name) require.Len(t, j.Tasks, 1) assert.NotEmpty(t, j.Tasks[0].PipelineTask.PipelineId) diff --git a/bundle/tests/presets/databricks.yml b/bundle/tests/presets/databricks.yml new file mode 100644 index 0000000000..d83d318015 --- /dev/null +++ b/bundle/tests/presets/databricks.yml @@ -0,0 +1,22 @@ +bundle: + name: presets + +presets: + tags: + prod: true + team: finance + pipelines_development: true + +targets: + dev: + presets: + name_prefix: "myprefix" + pipelines_development: true + trigger_pause_status: PAUSED + jobs_max_concurrent_runs: 10 + tags: + dev: true + prod: false + prod: + presets: + pipelines_development: false diff --git a/bundle/tests/presets_test.go b/bundle/tests/presets_test.go new file mode 100644 index 0000000000..5fcb5d95b5 --- /dev/null +++ b/bundle/tests/presets_test.go @@ -0,0 +1,28 @@ +package config_tests + +import ( + "testing" + + "github.com/databricks/cli/bundle/config" + "github.com/stretchr/testify/assert" +) + +func TestPresetsDev(t *testing.T) { + b := loadTarget(t, "./presets", "dev") + + assert.Equal(t, "myprefix", b.Config.Presets.NamePrefix) + assert.Equal(t, config.Paused, b.Config.Presets.TriggerPauseStatus) + assert.Equal(t, 10, b.Config.Presets.JobsMaxConcurrentRuns) + assert.Equal(t, true, *b.Config.Presets.PipelinesDevelopment) + assert.Equal(t, "true", b.Config.Presets.Tags["dev"]) + assert.Equal(t, "finance", b.Config.Presets.Tags["team"]) + assert.Equal(t, "false", b.Config.Presets.Tags["prod"]) +} + +func TestPresetsProd(t *testing.T) { + b := loadTarget(t, "./presets", "prod") + + assert.Equal(t, false, *b.Config.Presets.PipelinesDevelopment) + assert.Equal(t, "finance", b.Config.Presets.Tags["team"]) + assert.Equal(t, "true", b.Config.Presets.Tags["prod"]) +} diff --git a/bundle/tests/undefined_job/databricks.yml b/bundle/tests/undefined_job/databricks.yml deleted file mode 100644 index 12c19f946f..0000000000 --- a/bundle/tests/undefined_job/databricks.yml +++ /dev/null @@ -1,8 +0,0 @@ -bundle: - name: undefined-job - -resources: - jobs: - undefined: - test: - name: "Test Job" diff --git a/bundle/tests/undefined_job_test.go b/bundle/tests/undefined_job_test.go deleted file mode 100644 index 4596f20695..0000000000 --- a/bundle/tests/undefined_job_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package config_tests - -import ( - "context" - "testing" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config/validate" - "github.com/stretchr/testify/assert" -) - -func TestUndefinedJobLoadsWithError(t *testing.T) { - b := load(t, "./undefined_job") - diags := bundle.Apply(context.Background(), b, validate.AllResourcesHaveValues()) - assert.ErrorContains(t, diags.Error(), "job undefined is not defined") -} - -func TestUndefinedPipelineLoadsWithError(t *testing.T) { - b := load(t, "./undefined_pipeline") - diags := bundle.Apply(context.Background(), b, validate.AllResourcesHaveValues()) - assert.ErrorContains(t, diags.Error(), "pipeline undefined is not defined") -} diff --git a/bundle/tests/undefined_pipeline/databricks.yml b/bundle/tests/undefined_pipeline/databricks.yml deleted file mode 100644 index a52fda38c4..0000000000 --- a/bundle/tests/undefined_pipeline/databricks.yml +++ /dev/null @@ -1,8 +0,0 @@ -bundle: - name: undefined-pipeline - -resources: - pipelines: - undefined: - test: - name: "Test Pipeline" diff --git a/bundle/tests/undefined_resources/databricks.yml b/bundle/tests/undefined_resources/databricks.yml new file mode 100644 index 0000000000..ffc0e46da9 --- /dev/null +++ b/bundle/tests/undefined_resources/databricks.yml @@ -0,0 +1,14 @@ +bundle: + name: undefined-job + +resources: + jobs: + undefined-job: + test: + name: "Test Job" + + experiments: + undefined-experiment: + + pipelines: + undefined-pipeline: diff --git a/bundle/tests/undefined_resources_test.go b/bundle/tests/undefined_resources_test.go new file mode 100644 index 0000000000..3dbacbc254 --- /dev/null +++ b/bundle/tests/undefined_resources_test.go @@ -0,0 +1,50 @@ +package config_tests + +import ( + "context" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/validate" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/stretchr/testify/assert" +) + +func TestUndefinedResourcesLoadWithError(t *testing.T) { + b := load(t, "./undefined_resources") + diags := bundle.Apply(context.Background(), b, validate.AllResourcesHaveValues()) + + assert.Len(t, diags, 3) + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: "job undefined-job is not defined", + Locations: []dyn.Location{{ + File: filepath.FromSlash("undefined_resources/databricks.yml"), + Line: 6, + Column: 19, + }}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.undefined-job")}, + }) + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: "experiment undefined-experiment is not defined", + Locations: []dyn.Location{{ + File: filepath.FromSlash("undefined_resources/databricks.yml"), + Line: 11, + Column: 26, + }}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.experiments.undefined-experiment")}, + }) + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: "pipeline undefined-pipeline is not defined", + Locations: []dyn.Location{{ + File: filepath.FromSlash("undefined_resources/databricks.yml"), + Line: 14, + Column: 24, + }}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.undefined-pipeline")}, + }) +} diff --git a/libs/template/templates/default-sql/databricks_template_schema.json b/libs/template/templates/default-sql/databricks_template_schema.json index 329f919629..113cbef642 100644 --- a/libs/template/templates/default-sql/databricks_template_schema.json +++ b/libs/template/templates/default-sql/databricks_template_schema.json @@ -13,7 +13,7 @@ "type": "string", "pattern": "^/sql/.\\../warehouses/[a-z0-9]+$", "pattern_match_failure_message": "Path must be of the form /sql/1.0/warehouses/", - "description": "\nPlease provide the HTTP Path of the SQL warehouse you would like to use with dbt during development.\nYou can find this path by clicking on \"Connection details\" for your SQL warehouse.\nhttp_path [example: /sql/1.0/warehouses/abcdef1234567890]", + "description": "\nPlease provide the HTTP Path of the SQL warehouse you would like to use during development.\nYou can find this path by clicking on \"Connection details\" for your SQL warehouse.\nhttp_path [example: /sql/1.0/warehouses/abcdef1234567890]", "order": 2 }, "default_catalog": {