Skip to content

Commit

Permalink
Merge pull request kubernetes#96421 from dgrisonnet/fix-apiservice-av…
Browse files Browse the repository at this point in the history
…ailability

Fix aggregator_unavailable_apiservice gauge
  • Loading branch information
k8s-ci-robot authored Nov 26, 2020
2 parents c1f36fa + b525f9e commit 5ed4b76
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["available_controller_test.go"],
srcs = [
"available_controller_test.go",
"metrics_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand All @@ -50,6 +53,7 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
Expand All @@ -52,6 +53,9 @@ import (
"k8s.io/kube-aggregator/pkg/controllers"
)

// making sure we only register metrics once into legacy registry
var registerIntoLegacyRegistryOnce sync.Once

type certKeyFunc func() ([]byte, []byte)

// ServiceResolver knows how to convert a service reference into an actual location.
Expand Down Expand Up @@ -93,6 +97,9 @@ type AvailableConditionController struct {
// NOTE: the cache works because we assume that the transports constructed
// by the controller only vary on the dynamic cert/key.
tlsCache *tlsTransportCache

// metrics registered into legacy registry
metrics *availabilityMetrics
}

type tlsTransportCache struct {
Expand Down Expand Up @@ -163,6 +170,7 @@ func NewAvailableConditionController(
"AvailableConditionController"),
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}

if egressSelector != nil {
Expand Down Expand Up @@ -203,12 +211,22 @@ func NewAvailableConditionController(

c.syncFn = c.sync

// TODO: decouple from legacyregistry
var err error
registerIntoLegacyRegistryOnce.Do(func() {
err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister)
})
if err != nil {
return nil, err
}

return c, nil
}

func (c *AvailableConditionController) sync(key string) error {
originalAPIService, err := c.apiServiceLister.Get(key)
if apierrors.IsNotFound(err) {
c.metrics.ForgetAPIService(key)
return nil
}
if err != nil {
Expand Down Expand Up @@ -259,7 +277,7 @@ func (c *AvailableConditionController) sync(key string) error {
// local API services are always considered available
if apiService.Spec.Service == nil {
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}

Expand All @@ -269,14 +287,14 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "ServiceNotFound"
availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
} else if err != nil {
availableCondition.Status = apiregistrationv1.ConditionUnknown
availableCondition.Reason = "ServiceAccessError"
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}

Expand All @@ -297,7 +315,7 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "ServicePortError"
availableCondition.Message = fmt.Sprintf("service/%s in %q is not listening on port %d", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, *apiService.Spec.Service.Port)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}

Expand All @@ -307,14 +325,14 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "EndpointsNotFound"
availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
} else if err != nil {
availableCondition.Status = apiregistrationv1.ConditionUnknown
availableCondition.Reason = "EndpointsAccessError"
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
hasActiveEndpoints := false
Expand All @@ -335,7 +353,7 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "MissingEndpoints"
availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
}
Expand Down Expand Up @@ -413,7 +431,7 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "FailedDiscoveryCheck"
availableCondition.Message = lastError.Error()
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, updateErr := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService)
if updateErr != nil {
return updateErr
}
Expand All @@ -426,26 +444,26 @@ func (c *AvailableConditionController) sync(key string) error {
availableCondition.Reason = "Passed"
availableCondition.Message = "all checks passed"
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
_, err = updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}

// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
// apiservices. Doing that means we don't want to quickly issue no-op updates.
func updateAPIServiceStatus(client apiregistrationclient.APIServicesGetter, originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
// update this metric on every sync operation to reflect the actual state
setUnavailableGauge(newAPIService)
c.setUnavailableGauge(newAPIService)

if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
return newAPIService, nil
}

newAPIService, err := client.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
if err != nil {
return nil, err
}

setUnavailableCounter(originalAPIService, newAPIService)
c.setUnavailableCounter(originalAPIService, newAPIService)
return newAPIService, nil
}

Expand Down Expand Up @@ -630,17 +648,17 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
}

// setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service
func setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
unavailableGauge.WithLabelValues(newAPIService.Name).Set(0.0)
c.metrics.SetAPIServiceAvailable(newAPIService.Name)
return
}

unavailableGauge.WithLabelValues(newAPIService.Name).Set(1.0)
c.metrics.SetAPIServiceUnavailable(newAPIService.Name)
}

// setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
statusChanged := isAvailable != wasAvailable
Expand All @@ -650,6 +668,6 @@ func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.
if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
reason = newCondition.Reason
}
unavailableCounter.WithLabelValues(newAPIService.Name, reason).Inc()
c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
"AvailableConditionController"),
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
for _, svc := range apiServices {
c.addAPIService(svc)
Expand Down Expand Up @@ -408,6 +409,7 @@ func TestSync(t *testing.T) {
serviceResolver: &fakeServiceResolver{url: testServer.URL},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)},
metrics: newAvailabilityMetrics(),
}
c.sync(tc.apiServiceName)

Expand Down Expand Up @@ -457,13 +459,18 @@ func TestUpdateAPIServiceStatus(t *testing.T) {
bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}

fakeClient := fake.NewSimpleClientset()
updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, foo)
c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter),
metrics: newAvailabilityMetrics(),
}

c.updateAPIServiceStatus(foo, foo)
if e, a := 0, len(fakeClient.Actions()); e != a {
t.Error(spew.Sdump(fakeClient.Actions()))
}

fakeClient.ClearActions()
updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, bar)
c.updateAPIServiceStatus(foo, bar)
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Error(spew.Sdump(fakeClient.Actions()))
}
Expand Down
132 changes: 114 additions & 18 deletions staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ limitations under the License.
package apiserver

import (
"sync"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)

/*
Expand All @@ -30,25 +31,120 @@ import (
* the metric stability policy.
*/
var (
unavailableCounter = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "aggregator_unavailable_apiservice_total",
Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.",
StabilityLevel: metrics.ALPHA,
},
[]string{"name", "reason"},
)
unavailableGauge = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Name: "aggregator_unavailable_apiservice",
Help: "Gauge of APIServices which are marked as unavailable broken down by APIService name.",
StabilityLevel: metrics.ALPHA,
},
unavailableGaugeDesc = metrics.NewDesc(
"aggregator_unavailable_apiservice",
"Gauge of APIServices which are marked as unavailable broken down by APIService name.",
[]string{"name"},
nil,
metrics.ALPHA,
"",
)
)

func init() {
legacyregistry.MustRegister(unavailableCounter)
legacyregistry.MustRegister(unavailableGauge)
type availabilityMetrics struct {
unavailableCounter *metrics.CounterVec

*availabilityCollector
}

func newAvailabilityMetrics() *availabilityMetrics {
return &availabilityMetrics{
unavailableCounter: metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "aggregator_unavailable_apiservice_total",
Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.",
StabilityLevel: metrics.ALPHA,
},
[]string{"name", "reason"},
),
availabilityCollector: newAvailabilityCollector(),
}
}

// Register registers apiservice availability metrics.
func (m *availabilityMetrics) Register(
registrationFunc func(metrics.Registerable) error,
customRegistrationFunc func(metrics.StableCollector) error,
) error {
err := registrationFunc(m.unavailableCounter)
if err != nil {
return err
}

err = customRegistrationFunc(m.availabilityCollector)
if err != nil {
return err
}

return nil
}

// UnavailableCounter returns a counter to track apiservices marked as unavailable.
func (m *availabilityMetrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric {
return m.unavailableCounter.WithLabelValues(apiServiceName, reason)
}

type availabilityCollector struct {
metrics.BaseStableCollector

mtx sync.RWMutex
availabilities map[string]bool
}

// Check if apiServiceStatusCollector implements necessary interface.
var _ metrics.StableCollector = &availabilityCollector{}

func newAvailabilityCollector() *availabilityCollector {
return &availabilityCollector{
availabilities: make(map[string]bool),
}
}

// DescribeWithStability implements the metrics.StableCollector interface.
func (c *availabilityCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- unavailableGaugeDesc
}

// CollectWithStability implements the metrics.StableCollector interface.
func (c *availabilityCollector) CollectWithStability(ch chan<- metrics.Metric) {
c.mtx.RLock()
defer c.mtx.RUnlock()

for apiServiceName, isAvailable := range c.availabilities {
gaugeValue := 1.0
if isAvailable {
gaugeValue = 0.0
}
ch <- metrics.NewLazyConstMetric(
unavailableGaugeDesc,
metrics.GaugeValue,
gaugeValue,
apiServiceName,
)
}
}

// SetAPIServiceAvailable sets the given apiservice availability gauge to available.
func (c *availabilityCollector) SetAPIServiceAvailable(apiServiceKey string) {
c.setAPIServiceAvailability(apiServiceKey, true)
}

// SetAPIServiceUnavailable sets the given apiservice availability gauge to unavailable.
func (c *availabilityCollector) SetAPIServiceUnavailable(apiServiceKey string) {
c.setAPIServiceAvailability(apiServiceKey, false)
}

func (c *availabilityCollector) setAPIServiceAvailability(apiServiceKey string, availability bool) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.availabilities[apiServiceKey] = availability
}

// ForgetAPIService removes the availability gauge of the given apiservice.
func (c *availabilityCollector) ForgetAPIService(apiServiceKey string) {
c.mtx.Lock()
defer c.mtx.Unlock()

delete(c.availabilities, apiServiceKey)
}
Loading

0 comments on commit 5ed4b76

Please sign in to comment.