diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup.go b/cmd/zoekt-sourcegraph-indexserver/cleanup.go index ef01a4f6..ea5f3cd2 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup.go @@ -1,7 +1,9 @@ package main import ( + "errors" "fmt" + "io/fs" "log" "os" "os/exec" @@ -92,23 +94,7 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) } log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " ")) - // We may be in both normal and compound shards in this case. First - // tombstone the compound shards so we don't just rm them. - simple := shards[:0] - for _, s := range shards { - if shardMerging && maybeSetTombstone([]shard{s}, repo) { - shardsLog(indexDir, "tombname", []shard{s}) - } else { - simple = append(simple, s) - } - } - - if len(simple) == 0 { - continue - } - - removeAll(simple...) - shardsLog(indexDir, "removename", simple) + deleteOrTombstone(indexDir, repo, shardMerging, shards...) } // index: Move missing repos from trash into index @@ -172,6 +158,38 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool) } } + // remove any Zoekt metadata files in the given dir that don't have an + // associated shard file + metaFiles, err := filepath.Glob(filepath.Join(indexDir, "*.meta")) + if err != nil { + log.Printf("failed to glob %q for stranded metadata files: %s", indexDir, err) + } else { + for _, metaFile := range metaFiles { + shard := strings.TrimSuffix(metaFile, ".meta") + _, err := os.Stat(shard) + if err == nil { + // metadata file has associated shard + continue + } + + if !errors.Is(err, fs.ErrNotExist) { + log.Printf("failed to stat metadata file %q: %s", metaFile, err) + continue + } + + // metadata doesn't have an associated shard file, remove the metadata file + + err = os.Remove(metaFile) + if err != nil { + log.Printf("failed to remove stranded metadata file %q: %s", metaFile, err) + continue + } else { + log.Printf("removed stranded metadata file: %s", metaFile) + } + + } + } + metricCleanupDuration.Observe(time.Since(start).Seconds()) } @@ -292,11 +310,43 @@ func removeAll(shards ...shard) { // potential for a partial index for a repo. However, this should be // exceedingly rare due to it being a mix of partial failure on something in // trash + an admin re-adding a repository. - for _, shard := range shards { + + if len(shards) == 0 { + return + } + + // Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic + // works correctly + ensure that we don't leave behind partial state. + // + // Example: - repoA_v16.00002.zoekt + // - repoA_v16.00001.zoekt + // - repoA_v16.00000.zoekt + // + // zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard + // is present (repoA_v16.00000.zoekt). + // - If it's present, then it gathers all rest of the shards names in ascending order (...00001.zoekt, ...00002.zoekt). + // - If it's missing, then zoekt assumes that it never indexed "repoA" (the remaining data from shards 1 & 2 is effectively invisible) + // + // If this function were to crash while deleting repoA, and we only deleted the 0th shard, then : + // - zoekt would think that there is no data for that repository (despite the partial data from + // - it's possible for zoekt to show inconsistent state when re-indexing the repository (zoekt incorrectly + // associates the data from shards 1 and 2 with the "new" shard 0 data (from a newer commit)) + // + // Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent + // state behind even if we crash. + + sortedShards := append([]shard{}, shards...) + + sort.Slice(sortedShards, func(i, j int) bool { + return sortedShards[i].Path > sortedShards[j].Path + }) + + for _, shard := range sortedShards { paths, err := zoekt.IndexFilePaths(shard.Path) if err != nil { debug.Printf("failed to remove shard %s: %v", shard.Path, err) } + for _, p := range paths { if err := os.Remove(p); err != nil { debug.Printf("failed to remove shard file %s: %v", p, err) @@ -515,3 +565,26 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) { } return tombstones, nil } + +// deleteOrTombstone deletes the provided shards in indexDir that are associated with +// the given repoID. +// +// If one of the provided shards is a compound shard and the repository is contained within it, +// the repository is tombstoned instead. +func deleteOrTombstone(indexDir string, repoID uint32, shardMerging bool, shards ...shard) { + var simple []shard + for _, s := range shards { + if shardMerging && maybeSetTombstone([]shard{s}, repoID) { + shardsLog(indexDir, "tombname", []shard{s}) + } else { + simple = append(simple, s) + } + } + + if len(simple) == 0 { + return + } + + removeAll(simple...) + shardsLog(indexDir, "removename", simple) +} diff --git a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go index f6e53280..96540413 100644 --- a/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/cleanup_test.go @@ -19,6 +19,7 @@ import ( ) func TestCleanup(t *testing.T) { + mk := func(name string, n int, mtime time.Time) shard { return shard{ RepoID: fakeID(name), @@ -28,6 +29,11 @@ func TestCleanup(t *testing.T) { RepoTombstone: false, } } + + mkMeta := func(name string, n int) string { + return fmt.Sprintf("%s_v%d.%05d.zoekt.meta", url.QueryEscape(name), 15, n) + } + // We don't use getShards so that we have two implementations of the same // thing (ie pick up bugs in one) glob := func(pattern string) []shard { @@ -56,14 +62,16 @@ func TestCleanup(t *testing.T) { recent := now.Add(-time.Hour) old := now.Add(-25 * time.Hour) cases := []struct { - name string - repos []string - index []shard - trash []shard - tmps []string - - wantIndex []shard - wantTrash []shard + name string + repos []string + indexMetaFiles []string + index []shard + trash []shard + tmps []string + + wantIndex []shard + wantIndexMetaFiles []string + wantTrash []shard }{{ name: "noop", }, { @@ -96,6 +104,13 @@ func TestCleanup(t *testing.T) { index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)}, wantIndex: []shard{mk("foo", 0, recent)}, wantTrash: []shard{mk("bar", 0, now)}, + }, { + name: "remove metafiles with no associated shards", + repos: []string{"foo", "bar"}, + index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)}, + indexMetaFiles: []string{mkMeta("foo", 0), mkMeta("foo", 1), mkMeta("bar", 0)}, + wantIndex: []shard{mk("foo", 0, recent), mk("bar", 0, recent)}, + wantIndexMetaFiles: []string{mkMeta("foo", 0), mkMeta("bar", 0)}, }, { name: "clean old .tmp files", tmps: []string{"recent.tmp", "old.tmp"}, @@ -134,6 +149,12 @@ func TestCleanup(t *testing.T) { t.Fatal(err) } } + for _, f := range tt.indexMetaFiles { + path := filepath.Join(dir, f) + if _, err := os.Create(path); err != nil { + t.Fatal(err) + } + } var repoIDs []uint32 for _, name := range tt.repos { @@ -141,12 +162,41 @@ func TestCleanup(t *testing.T) { } cleanup(dir, repoIDs, now, false) - if d := cmp.Diff(tt.wantIndex, glob(filepath.Join(dir, "*.zoekt"))); d != "" { + actualIndexShards := glob(filepath.Join(dir, "*.zoekt")) + + sort.Slice(actualIndexShards, func(i, j int) bool { + return actualIndexShards[i].Path < actualIndexShards[j].Path + }) + sort.Slice(tt.wantIndex, func(i, j int) bool { + return tt.wantIndex[i].Path < tt.wantIndex[j].Path + }) + + if d := cmp.Diff(tt.wantIndex, actualIndexShards); d != "" { t.Errorf("unexpected index (-want, +got):\n%s", d) } - if d := cmp.Diff(tt.wantTrash, glob(filepath.Join(dir, ".trash", "*.zoekt"))); d != "" { + + actualTrashShards := glob(filepath.Join(dir, ".trash", "*.zoekt")) + + sort.Slice(actualTrashShards, func(i, j int) bool { + return actualTrashShards[i].Path < actualTrashShards[j].Path + }) + + sort.Slice(tt.wantTrash, func(i, j int) bool { + return tt.wantTrash[i].Path < tt.wantTrash[j].Path + }) + if d := cmp.Diff(tt.wantTrash, actualTrashShards); d != "" { t.Errorf("unexpected trash (-want, +got):\n%s", d) } + + actualIndexMetaFiles := globBase(filepath.Join(dir, "*.meta")) + + sort.Strings(actualIndexMetaFiles) + sort.Strings(tt.wantIndexMetaFiles) + + if d := cmp.Diff(tt.wantIndexMetaFiles, actualIndexMetaFiles, cmpopts.EquateEmpty()); d != "" { + t.Errorf("unexpected metadata files (-want, +got):\n%s", d) + } + if tmps := globBase(filepath.Join(dir, "*.tmp")); len(tmps) > 0 { t.Errorf("unexpected tmps: %v", tmps) } @@ -455,6 +505,122 @@ func TestCleanupCompoundShards(t *testing.T) { } } +func TestDeleteShards(t *testing.T) { + remainingRepoA := zoekt.Repository{ID: 1, Name: "A"} + remainingRepoB := zoekt.Repository{ID: 2, Name: "B"} + repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"} + + t.Run("delete repository from set of normal shards", func(t *testing.T) { + indexDir := t.TempDir() + + // map of repoID -> list of paths for associated shard files + metadata files + shardFilesMap := make(map[uint32][]string) + + // map of repoID -> list of associated shard structs + shardStructMap := make(map[uint32][]shard) + + // setup: create shards for each repository, and populate the shard map + for _, r := range []zoekt.Repository{ + remainingRepoA, + remainingRepoB, + repositoryToDelete, + } { + shardPaths := createTestNormalShard(t, indexDir, r, 3) + + for _, p := range shardPaths { + // create stub meta file + metaFile := p + ".meta" + f, err := os.Create(metaFile) + if err != nil { + t.Fatalf("creating metadata file %q: %s", metaFile, err) + } + + f.Close() + + shardFilesMap[r.ID] = append(shardFilesMap[r.ID], p, metaFile) + shardStructMap[r.ID] = append(shardStructMap[r.ID], shard{ + RepoID: repositoryToDelete.ID, + RepoName: repositoryToDelete.Name, + Path: p, + }) + } + } + + // run test: delete repository + deleteOrTombstone(indexDir, repositoryToDelete.ID, false, shardStructMap[repositoryToDelete.ID]...) + + // run assertions: gather all the shards + meta files that remain and + // check to see that only the files associated with the "remaining" repositories + // are present + var actualShardFiles []string + + for _, pattern := range []string{"*.zoekt", "*.meta"} { + files, err := filepath.Glob(filepath.Join(indexDir, pattern)) + if err != nil { + t.Fatalf("globbing indexDir: %s", err) + } + + actualShardFiles = append(actualShardFiles, files...) + } + + var expectedShardFiles []string + expectedShardFiles = append(expectedShardFiles, shardFilesMap[remainingRepoA.ID]...) + expectedShardFiles = append(expectedShardFiles, shardFilesMap[remainingRepoB.ID]...) + + sort.Strings(actualShardFiles) + sort.Strings(expectedShardFiles) + + if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" { + t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff) + } + }) + + t.Run("delete repository from compound shard", func(t *testing.T) { + indexDir := t.TempDir() + + // setup: enable shard merging for compound shards + t.Setenv("SRC_ENABLE_SHARD_MERGING", "1") + + // setup: create compound shard with all repositories + repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete} + compoundShard := createTestCompoundShard(t, indexDir, repositories) + + s := shard{ + RepoID: repositoryToDelete.ID, + RepoName: repositoryToDelete.Name, + Path: compoundShard, + } + deleteOrTombstone(indexDir, repositoryToDelete.ID, true, s) + + // verify: read the compound shard, and ensure that only + // the repositories that we expect are in the shard (and the deleted one has been tombstoned) + actualRepositories, _, err := zoekt.ReadMetadataPathAlive(compoundShard) + if err != nil { + t.Fatalf("reading repository metadata from shard: %s", err) + } + + expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB} + + sort.Slice(actualRepositories, func(i, j int) bool { + return actualRepositories[i].ID < actualRepositories[j].ID + }) + + sort.Slice(expectedRepositories, func(i, j int) bool { + return expectedRepositories[i].ID < expectedRepositories[j].ID + }) + + opts := []cmp.Option{ + cmpopts.IgnoreUnexported(zoekt.Repository{}), + cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"), + cmpopts.EquateEmpty(), + } + if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" { + t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff) + } + }) + +} + // createCompoundShard returns a path to a compound shard containing repos with // ids. Use optsFns to overwrite fields of zoekt.Repository for all repos. func createCompoundShard(t *testing.T, dir string, ids []uint32, optFns ...func(in *zoekt.Repository)) string { diff --git a/cmd/zoekt-sourcegraph-indexserver/debug.go b/cmd/zoekt-sourcegraph-indexserver/debug.go index c407e6a3..512bed75 100644 --- a/cmd/zoekt-sourcegraph-indexserver/debug.go +++ b/cmd/zoekt-sourcegraph-indexserver/debug.go @@ -95,6 +95,15 @@ func debugCmd() *ffcli.Command { "wget -q -O - http://localhost:6072/metrics -sS | grep index_shard_merging_running". It is only possible to trigger one merge operation at a time. + wget -q -O - http://localhost:6072/debug/delete?id=[REPOSITORY_ID] + delete all of the shards associated with the given repository id. + + You can find the id associated with a repository via the "/debug/indexed" route. + If you need to delete multiple repositories at once, you can create a small shell pipeline. See the following example + (that removes the first listed repository from the ""/debug/indexed" route for inspiration): + + > wget -q -O - http://localhost:6072/debug/indexed | awk '{print $1}' | tail -n +2 | head -n 1 | xargs -I {} -- wget -q -O - "http://localhost:6072/debug/delete?id={}" + wget -q -O - http://localhost:6072/debug/queue list the repositories in the indexing queue, sorted by descending priority. diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 23b52159..91bef9c9 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -646,6 +646,7 @@ func (s *Server) addDebugHandlers(mux *http.ServeMux) { // on "/". mux.Handle("/", http.HandlerFunc(s.handleReIndex)) + mux.Handle("/debug/delete", http.HandlerFunc(s.handleDebugDelete)) mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed)) mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList)) mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge)) @@ -774,6 +775,42 @@ func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("merging enqueued\n")) } +func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) { + rawID := r.URL.Query().Get("id") + if rawID == "" { + http.Error(w, "URL parameter 'id' must be specified", http.StatusBadRequest) + return + } + + id64, err := strconv.ParseUint(rawID, 10, 32) + if err != nil { + http.Error(w, fmt.Sprintf("failed to parse repository id %q as uint32: %s", rawID, err), http.StatusBadRequest) + return + } + + repoID := uint32(id64) + + repositoryNotFound := false + s.muIndexDir.Global(func() { + shardMap := getShards(s.IndexDir) + shards, ok := shardMap[repoID] + if !ok { + repositoryNotFound = true + return + } + + deleteOrTombstone(s.IndexDir, repoID, s.shardMerging, shards...) + }) + + if repositoryNotFound { + http.Error(w, fmt.Sprintf("repository id %q not found", rawID), http.StatusBadRequest) + return + } + + _, _ = w.Write([]byte(fmt.Sprintf("deleted repository %q\n", rawID))) + +} + func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) { indexed := listIndexed(s.IndexDir) diff --git a/cmd/zoekt-sourcegraph-indexserver/main_test.go b/cmd/zoekt-sourcegraph-indexserver/main_test.go index 5ec4f248..77867ebb 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/main_test.go @@ -4,18 +4,21 @@ import ( "context" "flag" "fmt" - sglog "github.com/sourcegraph/log" - "github.com/sourcegraph/log/logtest" "io" "log" "net/http" "net/http/httptest" "net/url" "os" + "path/filepath" + "strconv" "strings" "testing" "github.com/google/go-cmp/cmp" + sglog "github.com/sourcegraph/log" + "github.com/sourcegraph/log/logtest" + "github.com/sourcegraph/zoekt/build" "github.com/sourcegraph/zoekt" ) @@ -124,6 +127,109 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func createTestNormalShard(t *testing.T, indexDir string, r zoekt.Repository, numShards int, optFns ...func(options *build.Options)) []string { + t.Helper() + + if err := os.MkdirAll(filepath.Dir(indexDir), 0700); err != nil { + t.Fatal(err) + } + + o := build.Options{ + IndexDir: indexDir, + RepositoryDescription: r, + ShardMax: 75, // create a new shard every 75 bytes + } + o.SetDefaults() + + for _, fn := range optFns { + fn(&o) + } + + b, err := build.NewBuilder(o) + if err != nil { + t.Fatalf("NewBuilder: %v", err) + } + + if numShards == 0 { + // We have to make at least 1 shard. + numShards = 1 + } + + for i := 0; i < numShards; i++ { + // Create entries (file + contents) that are ~100 bytes each. + // This (along with our shardMax setting of 75 bytes) means that each shard + // will contain at most one of these. + fileName := strconv.Itoa(i) + document := zoekt.Document{Name: fileName, Content: []byte(strings.Repeat("A", 100))} + for _, branch := range o.RepositoryDescription.Branches { + document.Branches = append(document.Branches, branch.Name) + } + + err := b.Add(document) + if err != nil { + t.Fatalf("failed to add file %q to builder: %s", fileName, err) + } + } + + if err := b.Finish(); err != nil { + t.Fatalf("Finish: %v", err) + } + + return o.FindAllShards() +} + +func createTestCompoundShard(t *testing.T, indexDir string, repositories []zoekt.Repository, optFns ...func(options *build.Options)) string { + t.Helper() + + var shardNames []string + + for _, r := range repositories { + // create an isolated scratch space to store normal shards for this repository + scratchDir := t.TempDir() + + // create shards that'll be merged later + createTestNormalShard(t, scratchDir, r, 1, optFns...) + + // discover file names for all the normal shards we created + // note: this only looks in the immediate 'scratchDir' folder and doesn't recurse + shards, err := filepath.Glob(filepath.Join(scratchDir, "*.zoekt")) + if err != nil { + t.Fatalf("while globbing %q to find normal shards: %s", scratchDir, err) + } + + shardNames = append(shardNames, shards...) + } + + // load the normal shards that we created + var files []zoekt.IndexFile + for _, shard := range shardNames { + f, err := os.Open(shard) + if err != nil { + t.Fatalf("opening shard file: %s", err) + } + defer f.Close() + + indexFile, err := zoekt.NewIndexFile(f) + if err != nil { + t.Fatalf("creating index file: %s", err) + } + defer indexFile.Close() + + files = append(files, indexFile) + } + + // merge all the simple shards into a compound shard + tmpName, dstName, err := zoekt.Merge(indexDir, files...) + if err != nil { + t.Fatalf("merging index files into compound shard: %s", err) + } + if err := os.Rename(tmpName, dstName); err != nil { + t.Fatal(err) + } + + return dstName +} + func TestCreateEmptyShard(t *testing.T) { dir := t.TempDir()