From 4c1cd77770d21010abd6feffda80d4899a14704e Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 5 Jun 2024 09:32:41 +0545 Subject: [PATCH] feat: use shared informers to watch resource --- scrapers/kubernetes/events_watch.go | 125 +++++++++++++++++++++++----- utils/kube/kube.go | 4 +- utils/kube/kube_test.go | 2 +- 3 files changed, 107 insertions(+), 24 deletions(-) diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index e9c94a9d..4e9a9dba 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -1,6 +1,7 @@ package kubernetes import ( + "encoding/json" "fmt" "strings" "sync" @@ -10,13 +11,13 @@ import ( "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" - "github.com/flanksource/config-db/utils" "github.com/flanksource/config-db/utils/kube" ) @@ -37,6 +38,24 @@ var ( DeleteResourceBuffer = sync.Map{} ) +func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) { + b, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("failed to marshal: %v", err) + } + + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + return nil, fmt.Errorf("failed to unmarshal on add func: %v", err) + } + + // The object returned by the informers do not have kind and apiversion set + m["kind"] = resource.Kind + m["apiVersion"] = resource.ApiVersion + + return &unstructured.Unstructured{Object: m}, nil +} + // WatchResources watches Kubernetes resources func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) @@ -59,30 +78,94 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } } - var channels []<-chan watch.Event - for _, k := range lo.Uniq(config.Watch) { - client, err := kube.GetClientByGroupVersionKind(restConfig, k.ApiVersion, k.Kind) - if err != nil { - return fmt.Errorf("failed to create client for kind(%s): %v", k, err) - } + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err) + } - watcher, err := client.Watch(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to create watcher for kind(%s): %v", k, err) + factory := informers.NewSharedInformerFactory(clientset, 0) + stopper := make(chan struct{}) + defer close(stopper) + + var wg sync.WaitGroup + for _, watchResource := range lo.Uniq(config.Watch) { + wg.Add(1) + + informer := getInformer(factory, watchResource.ApiVersion, watchResource.Kind) + if informer == nil { + return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", watchResource.ApiVersion, watchResource.Kind) } - defer watcher.Stop() - channels = append(channels, watcher.ResultChan()) + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from new object: %v", err) + return + } + + buffer <- u + }, + UpdateFunc: func(oldObj any, newObj any) { + u, err := getUnstructuredFromInformedObj(watchResource, newObj) + if err != nil { + logger.Errorf("failed to get unstructured from updated object: %v", err) + return + } + + buffer <- u + }, + DeleteFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from deleted object: %v", err) + return + } + + deleteBuffer <- string(u.GetUID()) + }, + }) + + go informer.Run(stopper) } - for watchEvent := range utils.MergeChannels(channels...) { - obj, ok := watchEvent.Object.(*unstructured.Unstructured) - if ok { - if watchEvent.Type == watch.Deleted { - deleteBuffer <- string(obj.GetUID()) - } else { - buffer <- obj - } + ctx.Logger.V(1).Infof("waiting for informers: %w", err) + wg.Wait() + + return nil +} + +func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer { + // TODO: need to populate this + + kind = strings.ToLower(kind) + switch strings.ToLower(apiVersion) { + case "v1": + switch kind { + case "pod": + return factory.Core().V1().Pods().Informer() + case "node": + return factory.Core().V1().Nodes().Informer() + } + + case "apps/v1": + switch kind { + case "deployment": + return factory.Apps().V1().Deployments().Informer() + case "daemonset": + return factory.Apps().V1().DaemonSets().Informer() + case "replicaset": + return factory.Apps().V1().ReplicaSets().Informer() + case "statefulset": + return factory.Apps().V1().StatefulSets().Informer() + } + + case "batch/v1": + switch kind { + case "cronjob": + return factory.Batch().V1().CronJobs().Informer() + case "job": + return factory.Batch().V1().Jobs().Informer() } } diff --git a/utils/kube/kube.go b/utils/kube/kube.go index 83b2c408..c41bbfc5 100644 --- a/utils/kube/kube.go +++ b/utils/kube/kube.go @@ -53,7 +53,7 @@ func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { return restmapper.NewDeferredDiscoveryRESTMapper(cache), nil } -func getGroupVersion(apiVersion string) (string, string) { +func GetGroupVersion(apiVersion string) (string, string) { split := strings.Split(apiVersion, "/") if len(split) == 1 { return "", apiVersion @@ -73,7 +73,7 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return nil, err } - group, version := getGroupVersion(apiVersion) + group, version := GetGroupVersion(apiVersion) gvk, err := rm.KindFor(schema.GroupVersionResource{Group: group, Version: version, Resource: kind}) if err != nil { return nil, err diff --git a/utils/kube/kube_test.go b/utils/kube/kube_test.go index 103046a5..023c3b06 100644 --- a/utils/kube/kube_test.go +++ b/utils/kube/kube_test.go @@ -25,7 +25,7 @@ func TestGetGroupVersion(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - group, version := getGroupVersion(tc.apiVersion) + group, version := GetGroupVersion(tc.apiVersion) if group != tc.expectedGroup || version != tc.expectedVersion { t.Errorf("getGroupVersion(%q) = %q, %q; expected %q, %q", tc.apiVersion, group, version, tc.expectedGroup, tc.expectedVersion)