Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexserver: add debug endpoint for deleting repository shards #485

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func debugCmd() *ffcli.Command {
"wget -q -O - http://localhost:6072/metrics -sS | grep index_shard_merging_running". It is only possible
to trigger one merge operation at a time.

wget -q -O - http://localhost:6072/debug/delete?id=[REPOSITORY_ID]
delete all of the shards associated with the given repository id. You can find the id associated with a
repository via the "/debug/indexed" route.

wget -q -O - http://localhost:6072/debug/queue
list the repositories in the indexing queue, sorted by descending priority.

Expand Down
100 changes: 100 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ func (s *Server) addDebugHandlers(mux *http.ServeMux) {
// on "/".
mux.Handle("/", http.HandlerFunc(s.handleReIndex))

mux.Handle("/debug/delete", http.HandlerFunc(s.handleDebugDelete))
mux.Handle("/debug/indexed", http.HandlerFunc(s.handleDebugIndexed))
mux.Handle("/debug/list", http.HandlerFunc(s.handleDebugList))
mux.Handle("/debug/merge", http.HandlerFunc(s.handleDebugMerge))
Expand Down Expand Up @@ -757,6 +758,105 @@ func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
}
}

func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) {
rawID := r.URL.Query().Get("id")
if rawID == "" {
http.Error(w, "URL parameter 'id' must be specified", http.StatusBadRequest)
return
}

id64, err := strconv.ParseUint(rawID, 10, 32)
if err != nil {
http.Error(w, fmt.Sprintf("failed to parse repository id %q as uint32: %s", rawID, err), http.StatusBadRequest)
return
}

repoID := uint32(id64)

s.queue.mu.Lock()
defer s.queue.mu.Unlock()
ggilmore marked this conversation as resolved.
Show resolved Hide resolved

item := s.queue.get(repoID)
if item == nil {
http.Error(w, fmt.Sprintf("no repository found for id %q", rawID), http.StatusBadRequest)
return
}

var deletionError error

repoName := item.opts.Name
s.muIndexDir.With(repoName, func() {
o := s.indexArgs(item.opts).BuildOptions()
deletionError = deleteShards(o)
})

if deletionError != nil {
http.Error(w, fmt.Sprintf("while deleting shards for repository id %q: %s", rawID, deletionError), http.StatusInternalServerError)
return
}
ggilmore marked this conversation as resolved.
Show resolved Hide resolved
}

// deleteShards deletes all the shards that are associated with the repository specified
// in the build options.
//
// Users must hold the indexDir lock for this repository before calling deleteShards.
func deleteShards(options *build.Options) error {
shardPaths := options.FindAllShards()

// Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic
// works correctly.
//
// Example: - repoA_v16.00002.zoekt
// - repoA_v16.00001.zoekt
// - repoA_v16.00000.zoekt
//
// zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard
// is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order
// (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA".
//
// If this function were to crash while deleting repoA and we only deleted the 0th shard, then shard's 1 & 2 would never
// be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested).
//
// Ensuring that we delete shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent
// state behind even if we crash.

sort.Slice(shardPaths, func(i, j int) bool {
return shardPaths[i] > shardPaths[j]
})

for _, shard := range shardPaths {
// Is this repository inside a compound shard? If so, set a tombstone
// instead of deleting the shard outright.
if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") {
if !strings.HasSuffix(shard, ".zoekt") {
continue
}

err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID)
if err != nil {
return fmt.Errorf("setting tombstone in shard %q: %w", shard, err)
ggilmore marked this conversation as resolved.
Show resolved Hide resolved
}

continue
}

metaFile := shard + ".meta"
if _, err := os.Stat(metaFile); err == nil {
err := os.Remove(metaFile)
ggilmore marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("deleting metadata file %q: %w", metaFile, err)
}
}

err := os.Remove(shard)
if err != nil {
return fmt.Errorf("deleting shard %q: %w", shard, err)
}
}

return nil
}

// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
// can run this command during periods of low usage (evenings, weekends) to
// trigger an initial merge run. In the steady-state, merges happen rarely, even
Expand Down