Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexserver: add debug endpoint for deleting repository shards #485

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
109 changes: 91 additions & 18 deletions cmd/zoekt-sourcegraph-indexserver/cleanup.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"errors"
"fmt"
"io/fs"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -92,23 +94,7 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool)
}
log.Printf("removing shards for %v due to multiple repository names: %s", repo, strings.Join(paths, " "))

// We may be in both normal and compound shards in this case. First
// tombstone the compound shards so we don't just rm them.
simple := shards[:0]
for _, s := range shards {
if shardMerging && maybeSetTombstone([]shard{s}, repo) {
shardsLog(indexDir, "tombname", []shard{s})
} else {
simple = append(simple, s)
}
}

if len(simple) == 0 {
continue
}

removeAll(simple...)
shardsLog(indexDir, "removename", simple)
deleteOrTombstone(indexDir, repo, shardMerging, shards...)
}

// index: Move missing repos from trash into index
Expand Down Expand Up @@ -172,6 +158,38 @@ func cleanup(indexDir string, repos []uint32, now time.Time, shardMerging bool)
}
}

// remove any Zoekt metadata files in the given dir that don't have an
// associated shard file
Comment on lines +161 to +162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this unrelated cleanup? IE maybe should be another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I added this functionality since we switched the implementation. Before, the implementation made sure to delete the metadata file before its associated shard file. Since we switched to using zoekt.IndexFilePaths that order isn't guaranteed anymore (leaving open the possibility of a metadata file not having an associated shard if we delete the shard and then crash). I added this additional logic to cleanup so that we would have a background process that would reap those "stranded" files.

I am happy to pull bit into a separate PR though.

metaFiles, err := filepath.Glob(filepath.Join(indexDir, "*.meta"))
if err != nil {
log.Printf("failed to glob %q for stranded metadata files: %s", indexDir, err)
} else {
for _, metaFile := range metaFiles {
shard := strings.TrimSuffix(metaFile, ".meta")
_, err := os.Stat(shard)
if err == nil {
// metadata file has associated shard
continue
}

if !errors.Is(err, fs.ErrNotExist) {
log.Printf("failed to stat metadata file %q: %s", metaFile, err)
continue
}

// metadata doesn't have an associated shard file, remove the metadata file

err = os.Remove(metaFile)
if err != nil {
log.Printf("failed to remove stranded metadata file %q: %s", metaFile, err)
continue
} else {
log.Printf("removed stranded metadata file: %s", metaFile)
}

}
}

metricCleanupDuration.Observe(time.Since(start).Seconds())
}

Expand Down Expand Up @@ -292,11 +310,43 @@ func removeAll(shards ...shard) {
// potential for a partial index for a repo. However, this should be
// exceedingly rare due to it being a mix of partial failure on something in
// trash + an admin re-adding a repository.
for _, shard := range shards {

if len(shards) == 0 {
return
}

// Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic
// works correctly + ensure that we don't leave behind partial state.
//
// Example: - repoA_v16.00002.zoekt
// - repoA_v16.00001.zoekt
// - repoA_v16.00000.zoekt
//
// zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard
// is present (repoA_v16.00000.zoekt).
// - If it's present, then it gathers all rest of the shards names in ascending order (...00001.zoekt, ...00002.zoekt).
// - If it's missing, then zoekt assumes that it never indexed "repoA" (the remaining data from shards 1 & 2 is effectively invisible)
//
// If this function were to crash while deleting repoA, and we only deleted the 0th shard, then :
// - zoekt would think that there is no data for that repository (despite the partial data from
// - it's possible for zoekt to show inconsistent state when re-indexing the repository (zoekt incorrectly
// associates the data from shards 1 and 2 with the "new" shard 0 data (from a newer commit))
//
// Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent
// state behind even if we crash.
Comment on lines +335 to +336
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if we do crash we will have partial state and be none the wiser anyways. So I think this is just extra complexity without making us any safer than before.

In fact I'd argue deleting 0 first is the best bet, since that is what we actually check in other parts. So if that is missing then we end up deleting the other ones. Then adding a check in cleanup to remove stranded indexes would make sense.

At the end of the day we are relying on the fact that os.Remove is super fast. Not ideal, and really we should introduce some other way of being atomic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an extra file that acts as a tombstone?

for a given repo if repo.tombstone.json exists, for all intents and purposes zoekt should treat the repo as deleted. webserver should unload all of its associated shards, and cleanup should have a reaper that would eventually delete all the shards + metafiles before deleting the tombstone file.


sortedShards := append([]shard{}, shards...)

sort.Slice(sortedShards, func(i, j int) bool {
return sortedShards[i].Path > sortedShards[j].Path
})

for _, shard := range sortedShards {
paths, err := zoekt.IndexFilePaths(shard.Path)
if err != nil {
debug.Printf("failed to remove shard %s: %v", shard.Path, err)
}

for _, p := range paths {
if err := os.Remove(p); err != nil {
debug.Printf("failed to remove shard file %s: %v", p, err)
Expand Down Expand Up @@ -515,3 +565,26 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) {
}
return tombstones, nil
}

// deleteOrTombstone deletes the provided shards in indexDir that are associated with
// the given repoID.
//
// If one of the provided shards is a compound shard and the repository is contained within it,
// the repository is tombstoned instead.
func deleteOrTombstone(indexDir string, repoID uint32, shardMerging bool, shards ...shard) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you mentioned you didn't like that we have to pass in indexDir and shardMerging to this func. Maybe instead it should be part of some helper struct that state.

minor nit: I'd make repoID come after shardMerging param. IE the more "static" something is the closer to the front of the arg list. Just a habit from functional languages and currying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it...I'll play around with passing around some sort of helper to see if that's less awkward. Something like:

type ServerConfig struct {
   shardMerging bool
   indexDir path

}

var simple []shard
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you got rid of the filtering hack, I guess just because now that it is a func it is safer that way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By filtering hack I assume you mean the shardMap := getShards(indexDir) line? Yes, I decided to pass the list of shards associated with the ID directly since cleanup() already has this information. If we're sharing the function implementation, gathering the entire shardMap seemed redundant.

for _, s := range shards {
if shardMerging && maybeSetTombstone([]shard{s}, repoID) {
shardsLog(indexDir, "tombname", []shard{s})
} else {
simple = append(simple, s)
}
}

if len(simple) == 0 {
return
}

removeAll(simple...)
shardsLog(indexDir, "removename", simple)
}
186 changes: 176 additions & 10 deletions cmd/zoekt-sourcegraph-indexserver/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

func TestCleanup(t *testing.T) {

mk := func(name string, n int, mtime time.Time) shard {
return shard{
RepoID: fakeID(name),
Expand All @@ -28,6 +29,11 @@ func TestCleanup(t *testing.T) {
RepoTombstone: false,
}
}

mkMeta := func(name string, n int) string {
return fmt.Sprintf("%s_v%d.%05d.zoekt.meta", url.QueryEscape(name), 15, n)
}

// We don't use getShards so that we have two implementations of the same
// thing (ie pick up bugs in one)
glob := func(pattern string) []shard {
Expand Down Expand Up @@ -56,14 +62,16 @@ func TestCleanup(t *testing.T) {
recent := now.Add(-time.Hour)
old := now.Add(-25 * time.Hour)
cases := []struct {
name string
repos []string
index []shard
trash []shard
tmps []string

wantIndex []shard
wantTrash []shard
name string
repos []string
indexMetaFiles []string
index []shard
trash []shard
tmps []string

wantIndex []shard
wantIndexMetaFiles []string
wantTrash []shard
}{{
name: "noop",
}, {
Expand Down Expand Up @@ -96,6 +104,13 @@ func TestCleanup(t *testing.T) {
index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)},
wantIndex: []shard{mk("foo", 0, recent)},
wantTrash: []shard{mk("bar", 0, now)},
}, {
name: "remove metafiles with no associated shards",
repos: []string{"foo", "bar"},
index: []shard{mk("foo", 0, recent), mk("bar", 0, recent)},
indexMetaFiles: []string{mkMeta("foo", 0), mkMeta("foo", 1), mkMeta("bar", 0)},
wantIndex: []shard{mk("foo", 0, recent), mk("bar", 0, recent)},
wantIndexMetaFiles: []string{mkMeta("foo", 0), mkMeta("bar", 0)},
}, {
name: "clean old .tmp files",
tmps: []string{"recent.tmp", "old.tmp"},
Expand Down Expand Up @@ -134,19 +149,54 @@ func TestCleanup(t *testing.T) {
t.Fatal(err)
}
}
for _, f := range tt.indexMetaFiles {
path := filepath.Join(dir, f)
if _, err := os.Create(path); err != nil {
t.Fatal(err)
}
}

var repoIDs []uint32
for _, name := range tt.repos {
repoIDs = append(repoIDs, fakeID(name))
}
cleanup(dir, repoIDs, now, false)

if d := cmp.Diff(tt.wantIndex, glob(filepath.Join(dir, "*.zoekt"))); d != "" {
actualIndexShards := glob(filepath.Join(dir, "*.zoekt"))

sort.Slice(actualIndexShards, func(i, j int) bool {
return actualIndexShards[i].Path < actualIndexShards[j].Path
})
sort.Slice(tt.wantIndex, func(i, j int) bool {
return tt.wantIndex[i].Path < tt.wantIndex[j].Path
})

if d := cmp.Diff(tt.wantIndex, actualIndexShards); d != "" {
t.Errorf("unexpected index (-want, +got):\n%s", d)
}
if d := cmp.Diff(tt.wantTrash, glob(filepath.Join(dir, ".trash", "*.zoekt"))); d != "" {

actualTrashShards := glob(filepath.Join(dir, ".trash", "*.zoekt"))

sort.Slice(actualTrashShards, func(i, j int) bool {
return actualTrashShards[i].Path < actualTrashShards[j].Path
})

sort.Slice(tt.wantTrash, func(i, j int) bool {
return tt.wantTrash[i].Path < tt.wantTrash[j].Path
})
if d := cmp.Diff(tt.wantTrash, actualTrashShards); d != "" {
t.Errorf("unexpected trash (-want, +got):\n%s", d)
}

actualIndexMetaFiles := globBase(filepath.Join(dir, "*.meta"))

sort.Strings(actualIndexMetaFiles)
sort.Strings(tt.wantIndexMetaFiles)

if d := cmp.Diff(tt.wantIndexMetaFiles, actualIndexMetaFiles, cmpopts.EquateEmpty()); d != "" {
t.Errorf("unexpected metadata files (-want, +got):\n%s", d)
}

if tmps := globBase(filepath.Join(dir, "*.tmp")); len(tmps) > 0 {
t.Errorf("unexpected tmps: %v", tmps)
}
Expand Down Expand Up @@ -455,6 +505,122 @@ func TestCleanupCompoundShards(t *testing.T) {
}
}

func TestDeleteShards(t *testing.T) {
remainingRepoA := zoekt.Repository{ID: 1, Name: "A"}
remainingRepoB := zoekt.Repository{ID: 2, Name: "B"}
repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"}

t.Run("delete repository from set of normal shards", func(t *testing.T) {
indexDir := t.TempDir()

// map of repoID -> list of paths for associated shard files + metadata files
shardFilesMap := make(map[uint32][]string)

// map of repoID -> list of associated shard structs
shardStructMap := make(map[uint32][]shard)

// setup: create shards for each repository, and populate the shard map
for _, r := range []zoekt.Repository{
remainingRepoA,
remainingRepoB,
repositoryToDelete,
} {
shardPaths := createTestNormalShard(t, indexDir, r, 3)

for _, p := range shardPaths {
// create stub meta file
metaFile := p + ".meta"
f, err := os.Create(metaFile)
if err != nil {
t.Fatalf("creating metadata file %q: %s", metaFile, err)
}

f.Close()

shardFilesMap[r.ID] = append(shardFilesMap[r.ID], p, metaFile)
shardStructMap[r.ID] = append(shardStructMap[r.ID], shard{
RepoID: repositoryToDelete.ID,
RepoName: repositoryToDelete.Name,
Path: p,
})
}
}

// run test: delete repository
deleteOrTombstone(indexDir, repositoryToDelete.ID, false, shardStructMap[repositoryToDelete.ID]...)

// run assertions: gather all the shards + meta files that remain and
// check to see that only the files associated with the "remaining" repositories
// are present
var actualShardFiles []string

for _, pattern := range []string{"*.zoekt", "*.meta"} {
files, err := filepath.Glob(filepath.Join(indexDir, pattern))
if err != nil {
t.Fatalf("globbing indexDir: %s", err)
}

actualShardFiles = append(actualShardFiles, files...)
}

var expectedShardFiles []string
expectedShardFiles = append(expectedShardFiles, shardFilesMap[remainingRepoA.ID]...)
expectedShardFiles = append(expectedShardFiles, shardFilesMap[remainingRepoB.ID]...)

sort.Strings(actualShardFiles)
sort.Strings(expectedShardFiles)

if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" {
t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff)
}
})

t.Run("delete repository from compound shard", func(t *testing.T) {
indexDir := t.TempDir()

// setup: enable shard merging for compound shards
t.Setenv("SRC_ENABLE_SHARD_MERGING", "1")

// setup: create compound shard with all repositories
repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete}
compoundShard := createTestCompoundShard(t, indexDir, repositories)

s := shard{
RepoID: repositoryToDelete.ID,
RepoName: repositoryToDelete.Name,
Path: compoundShard,
}
deleteOrTombstone(indexDir, repositoryToDelete.ID, true, s)

// verify: read the compound shard, and ensure that only
// the repositories that we expect are in the shard (and the deleted one has been tombstoned)
actualRepositories, _, err := zoekt.ReadMetadataPathAlive(compoundShard)
if err != nil {
t.Fatalf("reading repository metadata from shard: %s", err)
}

expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB}

sort.Slice(actualRepositories, func(i, j int) bool {
return actualRepositories[i].ID < actualRepositories[j].ID
})

sort.Slice(expectedRepositories, func(i, j int) bool {
return expectedRepositories[i].ID < expectedRepositories[j].ID
})

opts := []cmp.Option{
cmpopts.IgnoreUnexported(zoekt.Repository{}),
cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"),
cmpopts.EquateEmpty(),
}
if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" {
t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff)
}
})

}

// createCompoundShard returns a path to a compound shard containing repos with
// ids. Use optsFns to overwrite fields of zoekt.Repository for all repos.
func createCompoundShard(t *testing.T, dir string, ids []uint32, optFns ...func(in *zoekt.Repository)) string {
Expand Down
Loading