diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD index 443aee80b157f..6a3197382f090 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD @@ -41,7 +41,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["available_controller_test.go"], + srcs = [ + "available_controller_test.go", + "metrics_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -50,6 +53,7 @@ go_test( "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 525d8c98c6344..5d91c85fa66fb 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -43,6 +43,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" + "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" @@ -52,6 +53,9 @@ import ( "k8s.io/kube-aggregator/pkg/controllers" ) +// making sure we only register metrics once into legacy registry +var registerIntoLegacyRegistryOnce sync.Once + type certKeyFunc func() ([]byte, []byte) // ServiceResolver knows how to convert a service reference into an actual location. @@ -93,6 +97,9 @@ type AvailableConditionController struct { // NOTE: the cache works because we assume that the transports constructed // by the controller only vary on the dynamic cert/key. tlsCache *tlsTransportCache + + // metrics registered into legacy registry + metrics *availabilityMetrics } type tlsTransportCache struct { @@ -163,6 +170,7 @@ func NewAvailableConditionController( "AvailableConditionController"), proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, + metrics: newAvailabilityMetrics(), } if egressSelector != nil { @@ -203,12 +211,22 @@ func NewAvailableConditionController( c.syncFn = c.sync + // TODO: decouple from legacyregistry + var err error + registerIntoLegacyRegistryOnce.Do(func() { + err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) + }) + if err != nil { + return nil, err + } + return c, nil } func (c *AvailableConditionController) sync(key string) error { originalAPIService, err := c.apiServiceLister.Get(key) if apierrors.IsNotFound(err) { + c.metrics.ForgetAPIService(key) return nil } if err != nil { @@ -259,7 +277,7 @@ func (c *AvailableConditionController) sync(key string) error { // local API services are always considered available if apiService.Spec.Service == nil { apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition()) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } @@ -269,14 +287,14 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "ServiceNotFound" availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } else if err != nil { availableCondition.Status = apiregistrationv1.ConditionUnknown availableCondition.Reason = "ServiceAccessError" availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } @@ -297,7 +315,7 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "ServicePortError" availableCondition.Message = fmt.Sprintf("service/%s in %q is not listening on port %d", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, *apiService.Spec.Service.Port) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } @@ -307,14 +325,14 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "EndpointsNotFound" availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } else if err != nil { availableCondition.Status = apiregistrationv1.ConditionUnknown availableCondition.Reason = "EndpointsAccessError" availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } hasActiveEndpoints := false @@ -335,7 +353,7 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "MissingEndpoints" availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName) apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err := c.updateAPIServiceStatus(originalAPIService, apiService) return err } } @@ -413,7 +431,7 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "FailedDiscoveryCheck" availableCondition.Message = lastError.Error() apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, updateErr := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService) if updateErr != nil { return updateErr } @@ -426,26 +444,26 @@ func (c *AvailableConditionController) sync(key string) error { availableCondition.Reason = "Passed" availableCondition.Message = "all checks passed" apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) - _, err = updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService) + _, err = c.updateAPIServiceStatus(originalAPIService, apiService) return err } // updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead // apiservices. Doing that means we don't want to quickly issue no-op updates. -func updateAPIServiceStatus(client apiregistrationclient.APIServicesGetter, originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { +func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { // update this metric on every sync operation to reflect the actual state - setUnavailableGauge(newAPIService) + c.setUnavailableGauge(newAPIService) if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) { return newAPIService, nil } - newAPIService, err := client.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{}) + newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{}) if err != nil { return nil, err } - setUnavailableCounter(originalAPIService, newAPIService) + c.setUnavailableCounter(originalAPIService, newAPIService) return newAPIService, nil } @@ -630,17 +648,17 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) { } // setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service -func setUnavailableGauge(newAPIService *apiregistrationv1.APIService) { +func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) { if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) { - unavailableGauge.WithLabelValues(newAPIService.Name).Set(0.0) + c.metrics.SetAPIServiceAvailable(newAPIService.Name) return } - unavailableGauge.WithLabelValues(newAPIService.Name).Set(1.0) + c.metrics.SetAPIServiceUnavailable(newAPIService.Name) } // setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed -func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { +func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available) isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) statusChanged := isAvailable != wasAvailable @@ -650,6 +668,6 @@ func setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1. if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil { reason = newCondition.Reason } - unavailableCounter.WithLabelValues(newAPIService.Name, reason).Inc() + c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc() } } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index 1bb0b403f7f85..46dc74877ff20 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -135,6 +135,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), "AvailableConditionController"), tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, + metrics: newAvailabilityMetrics(), } for _, svc := range apiServices { c.addAPIService(svc) @@ -408,6 +409,7 @@ func TestSync(t *testing.T) { serviceResolver: &fakeServiceResolver{url: testServer.URL}, proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, tlsCache: &tlsTransportCache{transports: make(map[tlsCacheKey]http.RoundTripper)}, + metrics: newAvailabilityMetrics(), } c.sync(tc.apiServiceName) @@ -457,13 +459,18 @@ func TestUpdateAPIServiceStatus(t *testing.T) { bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}} fakeClient := fake.NewSimpleClientset() - updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, foo) + c := AvailableConditionController{ + apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), + metrics: newAvailabilityMetrics(), + } + + c.updateAPIServiceStatus(foo, foo) if e, a := 0, len(fakeClient.Actions()); e != a { t.Error(spew.Sdump(fakeClient.Actions())) } fakeClient.ClearActions() - updateAPIServiceStatus(fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), foo, bar) + c.updateAPIServiceStatus(foo, bar) if e, a := 1, len(fakeClient.Actions()); e != a { t.Error(spew.Sdump(fakeClient.Actions())) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go index 3a601a9ad9d0b..524a8edb9aa9b 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go @@ -17,8 +17,9 @@ limitations under the License. package apiserver import ( + "sync" + "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" ) /* @@ -30,25 +31,120 @@ import ( * the metric stability policy. */ var ( - unavailableCounter = metrics.NewCounterVec( - &metrics.CounterOpts{ - Name: "aggregator_unavailable_apiservice_total", - Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.", - StabilityLevel: metrics.ALPHA, - }, - []string{"name", "reason"}, - ) - unavailableGauge = metrics.NewGaugeVec( - &metrics.GaugeOpts{ - Name: "aggregator_unavailable_apiservice", - Help: "Gauge of APIServices which are marked as unavailable broken down by APIService name.", - StabilityLevel: metrics.ALPHA, - }, + unavailableGaugeDesc = metrics.NewDesc( + "aggregator_unavailable_apiservice", + "Gauge of APIServices which are marked as unavailable broken down by APIService name.", []string{"name"}, + nil, + metrics.ALPHA, + "", ) ) -func init() { - legacyregistry.MustRegister(unavailableCounter) - legacyregistry.MustRegister(unavailableGauge) +type availabilityMetrics struct { + unavailableCounter *metrics.CounterVec + + *availabilityCollector +} + +func newAvailabilityMetrics() *availabilityMetrics { + return &availabilityMetrics{ + unavailableCounter: metrics.NewCounterVec( + &metrics.CounterOpts{ + Name: "aggregator_unavailable_apiservice_total", + Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.", + StabilityLevel: metrics.ALPHA, + }, + []string{"name", "reason"}, + ), + availabilityCollector: newAvailabilityCollector(), + } +} + +// Register registers apiservice availability metrics. +func (m *availabilityMetrics) Register( + registrationFunc func(metrics.Registerable) error, + customRegistrationFunc func(metrics.StableCollector) error, +) error { + err := registrationFunc(m.unavailableCounter) + if err != nil { + return err + } + + err = customRegistrationFunc(m.availabilityCollector) + if err != nil { + return err + } + + return nil +} + +// UnavailableCounter returns a counter to track apiservices marked as unavailable. +func (m *availabilityMetrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric { + return m.unavailableCounter.WithLabelValues(apiServiceName, reason) +} + +type availabilityCollector struct { + metrics.BaseStableCollector + + mtx sync.RWMutex + availabilities map[string]bool +} + +// Check if apiServiceStatusCollector implements necessary interface. +var _ metrics.StableCollector = &availabilityCollector{} + +func newAvailabilityCollector() *availabilityCollector { + return &availabilityCollector{ + availabilities: make(map[string]bool), + } +} + +// DescribeWithStability implements the metrics.StableCollector interface. +func (c *availabilityCollector) DescribeWithStability(ch chan<- *metrics.Desc) { + ch <- unavailableGaugeDesc +} + +// CollectWithStability implements the metrics.StableCollector interface. +func (c *availabilityCollector) CollectWithStability(ch chan<- metrics.Metric) { + c.mtx.RLock() + defer c.mtx.RUnlock() + + for apiServiceName, isAvailable := range c.availabilities { + gaugeValue := 1.0 + if isAvailable { + gaugeValue = 0.0 + } + ch <- metrics.NewLazyConstMetric( + unavailableGaugeDesc, + metrics.GaugeValue, + gaugeValue, + apiServiceName, + ) + } +} + +// SetAPIServiceAvailable sets the given apiservice availability gauge to available. +func (c *availabilityCollector) SetAPIServiceAvailable(apiServiceKey string) { + c.setAPIServiceAvailability(apiServiceKey, true) +} + +// SetAPIServiceUnavailable sets the given apiservice availability gauge to unavailable. +func (c *availabilityCollector) SetAPIServiceUnavailable(apiServiceKey string) { + c.setAPIServiceAvailability(apiServiceKey, false) +} + +func (c *availabilityCollector) setAPIServiceAvailability(apiServiceKey string, availability bool) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.availabilities[apiServiceKey] = availability +} + +// ForgetAPIService removes the availability gauge of the given apiservice. +func (c *availabilityCollector) ForgetAPIService(apiServiceKey string) { + c.mtx.Lock() + defer c.mtx.Unlock() + + delete(c.availabilities, apiServiceKey) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go new file mode 100644 index 0000000000000..8205bb010c2da --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "strings" + "testing" + + "k8s.io/component-base/metrics/testutil" +) + +func TestAPIServiceAvailabilityCollection(t *testing.T) { + collector := newAvailabilityCollector() + + availableAPIService := "available" + unavailableAPIService := "unavailable" + + collector.SetAPIServiceAvailable(availableAPIService) + collector.SetAPIServiceUnavailable(unavailableAPIService) + + err := testutil.CustomCollectAndCompare(collector, strings.NewReader(` + # HELP aggregator_unavailable_apiservice [ALPHA] Gauge of APIServices which are marked as unavailable broken down by APIService name. + # TYPE aggregator_unavailable_apiservice gauge + aggregator_unavailable_apiservice{name="available"} 0 + aggregator_unavailable_apiservice{name="unavailable"} 1 + `)) + if err != nil { + t.Fatal(err) + } + + collector.ClearState() + + collector.ForgetAPIService(availableAPIService) + collector.ForgetAPIService(unavailableAPIService) + + err = testutil.CustomCollectAndCompare(collector, strings.NewReader(` + # HELP aggregator_unavailable_apiservice [ALPHA] Gauge of APIServices which are marked as unavailable broken down by APIService name. + # TYPE aggregator_unavailable_apiservice gauge + `)) + if err != nil { + t.Fatal(err) + } +}