Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Dec 10, 2024
1 parent 956bd64 commit b126155
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 49 deletions.
66 changes: 29 additions & 37 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,18 +420,32 @@ func (acbw *acBalancerWrapper) closeProducers() {
}
}

// HealthCheckOptions are the options to configure the health check producer.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type HealthCheckOptions struct {
// ServiceName of the gRPC service running on the server for reporting health state.
ServiceName string
// Listener is called when the health update is received from the health
// service running on the server.
Listener func(balancer.SubConnState)
// healthProducerRegisterFn is a type alias for the health producer's function
// for registering listeners.
type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()

// healthServiceOpts returns the options for client side health checking.
// It returns a nil registerHealthListenerFn if client side health checks are
// disabled.
// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
func (acbw *acBalancerWrapper) healthServiceOpts() (string, healthProducerRegisterFn) {
if acbw.ccb.cc.dopts.disableHealthCheck {
return "", nil
}
regHealthLisFn := internal.RegisterClientHealthCheckListener
if regHealthLisFn == nil {
// The health package is not imported.
return "", nil
}
cfg := acbw.ac.cc.healthCheckConfig()
if cfg == nil {
return "", nil
}
return cfg.ServiceName, regHealthLisFn.(healthProducerRegisterFn)
}

// RegisterHealthListener accepts a health listener from the LB policy. It sends
Expand All @@ -458,24 +472,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
return
}

// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
healthCheckEnabled := !acbw.ccb.cc.dopts.disableHealthCheck
regHealthLisFn := internal.RegisterClientHealthCheckListener
if regHealthLisFn == nil {
// The health package is not imported.
healthCheckEnabled = false
}
var cfg *healthCheckConfig
if healthCheckEnabled {
// Avoid acquiring cc.mu unless necessary.
cfg = acbw.ac.cc.healthCheckConfig()
healthCheckEnabled = cfg != nil
}

serviceName, registerFn := acbw.healthServiceOpts()
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
Expand All @@ -486,7 +483,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
if acbw.healthData != hd {
return
}
if !healthCheckEnabled {
if registerFn == nil {
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return
}
Expand All @@ -506,11 +503,6 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
})
}

healthOpts := HealthCheckOptions{
ServiceName: cfg.ServiceName,
Listener: listenerWrapper,
}
fn := regHealthLisFn.(func(context.Context, balancer.SubConn, HealthCheckOptions) func())
hd.closeHealthProducer = fn(ctx, acbw, healthOpts)
hd.closeHealthProducer = registerFn(ctx, acbw, serviceName, listenerWrapper)
})
}
12 changes: 6 additions & 6 deletions health/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,41 +70,41 @@ type healthServiceProducer struct {

// registerClientSideHealthCheckListener accepts a listener to provide server
// health state via the health service.
func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions) func() {
func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func() {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*healthServiceProducer)
p.mu.Lock()
defer p.mu.Unlock()
p.cancel()
<-p.cancelDone
if opts.Listener == nil {
if listener == nil {
return closeFn
}

Check warning on line 82 in health/producer.go

View check run for this annotation

Codecov / codecov/patch

health/producer.go#L81-L82

Added lines #L81 - L82 were not covered by tests

p.cancelDone = make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel

go p.startHealthCheck(ctx, sc, opts, p.cancelDone)
go p.startHealthCheck(ctx, sc, serviceName, listener, p.cancelDone)
return closeFn
}

func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions, closeCh chan struct{}) {
func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState), closeCh chan struct{}) {
defer close(closeCh)
newStream := func(method string) (any, error) {
return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method)
}

setConnectivityState := func(state connectivity.State, err error) {
opts.Listener(balancer.SubConnState{
listener(balancer.SubConnState{
ConnectivityState: state,
ConnectionError: err,
})
}

// Call the function through the internal variable as tests use it for
// mocking.
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, opts.ServiceName)
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, serviceName)
if err == nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
// RegisterClientHealthCheckListener is used to provide a listener for
// updates from the client-side health checking service. It returns a
// function that can be called to stop the health producer.
RegisterClientHealthCheckListener any // func(context.Context, balancer.SubConn, grpc.HealthCheckOptions) func()
RegisterClientHealthCheckListener any // func(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func()
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
Expand Down
10 changes: 5 additions & 5 deletions test/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,11 +1250,11 @@ func (s) TestHealthCheckUnregisterHealthListener(t *testing.T) {

_, r := setupClient(t, nil)
svcCfg := fmt.Sprintf(`{
"healthCheckConfig": {
"serviceName": "foo"
},
"loadBalancingConfig": [{"%s":{}}]
}`, t.Name())
"healthCheckConfig": {
"serviceName": "foo"
},
"loadBalancingConfig": [{"%s":{}}]
}`, t.Name())
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: parseServiceConfig(t, r, svcCfg)})
Expand Down

0 comments on commit b126155

Please sign in to comment.