Skip to content

Commit

Permalink
move logic to cleanup.go
Browse files Browse the repository at this point in the history
  • Loading branch information
ggilmore committed Nov 28, 2022
1 parent 828806c commit e53fada
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 210 deletions.
71 changes: 71 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/cleanup.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"errors"
"fmt"
"io/fs"
"log"
"os"
"os/exec"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/grafana/regexp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/zoekt/build"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/sourcegraph/zoekt"
Expand Down Expand Up @@ -515,3 +518,71 @@ func removeTombstones(fn string) ([]*zoekt.Repository, error) {
}
return tombstones, nil
}

// deleteShards deletes all the shards that are associated with the repository specified
// in the build options.
//
// Users must hold the indexDir lock for this repository before calling deleteShards.
func deleteShards(options *build.Options) error {
shardPaths := options.FindAllShards()

// Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic
// works correctly.
//
// Example: - repoA_v16.00002.zoekt
// - repoA_v16.00001.zoekt
// - repoA_v16.00000.zoekt
//
// zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard
// is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order
// (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA".
//
// If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never
// be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested).
//
// Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent
// state behind even if we crash.

sort.Slice(shardPaths, func(i, j int) bool {
return shardPaths[i] > shardPaths[j]
})

for _, shard := range shardPaths {
// Is this repository inside a compound shard? If so, set a tombstone
// instead of deleting the shard outright.
if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") {
if !strings.HasSuffix(shard, ".zoekt") {
continue
}

err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID)
if err != nil {
return fmt.Errorf("setting tombstone in shard %q: %w", shard, err)
}

continue
}

err := os.Remove(shard)
if err != nil {
return fmt.Errorf("deleting shard %q: %w", shard, err)
}

// remove the metadata file associated with the shard (if any)
metaFile := shard + ".meta"
if _, err := os.Stat(metaFile); err != nil {
if errors.Is(err, fs.ErrNotExist) {
continue
}

return fmt.Errorf("'stat'ing metadata file %q: %w", metaFile, err)
}

err = os.Remove(metaFile)
if err != nil {
return fmt.Errorf("deleting metadata file %q: %w", metaFile, err)
}
}

return nil
}
121 changes: 121 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,127 @@ func TestCleanupCompoundShards(t *testing.T) {
}
}

func TestDeleteShards(t *testing.T) {
remainingRepoA := zoekt.Repository{ID: 1, Name: "A"}
remainingRepoB := zoekt.Repository{ID: 2, Name: "B"}
repositoryToDelete := zoekt.Repository{ID: 99, Name: "DELETE_ME"}

t.Run("delete repository from set of normal shards", func(t *testing.T) {
indexDir := t.TempDir()

// map of repoID -> list of associated shard paths + metadata paths
shardMap := make(map[uint32][]string)

// setup: create shards for each repository, and populate the shard map
for _, r := range []zoekt.Repository{
remainingRepoA,
remainingRepoB,
repositoryToDelete,
} {
shards := createTestNormalShard(t, indexDir, r, 3)

for _, shard := range shards {
// create stub meta file
metaFile := shard + ".meta"
f, err := os.Create(metaFile)
if err != nil {
t.Fatalf("creating metadata file %q: %s", metaFile, err)
}

f.Close()

shardMap[r.ID] = append(shardMap[r.ID], shard, metaFile)
}
}

// run test: delete repository
options := &build.Options{
IndexDir: indexDir,
RepositoryDescription: repositoryToDelete,
}
options.SetDefaults()

err := deleteShards(options)
if err != nil {
t.Errorf("unexpected error when deleting shards: %s", err)
}

// run assertions: gather all the shards + meta files that remain and
// check to see that only the files associated with the "remaining" repositories
// are present
var actualShardFiles []string

for _, pattern := range []string{"*.zoekt", "*.meta"} {
files, err := filepath.Glob(filepath.Join(indexDir, pattern))
if err != nil {
t.Fatalf("globbing indexDir: %s", err)
}

actualShardFiles = append(actualShardFiles, files...)
}

var expectedShardFiles []string
expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoA.ID]...)
expectedShardFiles = append(expectedShardFiles, shardMap[remainingRepoB.ID]...)

sort.Strings(actualShardFiles)
sort.Strings(expectedShardFiles)

if diff := cmp.Diff(expectedShardFiles, actualShardFiles); diff != "" {
t.Errorf("unexpected diff in list of shard files (-want +got):\n%s", diff)
}
})

t.Run("delete repository from compound shard", func(t *testing.T) {
indexDir := t.TempDir()

// setup: enable shard merging for compound shards
t.Setenv("SRC_ENABLE_SHARD_MERGING", "1")

// setup: create compound shard with all repositories
repositories := []zoekt.Repository{remainingRepoA, remainingRepoB, repositoryToDelete}
shard := createTestCompoundShard(t, indexDir, repositories)

// run test: delete repository
options := &build.Options{
IndexDir: indexDir,
RepositoryDescription: repositoryToDelete,
}
options.SetDefaults()

err := deleteShards(options)
if err != nil {
t.Errorf("unexpected error when deleting shards: %s", err)
}

// verify: read the compound shard, and ensure that only
// the repositories that we expect are in the shard (and the deleted one has been tombstoned)
actualRepositories, _, err := zoekt.ReadMetadataPathAlive(shard)
if err != nil {
t.Fatalf("reading repository metadata from shard: %s", err)
}

expectedRepositories := []*zoekt.Repository{&remainingRepoA, &remainingRepoB}

sort.Slice(actualRepositories, func(i, j int) bool {
return actualRepositories[i].ID < actualRepositories[j].ID
})

sort.Slice(expectedRepositories, func(i, j int) bool {
return expectedRepositories[i].ID < expectedRepositories[j].ID
})

opts := []cmp.Option{
cmpopts.IgnoreUnexported(zoekt.Repository{}),
cmpopts.IgnoreFields(zoekt.Repository{}, "IndexOptions", "HasSymbols"),
cmpopts.EquateEmpty(),
}
if diff := cmp.Diff(expectedRepositories, actualRepositories, opts...); diff != "" {
t.Errorf("unexpected diff in list of repositories (-want +got):\n%s", diff)
}
})
}

// createCompoundShard returns a path to a compound shard containing repos with
// ids. Use optsFns to overwrite fields of zoekt.Repository for all repos.
func createCompoundShard(t *testing.T, dir string, ids []uint32, optFns ...func(in *zoekt.Repository)) string {
Expand Down
104 changes: 17 additions & 87 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"html/template"
"io"
"io/fs"
"log"
"math"
"math/rand"
Expand Down Expand Up @@ -760,6 +758,23 @@ func (s *Server) handleDebugList(w http.ResponseWriter, r *http.Request) {
}
}

// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
// can run this command during periods of low usage (evenings, weekends) to
// trigger an initial merge run. In the steady-state, merges happen rarely, even
// on busy instances, and users can rely on automatic merging instead.
func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {

// A merge operation can take very long, depending on the number merges and the
// target size of the compound shards. We run the merge in the background and
// return immediately to the user.
//
// We track the status of the merge with metricShardMergingRunning.
go func() {
s.doMerge()
}()
_, _ = w.Write([]byte("merging enqueued\n"))
}

func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) {
rawID := r.URL.Query().Get("id")
if rawID == "" {
Expand Down Expand Up @@ -798,91 +813,6 @@ func (s *Server) handleDebugDelete(w http.ResponseWriter, r *http.Request) {
}
}

// deleteShards deletes all the shards that are associated with the repository specified
// in the build options.
//
// Users must hold the indexDir lock for this repository before calling deleteShards.
func deleteShards(options *build.Options) error {
shardPaths := options.FindAllShards()

// Ensure that the paths are in reverse sorted order to ensure that Zoekt's repository <-> shard matching logic
// works correctly.
//
// Example: - repoA_v16.00002.zoekt
// - repoA_v16.00001.zoekt
// - repoA_v16.00000.zoekt
//
// zoekt-indexserver checks whether it has indexed "repoA" by first checking to see if the 0th shard
// is present (repoA_v16.00000.zoekt). If it's present, then it gathers all rest of the shards names in ascending order
// (...00001.zoekt, ...00002.zoekt). If it's missing, then zoekt assumes that it never indexed "repoA".
//
// If this function were to crash while deleting repoA, and we only deleted the 0th shard, then shard's 1 & 2 would never
// be cleaned up by Zoekt indexserver (since the 0th shard is the only shard that's tested).
//
// Deleting shards in reverse sorted order (2 -> 1 -> 0) always ensures that we don't leave an inconsistent
// state behind even if we crash.

sort.Slice(shardPaths, func(i, j int) bool {
return shardPaths[i] > shardPaths[j]
})

for _, shard := range shardPaths {
// Is this repository inside a compound shard? If so, set a tombstone
// instead of deleting the shard outright.
if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(shard), "compound-") {
if !strings.HasSuffix(shard, ".zoekt") {
continue
}

err := zoekt.SetTombstone(shard, options.RepositoryDescription.ID)
if err != nil {
return fmt.Errorf("setting tombstone in shard %q: %w", shard, err)
}

continue
}

err := os.Remove(shard)
if err != nil {
return fmt.Errorf("deleting shard %q: %w", shard, err)
}

// remove the metadata file associated with the shard (if any)
metaFile := shard + ".meta"
if _, err := os.Stat(metaFile); err != nil {
if errors.Is(err, fs.ErrNotExist) {
continue
}

return fmt.Errorf("'stat'ing metadata file %q: %w", metaFile, err)
}

err = os.Remove(metaFile)
if err != nil {
return fmt.Errorf("deleting metadata file %q: %w", metaFile, err)
}
}

return nil
}

// handleDebugMerge triggers a merge even if shard merging is not enabled. Users
// can run this command during periods of low usage (evenings, weekends) to
// trigger an initial merge run. In the steady-state, merges happen rarely, even
// on busy instances, and users can rely on automatic merging instead.
func (s *Server) handleDebugMerge(w http.ResponseWriter, _ *http.Request) {

// A merge operation can take very long, depending on the number merges and the
// target size of the compound shards. We run the merge in the background and
// return immediately to the user.
//
// We track the status of the merge with metricShardMergingRunning.
go func() {
s.doMerge()
}()
_, _ = w.Write([]byte("merging enqueued\n"))
}

func (s *Server) handleDebugIndexed(w http.ResponseWriter, r *http.Request) {
indexed := listIndexed(s.IndexDir)

Expand Down
Loading

0 comments on commit e53fada

Please sign in to comment.