From a27c24a397f91048677c38e48ee31213f278f640 Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Wed, 4 Sep 2024 16:41:47 +0530 Subject: [PATCH 1/2] Add prompt when a pipeline recreation happens (#1672) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Changes DLT pipeline recreations are destructive. They can lead to lost history of previous updates, outage of the tables temporarily and are potentially computationally expensive. Thus we make a breaking change where a prompt is shown to the user if there configuration changes will lead to a DLT recreation. Users can skip the prompt by specifying the `--auto-approve` flag. This PR also fixes an issue with our test runner where logs from the cmdio.Logger would not get propagated to the reader returned by our cobra test runner. ## Tests Manually, and new unit and integration tests. ``` ➜ bundle-playground-3 cli bundle deploy Uploading bundle files to /Users/63ec021d-b0c6-49c0-93a0-5123953a1cb2/.bundle/test/development/files... The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period until the newly recreated pipelines are run once successfully. History of previous pipeline update runs will be lost because of recreation: recreate pipeline foo Would you like to proceed? [y/n]: n Deployment cancelled! ``` --- bundle/phases/deploy.go | 92 +++++++++++++------ bundle/phases/deploy_test.go | 67 ++++++++++++++ .../databricks_template_schema.json | 8 ++ .../template/databricks.yml.tmpl | 25 +++++ .../bundles/recreate_pipeline/template/nb.sql | 2 + internal/bundle/deploy_test.go | 91 +++++++++++++++++- 6 files changed, 258 insertions(+), 27 deletions(-) create mode 100644 bundle/phases/deploy_test.go create mode 100644 internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json create mode 100644 internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl create mode 100644 internal/bundle/bundles/recreate_pipeline/template/nb.sql diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index ca967c321a..49544227ec 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -19,55 +19,95 @@ import ( "github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/libs/cmdio" terraformlib "github.com/databricks/cli/libs/terraform" + tfjson "github.com/hashicorp/terraform-json" ) -func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) { - tf := b.Terraform - if tf == nil { - return false, fmt.Errorf("terraform not initialized") - } - - // read plan file - plan, err := tf.ShowPlanFile(ctx, b.Plan.Path) - if err != nil { - return false, err - } - - actions := make([]terraformlib.Action, 0) - for _, rc := range plan.ResourceChanges { - // We only care about destructive actions on UC schema resources. - if rc.Type != "databricks_schema" { +func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action { + res := make([]terraformlib.Action, 0) + for _, rc := range changes { + if !toInclude(rc.Type, rc.Change.Actions) { continue } var actionType terraformlib.ActionType - switch { case rc.Change.Actions.Delete(): actionType = terraformlib.ActionTypeDelete case rc.Change.Actions.Replace(): actionType = terraformlib.ActionTypeRecreate default: - // We don't need a prompt for non-destructive actions like creating - // or updating a schema. + // No use case for other action types yet. continue } - actions = append(actions, terraformlib.Action{ + res = append(res, terraformlib.Action{ Action: actionType, ResourceType: rc.Type, ResourceName: rc.Name, }) } - // No restricted actions planned. No need for approval. - if len(actions) == 0 { + return res +} + +func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) { + tf := b.Terraform + if tf == nil { + return false, fmt.Errorf("terraform not initialized") + } + + // read plan file + plan, err := tf.ShowPlanFile(ctx, b.Plan.Path) + if err != nil { + return false, err + } + + schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { + // Filter in only UC schema resources. + if typ != "databricks_schema" { + return false + } + + // We only display prompts for destructive actions like deleting or + // recreating a schema. + return actions.Delete() || actions.Replace() + }) + + dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { + // Filter in only DLT pipeline resources. + if typ != "databricks_pipeline" { + return false + } + + // Recreating DLT pipeline leads to metadata loss and for a transient period + // the underling tables will be unavailable. + return actions.Replace() || actions.Delete() + }) + + // We don't need to display any prompts in this case. + if len(dltActions) == 0 && len(schemaActions) == 0 { return true, nil } - cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:") - for _, action := range actions { - cmdio.Log(ctx, action) + // One or more UC schema resources will be deleted or recreated. + if len(schemaActions) != 0 { + cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:") + for _, action := range schemaActions { + cmdio.Log(ctx, action) + } + } + + // One or more DLT pipelines is being recreated. + if len(dltActions) != 0 { + msg := ` +This action will result in the deletion or recreation of the following DLT Pipelines along with the +Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will +restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline +properties such as the 'catalog' or 'storage' are changed:` + cmdio.LogString(ctx, msg) + for _, action := range dltActions { + cmdio.Log(ctx, action) + } } if b.AutoApprove { @@ -126,7 +166,7 @@ func Deploy() bundle.Mutator { terraform.CheckRunningResource(), terraform.Plan(terraform.PlanGoal("deploy")), bundle.If( - approvalForUcSchemaDelete, + approvalForDeploy, deployCore, bundle.LogString("Deployment cancelled!"), ), diff --git a/bundle/phases/deploy_test.go b/bundle/phases/deploy_test.go new file mode 100644 index 0000000000..e00370b380 --- /dev/null +++ b/bundle/phases/deploy_test.go @@ -0,0 +1,67 @@ +package phases + +import ( + "testing" + + terraformlib "github.com/databricks/cli/libs/terraform" + tfjson "github.com/hashicorp/terraform-json" + "github.com/stretchr/testify/assert" +) + +func TestParseTerraformActions(t *testing.T) { + changes := []*tfjson.ResourceChange{ + { + Type: "databricks_pipeline", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionCreate}, + }, + Name: "create pipeline", + }, + { + Type: "databricks_pipeline", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionDelete}, + }, + Name: "delete pipeline", + }, + { + Type: "databricks_pipeline", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate}, + }, + Name: "recreate pipeline", + }, + { + Type: "databricks_whatever", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate}, + }, + Name: "recreate whatever", + }, + } + + res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool { + if typ != "databricks_pipeline" { + return false + } + + if actions.Delete() || actions.Replace() { + return true + } + + return false + }) + + assert.Equal(t, []terraformlib.Action{ + { + Action: terraformlib.ActionTypeDelete, + ResourceType: "databricks_pipeline", + ResourceName: "delete pipeline", + }, + { + Action: terraformlib.ActionTypeRecreate, + ResourceType: "databricks_pipeline", + ResourceName: "recreate pipeline", + }, + }, res) +} diff --git a/internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json b/internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json new file mode 100644 index 0000000000..762f4470c2 --- /dev/null +++ b/internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json @@ -0,0 +1,8 @@ +{ + "properties": { + "unique_id": { + "type": "string", + "description": "Unique ID for the schema and pipeline names" + } + } +} diff --git a/internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl b/internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl new file mode 100644 index 0000000000..10350f13e9 --- /dev/null +++ b/internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl @@ -0,0 +1,25 @@ +bundle: + name: "bundle-playground" + +variables: + catalog: + description: The catalog the DLT pipeline should use. + default: main + + +resources: + pipelines: + foo: + name: test-pipeline-{{.unique_id}} + libraries: + - notebook: + path: ./nb.sql + development: true + catalog: ${var.catalog} + +include: + - "*.yml" + +targets: + development: + default: true diff --git a/internal/bundle/bundles/recreate_pipeline/template/nb.sql b/internal/bundle/bundles/recreate_pipeline/template/nb.sql new file mode 100644 index 0000000000..199ff50788 --- /dev/null +++ b/internal/bundle/bundles/recreate_pipeline/template/nb.sql @@ -0,0 +1,2 @@ +-- Databricks notebook source +select 1 diff --git a/internal/bundle/deploy_test.go b/internal/bundle/deploy_test.go index 269b7c80a0..736c880dbc 100644 --- a/internal/bundle/deploy_test.go +++ b/internal/bundle/deploy_test.go @@ -120,8 +120,97 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) { t.Setenv("BUNDLE_ROOT", bundleRoot) t.Setenv("TERM", "dumb") c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock") - stdout, _, err := c.Run() + stdout, stderr, err := c.Run() + + assert.EqualError(t, err, root.ErrAlreadyPrinted.Error()) + assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar") + assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") +} + +func TestAccBundlePipelineDeleteWithoutAutoApprove(t *testing.T) { + ctx, wt := acc.WorkspaceTest(t) + w := wt.W + + nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV")) + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{ + "unique_id": uniqueId, + "node_type_id": nodeTypeId, + "spark_version": defaultSparkVersion, + }) + require.NoError(t, err) + + // deploy pipeline + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + // assert pipeline is created + pipelineName := "test-bundle-pipeline-" + uniqueId + pipeline, err := w.Pipelines.GetByName(ctx, pipelineName) + require.NoError(t, err) + assert.Equal(t, pipeline.Name, pipelineName) + + // assert job is created + jobName := "test-bundle-job-" + uniqueId + job, err := w.Jobs.GetBySettingsName(ctx, jobName) + require.NoError(t, err) + assert.Equal(t, job.Settings.Name, jobName) + + // delete resources.yml + err = os.Remove(filepath.Join(bundleRoot, "resources.yml")) + require.NoError(t, err) + + // Redeploy the bundle. Expect it to fail because deleting the pipeline requires --auto-approve. + t.Setenv("BUNDLE_ROOT", bundleRoot) + t.Setenv("TERM", "dumb") + c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock") + stdout, stderr, err := c.Run() + + assert.EqualError(t, err, root.ErrAlreadyPrinted.Error()) + assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the +Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will +restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline +properties such as the 'catalog' or 'storage' are changed: + delete pipeline bar`) + assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") + +} + +func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + uniqueId := uuid.New().String() + + bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{ + "unique_id": uniqueId, + }) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + t.Cleanup(func() { + destroyBundle(t, ctx, bundleRoot) + }) + + // Assert the pipeline is created + pipelineName := "test-pipeline-" + uniqueId + pipeline, err := w.Pipelines.GetByName(ctx, pipelineName) + require.NoError(t, err) + require.Equal(t, pipelineName, pipeline.Name) + + // Redeploy the bundle, pointing the DLT pipeline to a different UC catalog. + t.Setenv("BUNDLE_ROOT", bundleRoot) + t.Setenv("TERM", "dumb") + c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"") + stdout, stderr, err := c.Run() + assert.EqualError(t, err, root.ErrAlreadyPrinted.Error()) + assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the +Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will +restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline +properties such as the 'catalog' or 'storage' are changed: + recreate pipeline foo`) assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") } From ca6332a5a4325aff1be848536f45d13bd74d93b3 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Wed, 4 Sep 2024 13:24:55 +0200 Subject: [PATCH 2/2] Fixed complex variables are not being correctly merged from include files (#1746) ## Changes Fixes an `Error: no value assigned to required variable .` when the main complex variable definition is defined in one file but target override is defined in separate file which is included in the main one. ## Tests Added regression test --- bundle/config/root.go | 12 +++++++++++- bundle/tests/complex_variables_test.go | 19 +++++++++++++++++++ .../complex_multiple_files/databricks.yml | 17 +++++++++++++++++ .../variables/clusters.yml | 11 +++++++++++ 4 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 bundle/tests/variables/complex_multiple_files/databricks.yml create mode 100644 bundle/tests/variables/complex_multiple_files/variables/clusters.yml diff --git a/bundle/config/root.go b/bundle/config/root.go index 86dc33921d..281c5c2a37 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -433,10 +433,20 @@ func rewriteShorthands(v dyn.Value) (dyn.Value, error) { }, variable.Locations()), nil case dyn.KindMap, dyn.KindSequence: + lookup, err := dyn.Get(variable, "lookup") + // If lookup is set, we don't want to rewrite the variable and return it as is. + if err == nil && lookup.Kind() != dyn.KindInvalid { + return variable, nil + } + // Check if the original definition of variable has a type field. + // Type might not be found if the variable overriden in a separate file + // and configuration is not merged yet. typeV, err := dyn.GetByPath(v, p.Append(dyn.Key("type"))) if err != nil { - return variable, nil + return dyn.NewValue(map[string]dyn.Value{ + "default": variable, + }, variable.Locations()), nil } if typeV.MustString() == "complex" { diff --git a/bundle/tests/complex_variables_test.go b/bundle/tests/complex_variables_test.go index 1badea6dfb..d46d8d8c10 100644 --- a/bundle/tests/complex_variables_test.go +++ b/bundle/tests/complex_variables_test.go @@ -68,3 +68,22 @@ func TestComplexVariablesOverride(t *testing.T) { require.Equal(t, "", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkConf["spark.random"]) require.Equal(t, "", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.PolicyId) } + +func TestComplexVariablesOverrideWithMultipleFiles(t *testing.T) { + b, diags := loadTargetWithDiags("variables/complex_multiple_files", "dev") + require.Empty(t, diags) + + diags = bundle.Apply(context.Background(), b, bundle.Seq( + mutator.SetVariables(), + mutator.ResolveVariableReferencesInComplexVariables(), + mutator.ResolveVariableReferences( + "variables", + ), + )) + require.NoError(t, diags.Error()) + + require.Equal(t, "14.2.x-scala2.11", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkVersion) + require.Equal(t, "Standard_DS3_v2", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.NodeTypeId) + require.Equal(t, 4, b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.NumWorkers) + require.Equal(t, "false", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkConf["spark.speculation"]) +} diff --git a/bundle/tests/variables/complex_multiple_files/databricks.yml b/bundle/tests/variables/complex_multiple_files/databricks.yml new file mode 100644 index 0000000000..cb5d243957 --- /dev/null +++ b/bundle/tests/variables/complex_multiple_files/databricks.yml @@ -0,0 +1,17 @@ +bundle: + name: complex-variables-multiple-files + +resources: + jobs: + my_job: + job_clusters: + - job_cluster_key: key + new_cluster: ${var.cluster} + +variables: + cluster: + type: complex + description: "A cluster definition" + +include: + - ./variables/*.yml diff --git a/bundle/tests/variables/complex_multiple_files/variables/clusters.yml b/bundle/tests/variables/complex_multiple_files/variables/clusters.yml new file mode 100644 index 0000000000..badd451648 --- /dev/null +++ b/bundle/tests/variables/complex_multiple_files/variables/clusters.yml @@ -0,0 +1,11 @@ +targets: + default: + dev: + variables: + cluster: + spark_version: "14.2.x-scala2.11" + node_type_id: "Standard_DS3_v2" + num_workers: 4 + spark_conf: + spark.speculation: false + spark.databricks.delta.retentionDurationCheck.enabled: false