Skip to content

Commit

Permalink
Add resource for UC schemas to DABs (#1413)
Browse files Browse the repository at this point in the history
## Changes
This PR adds support for UC Schemas to DABs. This allows users to define
schemas for tables and other assets their pipelines/workflows create as
part of the DAB, thus managing the life-cycle in the DAB.

The first version has a couple of intentional limitations:
1. The owner of the schema will be the deployment user. Changing the
owner of the schema is not allowed (yet). `run_as` will not be
restricted for DABs containing UC schemas. Let's limit the scope of
run_as to the compute identity used instead of ownership of data assets
like UC schemas.
2. API fields that are present in the update API but not the create API.
For example: enabling predictive optimization is not supported in the
create schema API and thus is not available in DABs at the moment.

## Tests
Manually and integration test. Manually verified the following work:
1. Development mode adds a "dev_" prefix.
2. Modified status is correctly computed in the `bundle summary`
command.
3. Grants work as expected, for assigning privileges.
4. Variable interpolation works for the schema ID.
  • Loading branch information
shreyas-goenka authored Jul 31, 2024
1 parent 5afcc25 commit 89c0af5
Show file tree
Hide file tree
Showing 20 changed files with 540 additions and 13 deletions.
7 changes: 7 additions & 0 deletions bundle/config/mutator/process_target_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) diag.Diagno
}
}

for i := range r.Schemas {
prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_"
r.Schemas[i].Name = prefix + r.Schemas[i].Name
// HTTP API for schemas doesn't yet support tags. It's only supported in
// the Databricks UI and via the SQL API.
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions bundle/config/mutator/process_target_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
},
},
},
Schemas: map[string]*resources.Schema{
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
},
},
},
// Use AWS implementation for testing.
Expand Down Expand Up @@ -167,6 +170,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Nil(t, b.Config.Resources.QualityMonitors["qualityMonitor2"].Schedule)
assert.Equal(t, catalog.MonitorCronSchedulePauseStatusUnpaused, b.Config.Resources.QualityMonitors["qualityMonitor3"].Schedule.PauseStatus)

// Schema 1
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
}

func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions bundle/config/mutator/run_as_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func allResourceTypes(t *testing.T) []string {
"pipelines",
"quality_monitors",
"registered_models",
"schemas",
},
resourceTypes,
)
Expand Down Expand Up @@ -136,6 +137,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
"models",
"registered_models",
"experiments",
"schemas",
}

base := config.Root{
Expand Down
1 change: 1 addition & 0 deletions bundle/config/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Resources struct {
ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"`
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
}

type resource struct {
Expand Down
27 changes: 27 additions & 0 deletions bundle/config/resources/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package resources

import (
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/catalog"
)

type Schema struct {
// List of grants to apply on this schema.
Grants []Grant `json:"grants,omitempty"`

// Full name of the schema (catalog_name.schema_name). This value is read from
// the terraform state after deployment succeeds.
ID string `json:"id,omitempty" bundle:"readonly"`

*catalog.CreateSchema

ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
}

func (s *Schema) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}

func (s Schema) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}
21 changes: 19 additions & 2 deletions bundle/deploy/terraform/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func convGrants(acl []resources.Grant) *schema.ResourceGrants {
// BundleToTerraform converts resources in a bundle configuration
// to the equivalent Terraform JSON representation.
//
// NOTE: THIS IS CURRENTLY A HACK. WE NEED A BETTER WAY TO
// CONVERT TO/FROM TERRAFORM COMPATIBLE FORMAT.
// Note: This function is an older implementation of the conversion logic. It is
// no longer used in any code paths. It is kept around to be used in tests.
// New resources do not need to modify this function and can instead can define
// the conversion login in the tfdyn package.
func BundleToTerraform(config *config.Root) *schema.Root {
tfroot := schema.NewRoot()
tfroot.Provider = schema.NewProviders()
Expand Down Expand Up @@ -382,6 +384,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
}
cur.ID = instance.Attributes.ID
config.Resources.QualityMonitors[resource.Name] = cur
case "databricks_schema":
if config.Resources.Schemas == nil {
config.Resources.Schemas = make(map[string]*resources.Schema)
}
cur := config.Resources.Schemas[resource.Name]
if cur == nil {
cur = &resources.Schema{ModifiedStatus: resources.ModifiedStatusDeleted}
}
cur.ID = instance.Attributes.ID
config.Resources.Schemas[resource.Name] = cur
case "databricks_permissions":
case "databricks_grants":
// Ignore; no need to pull these back into the configuration.
Expand Down Expand Up @@ -426,6 +438,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
for _, src := range config.Resources.Schemas {
if src.ModifiedStatus == "" && src.ID == "" {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}

return nil
}
57 changes: 57 additions & 0 deletions bundle/deploy/terraform/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_schema",
Mode: "managed",
Name: "test_schema",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
},
}
err := TerraformToBundle(&tfState, &config)
Expand All @@ -681,6 +689,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.QualityMonitors["test_monitor"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus)

assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)

AssertFullResourceCoverage(t, &config)
}

Expand Down Expand Up @@ -736,6 +747,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
},
},
},
Schemas: map[string]*resources.Schema{
"test_schema": {
CreateSchema: &catalog.CreateSchema{
Name: "test_schema",
},
},
},
},
}
var tfState = resourcesState{
Expand Down Expand Up @@ -765,6 +783,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus)

assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)

AssertFullResourceCoverage(t, &config)
}

Expand Down Expand Up @@ -855,6 +876,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
},
},
},
Schemas: map[string]*resources.Schema{
"test_schema": {
CreateSchema: &catalog.CreateSchema{
Name: "test_schema",
},
},
"test_schema_new": {
CreateSchema: &catalog.CreateSchema{
Name: "test_schema_new",
},
},
},
},
}
var tfState = resourcesState{
Expand Down Expand Up @@ -971,6 +1004,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "test_monitor_old"}},
},
},
{
Type: "databricks_schema",
Mode: "managed",
Name: "test_schema",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_schema",
Mode: "managed",
Name: "test_schema_old",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
},
}
err := TerraformToBundle(&tfState, &config)
Expand Down Expand Up @@ -1024,6 +1073,14 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor_new"].ModifiedStatus)

assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "2", config.Resources.Schemas["test_schema_old"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)

AssertFullResourceCoverage(t, &config)
}

Expand Down
2 changes: 2 additions & 0 deletions bundle/deploy/terraform/interpolate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
path = dyn.NewPath(dyn.Key("databricks_registered_model")).Append(path[2:]...)
case dyn.Key("quality_monitors"):
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
case dyn.Key("schemas"):
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
default:
// Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path)
Expand Down
2 changes: 2 additions & 0 deletions bundle/deploy/terraform/interpolate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestInterpolate(t *testing.T) {
"other_experiment": "${resources.experiments.other_experiment.id}",
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
"other_registered_model": "${resources.registered_models.other_registered_model.id}",
"other_schema": "${resources.schemas.other_schema.id}",
},
Tasks: []jobs.Task{
{
Expand Down Expand Up @@ -65,6 +66,7 @@ func TestInterpolate(t *testing.T) {
assert.Equal(t, "${databricks_mlflow_experiment.other_experiment.id}", j.Tags["other_experiment"])
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])

m := b.Config.Resources.Models["my_model"]
assert.Equal(t, "my_model", m.Model.Name)
Expand Down
53 changes: 53 additions & 0 deletions bundle/deploy/terraform/tfdyn/convert_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package tfdyn

import (
"context"
"fmt"

"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
)

func convertSchemaResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
v, diags := convert.Normalize(schema.ResourceSchema{}, vin)
for _, diag := range diags {
log.Debugf(ctx, "schema normalization diagnostic: %s", diag.Summary)
}

// We always set force destroy as it allows DABs to manage the lifecycle
// of the schema. It's the responsibility of the CLI to ensure the user
// is adequately warned when they try to delete a UC schema.
vout, err := dyn.SetByPath(v, dyn.MustPathFromString("force_destroy"), dyn.V(true))
if err != nil {
return dyn.InvalidValue, err
}

return vout, nil
}

type schemaConverter struct{}

func (schemaConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertSchemaResource(ctx, vin)
if err != nil {
return err
}

// Add the converted resource to the output.
out.Schema[key] = vout.AsAny()

// Configure grants for this resource.
if grants := convertGrantsResource(ctx, vin); grants != nil {
grants.Schema = fmt.Sprintf("${databricks_schema.%s.id}", key)
out.Grants["schema_"+key] = grants
}

return nil
}

func init() {
registerConverter("schemas", schemaConverter{})
}
75 changes: 75 additions & 0 deletions bundle/deploy/terraform/tfdyn/convert_schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package tfdyn

import (
"context"
"testing"

"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestConvertSchema(t *testing.T) {
var src = resources.Schema{
CreateSchema: &catalog.CreateSchema{
Name: "name",
CatalogName: "catalog",
Comment: "comment",
Properties: map[string]string{
"k1": "v1",
"k2": "v2",
},
StorageRoot: "root",
},
Grants: []resources.Grant{
{
Privileges: []string{"EXECUTE"},
Principal: "[email protected]",
},
{
Privileges: []string{"RUN"},
Principal: "[email protected]",
},
},
}

vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)

ctx := context.Background()
out := schema.NewResources()
err = schemaConverter{}.Convert(ctx, "my_schema", vin, out)
require.NoError(t, err)

// Assert equality on the schema
assert.Equal(t, map[string]any{
"name": "name",
"catalog_name": "catalog",
"comment": "comment",
"properties": map[string]any{
"k1": "v1",
"k2": "v2",
},
"force_destroy": true,
"storage_root": "root",
}, out.Schema["my_schema"])

// Assert equality on the grants
assert.Equal(t, &schema.ResourceGrants{
Schema: "${databricks_schema.my_schema.id}",
Grant: []schema.ResourceGrantsGrant{
{
Privileges: []string{"EXECUTE"},
Principal: "[email protected]",
},
{
Privileges: []string{"RUN"},
Principal: "[email protected]",
},
},
}, out.Grants["schema_my_schema"])
}
Loading

0 comments on commit 89c0af5

Please sign in to comment.