From df0a98066acb8346b5902eb6a80224160b45feff Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 18 Nov 2024 15:51:58 +0100 Subject: [PATCH 1/6] Add validation for single node clusters --- bundle/config/validate/single_node_cluster.go | 174 +++++++ .../validate/single_node_cluster_test.go | 470 ++++++++++++++++++ bundle/config/validate/validate.go | 1 + 3 files changed, 645 insertions(+) create mode 100644 bundle/config/validate/single_node_cluster.go create mode 100644 bundle/config/validate/single_node_cluster_test.go diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go new file mode 100644 index 0000000000..f5b8befaca --- /dev/null +++ b/bundle/config/validate/single_node_cluster.go @@ -0,0 +1,174 @@ +package validate + +import ( + "context" + "fmt" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +// Validates that any single node clusters defined in the bundle are correctly configured. +func SingleNodeCluster() bundle.ReadOnlyMutator { + return &singleNodeCluster{} +} + +type singleNodeCluster struct{} + +func (m *singleNodeCluster) Name() string { + return "validate:SingleNodeCluster" +} + +const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + ` + +const singleNodeWarningSummary = `Single node cluster is not correctly configured` + +func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { + if spec == nil { + return nil + } + + if spec.NumWorkers > 0 || spec.Autoscale != nil { + return nil + } + + if spec.PolicyId != "" { + return nil + } + + invalidSingleNodeWarning := &diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: l, + Paths: []dyn.Path{p}, + } + profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + if !ok { + return invalidSingleNodeWarning + } + master, ok := spec.SparkConf["spark.master"] + if !ok { + return invalidSingleNodeWarning + } + resourceClass, ok := spec.CustomTags["ResourceClass"] + if !ok { + return invalidSingleNodeWarning + } + + if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { + return nil + } + + return invalidSingleNodeWarning +} + +func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic { + if spec.NumWorkers > 0 || spec.Autoscale != nil { + return nil + } + + if spec.PolicyId != "" { + return nil + } + + invalidSingleNodeWarning := &diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: l, + Paths: []dyn.Path{p}, + } + profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + if !ok { + return invalidSingleNodeWarning + } + master, ok := spec.SparkConf["spark.master"] + if !ok { + return invalidSingleNodeWarning + } + resourceClass, ok := spec.CustomTags["ResourceClass"] + if !ok { + return invalidSingleNodeWarning + } + + if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { + return nil + } + + return invalidSingleNodeWarning +} + +func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + // Interactive clusters + for k, r := range rb.Config().Resources.Clusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) + l := rb.Config().GetLocations("resources.clusters." + k) + + d := validateSingleNodeCluster(r.ClusterSpec, l, p) + if d != nil { + diags = append(diags, *d) + } + } + + // Job clusters + for jobK, jobV := range rb.Config().Resources.Jobs { + for i, clusterV := range jobV.JobSettings.JobClusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) + l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) + + d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + // Job task clusters + for jobK, jobV := range rb.Config().Resources.Jobs { + for i, taskV := range jobV.JobSettings.Tasks { + if taskV.NewCluster == nil { + continue + } + + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster")) + l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i)) + + d := validateSingleNodeCluster(taskV.NewCluster, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + // Pipeline clusters + for pipelineK, pipelineV := range rb.Config().Resources.Pipelines { + for i, clusterV := range pipelineV.PipelineSpec.Clusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i)) + l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i)) + + d := validateSingleNodePipelineCluster(clusterV, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + return diags +} diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go new file mode 100644 index 0000000000..d9fea6cfd1 --- /dev/null +++ b/bundle/config/validate/single_node_cluster_test.go @@ -0,0 +1,470 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func TestValidateSingleNodeClusterFail(t *testing.T) { + failCases := []struct { + name string + spec *compute.ClusterSpec + }{ + { + name: "no tags or conf", + spec: &compute.ClusterSpec{ + ClusterName: "foo", + }, + }, + { + name: "no tags", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + }, + { + name: "no conf", + spec: &compute.ClusterSpec{ + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark cluster profile", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark.master", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid tags", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "invalid", + }, + }, + }, + } + + ctx := context.Background() + + // Test interactive clusters. + for _, tc := range failCases { + t.Run("interactive_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: tc.spec, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))}, + }, + }, diags) + }) + } + + // Test new job clusters. + for _, tc := range failCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: *tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, + }, + }, diags) + + }) + } + + // Test job task clusters. + for _, tc := range failCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterPass(t *testing.T) { + passCases := []struct { + name string + spec *compute.ClusterSpec + }{ + { + name: "single node cluster", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "num workers is not zero", + spec: &compute.ClusterSpec{ + NumWorkers: 1, + }, + }, + { + name: "autoscale is not nil", + spec: &compute.ClusterSpec{ + Autoscale: &compute.AutoScale{ + MinWorkers: 1, + }, + }, + }, + { + name: "policy id is not empty", + spec: &compute.ClusterSpec{ + PolicyId: "policy-abc", + }, + }, + } + + ctx := context.Background() + + // Test interactive clusters. + for _, tc := range passCases { + t.Run("interactive_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: tc.spec, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Test new job clusters. + for _, tc := range passCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: *tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Test job task clusters. + for _, tc := range passCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodePipelineClustersFail(t *testing.T) { + failCases := []struct { + name string + spec pipelines.PipelineCluster + }{ + { + name: "no tags or conf", + spec: pipelines.PipelineCluster{ + DriverInstancePoolId: "abcd", + }, + }, + { + name: "no tags", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + }, + { + name: "no conf", + spec: pipelines.PipelineCluster{ + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark cluster profile", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark.master", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid tags", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "invalid", + }, + }, + }, + } + + ctx := context.Background() + + for _, tc := range failCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + tc.spec, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodePipelineClustersPass(t *testing.T) { + passCases := []struct { + name string + spec pipelines.PipelineCluster + }{ + { + name: "single node cluster", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "num workers is not zero", + spec: pipelines.PipelineCluster{ + NumWorkers: 1, + }, + }, + { + name: "autoscale is not nil", + spec: pipelines.PipelineCluster{ + Autoscale: &pipelines.PipelineClusterAutoscale{ + MaxWorkers: 3, + }, + }, + }, + { + name: "policy id is not empty", + spec: pipelines.PipelineCluster{ + PolicyId: "policy-abc", + }, + }, + } + + ctx := context.Background() + + for _, tc := range passCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + tc.spec, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 440477e654..eb4c3c3cd2 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics ValidateSyncPatterns(), JobTaskClusterSpec(), ValidateFolderPermissions(), + SingleNodeCluster(), )) } From 96a0a3ec272a19f82f53bd99f637c88151f54627 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 19 Nov 2024 23:06:43 +0100 Subject: [PATCH 2/6] address comments --- bundle/config/validate/single_node_cluster.go | 177 +++----- .../validate/single_node_cluster_test.go | 418 ++++++++---------- 2 files changed, 263 insertions(+), 332 deletions(-) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index f5b8befaca..e1104f1164 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -2,14 +2,13 @@ package validate import ( "context" - "fmt" "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" ) // Validates that any single node clusters defined in the bundle are correctly configured. @@ -37,138 +36,100 @@ are correctly set in the cluster specification: const singleNodeWarningSummary = `Single node cluster is not correctly configured` -func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { - if spec == nil { - return nil +func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { + // Check if the user has explicitly set the num_workers to 0. Skip the warning + // if that's not the case. + numWorkers, ok := v.Get("num_workers").AsInt() + if !ok || numWorkers > 0 { + return false } - if spec.NumWorkers > 0 || spec.Autoscale != nil { - return nil + // Convenient type that contains the common fields from compute.ClusterSpec and + // pipelines.PipelineCluster that we are interested in. + type ClusterConf struct { + SparkConf map[string]string `json:"spark_conf"` + CustomTags map[string]string `json:"custom_tags"` + PolicyId string `json:"policy_id"` } - if spec.PolicyId != "" { - return nil + conf := &ClusterConf{} + err := convert.ToTyped(conf, v) + if err != nil { + return false } - invalidSingleNodeWarning := &diag.Diagnostic{ - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: l, - Paths: []dyn.Path{p}, - } - profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] - if !ok { - return invalidSingleNodeWarning - } - master, ok := spec.SparkConf["spark.master"] - if !ok { - return invalidSingleNodeWarning - } - resourceClass, ok := spec.CustomTags["ResourceClass"] - if !ok { - return invalidSingleNodeWarning - } - - if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { - return nil + // If the policy id is set, we don't want to show the warning. This is because + // the user might have configured `spark_conf` and `custom_tags` correctly + // in their cluster policy. + if conf.PolicyId != "" { + return false } - return invalidSingleNodeWarning -} - -func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic { - if spec.NumWorkers > 0 || spec.Autoscale != nil { - return nil + profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] + if !ok { + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") + return true } - - if spec.PolicyId != "" { - return nil + if profile != "singleNode" { + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec") + return true } - invalidSingleNodeWarning := &diag.Diagnostic{ - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: l, - Paths: []dyn.Path{p}, - } - profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + master, ok := conf.SparkConf["spark.master"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec") + return true } - master, ok := spec.SparkConf["spark.master"] - if !ok { - return invalidSingleNodeWarning + if !strings.HasPrefix(master, "local") { + log.Warnf(ctx, "spark_conf spark.master is not local in single-node cluster spec") + return true } - resourceClass, ok := spec.CustomTags["ResourceClass"] + + resourceClass, ok := conf.CustomTags["ResourceClass"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") + return true } - - if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { - return nil + if resourceClass != "SingleNode" { + log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec") + return true } - return invalidSingleNodeWarning + return false } func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { diags := diag.Diagnostics{} - // Interactive clusters - for k, r := range rb.Config().Resources.Clusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) - l := rb.Config().GetLocations("resources.clusters." + k) - - d := validateSingleNodeCluster(r.ClusterSpec, l, p) - if d != nil { - diags = append(diags, *d) - } - } - - // Job clusters - for jobK, jobV := range rb.Config().Resources.Jobs { - for i, clusterV := range jobV.JobSettings.JobClusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) - l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) - - d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) - if d != nil { - diags = append(diags, *d) - } - } - } - - // Job task clusters - for jobK, jobV := range rb.Config().Resources.Jobs { - for i, taskV := range jobV.JobSettings.Tasks { - if taskV.NewCluster == nil { - continue - } - - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster")) - l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i)) - - d := validateSingleNodeCluster(taskV.NewCluster, l, p) - if d != nil { - diags = append(diags, *d) + patterns := []dyn.Pattern{ + // Interactive clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()), + // Job clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Pipeline clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), + } + + for _, p := range patterns { + _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: v.Locations(), + Paths: []dyn.Path{p}, } - } - } - // Pipeline clusters - for pipelineK, pipelineV := range rb.Config().Resources.Pipelines { - for i, clusterV := range pipelineV.PipelineSpec.Clusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i)) - l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i)) - - d := validateSingleNodePipelineCluster(clusterV, l, p) - if d != nil { - diags = append(diags, *d) + if showSingleNodeClusterWarning(ctx, v) { + diags = append(diags, warning) } + return v, nil + }) + if err != nil { + log.Debugf(ctx, "Error while applying single node cluster validation: %s", err) } } - return diags } diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index d9fea6cfd1..9f76c968d7 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -18,73 +18,75 @@ import ( func TestValidateSingleNodeClusterFail(t *testing.T) { failCases := []struct { - name string - spec *compute.ClusterSpec + name string + sparkConf map[string]string + customTags map[string]string }{ { name: "no tags or conf", - spec: &compute.ClusterSpec{ - ClusterName: "foo", - }, }, { name: "no tags", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, }, { - name: "no conf", - spec: &compute.ClusterSpec{ - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, + name: "no conf", + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark cluster profile", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "invalid", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark.master", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "invalid", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid tags", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "invalid", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, + customTags: map[string]string{"ResourceClass": "invalid"}, + }, + { + name: "missing ResourceClass tag", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"what": "ever"}, + }, + { + name: "missing spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "missing spark.databricks.cluster.profile", + sparkConf: map[string]string{ + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, } ctx := context.Background() - // Test interactive clusters. + // Interactive clusters. for _, tc := range failCases { t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -92,7 +94,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { Resources: config.Resources{ Clusters: map[string]*resources.Cluster{ "foo": { - ClusterSpec: tc.spec, + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -101,6 +106,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { @@ -114,7 +124,7 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }) } - // Test new job clusters. + // Job clusters. for _, tc := range failCases { t.Run("job_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -125,7 +135,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { JobSettings: &jobs.JobSettings{ JobClusters: []jobs.JobCluster{ { - NewCluster: *tc.spec, + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -135,7 +149,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, } - bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ @@ -144,14 +164,14 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, - Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")}, }, }, diags) }) } - // Test job task clusters. + // Job task clusters. for _, tc := range failCases { t.Run("task_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -162,7 +182,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { - NewCluster: tc.spec, + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -172,7 +196,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, } - bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ @@ -186,50 +216,91 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, diags) }) } + + // Pipeline clusters. + for _, tc := range failCases { + t.Run("pipeline_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, + }, + }, diags) + }) + } + } func TestValidateSingleNodeClusterPass(t *testing.T) { + zero := 0 + one := 1 + passCases := []struct { - name string - spec *compute.ClusterSpec + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string }{ { name: "single node cluster", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{ + "ResourceClass": "SingleNode", }, + numWorkers: &zero, }, { - name: "num workers is not zero", - spec: &compute.ClusterSpec{ - NumWorkers: 1, - }, + name: "num workers is not zero", + numWorkers: &one, }, { - name: "autoscale is not nil", - spec: &compute.ClusterSpec{ - Autoscale: &compute.AutoScale{ - MinWorkers: 1, - }, - }, + name: "num workers is not set", }, { - name: "policy id is not empty", - spec: &compute.ClusterSpec{ - PolicyId: "policy-abc", - }, + name: "policy id is not empty", + policyId: "policy-abc", + numWorkers: &zero, }, } ctx := context.Background() - // Test interactive clusters. + // Interactive clusters. for _, tc := range passCases { t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -237,19 +308,29 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { Resources: config.Resources{ Clusters: map[string]*resources.Cluster{ "foo": { - ClusterSpec: tc.spec, + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, }, }, }, }, } + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers)) + }) + } + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } - // Test new job clusters. + // Job clusters. for _, tc := range passCases { t.Run("job_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -260,7 +341,12 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { JobSettings: &jobs.JobSettings{ JobClusters: []jobs.JobCluster{ { - NewCluster: *tc.spec, + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, }, }, }, @@ -270,12 +356,18 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { }, } + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } - // Test job task clusters. + // Job task clusters. for _, tc := range passCases { t.Run("task_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -286,7 +378,12 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { - NewCluster: tc.spec, + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, }, }, }, @@ -296,157 +393,20 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { }, } - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } -} - -func TestValidateSingleNodePipelineClustersFail(t *testing.T) { - failCases := []struct { - name string - spec pipelines.PipelineCluster - }{ - { - name: "no tags or conf", - spec: pipelines.PipelineCluster{ - DriverInstancePoolId: "abcd", - }, - }, - { - name: "no tags", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - }, - }, - { - name: "no conf", - spec: pipelines.PipelineCluster{ - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid spark cluster profile", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "invalid", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid spark.master", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "invalid", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid tags", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "invalid", - }, - }, - }, - } - - ctx := context.Background() - - for _, tc := range failCases { - t.Run(tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Pipelines: map[string]*resources.Pipeline{ - "foo": { - PipelineSpec: &pipelines.PipelineSpec{ - Clusters: []pipelines.PipelineCluster{ - tc.spec, - }, - }, - }, - }, - }, - }, + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) } - bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Equal(t, diag.Diagnostics{ - { - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, - Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, - }, - }, diags) + assert.Empty(t, diags) }) } -} - -func TestValidateSingleNodePipelineClustersPass(t *testing.T) { - passCases := []struct { - name string - spec pipelines.PipelineCluster - }{ - { - name: "single node cluster", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "num workers is not zero", - spec: pipelines.PipelineCluster{ - NumWorkers: 1, - }, - }, - { - name: "autoscale is not nil", - spec: pipelines.PipelineCluster{ - Autoscale: &pipelines.PipelineClusterAutoscale{ - MaxWorkers: 3, - }, - }, - }, - { - name: "policy id is not empty", - spec: pipelines.PipelineCluster{ - PolicyId: "policy-abc", - }, - }, - } - - ctx := context.Background() + // Pipeline clusters. for _, tc := range passCases { - t.Run(tc.name, func(t *testing.T) { + t.Run("pipeline_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -454,7 +414,11 @@ func TestValidateSingleNodePipelineClustersPass(t *testing.T) { "foo": { PipelineSpec: &pipelines.PipelineSpec{ Clusters: []pipelines.PipelineCluster{ - tc.spec, + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, }, }, }, @@ -463,6 +427,12 @@ func TestValidateSingleNodePipelineClustersPass(t *testing.T) { }, } + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers)) + }) + } + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) From 4233a7c2923c3acc5fc81e7cb6f0e45c4967dc2a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 19 Nov 2024 23:11:03 +0100 Subject: [PATCH 3/6] better warn --- bundle/config/validate/single_node_cluster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index e1104f1164..c4fc3a2314 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -71,7 +71,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return true } if profile != "singleNode" { - log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec") + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) return true } @@ -81,7 +81,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return true } if !strings.HasPrefix(master, "local") { - log.Warnf(ctx, "spark_conf spark.master is not local in single-node cluster spec") + log.Warnf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) return true } @@ -91,7 +91,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return true } if resourceClass != "SingleNode" { - log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec") + log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) return true } From 3ac1bb1853f125f706830ba99fb1d44be16a0ed5 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 22 Nov 2024 14:53:58 +0100 Subject: [PATCH 4/6] warn -> debug log --- bundle/config/validate/single_node_cluster.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index c4fc3a2314..8e0ad6da05 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -67,31 +67,31 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] if !ok { - log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") + log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") return true } if profile != "singleNode" { - log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) + log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) return true } master, ok := conf.SparkConf["spark.master"] if !ok { - log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec") + log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec") return true } if !strings.HasPrefix(master, "local") { - log.Warnf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) + log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) return true } resourceClass, ok := conf.CustomTags["ResourceClass"] if !ok { - log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") + log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") return true } if resourceClass != "SingleNode" { - log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) + log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) return true } From 8128cc390cceca32bfe859dfbc314ef4fe52d862 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 22 Nov 2024 15:10:12 +0100 Subject: [PATCH 5/6] separate test function --- .../validate/single_node_cluster_test.go | 71 +++++++++++++------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index 9f76c968d7..4fff3f8a8e 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -16,8 +16,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestValidateSingleNodeClusterFail(t *testing.T) { - failCases := []struct { +func failCases() []struct { + name string + sparkConf map[string]string + customTags map[string]string +} { + return []struct { name string sparkConf map[string]string customTags map[string]string @@ -83,11 +87,12 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { customTags: map[string]string{"ResourceClass": "SingleNode"}, }, } +} +func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { ctx := context.Background() - // Interactive clusters. - for _, tc := range failCases { + for _, tc := range failCases() { t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -123,9 +128,12 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, diags) }) } +} + +func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { + ctx := context.Background() - // Job clusters. - for _, tc := range failCases { + for _, tc := range failCases() { t.Run("job_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -170,9 +178,12 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }) } +} - // Job task clusters. - for _, tc := range failCases { +func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { t.Run("task_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -216,9 +227,12 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, diags) }) } +} - // Pipeline clusters. - for _, tc := range failCases { +func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { t.Run("pipeline_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -259,14 +273,19 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, diags) }) } - } -func TestValidateSingleNodeClusterPass(t *testing.T) { +func passCases() []struct { + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string +} { zero := 0 one := 1 - passCases := []struct { + return []struct { name string numWorkers *int sparkConf map[string]string @@ -297,11 +316,12 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { numWorkers: &zero, }, } +} +func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { ctx := context.Background() - // Interactive clusters. - for _, tc := range passCases { + for _, tc := range passCases() { t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -329,9 +349,12 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { assert.Empty(t, diags) }) } +} - // Job clusters. - for _, tc := range passCases { +func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { t.Run("job_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -366,9 +389,12 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { assert.Empty(t, diags) }) } +} - // Job task clusters. - for _, tc := range passCases { +func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { t.Run("task_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ @@ -403,9 +429,12 @@ func TestValidateSingleNodeClusterPass(t *testing.T) { assert.Empty(t, diags) }) } +} + +func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { + ctx := context.Background() - // Pipeline clusters. - for _, tc := range passCases { + for _, tc := range passCases() { t.Run("pipeline_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ From 382a4efd6efb98adbc7532a3286d81521c899843 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 22 Nov 2024 15:28:00 +0100 Subject: [PATCH 6/6] add validation for foreach task clusters as well --- bundle/config/validate/single_node_cluster.go | 2 + .../validate/single_node_cluster_test.go | 113 ++++++++++++++++-- 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index 8e0ad6da05..7c159f61ab 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -108,6 +108,8 @@ func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), // Job task clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job for each task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")), // Pipeline clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), } diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index 4fff3f8a8e..18771cc00d 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -93,7 +93,7 @@ func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("interactive_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -134,7 +134,7 @@ func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("job_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -184,7 +184,7 @@ func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("task_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -233,7 +233,7 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("pipeline_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -275,6 +275,59 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { } } +func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")}, + }, + }, diags) + }) + } +} + func passCases() []struct { name string numWorkers *int @@ -322,7 +375,7 @@ func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("interactive_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -355,7 +408,7 @@ func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("job_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -395,7 +448,7 @@ func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("task_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -435,7 +488,7 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("pipeline_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -467,3 +520,47 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { }) } } + +func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +}