Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Register a health listener when used as a leaf policy #7832

Merged
merged 11 commits into from
Dec 5, 2024
146 changes: 112 additions & 34 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
// It is changed to "pick_first" in init() if this balancer is to be
// registered as the default pickfirst.
Name = "pick_first_leaf"

// enableHealthListenerKey is the balancer attributes key for making
// pickfirst listen to health updates when its under a petiole policy.
enableHealthListenerKey = "generic_health_listener_enabled"
// enableHealthListenerValue is the resolver state attributes value for making
// pickfirst listen to health updates when its under a petiole policy.
enableHealthListenerValue = &struct{}{}
easwars marked this conversation as resolved.
Show resolved Hide resolved
)

const (
Expand Down Expand Up @@ -85,7 +92,8 @@
cc: cc,
addressList: addressList{},
dfawley marked this conversation as resolved.
Show resolved Hide resolved
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
connectivityState: connectivity.Connecting,
concludedState: connectivity.Connecting,
mu: sync.Mutex{},
cancelConnectionTimer: func() {},
}
Expand All @@ -105,6 +113,13 @@
return cfg, nil
}

// EnableHealthListener updates the state to configure pickfirst for using a
// generic health listener.
func EnableHealthListener(state balancer.ClientConnState) balancer.ClientConnState {
easwars marked this conversation as resolved.
Show resolved Hide resolved
state.ResolverState.Attributes = state.ResolverState.Attributes.WithValue(enableHealthListenerKey, enableHealthListenerValue)
return state
}

type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`

Expand All @@ -122,15 +137,15 @@
subConn balancer.SubConn
addr resolver.Address

state connectivity.State
connectivityState connectivity.State
lastErr error
connectionFailedInFirstPass bool
}

func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
sd := &scData{
state: connectivity.Idle,
addr: addr,
connectivityState: connectivity.Idle,
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
Expand All @@ -153,14 +168,18 @@
// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
// SubConn state updates.
mu sync.Mutex
state connectivity.State
mu sync.Mutex
connectivityState connectivity.State
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this deserves a comment, too, since there are now two very similar fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this kind of tracking be done in the subconn struct (scData) and not here? I would expect the lb policy only has the concludedState and each subchannel needs to track its real state and its effective state, accounting for sticky-TF and health reporting? It seems confusing to me that the LB policy itself is tracking two different states, but I'm willing to believe it's simpler this way if you tried it the other way already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring the Java implementation of Pickfirst and they were handling the sticky TF behaviour in the LB Policy. I don't see any issue with handling sticky TF in the subchannel state. I've update the PR to reflect the suggestions.

// State reported to the channel. It will be the health state when being
// used as a leaf policy and the connectivityState is READY.
concludedState connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
cancelConnectionTimer func()
healthCheckingEnabled bool
}

// ResolverError is called by the ClientConn when the name resolver produces
Expand All @@ -179,14 +198,15 @@
// The picker will not change since the balancer does not currently
// report an error. If the balancer hasn't received a single good resolver
// update yet, transition to TRANSIENT_FAILURE.
if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
if b.connectivityState != connectivity.TransientFailure && b.addressList.size() > 0 {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if b.logger.V(2) {
b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
}
return
}

b.cc.UpdateState(balancer.State{
b.connectivityState = connectivity.TransientFailure
easwars marked this conversation as resolved.
Show resolved Hide resolved
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
Expand All @@ -199,12 +219,13 @@
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.connectivityState = connectivity.TransientFailure
b.closeSubConnsLocked()
b.addressList.updateAddrs(nil)
b.resolverErrorLocked(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKey) == enableHealthListenerValue
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
Expand Down Expand Up @@ -256,7 +277,7 @@
prevAddr := b.addressList.currentAddress()
prevAddrsCount := b.addressList.size()
b.addressList.updateAddrs(newAddrs)
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {
if b.connectivityState == connectivity.Ready && b.addressList.seekTo(prevAddr) {
return nil
}

Expand All @@ -268,15 +289,15 @@
// we should still enter CONNECTING because the sticky TF behaviour
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
// due to connectivity failures.
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
if b.connectivityState == connectivity.Ready || b.connectivityState == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
b.connectivityState = connectivity.Connecting
b.forceUpdateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.startFirstPassLocked()
} else if b.state == connectivity.TransientFailure {
} else if b.connectivityState == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.startFirstPassLocked()
Expand All @@ -295,7 +316,7 @@
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.cancelConnectionTimer()
b.state = connectivity.Shutdown
b.connectivityState = connectivity.Shutdown
}

// ExitIdle moves the balancer out of idle state. It can be called concurrently
Expand All @@ -304,7 +325,7 @@
func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle {
if b.connectivityState == connectivity.Idle {
b.startFirstPassLocked()
}
}
Expand Down Expand Up @@ -470,7 +491,7 @@
}

scd := sd.(*scData)
switch scd.state {
switch scd.connectivityState {
case connectivity.Idle:
scd.subConn.Connect()
b.scheduleNextConnectionLocked()
Expand All @@ -488,7 +509,7 @@
b.scheduleNextConnectionLocked()
return
default:
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.connectivityState)

Check warning on line 512 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L512

Added line #L512 was not covered by tests
return

}
Expand Down Expand Up @@ -531,12 +552,12 @@
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
oldState := sd.connectivityState
// Record a connection attempt when exiting CONNECTING.
if newState.ConnectivityState == connectivity.TransientFailure {
sd.connectionFailedInFirstPass = true
}
sd.state = newState.ConnectivityState
sd.connectivityState = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes this
Expand All @@ -556,11 +577,20 @@
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.state = connectivity.Ready
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
b.connectivityState = connectivity.Ready
if !b.healthCheckingEnabled {
b.updateConcludedStateLocked(balancer.State{
easwars marked this conversation as resolved.
Show resolved Hide resolved
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
easwars marked this conversation as resolved.
Show resolved Hide resolved
} else {
if b.logger.V(2) {
b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
}

Check warning on line 589 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L588-L589

Added lines #L588 - L589 were not covered by tests
sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
b.updateSubConnHealthState(sd, scs)
})
}
return
}

Expand All @@ -570,13 +600,13 @@
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
if (b.connectivityState == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
b.connectivityState = connectivity.Idle
b.addressList.reset()
b.cc.UpdateState(balancer.State{
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
})
Expand All @@ -590,9 +620,9 @@
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
// TRANSIENT_FAILURE until it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.state == connectivity.Idle {
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
if b.connectivityState == connectivity.Idle {
b.connectivityState = connectivity.Connecting
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
Expand Down Expand Up @@ -625,7 +655,7 @@
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
b.updateConcludedStateLocked(balancer.State{
easwars marked this conversation as resolved.
Show resolved Hide resolved
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
Expand Down Expand Up @@ -655,7 +685,7 @@
}
}
b.firstPass = false
b.state = connectivity.TransientFailure
b.connectivityState = connectivity.TransientFailure

b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand All @@ -664,12 +694,60 @@
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.state == connectivity.Idle {
if sd.connectivityState == connectivity.Idle {
sd.subConn.Connect()
}
}
}

func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes
// this SubConn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
return
}

Check warning on line 712 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L711-L712

Added lines #L711 - L712 were not covered by tests
easwars marked this conversation as resolved.
Show resolved Hide resolved
switch state.ConnectivityState {
case connectivity.Ready:
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
case connectivity.TransientFailure:
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("health check failure: %v", state.ConnectionError)},
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
case connectivity.Connecting:
b.updateConcludedStateLocked(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
easwars marked this conversation as resolved.
Show resolved Hide resolved
default:
b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)

Check warning on line 730 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L729-L730

Added lines #L729 - L730 were not covered by tests
}
}

func (b *pickfirstBalancer) updateConcludedStateLocked(newState balancer.State) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
// Optimization to send duplicate CONNECTING and IDLE updates.
easwars marked this conversation as resolved.
Show resolved Hide resolved
if newState.ConnectivityState == b.concludedState && (b.concludedState == connectivity.Connecting || b.concludedState == connectivity.Idle) {
return
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 738 in balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

View check run for this annotation

Codecov / codecov/patch

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go#L737-L738

Added lines #L737 - L738 were not covered by tests
b.forceUpdateConcludedStateLocked(newState)
}

// A separate function to force update the ClientConn state is required to send
// the first CONNECTING update since the channel doesn't correctly assume that
// LB policies start in CONNECTING and relies on LB policy to send an initial
// CONNECTING update.
func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
b.concludedState = newState.ConnectivityState
b.cc.UpdateState(newState)
}

type picker struct {
result balancer.PickResult
err error
Expand Down
Loading