Skip to content

Commit

Permalink
index: enable shard merging by default (#798)
Browse files Browse the repository at this point in the history
This enables shard merging by default for zoekt-sourcegraph-indexserver.

Sourcegraph has been using shard merging in production for several years. We have recently confirmed significant performance improvements for queries which are bound by matchTree construction.

I also remove -merge_max_priority because we have stopped using it.

Use SRC_DISABLE_SHARD_MERGING to disable shard merging.

Test plan:
mostly CI, I did some manual testing to confirm that shard merging is enabled by default for zoekt-sourcegraph-indexserver.
  • Loading branch information
stefanhengl authored Aug 1, 2024
1 parent bbd1fed commit 764fe4f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
11 changes: 10 additions & 1 deletion build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ type Options struct {
changedOrRemovedFiles []string

LanguageMap ctags.LanguageMap

// ShardMerging is true if builder should respect compound shards. This is a
// Sourcegraph specific option.
ShardMerging bool
}

// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
Expand Down Expand Up @@ -194,6 +198,7 @@ func (o *Options) Flags(fs *flag.FlagSet) {

// Sourcegraph specific
fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
fs.BoolVar(&o.ShardMerging, "shard_merging", x.ShardMerging, "If set, builder will respect compound shards.")
}

// Args generates command line arguments for o. It is the "inverse" of Flags.
Expand Down Expand Up @@ -233,6 +238,10 @@ func (o *Options) Args() []string {
args = append(args, "-disable_ctags")
}

if o.ShardMerging {
args = append(args, "-shard_merging")
}

return args
}

Expand Down Expand Up @@ -774,7 +783,7 @@ func (b *Builder) Finish() error {

for p := range toDelete {
// Don't delete compound shards, set tombstones instead.
if zoekt.ShardMergingEnabled() && strings.HasPrefix(filepath.Base(p), "compound-") {
if b.opts.ShardMerging && strings.HasPrefix(filepath.Base(p), "compound-") {
if !strings.HasSuffix(p, ".zoekt") {
continue
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type indexArgs struct {
// DeltaShardNumberFallbackThreshold is an upper limit on the number of preexisting shards that can exist
// before attempting a delta build.
DeltaShardNumberFallbackThreshold uint64

// ShardMerging is true if we want zoekt-git-index to respect compound shards.
ShardMerging bool
}

// BuildOptions returns a build.Options represented by indexArgs. Note: it
Expand Down Expand Up @@ -131,6 +134,8 @@ func (o *indexArgs) BuildOptions() *build.Options {
DocumentRanksVersion: o.DocumentRanksVersion,

LanguageMap: o.LanguageMap,

ShardMerging: o.ShardMerging,
}
}

Expand Down
31 changes: 22 additions & 9 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ func (s *Server) indexArgs(opts IndexOptions) *indexArgs {

// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
FileLimit: 1 << 20,

ShardMerging: s.shardMerging,
}
}

Expand Down Expand Up @@ -1065,6 +1067,18 @@ func srcLogLevelIsDebug() bool {
return strings.EqualFold(lvl, "dbug") || strings.EqualFold(lvl, "debug")
}

func getEnvWithDefaultBool(k string, defaultVal bool) bool {
v := os.Getenv(k)
if v == "" {
return defaultVal
}
b, err := strconv.ParseBool(v)
if err != nil {
log.Fatalf("error parsing ENV %s to int64: %s", k, err)
}
return b
}

func getEnvWithDefaultInt64(k string, defaultVal int64) int64 {
v := os.Getenv(k)
if v == "" {
Expand Down Expand Up @@ -1196,12 +1210,12 @@ type rootConfig struct {
blockProfileRate int

// config values related to shard merging
vacuumInterval time.Duration
mergeInterval time.Duration
targetSize int64
minSize int64
minAgeDays int
maxPriority float64
disableShardMerging bool
vacuumInterval time.Duration
mergeInterval time.Duration
targetSize int64
minSize int64
minAgeDays int

// config values related to backoff indexing repos with one or more consecutive failures
backoffDuration time.Duration
Expand All @@ -1221,12 +1235,12 @@ func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")

// flags related to shard merging
fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
fs.Float64Var(&rc.maxPriority, "merge_max_priority", getEnvWithDefaultFloat64("SRC_MERGE_MAX_PRIORITY", 100), "the maximum priority a shard can have to be considered for merging.")
}

func startServer(conf rootConfig) error {
Expand Down Expand Up @@ -1428,7 +1442,7 @@ func newServer(conf rootConfig) (*Server, error) {
Interval: conf.interval,
CPUCount: cpuCount,
queue: *q,
shardMerging: zoekt.ShardMergingEnabled(),
shardMerging: !conf.disableShardMerging,
deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
Expand All @@ -1439,7 +1453,6 @@ func newServer(conf rootConfig) (*Server, error) {
targetSizeBytes: conf.targetSize * 1024 * 1024,
minSizeBytes: conf.minSize * 1024 * 1024,
minAgeDays: conf.minAgeDays,
maxPriority: conf.maxPriority,
},
timeout: indexingTimeout,
}, err
Expand Down
7 changes: 0 additions & 7 deletions cmd/zoekt-sourcegraph-indexserver/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ type mergeOpts struct {
// merging. For example, a value of 7 means that only repos that have been
// inactive for 7 days will be considered for merging.
minAgeDays int

// the MAX priority a shard can have to be considered for merging.
maxPriority float64
}

// isExcluded returns true if a shard should not be merged, false otherwise.
Expand Down Expand Up @@ -213,10 +210,6 @@ func isExcluded(path string, fi os.FileInfo, opts mergeOpts) bool {
return true
}

if priority, err := strconv.ParseFloat(repos[0].RawConfig["priority"], 64); err == nil && priority > opts.maxPriority {
return true
}

return false
}

Expand Down
8 changes: 0 additions & 8 deletions tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,8 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
)

// ShardMergingEnabled returns true if SRC_ENABLE_SHARD_MERGING is set to true.
func ShardMergingEnabled() bool {
t := os.Getenv("SRC_ENABLE_SHARD_MERGING")
enabled, _ := strconv.ParseBool(t)
return enabled
}

var mockRepos []*Repository

// SetTombstone idempotently sets a tombstone for repoName in .meta.
Expand Down

0 comments on commit 764fe4f

Please sign in to comment.