Skip to content

Commit

Permalink
[Fix] Remove single-node validation from jobs clusters (#4216)
Browse files Browse the repository at this point in the history
## Changes
Fixes databricks/cli#1896. Introducing this
validation has caused a regression for both DABs and TF customers. This
PR removes the validation for job clusters.

## Tests
Unit tests.
  • Loading branch information
shreyas-goenka authored Nov 13, 2024
1 parent e3b2561 commit 8f68baa
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 125 deletions.
16 changes: 0 additions & 16 deletions clusters/clusters_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,6 @@ type Cluster struct {
ClusterMounts []MountInfo `json:"cluster_mount_infos,omitempty" tf:"alias:cluster_mount_info"`
}

// TODO: Remove this once all the resources using clusters are migrated to Go SDK.
// They would then be using Validate(cluster compute.CreateCluster) defined in resource_cluster.go that is a duplicate of this method but uses Go SDK.
func (cluster Cluster) Validate() error {
// TODO: rewrite with CustomizeDiff
if cluster.NumWorkers > 0 || cluster.Autoscale != nil {
return nil
}
profile := cluster.SparkConf["spark.databricks.cluster.profile"]
master := cluster.SparkConf["spark.master"]
resourceClass := cluster.CustomTags["ResourceClass"]
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" {
return nil
}
return errors.New(numWorkerErr)
}

// TODO: Remove this once all the resources using clusters are migrated to Go SDK.
// They would then be using ModifyRequestOnInstancePool(cluster *compute.CreateCluster) defined in resource_cluster.go that is a duplicate of this method but uses Go SDK.
// ModifyRequestOnInstancePool helps remove all request fields that should not be submitted when instance pool is selected.
Expand Down
2 changes: 0 additions & 2 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ func ZoneDiffSuppress(k, old, new string, d *schema.ResourceData) bool {
return false
}

// This method is a duplicate of Validate() in clusters/clusters_api.go that uses Go SDK.
// Long term, Validate() in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK.
func Validate(cluster any) error {
var profile, master, resourceClass string
switch c := cluster.(type) {
Expand Down
18 changes: 7 additions & 11 deletions jobs/jobs_api_go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,8 @@ func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) erro
return StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate), w, ctx)
}

func updateAndValidateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema.ResourceData) error {
err := clusters.Validate(*clusterSpec)
if err != nil {
return err
}
err = clusters.ModifyRequestOnInstancePool(clusterSpec)
func updateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema.ResourceData) error {
err := clusters.ModifyRequestOnInstancePool(clusterSpec)
if err != nil {
return err
}
Expand All @@ -178,21 +174,21 @@ func updateAndValidateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema

func prepareJobSettingsForUpdateGoSdk(d *schema.ResourceData, js *JobSettingsResource) error {
if js.NewCluster != nil {
err := updateAndValidateJobClusterSpec(js.NewCluster, d)
err := updateJobClusterSpec(js.NewCluster, d)
if err != nil {
return err
}
}
for _, task := range js.Tasks {
if task.NewCluster != nil {
err := updateAndValidateJobClusterSpec(task.NewCluster, d)
err := updateJobClusterSpec(task.NewCluster, d)
if err != nil {
return err
}
}
}
for i := range js.JobClusters {
err := updateAndValidateJobClusterSpec(&js.JobClusters[i].NewCluster, d)
err := updateJobClusterSpec(&js.JobClusters[i].NewCluster, d)
if err != nil {
return err
}
Expand All @@ -205,14 +201,14 @@ func prepareJobSettingsForCreateGoSdk(d *schema.ResourceData, jc *JobCreateStruc
// Before the go-sdk migration, the field `num_workers` was required, so we always sent it.
for _, task := range jc.Tasks {
if task.NewCluster != nil {
err := updateAndValidateJobClusterSpec(task.NewCluster, d)
err := updateJobClusterSpec(task.NewCluster, d)
if err != nil {
return err
}
}
}
for i := range jc.JobClusters {
err := updateAndValidateJobClusterSpec(&jc.JobClusters[i].NewCluster, d)
err := updateJobClusterSpec(&jc.JobClusters[i].NewCluster, d)
if err != nil {
return err
}
Expand Down
13 changes: 0 additions & 13 deletions jobs/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,19 +1068,6 @@ func ResourceJob() common.Resource {
return fmt.Errorf("`control_run_state` must be specified only with `max_concurrent_runs = 1`")
}
}
for _, task := range js.Tasks {
if task.NewCluster == nil {
continue
}
if err := clusters.Validate(*task.NewCluster); err != nil {
return fmt.Errorf("task %s invalid: %w", task.TaskKey, err)
}
}
if js.NewCluster != nil {
if err := clusters.Validate(*js.NewCluster); err != nil {
return fmt.Errorf("invalid job cluster: %w", err)
}
}
return nil
},
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
Expand Down
104 changes: 21 additions & 83 deletions jobs/resource_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,14 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
NotebookPath: "/Stuff",
},
},
{
TaskKey: "c",
NewCluster: &clusters.Cluster{
SparkVersion: "d",
NodeTypeID: "e",
NumWorkers: 0,
},
},
},
MaxConcurrentRuns: 1,
JobClusters: []JobCluster{
Expand All @@ -839,7 +847,7 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
NewCluster: &clusters.Cluster{
SparkVersion: "x",
NodeTypeID: "y",
NumWorkers: 9,
NumWorkers: 0,
},
},
},
Expand Down Expand Up @@ -883,7 +891,7 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
job_cluster {
job_cluster_key = "k"
new_cluster {
num_workers = 9
num_workers = 0
spark_version = "x"
node_type_id = "y"
}
Expand All @@ -910,7 +918,17 @@ func TestResourceJobCreate_JobClusters(t *testing.T) {
notebook_task {
notebook_path = "/Stuff"
}
}`,
}
task {
task_key = "c"
new_cluster {
spark_version = "d"
node_type_id = "e"
num_workers = 0
}
}
`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "17", d.Id())
Expand Down Expand Up @@ -2031,48 +2049,6 @@ func TestResourceJobCreateFromGitSourceWithoutProviderFail(t *testing.T) {
}.ExpectError(t, "git source is not empty but Git Provider is not specified and cannot be guessed by url &{GitBranch: GitCommit: GitProvider: GitSnapshot:<nil> GitTag:0.4.8 GitUrl:https://custom.git.hosting.com/databricks/terraform-provider-databricks JobSource:<nil> ForceSendFields:[]}")
}

func TestResourceJobCreateSingleNode_Fail(t *testing.T) {
_, err := qa.ResourceFixture{
Create: true,
Resource: ResourceJob(),
HCL: `new_cluster {
num_workers = 0
spark_version = "7.3.x-scala2.12"
node_type_id = "Standard_DS3_v2"
}
max_concurrent_runs = 1
max_retries = 3
min_retry_interval_millis = 5000
name = "Featurizer"
retry_on_timeout = true
spark_jar_task {
main_class_name = "com.labs.BarMain"
}
library {
jar = "dbfs://aa/bb/cc.jar"
}
library {
jar = "dbfs://ff/gg/hh.jar"
}`,
}.Apply(t)
assert.ErrorContains(t, err, `num_workers may be 0 only for single-node clusters. To create a single node
cluster please include the following configuration in your cluster configuration:
spark_conf = {
"spark.databricks.cluster.profile" : "singleNode"
"spark.master" : "local[*]"
}
custom_tags = {
"ResourceClass" = "SingleNode"
}
Please note that the Databricks Terraform provider cannot detect if the above configuration
is defined in a policy used by the cluster. Please define this in the cluster configuration
itself to create a single node cluster.`)
}

func TestResourceJobRead(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down Expand Up @@ -2938,44 +2914,6 @@ func TestResourceJobDelete(t *testing.T) {
assert.Equal(t, "789", d.Id())
}

func TestResourceJobUpdate_FailNumWorkersZero(t *testing.T) {
_, err := qa.ResourceFixture{
ID: "789",
Update: true,
Resource: ResourceJob(),
HCL: `new_cluster {
num_workers = 0
spark_version = "7.3.x-scala2.12"
node_type_id = "Standard_DS3_v2"
}
max_concurrent_runs = 1
max_retries = 3
min_retry_interval_millis = 5000
name = "Featurizer New"
retry_on_timeout = true
spark_jar_task {
main_class_name = "com.labs.BarMain"
parameters = ["--cleanup", "full"]
}`,
}.Apply(t)
assert.ErrorContains(t, err, `num_workers may be 0 only for single-node clusters. To create a single node
cluster please include the following configuration in your cluster configuration:
spark_conf = {
"spark.databricks.cluster.profile" : "singleNode"
"spark.master" : "local[*]"
}
custom_tags = {
"ResourceClass" = "SingleNode"
}
Please note that the Databricks Terraform provider cannot detect if the above configuration
is defined in a policy used by the cluster. Please define this in the cluster configuration
itself to create a single node cluster.`)
}

func TestJobsAPIList(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Expand Down

0 comments on commit 8f68baa

Please sign in to comment.