Skip to content

Commit

Permalink
Add prompt when a pipeline recreation happens (#1672)
Browse files Browse the repository at this point in the history
## Changes
DLT pipeline recreations are destructive. They can lead to lost history
of previous updates, outage of the tables temporarily and are
potentially computationally expensive. Thus we make a breaking change
where a prompt is shown to the user if there configuration changes will
lead to a DLT recreation.

Users can skip the prompt by specifying  the `--auto-approve` flag.

This PR also fixes an issue with our test runner where logs from the
cmdio.Logger would not get propagated to the reader returned by our
cobra test runner.

## Tests
Manually, and new unit and integration tests.

```
➜  bundle-playground-3 cli bundle deploy
Uploading bundle files to /Users/63ec021d-b0c6-49c0-93a0-5123953a1cb2/.bundle/test/development/files...
The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period until the newly recreated pipelines are run once successfully. History of previous pipeline update runs will be lost because of recreation:
  recreate pipeline foo

Would you like to proceed? [y/n]: n
Deployment cancelled!
```
  • Loading branch information
shreyas-goenka authored Sep 4, 2024
1 parent 35cdf40 commit a27c24a
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 27 deletions.
92 changes: 66 additions & 26 deletions bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,95 @@ import (
"github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
)

func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

actions := make([]terraformlib.Action, 0)
for _, rc := range plan.ResourceChanges {
// We only care about destructive actions on UC schema resources.
if rc.Type != "databricks_schema" {
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
res := make([]terraformlib.Action, 0)
for _, rc := range changes {
if !toInclude(rc.Type, rc.Change.Actions) {
continue
}

var actionType terraformlib.ActionType

switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
// No use case for other action types yet.
continue
}

actions = append(actions, terraformlib.Action{
res = append(res, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}

// No restricted actions planned. No need for approval.
if len(actions) == 0 {
return res
}

func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only UC schema resources.
if typ != "databricks_schema" {
return false
}

// We only display prompts for destructive actions like deleting or
// recreating a schema.
return actions.Delete() || actions.Replace()
})

dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only DLT pipeline resources.
if typ != "databricks_pipeline" {
return false
}

// Recreating DLT pipeline leads to metadata loss and for a transient period
// the underling tables will be unavailable.
return actions.Replace() || actions.Delete()
})

// We don't need to display any prompts in this case.
if len(dltActions) == 0 && len(schemaActions) == 0 {
return true, nil
}

cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions {
cmdio.Log(ctx, action)
// One or more UC schema resources will be deleted or recreated.
if len(schemaActions) != 0 {
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range schemaActions {
cmdio.Log(ctx, action)
}
}

// One or more DLT pipelines is being recreated.
if len(dltActions) != 0 {
msg := `
This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:`
cmdio.LogString(ctx, msg)
for _, action := range dltActions {
cmdio.Log(ctx, action)
}
}

if b.AutoApprove {
Expand Down Expand Up @@ -126,7 +166,7 @@ func Deploy() bundle.Mutator {
terraform.CheckRunningResource(),
terraform.Plan(terraform.PlanGoal("deploy")),
bundle.If(
approvalForUcSchemaDelete,
approvalForDeploy,
deployCore,
bundle.LogString("Deployment cancelled!"),
),
Expand Down
67 changes: 67 additions & 0 deletions bundle/phases/deploy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package phases

import (
"testing"

terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/assert"
)

func TestParseTerraformActions(t *testing.T) {
changes := []*tfjson.ResourceChange{
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionCreate},
},
Name: "create pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete},
},
Name: "delete pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate pipeline",
},
{
Type: "databricks_whatever",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate whatever",
},
}

res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
if typ != "databricks_pipeline" {
return false
}

if actions.Delete() || actions.Replace() {
return true
}

return false
})

assert.Equal(t, []terraformlib.Action{
{
Action: terraformlib.ActionTypeDelete,
ResourceType: "databricks_pipeline",
ResourceName: "delete pipeline",
},
{
Action: terraformlib.ActionTypeRecreate,
ResourceType: "databricks_pipeline",
ResourceName: "recreate pipeline",
},
}, res)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
bundle:
name: "bundle-playground"

variables:
catalog:
description: The catalog the DLT pipeline should use.
default: main


resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: ${var.catalog}

include:
- "*.yml"

targets:
development:
default: true
2 changes: 2 additions & 0 deletions internal/bundle/bundles/recreate_pipeline/template/nb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Databricks notebook source
select 1
91 changes: 90 additions & 1 deletion internal/bundle/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,97 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, _, err := c.Run()
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}

func TestAccBundlePipelineDeleteWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
w := wt.W

nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)

// deploy pipeline
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)

// assert pipeline is created
pipelineName := "test-bundle-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
assert.Equal(t, pipeline.Name, pipelineName)

// assert job is created
jobName := "test-bundle-job-" + uniqueId
job, err := w.Jobs.GetBySettingsName(ctx, jobName)
require.NoError(t, err)
assert.Equal(t, job.Settings.Name, jobName)

// delete resources.yml
err = os.Remove(filepath.Join(bundleRoot, "resources.yml"))
require.NoError(t, err)

// Redeploy the bundle. Expect it to fail because deleting the pipeline requires --auto-approve.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:
delete pipeline bar`)
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")

}

func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()

bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)

err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)

t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})

// Assert the pipeline is created
pipelineName := "test-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
require.Equal(t, pipelineName, pipeline.Name)

// Redeploy the bundle, pointing the DLT pipeline to a different UC catalog.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"")
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:
recreate pipeline foo`)
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}

Expand Down

0 comments on commit a27c24a

Please sign in to comment.