Skip to content

Commit

Permalink
Indexing: configure CPU count through index options
Browse files Browse the repository at this point in the history
  • Loading branch information
jtibshirani committed Nov 21, 2023
1 parent 0959170 commit bcee53e
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 114 deletions.
4 changes: 4 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type IndexOptions struct {

// Map from language to scip-ctags, universal-ctags, or neither
LanguageMap ctags.LanguageMap

// The number of CPUs to use for indexing. Defaults to using all available CPUs. If
// the server flag -cpu_fraction is set, then this value overrides it.
CPUCount int32
}

// indexArgs represents the arguments we pass to zoekt-git-index
Expand Down
21 changes: 15 additions & 6 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ type Server struct {

// Interval is how often we sync with Sourcegraph.
Interval time.Duration
// CPUCount is the amount of parallelism to use when indexing a
// repository.

// CPUCount is the number of CPUs to use for indexing.
CPUCount int

queue Queue
Expand Down Expand Up @@ -641,20 +641,29 @@ func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
}

func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
parallelism := math.Ceil(float64(s.CPUCount) / float64(s.IndexConcurrency))
cpuCount := s.cpuCount(opts)
parallelism := math.Ceil(float64(cpuCount) / float64(s.IndexConcurrency))
return &indexArgs{
IndexOptions: opts,

IndexDir: s.IndexDir,
Parallelism: int(parallelism),

Incremental: true,

// 1 MB; match https://sourcegraph.sgdev.org/github.com/sourcegraph/sourcegraph/-/blob/cmd/symbols/internal/symbols/search.go#L22
FileLimit: 1 << 20,
}
}

// cpuCount consults both the server flags and index options to determine the number
// of CPUs to use for indexing. If the index option is provided, it always overrides
// the server flag.
func (s *Server) cpuCount(opts IndexOptions) int {
if opts.CPUCount > 0 {
return int(math.Min(float64(opts.CPUCount), float64(runtime.GOMAXPROCS(0))))
}
return s.CPUCount
}

func createEmptyShard(args *indexArgs) error {
bo := args.BuildOptions()
bo.SetDefaults()
Expand Down Expand Up @@ -1210,7 +1219,7 @@ type rootConfig struct {
func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.")
fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often")
fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of concurrent index jobs to run.")
fs.Int64Var(&rc.indexConcurrency, "index_concurrency", getEnvWithDefaultInt64("SRC_INDEX_CONCURRENCY", 1), "the number of repos to index concurrently")
fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use")
fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.")
fs.StringVar(&rc.hostname, "hostname", zoekt.HostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.")
Expand Down
45 changes: 40 additions & 5 deletions cmd/zoekt-sourcegraph-indexserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestServer_parallelism(t *testing.T) {
name string
cpuCount int
indexConcurrency int
options IndexOptions
wantParallelism int
}{
{
Expand All @@ -72,17 +73,35 @@ func TestServer_parallelism(t *testing.T) {
wantParallelism: 8,
},
{
name: "round parallelism up",
name: "no shard level parallelism",
cpuCount: 4,
indexConcurrency: 3,
wantParallelism: 2,
indexConcurrency: 4,
wantParallelism: 1,
},
{
name: "no shard level parallelism",
cpuCount: 4,
indexConcurrency: 4,
wantParallelism: 1,
},
{
name: "index option overrides server flag",
cpuCount: 8,
indexConcurrency: 2,
options: IndexOptions {
CPUCount: 6,
},
wantParallelism: 3,
},
{
name: "ignore invalid index option",
cpuCount: 8,
indexConcurrency: 2,
options: IndexOptions {
CPUCount: -1,
},
wantParallelism: 4,
},
}

for _, tt := range cases {
Expand All @@ -94,12 +113,28 @@ func TestServer_parallelism(t *testing.T) {
IndexConcurrency: tt.indexConcurrency,
}

got := s.indexArgs(IndexOptions{Name: "testName"})
if !cmp.Equal(got.Parallelism, tt.wantParallelism) {
got := s.indexArgs(tt.options)
if tt.wantParallelism != got.Parallelism {
t.Errorf("mismatch, want: %d, got: %d", tt.wantParallelism, got.Parallelism)
}
})
}

t.Run("index option is limited by available CPU", func(t *testing.T) {
s := &Server{
Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)),
IndexDir: "/testdata/index",
IndexConcurrency: 1,
}

got := s.indexArgs(IndexOptions {
CPUCount: 2048, // Some number that's way too high
})

if got.Parallelism >= 2048 {
t.Errorf("parallelism should be limited by available CPUs, instead got %d", got.Parallelism)
}
})
}

func TestListRepoIDs(t *testing.T) {
Expand Down
Loading

0 comments on commit bcee53e

Please sign in to comment.