Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Dec 6, 2024
1 parent 5d1d853 commit 956bd64
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 105 deletions.
2 changes: 1 addition & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, sub
return addresses, nil
}

// stateStoringBalancer stores the state of the subconns being created.
// stateStoringBalancer stores the state of the SubConns being created.
type stateStoringBalancer struct {
balancer.Balancer
mu sync.Mutex
Expand Down
57 changes: 33 additions & 24 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,17 @@ type healthData struct {
// to the LB policy. This is stored to avoid sending updates when the
// SubConn has already exited connectivity state READY.
connectivityState connectivity.State
// closeHealthProducer stores function to close the ref counted health
// producer. The health producer is automatically closed when the SubConn
// state changes.
closeHealthProducer func()
}

func newHealthData(s connectivity.State) *healthData {
return &healthData{connectivityState: s}
return &healthData{
connectivityState: s,
closeHealthProducer: func() {},
}
}

// updateState is invoked by grpc to push a subConn state update to the
Expand Down Expand Up @@ -420,10 +427,11 @@ func (acbw *acBalancerWrapper) closeProducers() {
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type HealthCheckOptions struct {
// Name of the gRPC service running on the server for reporting health state.
// If the service name is empty, client side health checking will be disabled.
HealthServiceName string
Listener func(balancer.SubConnState)
// ServiceName of the gRPC service running on the server for reporting health state.
ServiceName string
// Listener is called when the health update is received from the health
// service running on the server.
Listener func(balancer.SubConnState)
}

// RegisterHealthListener accepts a health listener from the LB policy. It sends
Expand All @@ -434,6 +442,7 @@ type HealthCheckOptions struct {
func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
acbw.healthData.closeHealthProducer()
// listeners should not be registered when the connectivity state
// isn't Ready. This may happen when the balancer registers a listener
// after the connectivityState is updated, but before it is notified
Expand All @@ -445,14 +454,21 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
// registered health listeners.
hd := newHealthData(connectivity.Ready)
acbw.healthData = hd
if listener == nil {
return
}

// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. The health check config is present in the service config.
// 2. Health checking is not disabled using the dial option.
// 3. The health package is imported.
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
healthCheckEnabled := !acbw.ccb.cc.dopts.disableHealthCheck
regHealthLisFn := internal.RegisterClientHealthCheckListener
healthCheckEnabled := !acbw.ccb.cc.dopts.disableHealthCheck && regHealthLisFn != nil
if regHealthLisFn == nil {
// The health package is not imported.
healthCheckEnabled = false
}
var cfg *healthCheckConfig
if healthCheckEnabled {
// Avoid acquiring cc.mu unless necessary.
Expand All @@ -467,14 +483,11 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
// Don't send updates if a new listener is registered.
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
curHD := acbw.healthData
if curHD != hd {
if acbw.healthData != hd {
return
}
if !healthCheckEnabled {
if listener != nil {
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return
}
// Serialize the health updates from the health producer with
Expand All @@ -485,23 +498,19 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
return
}
acbw.healthMu.Lock()
curHD := acbw.healthData
acbw.healthMu.Unlock()
if curHD != hd {
defer acbw.healthMu.Unlock()
if acbw.healthData != hd {
return
}
listener(scs)
})
}

if listener == nil {
listenerWrapper = nil
}

healthOpts := HealthCheckOptions{
HealthServiceName: cfg.ServiceName,
Listener: listenerWrapper,
ServiceName: cfg.ServiceName,
Listener: listenerWrapper,
}
regHealthLisFn.(func(context.Context, balancer.SubConn, HealthCheckOptions))(ctx, acbw, healthOpts)
fn := regHealthLisFn.(func(context.Context, balancer.SubConn, HealthCheckOptions) func())
hd.closeHealthProducer = fn(ctx, acbw, healthOpts)
})
}
19 changes: 7 additions & 12 deletions health/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func init() {
producerBuilderSingleton = &producerBuilder{}
internal.RegisterClientHealthCheckListener = RegisterClientSideHealthCheckListener
internal.RegisterClientHealthCheckListener = registerClientSideHealthCheckListener
}

type producerBuilder struct{}
Expand Down Expand Up @@ -68,34 +68,29 @@ type healthServiceProducer struct {
cancelDone chan (struct{})
}

// RegisterClientSideHealthCheckListener accepts a listener to provide server
// registerClientSideHealthCheckListener accepts a listener to provide server
// health state via the health service.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
func RegisterClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions) {
pr, _ := sc.GetOrBuildProducer(producerBuilderSingleton)
func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions) func() {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*healthServiceProducer)
p.mu.Lock()
defer p.mu.Unlock()
p.cancel()
<-p.cancelDone
if opts.Listener == nil {
return
return closeFn
}

Check warning on line 82 in health/producer.go

View check run for this annotation

Codecov / codecov/patch

health/producer.go#L81-L82

Added lines #L81 - L82 were not covered by tests

p.cancelDone = make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel

go p.startHealthCheck(ctx, sc, opts, p.cancelDone)
return closeFn
}

func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions, closeCh chan struct{}) {
defer close(closeCh)
serviceName := opts.HealthServiceName
newStream := func(method string) (any, error) {
return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method)
}
Expand All @@ -109,7 +104,7 @@ func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balance

// Call the function through the internal variable as tests use it for
// mocking.
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, serviceName)
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, opts.ServiceName)
if err == nil {
return
}
Expand Down
5 changes: 3 additions & 2 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ var (
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
// RegisterClientHealthCheckListener is used to provide a listener for
// updates from the client-side health checking service.
RegisterClientHealthCheckListener any // func(context.Context, balancer.SubConn, grpc.HealthCheckOptions)
// updates from the client-side health checking service. It returns a
// function that can be called to stop the health producer.
RegisterClientHealthCheckListener any // func(context.Context, balancer.SubConn, grpc.HealthCheckOptions) func()
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
Expand Down
Loading

0 comments on commit 956bd64

Please sign in to comment.