Skip to content

Commit

Permalink
fix inefficient blockRanges sorting creating GC thrashing
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Dec 2, 2024
1 parent 02d9c24 commit a5f3eb8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 32 deletions.
23 changes: 0 additions & 23 deletions block/ranges.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package block

import (
"sort"
"strings"
)

Expand Down Expand Up @@ -46,28 +45,6 @@ func (r Ranges) Contains(input *Range) bool {
return false
}

func (r Ranges) SortAndDedupe() (out Ranges) {
if r == nil {
return nil
}

m := make(map[string]*Range)

for _, rr := range r {
m[rr.String()] = rr
}

out = make(Ranges, len(m))
i := 0
for _, v := range m {
out[i] = v
i++
}

sort.Sort(out)
return
}

func (r Ranges) Merged() (out Ranges) {
if r == nil {
return nil
Expand Down
27 changes: 18 additions & 9 deletions orchestrator/stage/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stage
import (
"context"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -183,24 +184,32 @@ func (s *Stages) UpdateStats() {
s.lastStatUpdate = time.Now()
out := make([]*pbsubstreamsrpc.Stage, len(s.stages))

for i := range s.stages {
for stgIdx := range s.stages {

mods := make([]string, len(s.stages[i].allExecutedModules))
_ = copy(mods, s.stages[i].allExecutedModules)
mods := make([]string, len(s.stages[stgIdx].allExecutedModules))
_ = copy(mods, s.stages[stgIdx].allExecutedModules)

var br []*block.Range
br := make(map[uint64]*block.Range)
for segmentIdx, segment := range s.segmentStates {
state := segment[i]
segmenter := s.stages[i].storeModuleStates[0].segmenter
state := segment[stgIdx]
segmenter := s.stages[stgIdx].storeModuleStates[0].segmenter
if state == UnitCompleted || state == UnitPartialPresent || state == UnitMerging {
if rng := segmenter.Range(segmentIdx + s.segmentOffset); rng != nil {
br = append(br, rng)
br[rng.StartBlock] = rng
}
}
}
blockRanges := block.Ranges(br).SortAndDedupe().Merged()

out[i] = &pbsubstreamsrpc.Stage{
blockRanges := block.Ranges(make([]*block.Range, len(br)))
i := 0
for _, v := range br {
blockRanges[i] = v
i++
}
sort.Sort(blockRanges)
blockRanges = blockRanges.Merged()

out[stgIdx] = &pbsubstreamsrpc.Stage{
Modules: mods,
CompletedRanges: toProtoRanges(blockRanges),
}
Expand Down

0 comments on commit a5f3eb8

Please sign in to comment.