From 76eec50fdc9cf509c0bc1b99d71ee3e4ebd995a7 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Thu, 16 Nov 2023 13:22:59 -0800 Subject: [PATCH] Indexing: limit shard parallelism when index concurrency is set --- cmd/zoekt-sourcegraph-indexserver/main.go | 13 +++-- .../main_test.go | 56 ++++++++++++++++++- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 80539c4f0..ace600c3e 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -641,11 +641,12 @@ func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field { } func (s *Server) indexArgs(opts IndexOptions) *indexArgs { + parallelism := s.CPUCount / s.IndexConcurrency return &indexArgs{ IndexOptions: opts, IndexDir: s.IndexDir, - Parallelism: s.CPUCount, + Parallelism: parallelism, Incremental: true, @@ -1409,15 +1410,17 @@ func newServer(conf rootConfig) (*Server, error) { } } - if conf.indexConcurrency < 1 { - conf.indexConcurrency = 1 - } - cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) if cpuCount < 1 { cpuCount = 1 } + if conf.indexConcurrency < 1 { + conf.indexConcurrency = 1 + } else if conf.indexConcurrency > int64(cpuCount) { + conf.indexConcurrency = int64(cpuCount) + } + q := NewQueue(conf.backoffDuration, conf.maxBackoffDuration, logger) return &Server{ diff --git a/cmd/zoekt-sourcegraph-indexserver/main_test.go b/cmd/zoekt-sourcegraph-indexserver/main_test.go index 0d3f65cd3..380da80f7 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main_test.go +++ b/cmd/zoekt-sourcegraph-indexserver/main_test.go @@ -33,9 +33,10 @@ func TestServer_defaultArgs(t *testing.T) { } s := &Server{ - Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)), - IndexDir: "/testdata/index", - CPUCount: 6, + Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)), + IndexDir: "/testdata/index", + CPUCount: 6, + IndexConcurrency: 1, } want := &indexArgs{ IndexOptions: IndexOptions{ @@ -52,6 +53,55 @@ func TestServer_defaultArgs(t *testing.T) { } } +func TestServer_parallelism(t *testing.T) { + root, err := url.Parse("http://api.test") + if err != nil { + t.Fatal(err) + } + + cases := []struct { + name string + cpuCount int + indexConcurrency int + wantParallelism int + }{ + { + name: "CPU count divides evenly", + cpuCount: 16, + indexConcurrency: 2, + wantParallelism: 8, + }, + { + name: "round parallelism down", + cpuCount: 4, + indexConcurrency: 3, + wantParallelism: 1, + }, + { + name: "no shard level parallelism", + cpuCount: 4, + indexConcurrency: 4, + wantParallelism: 1, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + s := &Server{ + Sourcegraph: newSourcegraphClient(root, "", WithBatchSize(0)), + IndexDir: "/testdata/index", + CPUCount: tt.cpuCount, + IndexConcurrency: tt.indexConcurrency, + } + + got := s.indexArgs(IndexOptions{Name: "testName"}) + if !cmp.Equal(got.Parallelism, tt.wantParallelism) { + t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(tt.wantParallelism, got)) + } + }) + } +} + func TestListRepoIDs(t *testing.T) { t.Run("gRPC", func(t *testing.T) {