Skip to content

Commit

Permalink
Indexing: configure CPU count through index options (#704)
Browse files Browse the repository at this point in the history
This change allows the shard concurrency to be set through index options. This
is much more convenient than the current way to limit CPU through the server
flag `-cpu_fraction`.

In a follow-up we'll expose shard concurrency through Sourcegraph site config,
and pass it through these index options. Maybe we can deprecate and move away
from `-cpu_fraction` in favor of this approach.
  • Loading branch information
jtibshirani authored Nov 22, 2023
1 parent 0959170 commit d982320
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 124 deletions.
4 changes: 4 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type IndexOptions struct {

// Map from language to scip-ctags, universal-ctags, or neither
LanguageMap ctags.LanguageMap

// The number of threads to use for indexing shards. Defaults to the number of available
// CPUs. If the server flag -cpu_fraction is set, then this value overrides it.
ShardConcurrency int32
}

// indexArgs represents the arguments we pass to zoekt-git-index
Expand Down
37 changes: 30 additions & 7 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ type Server struct {

// Interval is how often we sync with Sourcegraph.
Interval time.Duration
// CPUCount is the amount of parallelism to use when indexing a
// repository.

// CPUCount is the number of CPUs to use for indexing shards.
CPUCount int

queue Queue
Expand Down Expand Up @@ -641,20 +641,43 @@ func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
}

func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
parallelism := math.Ceil(float64(s.CPUCount) / float64(s.IndexConcurrency))
parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
return &indexArgs{
IndexOptions: opts,

IndexDir: s.IndexDir,
Parallelism: int(parallelism),

Parallelism: parallelism,
Incremental: true,

// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
FileLimit: 1 << 20,
}
}

// parallelism consults both the server flags and index options to determine the number
// of shards to index in parallel. If the CPUCount index option is provided, it always
// overrides the server flag.
func (s *Server) parallelism(opts IndexOptions, maxProcs int) int {
var parallelism int
if opts.ShardConcurrency > 0 {
parallelism = int(opts.ShardConcurrency)
} else {
parallelism = s.CPUCount
}

// In case this was accidentally misconfigured, we cap the threads at 4 times the available CPUs
if parallelism > 4 * maxProcs {
parallelism = 4 * maxProcs
}

// If index concurrency is set, then divide the parallelism by the number of
// repos we're indexing in parallel
if s.IndexConcurrency > 1 {
parallelism = int(math.Ceil(float64(parallelism) / float64(s.IndexConcurrency)))
}

return parallelism
}

func createEmptyShard(args *indexArgs) error {
bo := args.BuildOptions()
bo.SetDefaults()
Expand Down Expand Up @@ -1210,7 +1233,7 @@ type rootConfig struct {
func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
Expand Down
56 changes: 43 additions & 13 deletions cmd/zoekt-sourcegraph-indexserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,38 @@ func TestServer_parallelism(t *testing.T) {
name string
cpuCount int
indexConcurrency int
wantParallelism int
options IndexOptions
want int
}{
{
name: "CPU count divides evenly",
cpuCount: 16,
indexConcurrency: 2,
wantParallelism: 8,
},
{
name: "round parallelism up",
cpuCount: 4,
indexConcurrency: 3,
wantParallelism: 2,
indexConcurrency: 8,
want: 2,
},
{
name: "no shard level parallelism",
cpuCount: 4,
indexConcurrency: 4,
wantParallelism: 1,
want: 1,
},
{
name: "index option overrides server flag",
cpuCount: 2,
indexConcurrency: 1,
options: IndexOptions {
ShardConcurrency: 1,
},
want: 1,
},
{
name: "ignore invalid index option",
cpuCount: 8,
indexConcurrency: 2,
options: IndexOptions {
ShardConcurrency: -1,
},
want: 4,
},
}

Expand All @@ -94,12 +107,29 @@ func TestServer_parallelism(t *testing.T) {
IndexConcurrency: tt.indexConcurrency,
}

got := s.indexArgs(IndexOptions{Name: "testName"})
if !cmp.Equal(got.Parallelism, tt.wantParallelism) {
t.Errorf("mismatch, want: %d, got: %d", tt.wantParallelism, got.Parallelism)
maxProcs := 16
got := s.parallelism(tt.options, maxProcs)
if tt.want != got{
t.Errorf("mismatch, want: %d, got: %d", tt.want, got)
}
})
}

t.Run("index option is limited by available CPU", func(t *testing.T) {
s := &Server{
Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)),
IndexDir: "/testdata/index",
IndexConcurrency: 1,
}

got := s.indexArgs(IndexOptions {
ShardConcurrency: 2048, // Some number that's way too high
})

if got.Parallelism >= 2048 {
t.Errorf("parallelism should be limited by available CPUs, instead got %d", got.Parallelism)
}
})
}

func TestListRepoIDs(t *testing.T) {
Expand Down
Loading

0 comments on commit d982320

Please sign in to comment.