From 9f15eff8245de00378b4e1b368c828774135b449 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 19 Sep 2024 21:02:30 +0545 Subject: [PATCH] feat: support shared informer for any GVK --- scrapers/kubernetes/informers.go | 60 +++++++++++--------------------- 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 38f43d57..69982ed8 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -13,12 +13,13 @@ import ( "github.com/google/uuid" "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) type informerCacheData struct { - informer cache.SharedInformer + informer informers.GenericInformer stopper chan (struct{}) } @@ -53,7 +54,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 ctx.Context = ctx.WithName("watch." + ctx.ScrapeConfig().Name) ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource) - _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) if err != nil { @@ -96,14 +97,14 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1 } go func() { - informer.Run(stopper) + informer.Informer().Run(stopper) ctx.Logger.V(1).Infof("stopped shared informer for: %v", watchResource) }() return nil } // getOrCreate returns an existing shared informer instance or creates & returns a new one. -func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { +func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, kind string) (informers.GenericInformer, chan struct{}, bool) { t.mu.Lock() defer t.mu.Unlock() @@ -119,7 +120,10 @@ func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, k factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0) stopper := make(chan struct{}) - informer := getInformer(factory, apiVersion, kind) + informer, err := getInformer(factory, apiVersion, kind) + if err != nil { + return nil, nil, false + } ctx.Gauge("kubernetes_active_shared_informers").Add(1) cacheValue := &informerCacheData{ @@ -149,7 +153,7 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e if !lo.Contains(exception, key) { ctx.Logger.V(1).Infof("stopping informer for %s", key) - cached.informer.IsStopped() + cached.informer.Informer().IsStopped() ctx.Gauge("kubernetes_active_shared_informers").Sub(1) toDelete = append(toDelete, key) @@ -163,41 +167,15 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e } } -func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer { - // TODO: need to populate this - - kind = strings.ToLower(kind) - switch strings.ToLower(apiVersion) { - case "v1": - switch kind { - case "pod": - return factory.Core().V1().Pods().Informer() - case "node": - return factory.Core().V1().Nodes().Informer() - } - - case "apps/v1": - switch kind { - case "deployment": - return factory.Apps().V1().Deployments().Informer() - case "daemonset": - return factory.Apps().V1().DaemonSets().Informer() - case "replicaset": - return factory.Apps().V1().ReplicaSets().Informer() - case "statefulset": - return factory.Apps().V1().StatefulSets().Informer() - } - - case "batch/v1": - switch kind { - case "cronjob": - return factory.Batch().V1().CronJobs().Informer() - case "job": - return factory.Batch().V1().Jobs().Informer() - } +func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) (informers.GenericInformer, error) { + gvk := schema.FromAPIVersionAndKind(apiVersion, kind) + gvr := schema.GroupVersionResource{ + Group: gvk.Group, + Version: gvk.Version, + Resource: KindToResource(gvk.Kind), } - return nil + return factory.ForResource(gvr) } // logToJobHistory logs any failures in saving a playbook run to the job history. @@ -210,3 +188,7 @@ func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err logger.Errorf("error persisting job history: %v", err) } } + +func KindToResource(kind string) string { + return strings.ToLower(kind) + "s" +}