diff --git a/CHANGELOG.md b/CHANGELOG.md index 71e7b08b2f89..132cab106c04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ v0.40.3 (2024-03-14) - Fix a bug where structured metadata and parsed field are not passed further in `loki.source.api` (@marchellodev) +- Change `import.git` to use Git pulls rather than fetches to fix scenarios where the local code did not get updated. (@mattdurham) + ### Other changes - Upgrade to Go 1.22.1 (@thampiotr) diff --git a/docs/sources/flow/reference/components/module.git.md b/docs/sources/flow/reference/components/module.git.md index 44bdee36a034..90b8dae130c1 100644 --- a/docs/sources/flow/reference/components/module.git.md +++ b/docs/sources/flow/reference/components/module.git.md @@ -41,12 +41,12 @@ module.git "LABEL" { The following arguments are supported: -Name | Type | Description | Default | Required ----- | ---- | ----------- | ------- | -------- -`repository` | `string` | The Git repository address to retrieve the module from. | | yes -`revision` | `string` | The Git revision to retrieve the module from. | `"HEAD"` | no -`path` | `string` | The path in the repository where the module is stored. | | yes -`pull_frequency` | `duration` | The frequency to pull the repository for updates. | `"60s"` | no +Name | Type | Description | Default | Required +-----------------|------------|---------------------------------------------------------|----------|--------- +`repository` | `string` | The Git repository address to retrieve the module from. | | yes +`revision` | `string` | The Git revision to retrieve the module from. | `"HEAD"` | no +`path` | `string` | The path in the repository where the module is stored. | | yes +`pull_frequency` | `duration` | The frequency to pull the repository for updates. | `"60s"` | no The `repository` attribute must be set to a repository address that would be recognized by Git with a `git clone REPOSITORY_ADDRESS` command, such as diff --git a/internal/vcs/git.go b/internal/vcs/git.go index dece43c10b2f..78857e56d87c 100644 --- a/internal/vcs/git.go +++ b/internal/vcs/git.go @@ -29,7 +29,7 @@ type GitRepo struct { // managed at storagePath. // // If storagePath is empty on disk, NewGitRepo initializes GitRepo by cloning -// the repository. Otherwise, NewGitRepo will do a fetch. +// the repository. Otherwise, NewGitRepo will do a pull. // // After GitRepo is initialized, it checks out to the Revision specified in // GitRepoOptions. @@ -57,13 +57,20 @@ func NewGitRepo(ctx context.Context, storagePath string, opts GitRepoOptions) (* } } - // Fetch the latest contents. This may be a no-op if we just did a clone. - fetchRepoErr := repo.FetchContext(ctx, &git.FetchOptions{ + // Pulls the latest contents. This may be a no-op if we just did a clone. + wt, err := repo.Worktree() + if err != nil { + return nil, DownloadFailedError{ + Repository: opts.Repository, + Inner: err, + } + } + pullRepoErr := wt.PullContext(ctx, &git.PullOptions{ RemoteName: "origin", Force: true, Auth: opts.Auth.Convert(), }) - if fetchRepoErr != nil && !errors.Is(fetchRepoErr, git.NoErrAlreadyUpToDate) { + if pullRepoErr != nil && !errors.Is(pullRepoErr, git.NoErrAlreadyUpToDate) { workTree, err := repo.Worktree() if err != nil { return nil, err @@ -74,7 +81,7 @@ func NewGitRepo(ctx context.Context, storagePath string, opts GitRepoOptions) (* workTree: workTree, }, UpdateFailedError{ Repository: opts.Repository, - Inner: fetchRepoErr, + Inner: pullRepoErr, } } @@ -108,19 +115,19 @@ func isRepoCloned(dir string) bool { return dirError == nil && len(fi) > 0 } -// Update updates the repository by fetching new content and re-checking out to +// Update updates the repository by pulling new content and re-checking out to // latest version of Revision. func (repo *GitRepo) Update(ctx context.Context) error { var err error - fetchRepoErr := repo.repo.FetchContext(ctx, &git.FetchOptions{ + pullRepoErr := repo.workTree.PullContext(ctx, &git.PullOptions{ RemoteName: "origin", Force: true, Auth: repo.opts.Auth.Convert(), }) - if fetchRepoErr != nil && !errors.Is(fetchRepoErr, git.NoErrAlreadyUpToDate) { + if pullRepoErr != nil && !errors.Is(pullRepoErr, git.NoErrAlreadyUpToDate) { return UpdateFailedError{ Repository: repo.opts.Repository, - Inner: fetchRepoErr, + Inner: pullRepoErr, } } diff --git a/pkg/flow/import_test.go b/pkg/flow/import_test.go index fd95eacfc949..768b353b76a9 100644 --- a/pkg/flow/import_test.go +++ b/pkg/flow/import_test.go @@ -4,6 +4,7 @@ import ( "context" "io/fs" "os" + "os/exec" "path/filepath" "strings" "sync" @@ -163,6 +164,101 @@ func TestImportError(t *testing.T) { } } +func TestPullUpdating(t *testing.T) { + // Previously we used fetch instead of pull, which would set the FETCH_HEAD but not HEAD + // This caused changes not to propagate if there were changes, since HEAD was pinned to whatever it was on the initial download. + // Switching to pull removes this problem at the expense of network bandwidth. + // Tried switching to FETCH_HEAD but FETCH_HEAD is only set on fetch and not initial repo clone so we would need to + // remember to always call fetch after clone. + // + // This test ensures we can pull the correct values down if they update no matter what, it works by creating a local + // file based git repo then committing a file, running the component, then updating the file in the repo. + testRepo := t.TempDir() + + contents := `declare "add" { + argument "a" {} + argument "b" {} + + export "sum" { + value = argument.a.value + argument.b.value + } +}` + main := ` +import.git "testImport" { + repository = "` + testRepo + `" + path = "math.river" + pull_frequency = "5s" +} + +testImport.add "cc" { + a = 1 + b = 1 +} +` + init := exec.Command("git", "init", testRepo) + err := init.Run() + require.NoError(t, err) + math := filepath.Join(testRepo, "math.river") + err = os.WriteFile(math, []byte(contents), 0666) + require.NoError(t, err) + add := exec.Command("git", "add", ".") + add.Dir = testRepo + err = add.Run() + require.NoError(t, err) + commit := exec.Command("git", "commit", "-m \"test\"") + commit.Dir = testRepo + err = commit.Run() + require.NoError(t, err) + + defer verifyNoGoroutineLeaks(t) + ctrl, f := setup(t, main) + err = ctrl.LoadSource(f, nil) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + ctrl.Run(ctx) + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") + return export["sum"] == 2 + }, 3*time.Second, 10*time.Millisecond) + + contentsMore := `declare "add" { + argument "a" {} + argument "b" {} + + export "sum" { + value = argument.a.value + argument.b.value + 1 + } +}` + err = os.WriteFile(math, []byte(contentsMore), 0666) + require.NoError(t, err) + add2 := exec.Command("git", "add", ".") + add2.Dir = testRepo + add2.Run() + + commit2 := exec.Command("git", "commit", "-m \"test2\"") + commit2.Dir = testRepo + commit2.Run() + + // Check for final condition. + require.Eventually(t, func() bool { + export := getExport[map[string]interface{}](t, ctrl, "", "testImport.add.cc") + return export["sum"] == 3 + }, 20*time.Second, 1*time.Millisecond) +} + func testConfig(t *testing.T, config string, reloadConfig string, update func()) { defer verifyNoGoroutineLeaks(t) ctrl, f := setup(t, config)