Skip to content

Commit

Permalink
feat: support shared informer for any GVK
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Sep 20, 2024
1 parent 0d9a726 commit 9f15eff
Showing 1 changed file with 21 additions and 39 deletions.
60 changes: 21 additions & 39 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"github.com/google/uuid"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

type informerCacheData struct {
informer cache.SharedInformer
informer informers.GenericInformer
stopper chan (struct{})
}

Expand Down Expand Up @@ -53,7 +54,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
ctx.Context = ctx.WithName("watch." + ctx.ScrapeConfig().Name)

ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource)
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
if err != nil {
Expand Down Expand Up @@ -96,14 +97,14 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
}

go func() {
informer.Run(stopper)
informer.Informer().Run(stopper)
ctx.Logger.V(1).Infof("stopped shared informer for: %v", watchResource)
}()
return nil
}

// getOrCreate returns an existing shared informer instance or creates & returns a new one.
func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) {
func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, kind string) (informers.GenericInformer, chan struct{}, bool) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -119,7 +120,10 @@ func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, k
factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0)
stopper := make(chan struct{})

informer := getInformer(factory, apiVersion, kind)
informer, err := getInformer(factory, apiVersion, kind)
if err != nil {
return nil, nil, false
}
ctx.Gauge("kubernetes_active_shared_informers").Add(1)

cacheValue := &informerCacheData{
Expand Down Expand Up @@ -149,7 +153,7 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e
if !lo.Contains(exception, key) {
ctx.Logger.V(1).Infof("stopping informer for %s", key)

cached.informer.IsStopped()
cached.informer.Informer().IsStopped()
ctx.Gauge("kubernetes_active_shared_informers").Sub(1)

toDelete = append(toDelete, key)
Expand All @@ -163,41 +167,15 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e
}
}

func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer {
// TODO: need to populate this

kind = strings.ToLower(kind)
switch strings.ToLower(apiVersion) {
case "v1":
switch kind {
case "pod":
return factory.Core().V1().Pods().Informer()
case "node":
return factory.Core().V1().Nodes().Informer()
}

case "apps/v1":
switch kind {
case "deployment":
return factory.Apps().V1().Deployments().Informer()
case "daemonset":
return factory.Apps().V1().DaemonSets().Informer()
case "replicaset":
return factory.Apps().V1().ReplicaSets().Informer()
case "statefulset":
return factory.Apps().V1().StatefulSets().Informer()
}

case "batch/v1":
switch kind {
case "cronjob":
return factory.Batch().V1().CronJobs().Informer()
case "job":
return factory.Batch().V1().Jobs().Informer()
}
func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) (informers.GenericInformer, error) {
gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: KindToResource(gvk.Kind),
}

return nil
return factory.ForResource(gvr)
}

// logToJobHistory logs any failures in saving a playbook run to the job history.
Expand All @@ -210,3 +188,7 @@ func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err
logger.Errorf("error persisting job history: %v", err)
}
}

func KindToResource(kind string) string {
return strings.ToLower(kind) + "s"
}

0 comments on commit 9f15eff

Please sign in to comment.