Skip to content

Commit

Permalink
shuffle targets during parse phase (#343)
Browse files Browse the repository at this point in the history
* shuffle targets during parse phase

* don't use global rand
  • Loading branch information
dedelala committed Jul 30, 2024
1 parent bcb261e commit 4ef44fa
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type JSONGateResolverBuilder struct {
targets map[string][]targetHost
resolvers []*JSONGateResolver

rand *rand.Rand
sorter *shuffleSorter
ticker *time.Ticker
checksum []byte
}
Expand Down Expand Up @@ -114,6 +114,7 @@ func RegisterJSONGateResolver(
poolTypeField: poolTypeField,
affinityField: affinityField,
affinityValue: affinityValue,
sorter: newShuffleSorter(),
}

resolver.Register(jsonDiscovery)
Expand All @@ -133,9 +134,6 @@ func (*JSONGateResolverBuilder) Scheme() string { return "vtgate" }

// Parse and validate the format of the file and start watching for changes
func (b *JSONGateResolverBuilder) start() error {

b.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

// Perform the initial parse
_, err := b.parse()
if err != nil {
Expand Down Expand Up @@ -288,11 +286,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) {
}

for poolType := range targets {
if b.affinityField != "" {
sort.Slice(targets[poolType], func(i, j int) bool {
return b.affinityValue == targets[poolType][i].Affinity
})
}
b.sorter.shuffleSort(targets[poolType], b.affinityField, b.affinityValue)
if len(targets[poolType]) > *numConnections {
targets[poolType] = targets[poolType][:*numConnections]
}
Expand Down Expand Up @@ -324,32 +318,48 @@ func (b *JSONGateResolverBuilder) getTargets(poolType string) []targetHost {
targets = append(targets, b.targets[poolType]...)
b.mu.RUnlock()

// Shuffle to ensure every host has a different order to iterate through, putting
// the affinity matching (e.g. same az) hosts at the front and the non-matching ones
// at the end.
//
// Only need to do n-1 swaps since the last host is always in the right place.
b.sorter.shuffleSort(targets, b.affinityField, b.affinityValue)

return targets
}

type shuffleSorter struct {
rand *rand.Rand
mu *sync.Mutex
}

func newShuffleSorter() *shuffleSorter {
return &shuffleSorter{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
mu: &sync.Mutex{},
}
}

// shuffleSort shuffles a slice of targetHost to ensure every host has a
// different order to iterate through, putting the affinity matching (e.g. same
// az) hosts at the front and the non-matching ones at the end.
func (s *shuffleSorter) shuffleSort(targets []targetHost, affinityField, affinityValue string) {
n := len(targets)
head := 0
// Only need to do n-1 swaps since the last host is always in the right place.
tail := n - 1
for i := 0; i < n-1; i++ {
j := head + b.rand.Intn(tail-head+1)
s.mu.Lock()
j := head + s.rand.Intn(tail-head+1)
s.mu.Unlock()

if *affinityField != "" && *affinityValue == targets[j].Affinity {
if affinityField != "" && affinityValue == targets[j].Affinity {
targets[head], targets[j] = targets[j], targets[head]
head++
} else {
targets[tail], targets[j] = targets[j], targets[tail]
tail--
}
}

return targets
}

// Update the current list of hosts for the given resolver
func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {

log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)

targets := b.getTargets(r.poolType)
Expand Down

0 comments on commit 4ef44fa

Please sign in to comment.