diff --git a/bundle/bundle.go b/bundle/bundle.go index 1dc98656a7..4bd5e560e7 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -35,6 +35,10 @@ type Bundle struct { // It is set when we instantiate a new bundle instance. RootPath string + // + BundleRoot vfs.Path + BundleRootRelative string + Config config.Root // Metadata about the bundle deployment. This is the interface Databricks services @@ -69,7 +73,9 @@ type Bundle struct { func Load(ctx context.Context, path string) (*Bundle, error) { b := &Bundle{ - RootPath: filepath.Clean(path), + RootPath: filepath.Clean(path), + BundleRoot: vfs.MustNew(path), + BundleRootRelative: ".", } configFile, err := config.FileNames.FindInPath(path) if err != nil { diff --git a/bundle/bundle_read_only.go b/bundle/bundle_read_only.go index e4a4f99366..59084f2ace 100644 --- a/bundle/bundle_read_only.go +++ b/bundle/bundle_read_only.go @@ -4,6 +4,7 @@ import ( "context" "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/libs/vfs" "github.com/databricks/databricks-sdk-go" ) @@ -23,6 +24,10 @@ func (r ReadOnlyBundle) RootPath() string { return r.b.RootPath } +func (r ReadOnlyBundle) BundleRoot() vfs.Path { + return r.b.BundleRoot +} + func (r ReadOnlyBundle) WorkspaceClient() *databricks.WorkspaceClient { return r.b.WorkspaceClient() } diff --git a/bundle/config/mutator/configure_wsfs.go b/bundle/config/mutator/configure_wsfs.go new file mode 100644 index 0000000000..bba3beba78 --- /dev/null +++ b/bundle/config/mutator/configure_wsfs.go @@ -0,0 +1,48 @@ +package mutator + +import ( + "context" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/env" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/vfs" +) + +type configureWsfs struct{} + +func ConfigureWsfs() bundle.Mutator { + return &configureWsfs{} +} + +func (m *configureWsfs) Name() string { + return "ConfigureWsfs" +} + +func (m *configureWsfs) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + root := b.BundleRoot.Native() + + // The bundle root must be is located in /Workspace/ + if !strings.HasPrefix(root, "/Workspace/") { + return nil + } + + // The executable must be running on DBR. + if _, ok := env.Lookup(ctx, "DATABRICKS_RUNTIME_VERSION"); !ok { + return nil + } + + // If so, swap out vfs.Path instance of the sync root with one that + // makes all Workspace File System interactions extension aware. + p, err := vfs.NewFilerPath(ctx, root, func(path string) (filer.Filer, error) { + return filer.NewWorkspaceFilesExtensionsClient(b.WorkspaceClient(), path) + }) + if err != nil { + return diag.FromErr(err) + } + + b.BundleRoot = p + return nil +} diff --git a/bundle/config/mutator/translate_paths.go b/bundle/config/mutator/translate_paths.go index d9ab9e9e89..70fd1a7ee8 100644 --- a/bundle/config/mutator/translate_paths.go +++ b/bundle/config/mutator/translate_paths.go @@ -6,7 +6,6 @@ import ( "fmt" "io/fs" "net/url" - "os" "path" "path/filepath" "strings" @@ -33,9 +32,7 @@ func (err ErrIsNotNotebook) Error() string { return fmt.Sprintf("file at %s is not a notebook", err.path) } -type translatePaths struct { - seen map[string]string -} +type translatePaths struct{} // TranslatePaths converts paths to local notebook files into paths in the workspace file system. func TranslatePaths() bundle.Mutator { @@ -48,6 +45,14 @@ func (m *translatePaths) Name() string { type rewriteFunc func(literal, localFullPath, localRelPath, remotePath string) (string, error) +// rewriteContext is a context for rewriting paths in a config. +// It is freshly instantiated on every mutator apply call. +type rewriteContext struct { + b *bundle.Bundle + + seen map[string]string +} + // rewritePath converts a given relative path from the loaded config to a new path based on the passed rewriting function // // It takes these arguments: @@ -57,14 +62,13 @@ type rewriteFunc func(literal, localFullPath, localRelPath, remotePath string) ( // This logic is different between regular files or notebooks. // // The function returns an error if it is impossible to rewrite the given relative path. -func (m *translatePaths) rewritePath( +func (r *rewriteContext) rewritePath( dir string, - b *bundle.Bundle, p *string, fn rewriteFunc, ) error { // We assume absolute paths point to a location in the workspace - if path.IsAbs(filepath.ToSlash(*p)) { + if path.IsAbs(*p) { return nil } @@ -80,13 +84,14 @@ func (m *translatePaths) rewritePath( // Local path is relative to the directory the resource was defined in. localPath := filepath.Join(dir, filepath.FromSlash(*p)) - if interp, ok := m.seen[localPath]; ok { + if interp, ok := r.seen[localPath]; ok { *p = interp return nil } - // Remote path must be relative to the bundle root. - localRelPath, err := filepath.Rel(b.RootPath, localPath) + // Local path must be contained in the sync root. + // If it isn't, it won't be synchronized into the workspace. + localRelPath, err := filepath.Rel(r.b.RootPath, localPath) if err != nil { return err } @@ -94,22 +99,26 @@ func (m *translatePaths) rewritePath( return fmt.Errorf("path %s is not contained in bundle root path", localPath) } + // Convert platform-native paths back to slash-separated paths. + localPath = filepath.ToSlash(localPath) + localRelPath = filepath.ToSlash(localRelPath) + // Prefix remote path with its remote root path. - remotePath := path.Join(b.Config.Workspace.FilePath, filepath.ToSlash(localRelPath)) + remotePath := path.Join(r.b.Config.Workspace.FilePath, localRelPath) // Convert local path into workspace path via specified function. - interp, err := fn(*p, localPath, localRelPath, filepath.ToSlash(remotePath)) + interp, err := fn(*p, localPath, localRelPath, remotePath) if err != nil { return err } *p = interp - m.seen[localPath] = interp + r.seen[localPath] = interp return nil } -func translateNotebookPath(literal, localFullPath, localRelPath, remotePath string) (string, error) { - nb, _, err := notebook.Detect(localFullPath) +func (r *rewriteContext) translateNotebookPath(literal, localFullPath, localRelPath, remotePath string) (string, error) { + nb, _, err := notebook.DetectWithFS(r.b.BundleRoot, localRelPath) if errors.Is(err, fs.ErrNotExist) { return "", fmt.Errorf("notebook %s not found", literal) } @@ -124,8 +133,8 @@ func translateNotebookPath(literal, localFullPath, localRelPath, remotePath stri return strings.TrimSuffix(remotePath, filepath.Ext(localFullPath)), nil } -func translateFilePath(literal, localFullPath, localRelPath, remotePath string) (string, error) { - nb, _, err := notebook.Detect(localFullPath) +func (r *rewriteContext) translateFilePath(literal, localFullPath, localRelPath, remotePath string) (string, error) { + nb, _, err := notebook.DetectWithFS(r.b.BundleRoot, localRelPath) if errors.Is(err, fs.ErrNotExist) { return "", fmt.Errorf("file %s not found", literal) } @@ -138,8 +147,8 @@ func translateFilePath(literal, localFullPath, localRelPath, remotePath string) return remotePath, nil } -func translateDirectoryPath(literal, localFullPath, localRelPath, remotePath string) (string, error) { - info, err := os.Stat(localFullPath) +func (r *rewriteContext) translateDirectoryPath(literal, localFullPath, localRelPath, remotePath string) (string, error) { + info, err := r.b.BundleRoot.Stat(localRelPath) if err != nil { return "", err } @@ -149,20 +158,20 @@ func translateDirectoryPath(literal, localFullPath, localRelPath, remotePath str return remotePath, nil } -func translateNoOp(literal, localFullPath, localRelPath, remotePath string) (string, error) { +func (r *rewriteContext) translateNoOp(literal, localFullPath, localRelPath, remotePath string) (string, error) { return localRelPath, nil } -func translateNoOpWithPrefix(literal, localFullPath, localRelPath, remotePath string) (string, error) { +func (r *rewriteContext) translateNoOpWithPrefix(literal, localFullPath, localRelPath, remotePath string) (string, error) { if !strings.HasPrefix(localRelPath, ".") { localRelPath = "." + string(filepath.Separator) + localRelPath } return localRelPath, nil } -func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) { +func (r *rewriteContext) rewriteValue(p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) { out := v.MustString() - err := m.rewritePath(dir, b, &out, fn) + err := r.rewritePath(dir, &out, fn) if err != nil { if target := (&ErrIsNotebook{}); errors.As(err, target) { return dyn.InvalidValue, fmt.Errorf(`expected a file for "%s" but got a notebook: %w`, p, target) @@ -176,15 +185,15 @@ func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, return dyn.NewValue(out, v.Location()), nil } -func (m *translatePaths) rewriteRelativeTo(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir, fallback string) (dyn.Value, error) { - nv, err := m.rewriteValue(b, p, v, fn, dir) +func (r *rewriteContext) rewriteRelativeTo(p dyn.Path, v dyn.Value, fn rewriteFunc, dir, fallback string) (dyn.Value, error) { + nv, err := r.rewriteValue(p, v, fn, dir) if err == nil { return nv, nil } // If we failed to rewrite the path, try to rewrite it relative to the fallback directory. if fallback != "" { - nv, nerr := m.rewriteValue(b, p, v, fn, fallback) + nv, nerr := r.rewriteValue(p, v, fn, fallback) if nerr == nil { // TODO: Emit a warning that this path should be rewritten. return nv, nil @@ -195,16 +204,19 @@ func (m *translatePaths) rewriteRelativeTo(b *bundle.Bundle, p dyn.Path, v dyn.V } func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnostics { - m.seen = make(map[string]string) + r := &rewriteContext{ + b: b, + seen: make(map[string]string), + } err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { var err error - for _, fn := range []func(*bundle.Bundle, dyn.Value) (dyn.Value, error){ - m.applyJobTranslations, - m.applyPipelineTranslations, - m.applyArtifactTranslations, + for _, fn := range []func(dyn.Value) (dyn.Value, error){ + r.applyJobTranslations, + r.applyPipelineTranslations, + r.applyArtifactTranslations, } { - v, err = fn(b, v) + v, err = fn(v) if err != nil { return dyn.InvalidValue, err } diff --git a/bundle/config/mutator/translate_paths_artifacts.go b/bundle/config/mutator/translate_paths_artifacts.go index 7bda04eece..9b6e20c532 100644 --- a/bundle/config/mutator/translate_paths_artifacts.go +++ b/bundle/config/mutator/translate_paths_artifacts.go @@ -3,11 +3,10 @@ package mutator import ( "fmt" - "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/dyn" ) -func (m *translatePaths) applyArtifactTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) { +func (r *rewriteContext) applyArtifactTranslations(v dyn.Value) (dyn.Value, error) { var err error // Base pattern to match all artifacts. @@ -22,7 +21,7 @@ func (m *translatePaths) applyArtifactTranslations(b *bundle.Bundle, v dyn.Value }{ { base.Append(dyn.Key("path")), - translateNoOp, + r.translateNoOp, }, } { v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { @@ -32,7 +31,7 @@ func (m *translatePaths) applyArtifactTranslations(b *bundle.Bundle, v dyn.Value return dyn.InvalidValue, fmt.Errorf("unable to determine directory for artifact %s: %w", key, err) } - return m.rewriteRelativeTo(b, p, v, t.fn, dir, "") + return r.rewriteRelativeTo(p, v, t.fn, dir, "") }) if err != nil { return dyn.InvalidValue, err diff --git a/bundle/config/mutator/translate_paths_jobs.go b/bundle/config/mutator/translate_paths_jobs.go index 58b5e0fb08..e4d5ab7eb2 100644 --- a/bundle/config/mutator/translate_paths_jobs.go +++ b/bundle/config/mutator/translate_paths_jobs.go @@ -4,7 +4,6 @@ import ( "fmt" "slices" - "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/libs/dyn" ) @@ -19,42 +18,42 @@ func noSkipRewrite(string) bool { return false } -func rewritePatterns(base dyn.Pattern) []jobRewritePattern { +func rewritePatterns(r *rewriteContext, base dyn.Pattern) []jobRewritePattern { return []jobRewritePattern{ { base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")), - translateNotebookPath, + r.translateNotebookPath, noSkipRewrite, }, { base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")), - translateFilePath, + r.translateFilePath, noSkipRewrite, }, { base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")), - translateDirectoryPath, + r.translateDirectoryPath, noSkipRewrite, }, { base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")), - translateFilePath, + r.translateFilePath, noSkipRewrite, }, { base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")), - translateNoOp, + r.translateNoOp, noSkipRewrite, }, { base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")), - translateNoOp, + r.translateNoOp, noSkipRewrite, }, } } -func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) { +func (r *rewriteContext) applyJobTranslations(v dyn.Value) (dyn.Value, error) { fallback, err := gatherFallbackPaths(v, "jobs") if err != nil { return dyn.InvalidValue, err @@ -62,7 +61,7 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy // Do not translate job task paths if using Git source var ignore []string - for key, job := range b.Config.Resources.Jobs { + for key, job := range r.b.Config.Resources.Jobs { if job.GitSource != nil { ignore = append(ignore, key) } @@ -90,14 +89,14 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy dyn.Key("dependencies"), dyn.AnyIndex(), ), - translateNoOpWithPrefix, + r.translateNoOpWithPrefix, func(s string) bool { return !libraries.IsEnvironmentDependencyLocal(s) }, }, } - taskPatterns := rewritePatterns(base) - forEachPatterns := rewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task"))) + taskPatterns := rewritePatterns(r, base) + forEachPatterns := rewritePatterns(r, base.Append(dyn.Key("for_each_task"), dyn.Key("task"))) allPatterns := append(taskPatterns, jobEnvironmentsPatterns...) allPatterns = append(allPatterns, forEachPatterns...) @@ -119,7 +118,7 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy if t.skipRewrite(sv) { return v, nil } - return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key]) + return r.rewriteRelativeTo(p, v, t.fn, dir, fallback[key]) }) if err != nil { return dyn.InvalidValue, err diff --git a/bundle/config/mutator/translate_paths_pipelines.go b/bundle/config/mutator/translate_paths_pipelines.go index 5b2a2c346e..427465ef8d 100644 --- a/bundle/config/mutator/translate_paths_pipelines.go +++ b/bundle/config/mutator/translate_paths_pipelines.go @@ -3,11 +3,10 @@ package mutator import ( "fmt" - "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/dyn" ) -func (m *translatePaths) applyPipelineTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) { +func (r *rewriteContext) applyPipelineTranslations(v dyn.Value) (dyn.Value, error) { fallback, err := gatherFallbackPaths(v, "pipelines") if err != nil { return dyn.InvalidValue, err @@ -28,11 +27,11 @@ func (m *translatePaths) applyPipelineTranslations(b *bundle.Bundle, v dyn.Value }{ { base.Append(dyn.Key("notebook"), dyn.Key("path")), - translateNotebookPath, + r.translateNotebookPath, }, { base.Append(dyn.Key("file"), dyn.Key("path")), - translateFilePath, + r.translateFilePath, }, } { v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { @@ -42,7 +41,7 @@ func (m *translatePaths) applyPipelineTranslations(b *bundle.Bundle, v dyn.Value return dyn.InvalidValue, fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err) } - return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key]) + return r.rewriteRelativeTo(p, v, t.fn, dir, fallback[key]) }) if err != nil { return dyn.InvalidValue, err diff --git a/bundle/config/mutator/translate_paths_test.go b/bundle/config/mutator/translate_paths_test.go index 29afb99725..1ffcd0a9d2 100644 --- a/bundle/config/mutator/translate_paths_test.go +++ b/bundle/config/mutator/translate_paths_test.go @@ -12,6 +12,7 @@ import ( "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/vfs" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/pipelines" @@ -37,7 +38,9 @@ func touchEmptyFile(t *testing.T, path string) { func TestTranslatePathsSkippedWithGitSource(t *testing.T) { dir := t.TempDir() b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -107,7 +110,9 @@ func TestTranslatePaths(t *testing.T) { touchEmptyFile(t, filepath.Join(dir, "dist", "task.jar")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -274,7 +279,9 @@ func TestTranslatePathsInSubdirectories(t *testing.T) { touchEmptyFile(t, filepath.Join(dir, "job", "my_dbt_project", "dbt_project.yml")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -368,7 +375,9 @@ func TestTranslatePathsOutsideBundleRoot(t *testing.T) { dir := t.TempDir() b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -394,14 +403,16 @@ func TestTranslatePathsOutsideBundleRoot(t *testing.T) { bundletest.SetLocation(b, ".", filepath.Join(dir, "../resource.yml")) diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths()) - assert.ErrorContains(t, diags.Error(), "is not contained in bundle root") + assert.ErrorContains(t, diags.Error(), "is not contained in sync root path") } func TestJobNotebookDoesNotExistError(t *testing.T) { dir := t.TempDir() b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ @@ -431,7 +442,9 @@ func TestJobFileDoesNotExistError(t *testing.T) { dir := t.TempDir() b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ @@ -461,7 +474,9 @@ func TestPipelineNotebookDoesNotExistError(t *testing.T) { dir := t.TempDir() b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ @@ -491,7 +506,9 @@ func TestPipelineFileDoesNotExistError(t *testing.T) { dir := t.TempDir() b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ @@ -522,7 +539,9 @@ func TestJobSparkPythonTaskWithNotebookSourceError(t *testing.T) { touchNotebookFile(t, filepath.Join(dir, "my_notebook.py")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -556,7 +575,9 @@ func TestJobNotebookTaskWithFileSourceError(t *testing.T) { touchEmptyFile(t, filepath.Join(dir, "my_file.py")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -590,7 +611,9 @@ func TestPipelineNotebookLibraryWithFileSourceError(t *testing.T) { touchEmptyFile(t, filepath.Join(dir, "my_file.py")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -624,7 +647,9 @@ func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) { touchNotebookFile(t, filepath.Join(dir, "my_notebook.py")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Workspace: config.Workspace{ FilePath: "/bundle", @@ -659,7 +684,9 @@ func TestTranslatePathJobEnvironments(t *testing.T) { touchEmptyFile(t, filepath.Join(dir, "env2.py")) b := &bundle.Bundle{ - RootPath: dir, + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index 8d6efdae3d..a308668d3b 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -6,7 +6,6 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/sync" - "github.com/databricks/cli/libs/vfs" ) func GetSync(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.Sync, error) { @@ -29,7 +28,7 @@ func GetSyncOptions(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.SyncOp } opts := &sync.SyncOptions{ - LocalPath: vfs.MustNew(rb.RootPath()), + LocalPath: rb.BundleRoot(), RemotePath: rb.Config().Workspace.FilePath, Include: includes, Exclude: rb.Config().Sync.Exclude, diff --git a/bundle/deploy/state_update_test.go b/bundle/deploy/state_update_test.go index dd8a1336ec..a7dfc1fb87 100644 --- a/bundle/deploy/state_update_test.go +++ b/bundle/deploy/state_update_test.go @@ -10,6 +10,7 @@ import ( "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/internal/build" "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/vfs" databrickscfg "github.com/databricks/databricks-sdk-go/config" "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/iam" @@ -21,8 +22,11 @@ import ( func TestStateUpdate(t *testing.T) { s := &stateUpdate{} + dir := t.TempDir() b := &bundle.Bundle{ - RootPath: t.TempDir(), + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Bundle: config.Bundle{ Target: "default", @@ -81,8 +85,11 @@ func TestStateUpdate(t *testing.T) { func TestStateUpdateWithExistingState(t *testing.T) { s := &stateUpdate{} + dir := t.TempDir() b := &bundle.Bundle{ - RootPath: t.TempDir(), + RootPath: dir, + BundleRoot: vfs.MustNew(dir), + BundleRootRelative: ".", Config: config.Root{ Bundle: config.Bundle{ Target: "default", diff --git a/bundle/mutator.go b/bundle/mutator.go index 6c9968aacd..64db20e1d2 100644 --- a/bundle/mutator.go +++ b/bundle/mutator.go @@ -20,6 +20,11 @@ type Mutator interface { func Apply(ctx context.Context, b *Bundle, m Mutator) diag.Diagnostics { ctx = log.NewContext(ctx, log.GetLogger(ctx).With("mutator", m.Name())) + log.Tracef(ctx, "mutator:entry") + defer func() { + log.Tracef(ctx, "mutator:exit") + }() + log.Debugf(ctx, "Apply") err := b.Config.MarkMutatorEntry(ctx) diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index ded2e19808..b23ad6f966 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -40,6 +40,10 @@ func Initialize() bundle.Mutator { mutator.ProcessTargetMode(), mutator.DefaultQueueing(), mutator.ExpandPipelineGlobPaths(), + + // Configure use of WSFS for reads if the CLI is running on Databricks. + mutator.ConfigureWsfs(), + mutator.TranslatePaths(), python.WrapperWarning(), permissions.ApplyBundlePermissions(), diff --git a/experimental/log2cte/main.go b/experimental/log2cte/main.go new file mode 100644 index 0000000000..19555664bc --- /dev/null +++ b/experimental/log2cte/main.go @@ -0,0 +1,220 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "log" + "os" + "strings" + "time" +) + +type TraceEvent struct { + Name string `json:"name"` + Phase string `json:"ph"` + Timestamp int64 `json:"ts"` + PID int `json:"pid"` + TID int `json:"tid"` + Duration int64 `json:"dur,omitempty"` + Args map[string]interface{} `json:"args,omitempty"` +} + +type TraceEvents struct { + TraceEvents []TraceEvent `json:"traceEvents"` +} + +var ( + events []TraceEvent + // processID = 1 // Arbitrary process ID + // threadID = 1 // Arbitrary thread ID + // startTimes = make(map[string]int64) +) + +// func startEvent(name string) { +// startTimes[name] = time.Now().UnixNano() / 1000 // Convert to microseconds +// } + +// func endEvent(name string) { +// startTime, ok := startTimes[name] +// if !ok { +// return // No matching start event +// } +// endTime := time.Now().UnixNano() / 1000 // Convert to microseconds +// duration := endTime - startTime + +// events = append(events, TraceEvent{ +// Name: name, +// Phase: "X", // Complete event +// Timestamp: startTime, +// PID: processID, +// TID: threadID, +// Duration: duration, +// }) +// } + +func writeTraceFile(filename string) error { + trace := TraceEvents{ + TraceEvents: events, + } + file, err := os.Create(filename) + if err != nil { + return err + } + defer file.Close() + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + return encoder.Encode(trace) +} + +// DuplicateKeysMap is a custom structure to hold values of duplicate keys +type DuplicateKeysMap map[string][]any + +// UnmarshalJSON custom unmarshaler to handle duplicate keys +func (d *DuplicateKeysMap) UnmarshalJSON(data []byte) error { + dec := json.NewDecoder(bytes.NewReader(data)) + _, err := dec.Token() // consume the opening '{' + if err != nil { + return err + } + + if *d == nil { + *d = make(DuplicateKeysMap) + } + + for dec.More() { + t, err := dec.Token() + if err != nil { + return err + } + + key := t.(string) + + var value any + if err := dec.Decode(&value); err != nil { + return err + } + + (*d)[key] = append((*d)[key], value) + } + + _, err = dec.Token() // consume the closing '}' + return err + +} + +func main() { + var entries []DuplicateKeysMap + + f, err := os.Open(os.Args[1]) + if err != nil { + panic(err) + } + + dec := json.NewDecoder(f) + for { + var entry = new(DuplicateKeysMap) + if err := dec.Decode(&entry); err != nil { + if err == io.EOF { + break + } + + panic(err) + } + + entries = append(entries, *entry) + } + + startTimes := make(map[string]int64) + for _, entry := range entries { + var mutators []string + var t time.Time + var msg string + var level string + var sdk bool + + for key, values := range entry { + switch key { + case "mutator": + for _, value := range values { + mutators = append(mutators, value.(string)) + } + case "time": + // Parse the following: + // "2024-05-30T18:19:29.172811239Z" + t, err = time.Parse(time.RFC3339Nano, values[0].(string)) + case "msg": + msg = values[0].(string) + case "level": + level = values[0].(string) + case "sdk": + sdk = values[0].(bool) + } + } + + if len(mutators) == 0 { + log.Printf("No mutators found in entry: %v", entry) + continue + } + + key := strings.Join(mutators, ", ") + if level == "TRACE" { + switch msg { + case "mutator:entry": + startTimes[key] = t.UnixNano() + case "mutator:exit": + startTime, ok := startTimes[key] + if !ok { + panic("No matching start event") + } + durationMicros := (t.UnixNano() - startTime) / 1000 + events = append(events, TraceEvent{ + Name: mutators[len(mutators)-1], + Phase: "X", // Complete event + Timestamp: startTime / 1000, + PID: 1, // Arbitrary process ID + TID: 1, // Arbitrary thread ID + Duration: durationMicros, + }) + } + continue + } + + if level == "DEBUG" && sdk { + lines := strings.Split(msg, "\n") + events = append(events, TraceEvent{ + Name: lines[0], + Phase: "i", // Complete event + Timestamp: t.UnixMicro(), + PID: 1, // Arbitrary process ID + TID: 1, // Arbitrary thread ID + }) + } + + // Emit start and stop events + } + + // // fmt.Printf("%d entries!\n", len(entries)) + + // // Start custom event + // startEvent("MyCustomEvent") + + // // Simulate some work + // time.Sleep(1 * time.Second) + + // startEvent("otherevent") + + // time.Sleep(1 * time.Second) + + // endEvent("otherevent") + + // time.Sleep(1 * time.Second) + + // // End custom event + // endEvent("MyCustomEvent") + + // Write trace file + if err := writeTraceFile("trace.json"); err != nil { + panic(err) + } +} diff --git a/libs/filer/fs.go b/libs/filer/fs.go index c6dd21de52..9b2a075568 100644 --- a/libs/filer/fs.go +++ b/libs/filer/fs.go @@ -14,7 +14,7 @@ type filerFS struct { } // NewFS returns an fs.FS backed by a filer. -func NewFS(ctx context.Context, filer Filer) fs.FS { +func NewFS(ctx context.Context, filer Filer) *filerFS { return &filerFS{ctx: ctx, filer: filer} } diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 41e35d9d12..86b8c6c748 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -50,7 +50,7 @@ func (info wsfsFileInfo) Size() int64 { func (info wsfsFileInfo) Mode() fs.FileMode { switch info.oi.ObjectType { - case workspace.ObjectTypeDirectory: + case workspace.ObjectTypeDirectory, workspace.ObjectTypeRepo: return fs.ModeDir default: return fs.ModePerm @@ -62,7 +62,7 @@ func (info wsfsFileInfo) ModTime() time.Time { } func (info wsfsFileInfo) IsDir() bool { - return info.oi.ObjectType == workspace.ObjectTypeDirectory + return info.Mode() == fs.ModeDir } func (info wsfsFileInfo) Sys() any { diff --git a/libs/filer/workspace_files_extension_client_test.go b/libs/filer/workspace_files_extension_client_test.go new file mode 100644 index 0000000000..686f3c4edb --- /dev/null +++ b/libs/filer/workspace_files_extension_client_test.go @@ -0,0 +1,75 @@ +package filer_test + +import ( + "context" + "log/slog" + "os" + "testing" + + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/log/handler" + "github.com/databricks/cli/libs/sync" + "github.com/databricks/cli/libs/vfs" + "github.com/databricks/databricks-sdk-go" +) + +func initLogger(t *testing.T, ctx context.Context) context.Context { + opts := handler.Options{} + opts.Level = log.LevelTrace + opts.ReplaceAttr = log.ReplaceAttrFunctions{ + log.ReplaceLevelAttr, + log.ReplaceSourceAttr, + }.ReplaceAttr + + h := handler.NewFriendlyHandler(os.Stderr, &opts) + + ctx = log.NewContext(ctx, slog.New(h)) + return ctx +} + +func TestExtension(t *testing.T) { + ctx := initLogger(t, context.Background()) + + root := "/Workspace/Users/jingting.lu@databricks.com/dais-cow-bff-5" + w := databricks.Must(databricks.NewWorkspaceClient()) + + // f, err := NewWorkspaceFilesExtensionsClient(w, p) + // if err != nil { + // t.Fatal(err) + // } + + // If so, swap out vfs.Path instance of the sync root with one that + // makes all Workspace File System interactions extension aware. + p, err := vfs.NewFilerPath(ctx, root, func(path string) (filer.Filer, error) { + return filer.NewWorkspaceFilesExtensionsClient(w, path) + }) + if err != nil { + t.Fatal(err) + } + + opts := sync.SyncOptions{ + LocalPath: p, + RemotePath: "/Workspace/foobar", + Host: w.Config.Host, + + Full: false, + + WorkspaceClient: w, + } + + s, err := sync.New(ctx, opts) + if err != nil { + t.Fatal(err) + } + + s.GetFileList(ctx) + + // entries, err := f.ReadDir(ctx, ".") + // if err != nil { + // t.Fatal(err) + // } + // + // t.Log(entries) + +} diff --git a/libs/git/repository.go b/libs/git/repository.go index 86d56a7fcf..f7141f8f28 100644 --- a/libs/git/repository.go +++ b/libs/git/repository.go @@ -109,9 +109,11 @@ func (r *Repository) loadConfig() error { if err != nil { return fmt.Errorf("unable to load user specific gitconfig: %w", err) } - err = config.loadFile(r.root, ".git/config") - if err != nil { - return fmt.Errorf("unable to load repository specific gitconfig: %w", err) + if r.real { + err = config.loadFile(r.root, ".git/config") + if err != nil { + return fmt.Errorf("unable to load repository specific gitconfig: %w", err) + } } r.config = config return nil diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 585e8a887e..2d18034cf8 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -197,23 +197,23 @@ func (s *Sync) GetFileList(ctx context.Context) ([]fileset.File, error) { } all.Add(gitFiles...) - include, err := s.includeFileSet.All() - if err != nil { - log.Errorf(ctx, "cannot list include files: %s", err) - return nil, err - } - - all.Add(include...) - - exclude, err := s.excludeFileSet.All() - if err != nil { - log.Errorf(ctx, "cannot list exclude files: %s", err) - return nil, err - } - - for _, f := range exclude { - all.Remove(f) - } + // include, err := s.includeFileSet.All() + // if err != nil { + // log.Errorf(ctx, "cannot list include files: %s", err) + // return nil, err + // } + + // all.Add(include...) + + // exclude, err := s.excludeFileSet.All() + // if err != nil { + // log.Errorf(ctx, "cannot list exclude files: %s", err) + // return nil, err + // } + + // for _, f := range exclude { + // all.Remove(f) + // } return all.Iter(), nil } diff --git a/libs/vfs/filer.go b/libs/vfs/filer.go new file mode 100644 index 0000000000..1d3193ec95 --- /dev/null +++ b/libs/vfs/filer.go @@ -0,0 +1,66 @@ +package vfs + +import ( + "context" + "io/fs" + "path" + + "github.com/databricks/cli/libs/filer" +) + +type filerPath struct { + ctx context.Context + path string + fs FS + + construct func(path string) (filer.Filer, error) +} + +func NewFilerPath(ctx context.Context, path string, construct func(path string) (filer.Filer, error)) (Path, error) { + f, err := construct(path) + if err != nil { + return nil, err + } + + return &filerPath{ + ctx: ctx, + path: path, + fs: filer.NewFS(ctx, f), + + construct: construct, + }, nil +} + +func (f filerPath) Open(name string) (fs.File, error) { + return f.fs.Open(name) +} + +func (f filerPath) Stat(name string) (fs.FileInfo, error) { + return f.fs.Stat(name) +} + +func (f filerPath) ReadDir(name string) ([]fs.DirEntry, error) { + return f.fs.ReadDir(name) +} + +func (f filerPath) ReadFile(name string) ([]byte, error) { + return f.fs.ReadFile(name) +} + +func (f filerPath) Parent() Path { + if f.path == "/" { + return nil + } + + dir := path.Dir(f.path) + nf, err := NewFilerPath(f.ctx, dir, f.construct) + if err != nil { + panic(err) + } + + return nf +} + +func (f filerPath) Native() string { + return f.path +} diff --git a/libs/vfs/filer_test.go b/libs/vfs/filer_test.go new file mode 100644 index 0000000000..c842a6e7e2 --- /dev/null +++ b/libs/vfs/filer_test.go @@ -0,0 +1,80 @@ +package vfs + +import ( + "context" + "errors" + "io/fs" + "os" + "path" + "path/filepath" + "strings" + "testing" + + "github.com/databricks/cli/libs/filer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFilerPath(t *testing.T) { + ctx := context.Background() + wd, err := os.Getwd() + require.NoError(t, err) + + // Create a new filer-backed path. + p, err := NewFilerPath(ctx, filepath.FromSlash(wd), filer.NewLocalClient) + require.NoError(t, err) + + // Open self. + f, err := p.Open("filer_test.go") + require.NoError(t, err) + defer f.Close() + + // Run stat on self. + s, err := f.Stat() + require.NoError(t, err) + assert.Equal(t, "filer_test.go", s.Name()) + assert.GreaterOrEqual(t, int(s.Size()), 128) + + // Read some bytes. + buf := make([]byte, 1024) + _, err = f.Read(buf) + require.NoError(t, err) + assert.True(t, strings.HasPrefix(string(buf), "package vfs\n")) + + // Open non-existent file. + _, err = p.Open("doesntexist_test.go") + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // Stat self. + s, err = p.Stat("filer_test.go") + require.NoError(t, err) + assert.Equal(t, "filer_test.go", s.Name()) + assert.GreaterOrEqual(t, int(s.Size()), 128) + + // Stat non-existent file. + _, err = p.Stat("doesntexist_test.go") + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // ReadDir self. + entries, err := p.ReadDir(".") + require.NoError(t, err) + assert.GreaterOrEqual(t, len(entries), 1) + + // ReadDir non-existent directory. + _, err = p.ReadDir("doesntexist") + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // ReadFile self. + buf, err = p.ReadFile("filer_test.go") + require.NoError(t, err) + assert.True(t, strings.HasPrefix(string(buf), "package vfs\n")) + + // ReadFile non-existent file. + _, err = p.ReadFile("doesntexist_test.go") + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // Parent self. + pp := p.Parent() + require.NotNil(t, pp) + assert.Equal(t, path.Join(pp.Native(), "vfs"), p.Native()) +}