Skip to content

Commit

Permalink
Merge branch 'main' into init_on_databricks
Browse files Browse the repository at this point in the history
  • Loading branch information
fjakobs authored Sep 4, 2024
2 parents 320182d + ca6332a commit c409696
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 28 deletions.
12 changes: 11 additions & 1 deletion bundle/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,20 @@ func rewriteShorthands(v dyn.Value) (dyn.Value, error) {
}, variable.Locations()), nil

case dyn.KindMap, dyn.KindSequence:
lookup, err := dyn.Get(variable, "lookup")
// If lookup is set, we don't want to rewrite the variable and return it as is.
if err == nil && lookup.Kind() != dyn.KindInvalid {
return variable, nil
}

// Check if the original definition of variable has a type field.
// Type might not be found if the variable overriden in a separate file
// and configuration is not merged yet.
typeV, err := dyn.GetByPath(v, p.Append(dyn.Key("type")))
if err != nil {
return variable, nil
return dyn.NewValue(map[string]dyn.Value{
"default": variable,
}, variable.Locations()), nil
}

if typeV.MustString() == "complex" {
Expand Down
92 changes: 66 additions & 26 deletions bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,95 @@ import (
"github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
)

func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

actions := make([]terraformlib.Action, 0)
for _, rc := range plan.ResourceChanges {
// We only care about destructive actions on UC schema resources.
if rc.Type != "databricks_schema" {
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
res := make([]terraformlib.Action, 0)
for _, rc := range changes {
if !toInclude(rc.Type, rc.Change.Actions) {
continue
}

var actionType terraformlib.ActionType

switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
// No use case for other action types yet.
continue
}

actions = append(actions, terraformlib.Action{
res = append(res, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}

// No restricted actions planned. No need for approval.
if len(actions) == 0 {
return res
}

func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only UC schema resources.
if typ != "databricks_schema" {
return false
}

// We only display prompts for destructive actions like deleting or
// recreating a schema.
return actions.Delete() || actions.Replace()
})

dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only DLT pipeline resources.
if typ != "databricks_pipeline" {
return false
}

// Recreating DLT pipeline leads to metadata loss and for a transient period
// the underling tables will be unavailable.
return actions.Replace() || actions.Delete()
})

// We don't need to display any prompts in this case.
if len(dltActions) == 0 && len(schemaActions) == 0 {
return true, nil
}

cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions {
cmdio.Log(ctx, action)
// One or more UC schema resources will be deleted or recreated.
if len(schemaActions) != 0 {
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range schemaActions {
cmdio.Log(ctx, action)
}
}

// One or more DLT pipelines is being recreated.
if len(dltActions) != 0 {
msg := `
This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:`
cmdio.LogString(ctx, msg)
for _, action := range dltActions {
cmdio.Log(ctx, action)
}
}

if b.AutoApprove {
Expand Down Expand Up @@ -126,7 +166,7 @@ func Deploy() bundle.Mutator {
terraform.CheckRunningResource(),
terraform.Plan(terraform.PlanGoal("deploy")),
bundle.If(
approvalForUcSchemaDelete,
approvalForDeploy,
deployCore,
bundle.LogString("Deployment cancelled!"),
),
Expand Down
67 changes: 67 additions & 0 deletions bundle/phases/deploy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package phases

import (
"testing"

terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/assert"
)

func TestParseTerraformActions(t *testing.T) {
changes := []*tfjson.ResourceChange{
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionCreate},
},
Name: "create pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete},
},
Name: "delete pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate pipeline",
},
{
Type: "databricks_whatever",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate whatever",
},
}

res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
if typ != "databricks_pipeline" {
return false
}

if actions.Delete() || actions.Replace() {
return true
}

return false
})

assert.Equal(t, []terraformlib.Action{
{
Action: terraformlib.ActionTypeDelete,
ResourceType: "databricks_pipeline",
ResourceName: "delete pipeline",
},
{
Action: terraformlib.ActionTypeRecreate,
ResourceType: "databricks_pipeline",
ResourceName: "recreate pipeline",
},
}, res)
}
19 changes: 19 additions & 0 deletions bundle/tests/complex_variables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,22 @@ func TestComplexVariablesOverride(t *testing.T) {
require.Equal(t, "", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkConf["spark.random"])
require.Equal(t, "", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.PolicyId)
}

func TestComplexVariablesOverrideWithMultipleFiles(t *testing.T) {
b, diags := loadTargetWithDiags("variables/complex_multiple_files", "dev")
require.Empty(t, diags)

diags = bundle.Apply(context.Background(), b, bundle.Seq(
mutator.SetVariables(),
mutator.ResolveVariableReferencesInComplexVariables(),
mutator.ResolveVariableReferences(
"variables",
),
))
require.NoError(t, diags.Error())

require.Equal(t, "14.2.x-scala2.11", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkVersion)
require.Equal(t, "Standard_DS3_v2", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.NodeTypeId)
require.Equal(t, 4, b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.NumWorkers)
require.Equal(t, "false", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkConf["spark.speculation"])
}
17 changes: 17 additions & 0 deletions bundle/tests/variables/complex_multiple_files/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
bundle:
name: complex-variables-multiple-files

resources:
jobs:
my_job:
job_clusters:
- job_cluster_key: key
new_cluster: ${var.cluster}

variables:
cluster:
type: complex
description: "A cluster definition"

include:
- ./variables/*.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
targets:
default:
dev:
variables:
cluster:
spark_version: "14.2.x-scala2.11"
node_type_id: "Standard_DS3_v2"
num_workers: 4
spark_conf:
spark.speculation: false
spark.databricks.delta.retentionDurationCheck.enabled: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
bundle:
name: "bundle-playground"

variables:
catalog:
description: The catalog the DLT pipeline should use.
default: main


resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: ${var.catalog}

include:
- "*.yml"

targets:
development:
default: true
2 changes: 2 additions & 0 deletions internal/bundle/bundles/recreate_pipeline/template/nb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Databricks notebook source
select 1
Loading

0 comments on commit c409696

Please sign in to comment.