Skip to content

Commit

Permalink
Revert "shards: respect scheduler and use smarter synchronization for…
Browse files Browse the repository at this point in the history
… List (#750)" (#752)

This reverts commit 931974d.

After monitoring production we are seeing increased latencies in List
calls. Additionally we are seeing many more scheduler transitions into
interactive queued. The only realistic cause of this was this commit, so
we are reverting for now until further investigation.

Test Plan: go test

(cherry picked from commit 8cf8887)
  • Loading branch information
keegancsmith committed Mar 27, 2024
1 parent 31adf4c commit 6364497
Showing 1 changed file with 28 additions and 64 deletions.
92 changes: 28 additions & 64 deletions shards/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -896,19 +895,20 @@ type shardListResult struct {
err error
}

func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions) (result *zoekt.RepoList, _ error) {
func listOneShard(ctx context.Context, s zoekt.Searcher, q query.Q, opts *zoekt.ListOptions, sink chan shardListResult) {
metricListShardRunning.Inc()
defer func() {
metricListShardRunning.Dec()
// If we panic, we log the panic and set Crashes (but do not return an
// error).
if r := recover(); r != nil {
log.Printf("crashed shard: %s: %s, %s", s.String(), r, debug.Stack())
result = &zoekt.RepoList{Crashes: 1}
sink <- shardListResult{
&zoekt.RepoList{Crashes: 1}, nil,
}
}
}()

return s.List(ctx, q, opts)
ms, err := s.List(ctx, q, opts)
sink <- shardListResult{ms, err}
}

func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.ListOptions) (rl *zoekt.RepoList, err error) {
Expand Down Expand Up @@ -969,65 +969,34 @@ func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.List
return &agg, nil
}

// We use an errgroup so that when an error is encountered we can stop
// feeder and report the first error seen.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g, ctx := errgroup.WithContext(ctx)

// Bound work by number of CPUs.
workers := min(runtime.GOMAXPROCS(0), len(shards))

var (
feeder = make(chan zoekt.Searcher, workers)
all = make(chan *zoekt.RepoList, workers)
)

// Send shards to feeder until context is canceled.
g.Go(func() error {
defer close(feeder)
for _, s := range shards {
// If context is canceled we stop consuming from shards and cancel the
// errgroup.
if err := proc.Yield(ctx); err != nil {
return err
}
feeder <- s
}
return nil
})
shardCount := len(shards)
all := make(chan shardListResult, shardCount)
feeder := make(chan zoekt.Searcher, len(shards))
for _, s := range shards {
feeder <- s
}
close(feeder)

// Start up workers goroutines to consume feeder, do listing of a shard and
// send results down all. If an error is encountered we cancel the errgroup.
for range workers {
g.Go(func() error {
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
go func() {
for s := range feeder {
result, err := listOneShard(ctx, s, q, opts)
if err != nil {
return err
}
all <- result
listOneShard(ctx, s, q, opts, all)
}
return nil
})
}()
}

// Once all goroutines in errgroup is done, we know nothing more will be
// sent to all so close it. We rely on this sync point such that workersErr
// will be written to before we are finished reading from all.
var workersErr error
go func() {
workersErr = g.Wait()
close(all)
}()

// Aggregate results from all.
uniq := map[string]*zoekt.RepoListEntry{}
for rl := range all {
agg.Crashes += rl.Crashes
agg.Stats.Add(&rl.Stats)

for _, r := range rl.Repos {
for range shards {
r := <-all
if r.err != nil {
return nil, r.err
}

agg.Crashes += r.rl.Crashes
agg.Stats.Add(&r.rl.Stats)

for _, r := range r.rl.Repos {
prev, ok := uniq[r.Repository.Name]
if !ok {
cp := *r // We need to copy because we mutate r.Stats when merging duplicates
Expand All @@ -1037,19 +1006,14 @@ func (ss *shardedSearcher) List(ctx context.Context, q query.Q, opts *zoekt.List
}
}

for id, r := range rl.ReposMap {
for id, r := range r.rl.ReposMap {
_, ok := agg.ReposMap[id]
if !ok {
agg.ReposMap[id] = r
}
}
}

// workersErr will now be set since all is closed.
if workersErr != nil {
return nil, workersErr
}

agg.Repos = make([]*zoekt.RepoListEntry, 0, len(uniq))
for _, r := range uniq {
agg.Repos = append(agg.Repos, r)
Expand Down

0 comments on commit 6364497

Please sign in to comment.