diff --git a/balancer_wrapper.go b/balancer_wrapper.go index 0cc738f6305b..9daef60ec170 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -420,18 +420,32 @@ func (acbw *acBalancerWrapper) closeProducers() { } } -// HealthCheckOptions are the options to configure the health check producer. -// -// # Experimental -// -// Notice: This type is EXPERIMENTAL and may be changed or removed in a -// later release. -type HealthCheckOptions struct { - // ServiceName of the gRPC service running on the server for reporting health state. - ServiceName string - // Listener is called when the health update is received from the health - // service running on the server. - Listener func(balancer.SubConnState) +// healthProducerRegisterFn is a type alias for the health producer's function +// for registering listeners. +type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func() + +// healthServiceOpts returns the options for client side health checking. +// It returns a nil registerHealthListenerFn if client side health checks are +// disabled. +// Client side health checking is enabled when all the following +// conditions are satisfied: +// 1. Health checking is not disabled using the dial option. +// 2. The health package is imported. +// 3. The health check config is present in the service config. +func (acbw *acBalancerWrapper) healthServiceOpts() (string, healthProducerRegisterFn) { + if acbw.ccb.cc.dopts.disableHealthCheck { + return "", nil + } + regHealthLisFn := internal.RegisterClientHealthCheckListener + if regHealthLisFn == nil { + // The health package is not imported. + return "", nil + } + cfg := acbw.ac.cc.healthCheckConfig() + if cfg == nil { + return "", nil + } + return cfg.ServiceName, regHealthLisFn.(healthProducerRegisterFn) } // RegisterHealthListener accepts a health listener from the LB policy. It sends @@ -458,24 +472,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub return } - // Client side health checking is enabled when all the following - // conditions are satisfied: - // 1. Health checking is not disabled using the dial option. - // 2. The health package is imported. - // 3. The health check config is present in the service config. - healthCheckEnabled := !acbw.ccb.cc.dopts.disableHealthCheck - regHealthLisFn := internal.RegisterClientHealthCheckListener - if regHealthLisFn == nil { - // The health package is not imported. - healthCheckEnabled = false - } - var cfg *healthCheckConfig - if healthCheckEnabled { - // Avoid acquiring cc.mu unless necessary. - cfg = acbw.ac.cc.healthCheckConfig() - healthCheckEnabled = cfg != nil - } - + serviceName, registerFn := acbw.healthServiceOpts() acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { if ctx.Err() != nil || acbw.ccb.balancer == nil { return @@ -486,7 +483,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub if acbw.healthData != hd { return } - if !healthCheckEnabled { + if registerFn == nil { listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) return } @@ -506,11 +503,6 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub }) } - healthOpts := HealthCheckOptions{ - ServiceName: cfg.ServiceName, - Listener: listenerWrapper, - } - fn := regHealthLisFn.(func(context.Context, balancer.SubConn, HealthCheckOptions) func()) - hd.closeHealthProducer = fn(ctx, acbw, healthOpts) + hd.closeHealthProducer = registerFn(ctx, acbw, serviceName, listenerWrapper) }) } diff --git a/health/producer.go b/health/producer.go index 3eacae76030c..ebe67ee51ce2 100644 --- a/health/producer.go +++ b/health/producer.go @@ -70,14 +70,14 @@ type healthServiceProducer struct { // registerClientSideHealthCheckListener accepts a listener to provide server // health state via the health service. -func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions) func() { +func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func() { pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton) p := pr.(*healthServiceProducer) p.mu.Lock() defer p.mu.Unlock() p.cancel() <-p.cancelDone - if opts.Listener == nil { + if listener == nil { return closeFn } @@ -85,18 +85,18 @@ func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubC ctx, cancel := context.WithCancel(ctx) p.cancel = cancel - go p.startHealthCheck(ctx, sc, opts, p.cancelDone) + go p.startHealthCheck(ctx, sc, serviceName, listener, p.cancelDone) return closeFn } -func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions, closeCh chan struct{}) { +func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState), closeCh chan struct{}) { defer close(closeCh) newStream := func(method string) (any, error) { return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method) } setConnectivityState := func(state connectivity.State, err error) { - opts.Listener(balancer.SubConnState{ + listener(balancer.SubConnState{ ConnectivityState: state, ConnectionError: err, }) @@ -104,7 +104,7 @@ func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balance // Call the function through the internal variable as tests use it for // mocking. - err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, opts.ServiceName) + err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, serviceName) if err == nil { return } diff --git a/internal/internal.go b/internal/internal.go index 12f9916c7350..c17b98194b3c 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -34,7 +34,7 @@ var ( // RegisterClientHealthCheckListener is used to provide a listener for // updates from the client-side health checking service. It returns a // function that can be called to stop the health producer. - RegisterClientHealthCheckListener any // func(context.Context, balancer.SubConn, grpc.HealthCheckOptions) func() + RegisterClientHealthCheckListener any // func(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func() // BalancerUnregister is exported by package balancer to unregister a balancer. BalancerUnregister func(name string) // KeepaliveMinPingTime is the minimum ping interval. This must be 10s by diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 7178e7a59d8d..fac565240ab7 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -1250,11 +1250,11 @@ func (s) TestHealthCheckUnregisterHealthListener(t *testing.T) { _, r := setupClient(t, nil) svcCfg := fmt.Sprintf(`{ - "healthCheckConfig": { - "serviceName": "foo" - }, - "loadBalancingConfig": [{"%s":{}}] - }`, t.Name()) + "healthCheckConfig": { + "serviceName": "foo" + }, + "loadBalancingConfig": [{"%s":{}}] + }`, t.Name()) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseServiceConfig(t, r, svcCfg)})