From 24c68d50776c75e46cbb9d51d2ff5ca38575a312 Mon Sep 17 00:00:00 2001 From: Arnob Kumar Saha Date: Fri, 6 Oct 2023 02:51:55 +0600 Subject: [PATCH] Refresh metrics in the background, handle scanner-opa removal (#259) Signed-off-by: Arnob kumar saha Signed-off-by: Tamal Saha Co-authored-by: Tamal Saha --- pkg/apiserver/apiserver.go | 4 + pkg/graph/setup.go | 28 ++++-- pkg/metricshandler/ancestor.go | 17 ++-- pkg/metricshandler/handler.go | 135 +++++++++++++++++--------- pkg/metricshandler/handler_policy.go | 24 +++-- pkg/metricshandler/handler_scanner.go | 23 +++-- 6 files changed, 142 insertions(+), 89 deletions(-) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index f96061f55..c2f3c0f2f 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -377,5 +377,9 @@ func (c completedConfig) New(ctx context.Context) (*UIServer, error) { } m.Install(genericServer.Handler.NonGoRestfulMux) } + if err := mgr.Add(manager.RunnableFunc(metricshandler.StartMetricsCollector(mgr))); err != nil { + setupLog.Error(err, "unable to start metrics collector") + os.Exit(1) + } return s, nil } diff --git a/pkg/graph/setup.go b/pkg/graph/setup.go index 81fff7b14..bec01412b 100644 --- a/pkg/graph/setup.go +++ b/pkg/graph/setup.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync/atomic" "time" scannerapi "kubeops.dev/scanner/apis/scanner/v1alpha1" @@ -56,6 +57,8 @@ func PollNewResourceTypes(cfg *restclient.Config, pqr *projectquotacontroller.Pr klog.ErrorS(err, "failed to list server preferred resources") return false, nil } + opaInstalled := false + scannerInstalled := false for _, rsList := range rsLists { for _, rs := range rsList.APIResources { // skip sub resource @@ -85,6 +88,15 @@ func PollNewResourceTypes(cfg *restclient.Config, pqr *projectquotacontroller.Pr Kind: rs.Kind, Scope: scope, } + + // As we are generating metrics based on these variables, we need to update them timely. + if rid.Group == "templates.gatekeeper.sh" && rid.Kind == "ConstraintTemplate" { + opaInstalled = true + } + if rid.Group == scannerapi.SchemeGroupVersion.Group && rid.Kind == scannerapi.ResourceKindImageScanRequest { + scannerInstalled = true + } + if _, found := resourceTracker[gvk]; !found { resourceTracker[gvk] = rid resourceChannel <- rid @@ -94,6 +106,10 @@ func PollNewResourceTypes(cfg *restclient.Config, pqr *projectquotacontroller.Pr } } } + + OPAInstalled.Store(opaInstalled) + ScannerInstalled.Store(scannerInstalled) + return false, nil }, ctx.Done()) if err != nil { @@ -106,13 +122,10 @@ func PollNewResourceTypes(cfg *restclient.Config, pqr *projectquotacontroller.Pr } var ( - opaInstalled bool - scannerInstalled bool + OPAInstalled atomic.Bool + ScannerInstalled atomic.Bool ) -func OPAInstalled() bool { return opaInstalled } -func ScannerInstalled() bool { return scannerInstalled } - func SetupGraphReconciler(mgr manager.Manager) func(ctx context.Context) error { return func(ctx context.Context) error { for rid := range resourceChannel { @@ -124,13 +137,8 @@ func SetupGraphReconciler(mgr manager.Manager) func(ctx context.Context) error { return err } - if rid.Group == "templates.gatekeeper.sh" && rid.Kind == "ConstraintTemplate" { - opaInstalled = true - } - if rid.Group == scannerapi.SchemeGroupVersion.Group && rid.Kind == scannerapi.ResourceKindImageScanRequest { - scannerInstalled = true if err := (&scannercontrollers.WorkloadReconciler{ Client: mgr.GetClient(), }).SetupWithManager(mgr); err != nil { diff --git a/pkg/metricshandler/ancestor.go b/pkg/metricshandler/ancestor.go index d2775ecd0..313861cf0 100644 --- a/pkg/metricshandler/ancestor.go +++ b/pkg/metricshandler/ancestor.go @@ -20,38 +20,35 @@ import ( "context" "kubeops.dev/ui-server/pkg/graph" - "kubeops.dev/ui-server/pkg/metricsstore" core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kube-state-metrics/v2/pkg/metric" - generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" kmapi "kmodules.xyz/client-go/api/v1" "kmodules.xyz/resource-metadata/apis/meta/v1alpha1" - "sigs.k8s.io/controller-runtime/pkg/client" ) -func collectPodAncestorMetrics(kc client.Client, generators []generator.FamilyGenerator, store *metricsstore.MetricsStore) error { +func (mc *Collector) collectPodAncestorMetrics() error { var pods core.PodList - err := kc.List(context.TODO(), &pods) + err := mc.kc.List(context.TODO(), &pods) if err != nil { return err } - family := generators[0].Generate(nil) + family := mc.generators[0].Generate(nil) for _, pod := range pods.Items { - g, err := getResourceGraph(kc, pod.ObjectMeta) + g, err := mc.getResourceGraph(pod.ObjectMeta) if err != nil { return err } family.Metrics = append(family.Metrics, getMetricsForSinglePod(g, pod.Name)...) } - store.Add(family) + mc.store.Add(family) return nil } -func getResourceGraph(kc client.Client, podMeta metav1.ObjectMeta) (*v1alpha1.ResourceGraphResponse, error) { +func (mc *Collector) getResourceGraph(podMeta metav1.ObjectMeta) (*v1alpha1.ResourceGraphResponse, error) { src := kmapi.ObjectID{ Group: "", Kind: "Pod", @@ -59,7 +56,7 @@ func getResourceGraph(kc client.Client, podMeta metav1.ObjectMeta) (*v1alpha1.Re Name: podMeta.Name, } - return graph.ResourceGraph(kc.RESTMapper(), src, []kmapi.EdgeLabel{ + return graph.ResourceGraph(mc.kc.RESTMapper(), src, []kmapi.EdgeLabel{ kmapi.EdgeLabelOffshoot, }) } diff --git a/pkg/metricshandler/handler.go b/pkg/metricshandler/handler.go index 5efce9902..f0a94ae4c 100644 --- a/pkg/metricshandler/handler.go +++ b/pkg/metricshandler/handler.go @@ -17,23 +17,33 @@ limitations under the License. package metricshandler import ( - "io" + "context" "net/http" + "sync" + "time" "kubeops.dev/ui-server/pkg/graph" "kubeops.dev/ui-server/pkg/metricsstore" "github.com/prometheus/client_golang/prometheus/promhttp" "k8s.io/apiserver/pkg/server/mux" + "k8s.io/klog/v2" "k8s.io/kube-state-metrics/v2/pkg/metric" generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" ) const ( - MetricsPath = "/metrics" - scannerMetricPrefix = "scanner_appscode_com_" - policyMetricPrefix = "policy_appscode_com_" + MetricsPath = "/metrics" + scannerMetricPrefix = "scanner_appscode_com_" + policyMetricPrefix = "policy_appscode_com_" + MetricsRefreshPeriod = 2 * time.Second +) + +var ( + mu sync.RWMutex + store *metricsstore.MetricsStore ) // MetricsHandler struct contains Stores which store the metrics to serve in the /metrics path @@ -41,20 +51,36 @@ type MetricsHandler struct { client.Client } +type Collector struct { + kc client.Client + opaInstalled bool + scannerInstalled bool + + generators []generator.FamilyGenerator + store *metricsstore.MetricsStore +} + // ServeHTTP serves the request for /metrics path -func (m *MetricsHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { +func (h *MetricsHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { resHeader := w.Header() resHeader.Set("Content-Type", `text/plain; version=`+"0.0.4") - err := collectMetrics(m.Client, w) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return + + mu.RLock() + defer mu.RUnlock() + if store != nil { + err := store.WriteAll(w) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } else { + _, _ = w.Write([]byte("")) } } // Install adds the MetricsWithReset handler -func (m *MetricsHandler) Install(c *mux.PathRecorderMux) { - var next http.Handler = m +func (h *MetricsHandler) Install(c *mux.PathRecorderMux) { + var next http.Handler = h next = promhttp.InstrumentHandlerCounter(httpRequestsTotal, next) next = promhttp.InstrumentHandlerDuration(requestDuration, next) next = promhttp.InstrumentHandlerInFlight(inFlight, next) @@ -63,44 +89,66 @@ func (m *MetricsHandler) Install(c *mux.PathRecorderMux) { c.Handle(MetricsPath, next) } -func collectMetrics(kc client.Client, w io.Writer) error { - generators := getFamilyGenerators() - if len(generators) == 0 { - _, err := w.Write([]byte("")) - return err +func StartMetricsCollector(mgr manager.Manager) func(ctx context.Context) error { + return func(ctx context.Context) error { + klog.Infoln("Starts the Metrics Collector") + for { + collector := &Collector{ + kc: mgr.GetClient(), + opaInstalled: graph.OPAInstalled.Load(), + scannerInstalled: graph.ScannerInstalled.Load(), + } + collector.init() + err := collector.collectMetrics() + if err != nil { + klog.Errorf("Error occurred while collecting metrics : %s \n", err.Error()) + continue + } + + mu.Lock() + store = collector.store + mu.Unlock() + + time.Sleep(MetricsRefreshPeriod) + } } +} - // Generate the headers for the resources metrics - headers := generator.ExtractMetricFamilyHeaders(generators) - store := metricsstore.NewMetricsStore(headers) +func (mc *Collector) init() { + mc.initFamilyGenerators() + headers := generator.ExtractMetricFamilyHeaders(mc.generators) + mc.store = metricsstore.NewMetricsStore(headers) +} - err := collectPodAncestorMetrics(kc, generators, store) +func (mc *Collector) collectMetrics() error { + err := mc.collectPodAncestorMetrics() if err != nil { return err } offset := 1 - if graph.ScannerInstalled() { - err := collectScannerMetrics(kc, generators, store, offset) + if mc.scannerInstalled { + err := mc.collectScannerMetrics(offset) if err != nil { return err } offset = offset + 9 // # of scanner metrics families } - if graph.OPAInstalled() { - err := collectPolicyMetrics(kc, generators, store, offset) + if mc.opaInstalled { + err := mc.collectPolicyMetrics(offset) if err != nil { return err } } - return store.WriteAll(w) + + return nil } -func getFamilyGenerators() []generator.FamilyGenerator { +func (mc *Collector) initFamilyGenerators() { fn := func(obj interface{}) *metric.Family { return new(metric.Family) } - generators := make([]generator.FamilyGenerator, 0, 14) + mc.generators = make([]generator.FamilyGenerator, 0, 14) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: "k8s_appscode_com_pod_ancestor", Help: "Pod Ancestor statistics", Type: metric.Gauge, @@ -108,43 +156,43 @@ func getFamilyGenerators() []generator.FamilyGenerator { GenerateFunc: fn, }) - if graph.ScannerInstalled() { - generators = append(generators, generator.FamilyGenerator{ + if mc.scannerInstalled { + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "cluster_cve_occurrence", Help: "CVE occurrence statistics", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "cluster_cve_occurrence_total", Help: "Cluster total CVE occurrence", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "cluster_cve_count_total", Help: "Cluster total unique CVE count", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "namespace_cve_occurrence", Help: "Namespace CVE occurrence statistics", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "namespace_cve_occurrence_total", Help: "Namespace total CVE occurrence", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "namespace_cve_count_total", Help: "Namespace total unique CVE count", Type: metric.Gauge, @@ -152,21 +200,21 @@ func getFamilyGenerators() []generator.FamilyGenerator { GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "image_cve_occurrence_total", Help: "Image total CVE occurrence", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "image_cve_count_total", Help: "Image total unique CVE count", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: scannerMetricPrefix + "image_lineage", Help: "Image Lineage", Type: metric.Gauge, @@ -175,30 +223,30 @@ func getFamilyGenerators() []generator.FamilyGenerator { }) } - if graph.OPAInstalled() { + if mc.opaInstalled { // Policy related metrics - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: policyMetricPrefix + "cluster_violation_occurrence_total", Help: "Cluster-wide Violation Occurrence statistics", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: policyMetricPrefix + "cluster_violation_occurrence_by_constraint_type", Help: "Cluster-wide Violation Occurrence statistics by constraint type", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: policyMetricPrefix + "namespace_violation_occurrence_total", Help: "Namespace-wise total Violation Occurrence statistics", Type: metric.Gauge, DeprecatedVersion: "", GenerateFunc: fn, }) - generators = append(generators, generator.FamilyGenerator{ + mc.generators = append(mc.generators, generator.FamilyGenerator{ Name: policyMetricPrefix + "namespace_violation_occurrence_by_constraint_type", Help: "Namespace-wise Violation Occurrence statistics by constraint type", Type: metric.Gauge, @@ -206,5 +254,4 @@ func getFamilyGenerators() []generator.FamilyGenerator { GenerateFunc: fn, }) } - return generators } diff --git a/pkg/metricshandler/handler_policy.go b/pkg/metricshandler/handler_policy.go index 9e867bb58..887c95385 100644 --- a/pkg/metricshandler/handler_policy.go +++ b/pkg/metricshandler/handler_policy.go @@ -19,36 +19,34 @@ package metricshandler import ( "context" - "kubeops.dev/ui-server/pkg/metricsstore" policystorage "kubeops.dev/ui-server/pkg/registry/policy/reports" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/kube-state-metrics/v2/pkg/metric" generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" - "sigs.k8s.io/controller-runtime/pkg/client" ) -func collectPolicyMetrics(kc client.Client, generators []generator.FamilyGenerator, store *metricsstore.MetricsStore, offset int) error { - if clTotal, clByType, err := collectForCluster(kc, generators[offset], generators[offset+1]); err != nil { +func (mc *Collector) collectPolicyMetrics(offset int) error { + if clTotal, clByType, err := mc.collectForCluster(mc.generators[offset], mc.generators[offset+1]); err != nil { return err } else { - store.Add(clTotal, clByType) + mc.store.Add(clTotal, clByType) } - if nsTotal, nsByType, err := collectForNamespace(kc, generators[offset+2], generators[offset+3]); err != nil { + if nsTotal, nsByType, err := mc.collectForNamespace(mc.generators[offset+2], mc.generators[offset+3]); err != nil { return err } else { - store.Add(nsTotal, nsByType) + mc.store.Add(nsTotal, nsByType) } return nil } -func collectForCluster(kc client.Client, genTotal generator.FamilyGenerator, genByType generator.FamilyGenerator) (*metric.Family, *metric.Family, error) { +func (mc *Collector) collectForCluster(genTotal generator.FamilyGenerator, genByType generator.FamilyGenerator) (*metric.Family, *metric.Family, error) { fTotal := genTotal.Generate(nil) fByType := genByType.Generate(nil) - templates, err := policystorage.ListTemplates(context.TODO(), kc) + templates, err := policystorage.ListTemplates(context.TODO(), mc.kc) if err != nil { return nil, nil, err } @@ -58,7 +56,7 @@ func collectForCluster(kc client.Client, genTotal generator.FamilyGenerator, gen if err != nil { return nil, nil, err } - constraints, err := policystorage.ListConstraints(context.TODO(), kc, constraintKind) + constraints, err := policystorage.ListConstraints(context.TODO(), mc.kc, constraintKind) if err != nil { return nil, nil, err } @@ -92,11 +90,11 @@ func collectForCluster(kc client.Client, genTotal generator.FamilyGenerator, gen return fTotal, fByType, nil } -func collectForNamespace(kc client.Client, genTotal generator.FamilyGenerator, genByType generator.FamilyGenerator) (*metric.Family, *metric.Family, error) { +func (mc *Collector) collectForNamespace(genTotal generator.FamilyGenerator, genByType generator.FamilyGenerator) (*metric.Family, *metric.Family, error) { fTotal := genTotal.Generate(nil) fByType := genByType.Generate(nil) - templates, err := policystorage.ListTemplates(context.TODO(), kc) + templates, err := policystorage.ListTemplates(context.TODO(), mc.kc) if err != nil { return nil, nil, err } @@ -105,7 +103,7 @@ func collectForNamespace(kc client.Client, genTotal generator.FamilyGenerator, g if err != nil { return nil, nil, err } - constraints, err := policystorage.ListConstraints(context.TODO(), kc, cKind) + constraints, err := policystorage.ListConstraints(context.TODO(), mc.kc, cKind) if err != nil { return nil, nil, err } diff --git a/pkg/metricshandler/handler_scanner.go b/pkg/metricshandler/handler_scanner.go index 70d01b1f0..8e563ca99 100755 --- a/pkg/metricshandler/handler_scanner.go +++ b/pkg/metricshandler/handler_scanner.go @@ -20,7 +20,6 @@ import ( "context" scannerapi "kubeops.dev/scanner/apis/scanner/v1alpha1" - "kubeops.dev/ui-server/pkg/metricsstore" "kubeops.dev/ui-server/pkg/shared" "golang.org/x/sync/errgroup" @@ -36,11 +35,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func collectScannerMetrics(kc client.Client, generators []generator.FamilyGenerator, store *metricsstore.MetricsStore, offset int) error { +func (mc *Collector) collectScannerMetrics(offset int) error { var list unstructured.UnstructuredList list.SetAPIVersion("v1") list.SetKind("Pod") - if err := kc.List(context.TODO(), &list); err != nil { + if err := mc.kc.List(context.TODO(), &list); err != nil { return err } pods := list.Items @@ -52,21 +51,21 @@ func collectScannerMetrics(kc client.Client, generators []generator.FamilyGenera if err := runtime.DefaultUnstructuredConverter.FromUnstructured(p.UnstructuredContent(), &pod); err != nil { return err } - images, err = au.CollectImageInfo(kc, &pod, images, true) + images, err = au.CollectImageInfo(mc.kc, &pod, images, true) if err != nil { return err } } - results, err := collectReports(context.TODO(), kc, images) + results, err := mc.collectReports(context.TODO(), images) if err != nil { return err } - store.Add(collectClusterCVEMetrics(results, generators[offset], generators[offset+1], generators[offset+2])) - store.Add(collectNamespaceCVEMetrics(images, results, generators[offset+3], generators[offset+4], generators[offset+5])) - store.Add(collectImageCVEMetrics(results, generators[offset+6], generators[offset+7])) - store.Add(collectLineageMetrics(images, generators[offset+8])) + mc.store.Add(collectClusterCVEMetrics(results, mc.generators[offset], mc.generators[offset+1], mc.generators[offset+2])) + mc.store.Add(collectNamespaceCVEMetrics(images, results, mc.generators[offset+3], mc.generators[offset+4], mc.generators[offset+5])) + mc.store.Add(collectImageCVEMetrics(results, mc.generators[offset+6], mc.generators[offset+7])) + mc.store.Add(collectLineageMetrics(images, mc.generators[offset+8])) return nil } @@ -86,7 +85,7 @@ var severities = []string{ } // Based on https://pkg.go.dev/golang.org/x/sync@v0.1.0/errgroup#example-Group-Pipeline -func collectReports(ctx context.Context, kc client.Client, images map[string]kmapi.ImageInfo) (map[string]result, error) { +func (mc *Collector) collectReports(ctx context.Context, images map[string]kmapi.ImageInfo) (map[string]result, error) { // ctx is canceled when g.Wait() returns. When this version of MD5All returns // - even in case of error! - we know that all of the goroutines have finished // and the memory they were using can be garbage-collected. @@ -120,11 +119,11 @@ func collectReports(ctx context.Context, kc client.Client, images map[string]kma g.Go(func() error { for req := range requests { var report scannerapi.ImageScanReport - err := kc.Get(ctx, client.ObjectKey{Name: scannerapi.GetReportName(req.Image)}, &report) + err := mc.kc.Get(ctx, client.ObjectKey{Name: scannerapi.GetReportName(req.Image)}, &report) if client.IgnoreNotFound(err) != nil { return err } else if apierrors.IsNotFound(err) { - _ = shared.SendScanRequest(ctx, kc, req.Image, kmapi.PullCredentials{ + _ = shared.SendScanRequest(ctx, mc.kc, req.Image, kmapi.PullCredentials{ Namespace: req.Namespace, SecretRefs: req.PullSecrets, ServiceAccountName: req.ServiceAccountName,