Skip to content

Commit

Permalink
Added databricks bundle generate job command (#1043)
Browse files Browse the repository at this point in the history
## Changes
Now it's possible to generate bundle configuration for existing job.
For now it only supports jobs with notebook tasks.

It will download notebooks referenced in the job tasks and generate
bundle YAML config for this job which can be included in larger bundle.

## Tests
Running command manually

Example of generated config
```
resources:
  jobs:
    job_128737545467921:
      name: Notebook job
      format: MULTI_TASK
      tasks:
        - task_key: as_notebook
          existing_cluster_id: 0704-xxxxxx-yyyyyyy
          notebook_task:
            base_parameters:
              bundle_root: /Users/[email protected]/.bundle/job_with_module_imports/development/files
            notebook_path: ./entry_notebook.py
            source: WORKSPACE
          run_if: ALL_SUCCESS
      max_concurrent_runs: 1
 ```

## Tests
Manual (on our last 100 jobs) + added end-to-end test

```
--- PASS: TestAccGenerateFromExistingJobAndDeploy (50.91s)
PASS
coverage: 61.5% of statements in ./...
ok github.com/databricks/cli/internal/bundle 51.209s coverage: 61.5% of
statements in ./...
```
  • Loading branch information
andrewnester authored Jan 17, 2024
1 parent 9847769 commit 70fe0e3
Show file tree
Hide file tree
Showing 20 changed files with 984 additions and 23 deletions.
34 changes: 34 additions & 0 deletions bundle/config/generate/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package generate

import (
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/yamlsaver"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

var jobOrder = yamlsaver.NewOrder([]string{"name", "job_clusters", "compute", "tasks"})
var taskOrder = yamlsaver.NewOrder([]string{"task_key", "depends_on", "existing_cluster_id", "new_cluster", "job_cluster_key"})

func ConvertJobToValue(job *jobs.Job) (dyn.Value, error) {
value := make(map[string]dyn.Value)

if job.Settings.Tasks != nil {
tasks := make([]dyn.Value, 0)
for _, task := range job.Settings.Tasks {
v, err := convertTaskToValue(task, taskOrder)
if err != nil {
return dyn.NilValue, err
}
tasks = append(tasks, v)
}
// We're using location lines to define the order of keys in exported YAML.
value["tasks"] = dyn.NewValue(tasks, dyn.Location{Line: jobOrder.Get("tasks")})
}

return yamlsaver.ConvertToMapValue(job.Settings, jobOrder, []string{"format", "new_cluster", "existing_cluster_id"}, value)
}

func convertTaskToValue(task jobs.Task, order *yamlsaver.Order) (dyn.Value, error) {
dst := make(map[string]dyn.Value)
return yamlsaver.ConvertToMapValue(task, order, []string{"format"}, dst)
}
11 changes: 2 additions & 9 deletions bundle/config/mutator/populate_current_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package mutator
import (
"context"
"strings"
"unicode"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/tags"
"github.com/databricks/cli/libs/textutil"
)

type populateCurrentUser struct{}
Expand Down Expand Up @@ -43,17 +43,10 @@ func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error
return nil
}

func replaceNonAlphanumeric(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
return '_'
}

// Get a short-form username, based on the user's primary email address.
// We leave the full range of unicode letters in tact, but remove all "special" characters,
// including dots, which are not supported in e.g. experiment names.
func getShortUserName(emailAddress string) string {
local, _, _ := strings.Cut(emailAddress, "@")
return strings.Map(replaceNonAlphanumeric, local)
return textutil.NormalizeString(local)
}
1 change: 1 addition & 0 deletions cmd/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ func New() *cobra.Command {
cmd.AddCommand(newTestCommand())
cmd.AddCommand(newValidateCommand())
cmd.AddCommand(newInitCommand())
cmd.AddCommand(newGenerateCommand())
return cmd
}
18 changes: 18 additions & 0 deletions cmd/bundle/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package bundle

import (
"github.com/databricks/cli/cmd/bundle/generate"
"github.com/spf13/cobra"
)

func newGenerateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "generate",
Short: "Generate bundle configuration",
Long: "Generate bundle configuration",
PreRunE: ConfigureBundleWithVariables,
}

cmd.AddCommand(generate.NewGenerateJobCommand())
return cmd
}
91 changes: 91 additions & 0 deletions cmd/bundle/generate/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package generate

import (
"fmt"
"os"
"path/filepath"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/generate"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/yamlsaver"
"github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/spf13/cobra"
)

func NewGenerateJobCommand() *cobra.Command {
var configDir string
var sourceDir string
var jobId int64
var force bool

cmd := &cobra.Command{
Use: "job",
Short: "Generate bundle configuration for a job",
PreRunE: root.MustConfigureBundle,
}

cmd.Flags().Int64Var(&jobId, "existing-job-id", 0, `Job ID of the job to generate config for`)
cmd.MarkFlagRequired("existing-job-id")

wd, err := os.Getwd()
if err != nil {
wd = "."
}

cmd.Flags().StringVarP(&configDir, "config-dir", "d", filepath.Join(wd, "resources"), `Dir path where the output config will be stored`)
cmd.Flags().StringVarP(&sourceDir, "source-dir", "s", filepath.Join(wd, "src"), `Dir path where the downloaded files will be stored`)
cmd.Flags().BoolVarP(&force, "force", "f", false, `Force overwrite existing files in the output directory`)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
b := bundle.Get(ctx)
w := b.WorkspaceClient()

job, err := w.Jobs.Get(ctx, jobs.GetJobRequest{JobId: jobId})
if err != nil {
return err
}

downloader := newNotebookDownloader(w, sourceDir, configDir)
for _, task := range job.Settings.Tasks {
err := downloader.MarkForDownload(ctx, &task)
if err != nil {
return err
}
}

v, err := generate.ConvertJobToValue(job)
if err != nil {
return err
}

jobKey := fmt.Sprintf("job_%s", textutil.NormalizeString(job.Settings.Name))
result := map[string]dyn.Value{
"resources": dyn.V(map[string]dyn.Value{
"jobs": dyn.V(map[string]dyn.Value{
jobKey: v,
}),
}),
}

err = downloader.FlushToDisk(ctx, force)
if err != nil {
return err
}

filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
err = yamlsaver.SaveAsYAML(result, filename, force)
if err != nil {
return err
}

cmdio.LogString(ctx, fmt.Sprintf("Job configuration successfully saved to %s", filename))
return nil
}

return cmd
}
107 changes: 107 additions & 0 deletions cmd/bundle/generate/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package generate

import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"

"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/notebook"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"golang.org/x/sync/errgroup"
)

type notebookDownloader struct {
notebooks map[string]string
w *databricks.WorkspaceClient
sourceDir string
configDir string
}

func (n *notebookDownloader) MarkForDownload(ctx context.Context, task *jobs.Task) error {
if task.NotebookTask == nil {
return nil
}

info, err := n.w.Workspace.GetStatusByPath(ctx, task.NotebookTask.NotebookPath)
if err != nil {
return err
}

ext := notebook.GetExtensionByLanguage(info)

filename := path.Base(task.NotebookTask.NotebookPath) + ext
targetPath := filepath.Join(n.sourceDir, filename)

n.notebooks[targetPath] = task.NotebookTask.NotebookPath

// Update the notebook path to be relative to the config dir
rel, err := filepath.Rel(n.configDir, targetPath)
if err != nil {
return err
}

task.NotebookTask.NotebookPath = rel
return nil
}

func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error {
err := os.MkdirAll(n.sourceDir, 0755)
if err != nil {
return err
}

// First check that all files can be written
for targetPath := range n.notebooks {
info, err := os.Stat(targetPath)
if err == nil {
if info.IsDir() {
return fmt.Errorf("%s is a directory", targetPath)
}
if !force {
return fmt.Errorf("%s already exists. Use --force to overwrite", targetPath)
}
}
}

errs, errCtx := errgroup.WithContext(ctx)
for k, v := range n.notebooks {
targetPath := k
notebookPath := v
errs.Go(func() error {
reader, err := n.w.Workspace.Download(errCtx, notebookPath)
if err != nil {
return err
}

file, err := os.Create(targetPath)
if err != nil {
return err
}
defer file.Close()

_, err = io.Copy(file, reader)
if err != nil {
return err
}

cmdio.LogString(errCtx, fmt.Sprintf("Notebook successfully saved to %s", targetPath))
return reader.Close()
})
}

return errs.Wait()
}

func newNotebookDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *notebookDownloader {
return &notebookDownloader{
notebooks: make(map[string]string),
w: w,
sourceDir: sourceDir,
configDir: configDir,
}
}
16 changes: 2 additions & 14 deletions cmd/workspace/workspace/export_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/notebook"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -47,20 +48,7 @@ func (opts exportDirOptions) callback(ctx context.Context, workspaceFiler filer.
return err
}
objectInfo := info.Sys().(workspace.ObjectInfo)
if objectInfo.ObjectType == workspace.ObjectTypeNotebook {
switch objectInfo.Language {
case workspace.LanguagePython:
targetPath += ".py"
case workspace.LanguageR:
targetPath += ".r"
case workspace.LanguageScala:
targetPath += ".scala"
case workspace.LanguageSql:
targetPath += ".sql"
default:
// Do not add any extension to the file name
}
}
targetPath += notebook.GetExtensionByLanguage(&objectInfo)

// Skip file if a file already exists in path.
// os.Stat returns a fs.ErrNotExist if a file does not exist at path.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for bundle"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
bundle:
name: with_includes

workspace:
root_path: "~/.bundle/{{.unique_id}}"

includes:
- resources/*yml
Loading

0 comments on commit 70fe0e3

Please sign in to comment.