Skip to content

Commit

Permalink
Indexing: limit shard parallelism when index concurrency is set
Browse files Browse the repository at this point in the history
  • Loading branch information
jtibshirani committed Nov 16, 2023
1 parent 8801747 commit 76eec50
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
13 changes: 8 additions & 5 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,12 @@ func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
}

func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
parallelism := s.CPUCount / s.IndexConcurrency
return &indexArgs{
IndexOptions: opts,

IndexDir: s.IndexDir,
Parallelism: s.CPUCount,
Parallelism: parallelism,

Incremental: true,

Expand Down Expand Up @@ -1409,15 +1410,17 @@ func newServer(conf rootConfig) (*Server, error) {
}
}

if conf.indexConcurrency < 1 {
conf.indexConcurrency = 1
}

cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction)))
if cpuCount < 1 {
cpuCount = 1
}

if conf.indexConcurrency < 1 {
conf.indexConcurrency = 1
} else if conf.indexConcurrency > int64(cpuCount) {
conf.indexConcurrency = int64(cpuCount)
}

q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger)

return &Server{
Expand Down
56 changes: 53 additions & 3 deletions cmd/zoekt-sourcegraph-indexserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func TestServer_defaultArgs(t *testing.T) {
}

s := &Server{
Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)),
IndexDir: "/testdata/index",
CPUCount: 6,
Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)),
IndexDir: "/testdata/index",
CPUCount: 6,
IndexConcurrency: 1,
}
want := &indexArgs{
IndexOptions: IndexOptions{
Expand All @@ -52,6 +53,55 @@ func TestServer_defaultArgs(t *testing.T) {
}
}

func TestServer_parallelism(t *testing.T) {
root, err := url.Parse("http://api.test")
if err != nil {
t.Fatal(err)
}

cases := []struct {
name string
cpuCount int
indexConcurrency int
wantParallelism int
}{
{
name: "CPU count divides evenly",
cpuCount: 16,
indexConcurrency: 2,
wantParallelism: 8,
},
{
name: "round parallelism down",
cpuCount: 4,
indexConcurrency: 3,
wantParallelism: 1,
},
{
name: "no shard level parallelism",
cpuCount: 4,
indexConcurrency: 4,
wantParallelism: 1,
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
s := &Server{
Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)),
IndexDir: "/testdata/index",
CPUCount: tt.cpuCount,
IndexConcurrency: tt.indexConcurrency,
}

got := s.indexArgs(IndexOptions{Name: "testName"})
if !cmp.Equal(got.Parallelism, tt.wantParallelism) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(tt.wantParallelism, got))
}
})
}
}

func TestListRepoIDs(t *testing.T) {
t.Run("gRPC", func(t *testing.T) {

Expand Down

0 comments on commit 76eec50

Please sign in to comment.