From 4c1cd77770d21010abd6feffda80d4899a14704e Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 5 Jun 2024 09:32:41 +0545 Subject: [PATCH 1/7] feat: use shared informers to watch resource --- scrapers/kubernetes/events_watch.go | 125 +++++++++++++++++++++++----- utils/kube/kube.go | 4 +- utils/kube/kube_test.go | 2 +- 3 files changed, 107 insertions(+), 24 deletions(-) diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index e9c94a9d..4e9a9dba 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -1,6 +1,7 @@ package kubernetes import ( + "encoding/json" "fmt" "strings" "sync" @@ -10,13 +11,13 @@ import ( "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" - "github.com/flanksource/config-db/utils" "github.com/flanksource/config-db/utils/kube" ) @@ -37,6 +38,24 @@ var ( DeleteResourceBuffer = sync.Map{} ) +func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) { + b, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("failed to marshal: %v", err) + } + + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + return nil, fmt.Errorf("failed to unmarshal on add func: %v", err) + } + + // The object returned by the informers do not have kind and apiversion set + m["kind"] = resource.Kind + m["apiVersion"] = resource.ApiVersion + + return &unstructured.Unstructured{Object: m}, nil +} + // WatchResources watches Kubernetes resources func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) @@ -59,30 +78,94 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } } - var channels []<-chan watch.Event - for _, k := range lo.Uniq(config.Watch) { - client, err := kube.GetClientByGroupVersionKind(restConfig, k.ApiVersion, k.Kind) - if err != nil { - return fmt.Errorf("failed to create client for kind(%s): %v", k, err) - } + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err) + } - watcher, err := client.Watch(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to create watcher for kind(%s): %v", k, err) + factory := informers.NewSharedInformerFactory(clientset, 0) + stopper := make(chan struct{}) + defer close(stopper) + + var wg sync.WaitGroup + for _, watchResource := range lo.Uniq(config.Watch) { + wg.Add(1) + + informer := getInformer(factory, watchResource.ApiVersion, watchResource.Kind) + if informer == nil { + return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", watchResource.ApiVersion, watchResource.Kind) } - defer watcher.Stop() - channels = append(channels, watcher.ResultChan()) + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from new object: %v", err) + return + } + + buffer <- u + }, + UpdateFunc: func(oldObj any, newObj any) { + u, err := getUnstructuredFromInformedObj(watchResource, newObj) + if err != nil { + logger.Errorf("failed to get unstructured from updated object: %v", err) + return + } + + buffer <- u + }, + DeleteFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from deleted object: %v", err) + return + } + + deleteBuffer <- string(u.GetUID()) + }, + }) + + go informer.Run(stopper) } - for watchEvent := range utils.MergeChannels(channels...) { - obj, ok := watchEvent.Object.(*unstructured.Unstructured) - if ok { - if watchEvent.Type == watch.Deleted { - deleteBuffer <- string(obj.GetUID()) - } else { - buffer <- obj - } + ctx.Logger.V(1).Infof("waiting for informers: %w", err) + wg.Wait() + + return nil +} + +func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer { + // TODO: need to populate this + + kind = strings.ToLower(kind) + switch strings.ToLower(apiVersion) { + case "v1": + switch kind { + case "pod": + return factory.Core().V1().Pods().Informer() + case "node": + return factory.Core().V1().Nodes().Informer() + } + + case "apps/v1": + switch kind { + case "deployment": + return factory.Apps().V1().Deployments().Informer() + case "daemonset": + return factory.Apps().V1().DaemonSets().Informer() + case "replicaset": + return factory.Apps().V1().ReplicaSets().Informer() + case "statefulset": + return factory.Apps().V1().StatefulSets().Informer() + } + + case "batch/v1": + switch kind { + case "cronjob": + return factory.Batch().V1().CronJobs().Informer() + case "job": + return factory.Batch().V1().Jobs().Informer() } } diff --git a/utils/kube/kube.go b/utils/kube/kube.go index 83b2c408..c41bbfc5 100644 --- a/utils/kube/kube.go +++ b/utils/kube/kube.go @@ -53,7 +53,7 @@ func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { return restmapper.NewDeferredDiscoveryRESTMapper(cache), nil } -func getGroupVersion(apiVersion string) (string, string) { +func GetGroupVersion(apiVersion string) (string, string) { split := strings.Split(apiVersion, "/") if len(split) == 1 { return "", apiVersion @@ -73,7 +73,7 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return nil, err } - group, version := getGroupVersion(apiVersion) + group, version := GetGroupVersion(apiVersion) gvk, err := rm.KindFor(schema.GroupVersionResource{Group: group, Version: version, Resource: kind}) if err != nil { return nil, err diff --git a/utils/kube/kube_test.go b/utils/kube/kube_test.go index 103046a5..023c3b06 100644 --- a/utils/kube/kube_test.go +++ b/utils/kube/kube_test.go @@ -25,7 +25,7 @@ func TestGetGroupVersion(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - group, version := getGroupVersion(tc.apiVersion) + group, version := GetGroupVersion(tc.apiVersion) if group != tc.expectedGroup || version != tc.expectedVersion { t.Errorf("getGroupVersion(%q) = %q, %q; expected %q, %q", tc.apiVersion, group, version, tc.expectedGroup, tc.expectedVersion) From 0c3fe1662e72aac332c5c674e14ffd2863019ff4 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 5 Jun 2024 11:02:12 +0545 Subject: [PATCH 2/7] feat: cache kubeconfig & add metrics --- scrapers/kubernetes/events_watch.go | 4 +- utils/kube/kube.go | 81 +++++++++++++++++++++++++++-- 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 4e9a9dba..66a1b057 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -78,7 +78,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } } - clientset, err := kubernetes.NewForConfig(restConfig) + clientset, err := kube.ClientSetFromRestConfig(restConfig) if err != nil { return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err) } @@ -129,7 +129,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { go informer.Run(stopper) } - ctx.Logger.V(1).Infof("waiting for informers: %w", err) + ctx.Logger.V(1).Infof("waiting for informers") wg.Wait() return nil diff --git a/utils/kube/kube.go b/utils/kube/kube.go index c41bbfc5..8a66eca3 100644 --- a/utils/kube/kube.go +++ b/utils/kube/kube.go @@ -14,7 +14,9 @@ limitations under the License. package kube import ( + "bytes" "context" + "encoding/gob" "fmt" "os" "path/filepath" @@ -29,6 +31,8 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "github.com/flanksource/commons/files" + "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus" "gopkg.in/flanksource/yaml.v3" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +42,36 @@ import ( "k8s.io/client-go/util/homedir" ) +var ( + kubeClientCreatedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kube_client_from_rest_count", + Help: "The total number of times kubernetes clientset were created from rest configs", + }, + []string{}, + ) + + kubeClientCacheHitCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kube_client_from_rest_count_cache_hit", + Help: "The total number of times kubernetes clientset were created from rest configs", + }, + []string{}, + ) + + kubeClientCreatErrorCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kube_client_from_rest_error_count", + Help: "The total number of times kubernetes clientset were failed to be created from rest configs", + }, + []string{}, + ) +) + +func init() { + prometheus.MustRegister(kubeClientCreatedCount, kubeClientCacheHitCount, kubeClientCreatErrorCount) +} + func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { // re-use kubectl cache host := config.Host @@ -88,13 +122,52 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return dc.Resource(mapping.Resource), nil } +var clientSetCache = cache.New(time.Hour*24, time.Hour*24) + +func ClientSetFromRestConfig(restConfig *rest.Config) (*kubernetes.Clientset, error) { + client, cached, err := clientSetFromRestConfigCached(restConfig) + if err != nil { + kubeClientCreatErrorCount.WithLabelValues().Inc() + return nil, err + } + + if cached { + kubeClientCacheHitCount.WithLabelValues().Inc() + } else { + kubeClientCreatedCount.WithLabelValues().Inc() + } + + return client, nil +} + +func clientSetFromRestConfigCached(restConfig *rest.Config) (*kubernetes.Clientset, bool, error) { + // Using gob encoder because json encoder returned type error for transport wrapper func + var b bytes.Buffer + if err := gob.NewEncoder(&b).Encode(restConfig); err != nil { + return nil, false, fmt.Errorf("failed to gob encode restconfig: %w", err) + } + key := b.String() + + if val, ok := clientSetCache.Get(key); ok { + return val.(*kubernetes.Clientset), true, nil + } + + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, false, err + } + clientSetCache.SetDefault(key, client) + + return client, false, nil +} + func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) if err != nil { return fake.NewSimpleClientset(), nil, err } - client, err := kubernetes.NewForConfig(config) + client, err := ClientSetFromRestConfig(config) return client, config, err } @@ -118,7 +191,7 @@ func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Con return fake.NewSimpleClientset(), nil, err } - client, err := kubernetes.NewForConfig(config) + client, err := ClientSetFromRestConfig(config) return client, config, err } @@ -149,13 +222,13 @@ func NewK8sClient() (kubernetes.Interface, *rest.Config, error) { } } - client, err := kubernetes.NewForConfig(restConfig) + client, err := ClientSetFromRestConfig(restConfig) return client, restConfig, err } // GetClusterName ... func GetClusterName(config *rest.Config) string { - clientset, err := kubernetes.NewForConfig(config) + clientset, err := ClientSetFromRestConfig(config) if err != nil { return "" } From d45e1a2f149133addc1dceb93e81f55320352563 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 5 Jun 2024 11:11:23 +0545 Subject: [PATCH 3/7] feat: add metrics for the number of resource & event watcher we have --- scrapers/kubernetes/events_watch.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 66a1b057..91bc9800 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -58,6 +58,7 @@ func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj a // WatchResources watches Kubernetes resources func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { + buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) @@ -129,6 +130,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { go informer.Run(stopper) } + ctx.Counter("kubernetes_scraper_resource_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) ctx.Logger.V(1).Infof("waiting for informers") wg.Wait() @@ -193,6 +195,7 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { } defer watcher.Stop() + ctx.Counter("kubernetes_scraper_event_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) for watchEvent := range watcher.ResultChan() { var event v1.KubernetesEvent if err := event.FromObj(watchEvent.Object); err != nil { From 683894009c76008665b94db18c36fd812deffed7 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 5 Jun 2024 11:18:37 +0545 Subject: [PATCH 4/7] fix: lint error --- scrapers/kubernetes/events_watch.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 91bc9800..1ef27498 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -97,7 +97,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", watchResource.ApiVersion, watchResource.Kind) } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) if err != nil { @@ -126,6 +126,9 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { deleteBuffer <- string(u.GetUID()) }, }) + if err != nil { + return fmt.Errorf("failed to add informent event handlers: %w", err) + } go informer.Run(stopper) } From 89c657931869508b4ed94b14f1137dcac30e7066 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 5 Jun 2024 19:00:49 +0545 Subject: [PATCH 5/7] fix: cache kubeconfig->clientset creation --- scrapers/kubernetes/events_watch.go | 22 ++---- utils/kube/kube.go | 106 +++++++++++----------------- 2 files changed, 49 insertions(+), 79 deletions(-) diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 1ef27498..647caa08 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -58,33 +58,26 @@ func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj a // WatchResources watches Kubernetes resources func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { - buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) deleteBuffer := make(chan string, WatchResourceBufferSize) DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) - var restConfig *rest.Config var err error if config.Kubeconfig != nil { - ctx, restConfig, err = applyKubeconfig(ctx, *config.Kubeconfig) + ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig) if err != nil { - return fmt.Errorf("failed to apply kube config") + return fmt.Errorf("failed to apply custom kube config(%s): %w", config.Kubeconfig, err) } } else { - restConfig, err = kube.DefaultRestConfig() + _, err = kube.DefaultRestConfig() if err != nil { - return fmt.Errorf("failed to apply kube config") + return fmt.Errorf("failed to apply default kube config: %w", err) } } - clientset, err := kube.ClientSetFromRestConfig(restConfig) - if err != nil { - return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err) - } - - factory := informers.NewSharedInformerFactory(clientset, 0) + factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0) stopper := make(chan struct{}) defer close(stopper) @@ -133,8 +126,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { go informer.Run(stopper) } - ctx.Counter("kubernetes_scraper_resource_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) - ctx.Logger.V(1).Infof("waiting for informers") + ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) wg.Wait() return nil @@ -198,7 +190,7 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { } defer watcher.Stop() - ctx.Counter("kubernetes_scraper_event_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) + ctx.Counter("kubernetes_scraper_event_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) for watchEvent := range watcher.ResultChan() { var event v1.KubernetesEvent if err := event.FromObj(watchEvent.Object); err != nil { diff --git a/utils/kube/kube.go b/utils/kube/kube.go index 8a66eca3..6f6d75a3 100644 --- a/utils/kube/kube.go +++ b/utils/kube/kube.go @@ -14,9 +14,7 @@ limitations under the License. package kube import ( - "bytes" "context" - "encoding/gob" "fmt" "os" "path/filepath" @@ -42,34 +40,16 @@ import ( "k8s.io/client-go/util/homedir" ) -var ( - kubeClientCreatedCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "kube_client_from_rest_count", - Help: "The total number of times kubernetes clientset were created from rest configs", - }, - []string{}, - ) - - kubeClientCacheHitCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "kube_client_from_rest_count_cache_hit", - Help: "The total number of times kubernetes clientset were created from rest configs", - }, - []string{}, - ) - - kubeClientCreatErrorCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "kube_client_from_rest_error_count", - Help: "The total number of times kubernetes clientset were failed to be created from rest configs", - }, - []string{}, - ) +var kubeClientCreatedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kube_client_created_count", + Help: "The total number of times kubernetes clientset were created from kube config", + }, + []string{"cached"}, ) func init() { - prometheus.MustRegister(kubeClientCreatedCount, kubeClientCacheHitCount, kubeClientCreatErrorCount) + prometheus.MustRegister(kubeClientCreatedCount) } func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { @@ -122,56 +102,44 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return dc.Resource(mapping.Resource), nil } -var clientSetCache = cache.New(time.Hour*24, time.Hour*24) +var kubeCache = cache.New(time.Hour, time.Hour) -func ClientSetFromRestConfig(restConfig *rest.Config) (*kubernetes.Clientset, error) { - client, cached, err := clientSetFromRestConfigCached(restConfig) - if err != nil { - kubeClientCreatErrorCount.WithLabelValues().Inc() - return nil, err - } - - if cached { - kubeClientCacheHitCount.WithLabelValues().Inc() - } else { - kubeClientCreatedCount.WithLabelValues().Inc() - } - - return client, nil +type kubeCacheData struct { + Client kubernetes.Interface + Config *rest.Config } -func clientSetFromRestConfigCached(restConfig *rest.Config) (*kubernetes.Clientset, bool, error) { - // Using gob encoder because json encoder returned type error for transport wrapper func - var b bytes.Buffer - if err := gob.NewEncoder(&b).Encode(restConfig); err != nil { - return nil, false, fmt.Errorf("failed to gob encode restconfig: %w", err) - } - key := b.String() - - if val, ok := clientSetCache.Get(key); ok { - return val.(*kubernetes.Clientset), true, nil +func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) { + key := fmt.Sprintf("kube-config-path-%s", kubeConfigPath) + if val, ok := kubeCache.Get(key); ok { + d := val.(*kubeCacheData) + kubeClientCreatedCount.WithLabelValues("true").Inc() + return d.Client, d.Config, nil } - client, err := kubernetes.NewForConfig(restConfig) + config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) if err != nil { - return nil, false, err + return fake.NewSimpleClientset(), nil, err } - clientSetCache.SetDefault(key, client) - - return client, false, nil -} -func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + client, err := kubernetes.NewForConfig(config) if err != nil { return fake.NewSimpleClientset(), nil, err } - client, err := ClientSetFromRestConfig(config) + kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config}) + kubeClientCreatedCount.WithLabelValues("false").Inc() return client, config, err } func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Config, error) { + key := fmt.Sprintf("kube-config-%s", kubeConfig) + if val, ok := kubeCache.Get(key); ok { + kubeClientCreatedCount.WithLabelValues("true").Inc() + d := val.(*kubeCacheData) + return d.Client, d.Config, nil + } + getter := func() (*clientcmdapi.Config, error) { clientCfg, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig)) if err != nil { @@ -191,7 +159,13 @@ func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Con return fake.NewSimpleClientset(), nil, err } - client, err := ClientSetFromRestConfig(config) + client, err := kubernetes.NewForConfig(config) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + + kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config}) + kubeClientCreatedCount.WithLabelValues("false").Inc() return client, config, err } @@ -222,13 +196,17 @@ func NewK8sClient() (kubernetes.Interface, *rest.Config, error) { } } - client, err := ClientSetFromRestConfig(restConfig) + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + return client, restConfig, err } // GetClusterName ... func GetClusterName(config *rest.Config) string { - clientset, err := ClientSetFromRestConfig(config) + clientset, err := kubernetes.NewForConfig(config) if err != nil { return "" } From 639385b15197cb60295c9502843c2eda9de8d132 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 7 Jun 2024 07:35:30 +0545 Subject: [PATCH 6/7] feat: use global shared informer manager [skip ci] --- scrapers/cron.go | 27 +--- scrapers/kubernetes/events_watch.go | 96 ++------------ scrapers/kubernetes/informers.go | 187 ++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+), 109 deletions(-) create mode 100644 scrapers/kubernetes/informers.go diff --git a/scrapers/cron.go b/scrapers/cron.go index 85803f1d..38e24bed 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -87,28 +87,6 @@ func watchKubernetesEventsWithRetry(ctx api.ScrapeContext, config v1.Kubernetes) } } -func watchKubernetesResourcesWithRetry(ctx api.ScrapeContext, config v1.Kubernetes) { - const ( - timeout = time.Minute // how long to keep retrying before we reset and retry again - exponentialBaseDuration = time.Second - ) - - for { - backoff := retry.WithMaxDuration(timeout, retry.NewExponential(exponentialBaseDuration)) - err := retry.Do(ctx, backoff, func(ctxt gocontext.Context) error { - ctx := ctxt.(api.ScrapeContext) - if err := kubernetes.WatchResources(ctx, config); err != nil { - logger.Errorf("failed to watch resources: %v", err) - return retry.RetryableError(err) - } - - return nil - }) - - logger.Errorf("failed to watch kubernetes resources. cluster=%s: %v", config.ClusterName, err) - } -} - func SyncScrapeJob(sc api.ScrapeContext) error { id := sc.ScrapeConfig().GetPersistedID().String() @@ -175,7 +153,10 @@ func scheduleScraperJob(sc api.ScrapeContext) error { } go watchKubernetesEventsWithRetry(sc, config) - go watchKubernetesResourcesWithRetry(sc, config) + + if err := kubernetes.WatchResources(sc, config); err != nil { + return fmt.Errorf("failed to watch kubernetes resources: %v", err) + } eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config) if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 647caa08..6f7e8b50 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -11,10 +11,8 @@ import ( "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" @@ -56,7 +54,7 @@ func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj a return &unstructured.Unstructured{Object: m}, nil } -// WatchResources watches Kubernetes resources +// WatchResources watches Kubernetes resources with shared informers func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) @@ -77,95 +75,21 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } } - factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0) - stopper := make(chan struct{}) - defer close(stopper) - - var wg sync.WaitGroup for _, watchResource := range lo.Uniq(config.Watch) { - wg.Add(1) - - informer := getInformer(factory, watchResource.ApiVersion, watchResource.Kind) - if informer == nil { - return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", watchResource.ApiVersion, watchResource.Kind) - } - - _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - u, err := getUnstructuredFromInformedObj(watchResource, obj) - if err != nil { - logger.Errorf("failed to get unstructured from new object: %v", err) - return - } - - buffer <- u - }, - UpdateFunc: func(oldObj any, newObj any) { - u, err := getUnstructuredFromInformedObj(watchResource, newObj) - if err != nil { - logger.Errorf("failed to get unstructured from updated object: %v", err) - return - } - - buffer <- u - }, - DeleteFunc: func(obj any) { - u, err := getUnstructuredFromInformedObj(watchResource, obj) - if err != nil { - logger.Errorf("failed to get unstructured from deleted object: %v", err) - return - } - - deleteBuffer <- string(u.GetUID()) - }, - }) - if err != nil { - return fmt.Errorf("failed to add informent event handlers: %w", err) + if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, deleteBuffer); err != nil { + return fmt.Errorf("failed to register informer: %w", err) } - - go informer.Run(stopper) } - ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) - wg.Wait() - - return nil -} - -func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer { - // TODO: need to populate this - - kind = strings.ToLower(kind) - switch strings.ToLower(apiVersion) { - case "v1": - switch kind { - case "pod": - return factory.Core().V1().Pods().Informer() - case "node": - return factory.Core().V1().Nodes().Informer() - } - - case "apps/v1": - switch kind { - case "deployment": - return factory.Apps().V1().Deployments().Informer() - case "daemonset": - return factory.Apps().V1().DaemonSets().Informer() - case "replicaset": - return factory.Apps().V1().ReplicaSets().Informer() - case "statefulset": - return factory.Apps().V1().StatefulSets().Informer() - } - - case "batch/v1": - switch kind { - case "cronjob": - return factory.Batch().V1().CronJobs().Informer() - case "job": - return factory.Batch().V1().Jobs().Informer() - } + // Stop all the other active shared informers, if any, that were previously started + // but then removed from the config. + var existingWatches []string + for _, w := range config.Watch { + existingWatches = append(existingWatches, w.ApiVersion+w.Kind) } + globalSharedInformerManager.stop(ctx, config.Kubeconfig.ValueStatic, existingWatches...) + ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) return nil } diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go new file mode 100644 index 00000000..171b5b19 --- /dev/null +++ b/scrapers/kubernetes/informers.go @@ -0,0 +1,187 @@ +package kubernetes + +import ( + "fmt" + "strings" + "sync" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api" + v1 "github.com/flanksource/config-db/api/v1" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +type informerCacheData struct { + informer cache.SharedInformer + stopper chan (struct{}) +} + +// singleton +var globalSharedInformerManager = SharedInformerManager{ + cache: make(map[string]map[string]*informerCacheData), +} + +// SharedInformerManager distributes the same share informer for a given pair of +// +type SharedInformerManager struct { + mu sync.Mutex + cache map[string]map[string]*informerCacheData +} + +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error { + apiVersion, kind := watchResource.ApiVersion, watchResource.Kind + + informer, stopper, isNew := t.get(ctx, kubeconfig, apiVersion, kind) + if informer == nil { + return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", apiVersion, kind) + } + + if !isNew { + // event handlers have already been set. + // nothing left to do. + return nil + } + + ctx.Logger.V(0).Infof("registering shared informer for: %v", watchResource) + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from new object: %v", err) + return + } + + logger.Infof("added: %s %s", u.GetKind(), u.GetName()) + buffer <- u + }, + UpdateFunc: func(oldObj any, newObj any) { + u, err := getUnstructuredFromInformedObj(watchResource, newObj) + if err != nil { + logger.Errorf("failed to get unstructured from updated object: %v", err) + return + } + + logger.Infof("updated: %s %s", u.GetKind(), u.GetName()) + buffer <- u + }, + DeleteFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from deleted object: %v", err) + return + } + + logger.Infof("deleted:%s %s", u.GetKind(), u.GetName()) + deleteBuffer <- string(u.GetUID()) + }, + }) + if err != nil { + return fmt.Errorf("failed to add informent event handlers: %w", err) + } + + go func() { + informer.Run(stopper) + ctx.Logger.V(0).Infof("stopped shared informer for: %v", watchResource) + }() + return nil +} + +// get returns an existing shared informer instance or creates & returns a new one. +func (t *SharedInformerManager) get(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { + t.mu.Lock() + defer t.mu.Unlock() + + cacheKey := apiVersion + kind + + if val, ok := t.cache[kubeconfig]; ok { + if data, ok := val[cacheKey]; ok { + return data.informer, data.stopper, false + } + } + + factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0) + stopper := make(chan struct{}) + + informer := getInformer(factory, apiVersion, kind) + ctx.Gauge("kubernetes_active_shared_informers").Add(1) + + cacheValue := &informerCacheData{ + stopper: stopper, + informer: informer, + } + if _, ok := t.cache[kubeconfig]; ok { + t.cache[kubeconfig][cacheKey] = cacheValue + } else { + t.cache[kubeconfig] = map[string]*informerCacheData{ + cacheKey: cacheValue, + } + } + + return informer, stopper, true +} + +// stop stops all shared informers for the given kubeconfig +// apart from the ones provided. +func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, exception ...string) { + t.mu.Lock() + defer t.mu.Unlock() + + var toDelete []string + if informers, ok := t.cache[kubeconfig]; ok { + for key, cached := range informers { + if !lo.Contains(exception, key) { + ctx.Logger.V(0).Infof("stopping informer for %s", key) + + cached.informer.IsStopped() + ctx.Gauge("kubernetes_active_shared_informers").Sub(1) + + toDelete = append(toDelete, key) + close(cached.stopper) + } + } + } + + for _, key := range toDelete { + delete(t.cache[kubeconfig], key) + } +} + +func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer { + // TODO: need to populate this + + kind = strings.ToLower(kind) + switch strings.ToLower(apiVersion) { + case "v1": + switch kind { + case "pod": + return factory.Core().V1().Pods().Informer() + case "node": + return factory.Core().V1().Nodes().Informer() + } + + case "apps/v1": + switch kind { + case "deployment": + return factory.Apps().V1().Deployments().Informer() + case "daemonset": + return factory.Apps().V1().DaemonSets().Informer() + case "replicaset": + return factory.Apps().V1().ReplicaSets().Informer() + case "statefulset": + return factory.Apps().V1().StatefulSets().Informer() + } + + case "batch/v1": + switch kind { + case "cronjob": + return factory.Batch().V1().CronJobs().Informer() + case "job": + return factory.Batch().V1().Jobs().Informer() + } + } + + return nil +} From 8c65cebd8cf8b8a34225b1385da2ccb07eb867f8 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 7 Jun 2024 08:34:17 +0545 Subject: [PATCH 7/7] feat: remove delete buffer --- db/config.go | 8 ++--- scrapers/cron.go | 13 --------- scrapers/kubernetes/events_watch.go | 10 ++----- scrapers/kubernetes/informers.go | 45 ++++++++++++++++++++--------- 4 files changed, 36 insertions(+), 40 deletions(-) diff --git a/db/config.go b/db/config.go index ce2bbd14..6b10f3dd 100644 --- a/db/config.go +++ b/db/config.go @@ -213,14 +213,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } -func SoftDeleteConfigItems(ctx context.Context, ids []string) error { - if len(ids) == 0 { - return nil - } - +func SoftDeleteConfigItem(ctx context.Context, id string) error { return ctx.DB(). Model(&models.ConfigItem{}). - Where("id IN (?)", ids). + Where("id = ?", id). Update("deleted_at", gorm.Expr("NOW()")). Error } diff --git a/scrapers/cron.go b/scrapers/cron.go index 38e24bed..8d1a5aeb 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -296,19 +296,6 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } } - _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) - if !ok { - return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) - } - deletChan := _deleteCh.(chan string) - - if len(deletChan) > 0 { - deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan)) - if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil { - return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) - } - } - return nil }, } diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 6f7e8b50..7d7ba192 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -16,6 +16,7 @@ import ( "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/utils/kube" ) @@ -30,10 +31,6 @@ var ( // WatchEventBuffers stores a sync buffer per kubernetes config WatchResourceBuffer = sync.Map{} - - // DeleteResourceBuffer stores a buffer per kubernetes config - // that contains the ids of resources that have been deleted. - DeleteResourceBuffer = sync.Map{} ) func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) { @@ -59,9 +56,6 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) - deleteBuffer := make(chan string, WatchResourceBufferSize) - DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) - var err error if config.Kubeconfig != nil { ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig) @@ -76,7 +70,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } for _, watchResource := range lo.Uniq(config.Watch) { - if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, deleteBuffer); err != nil { + if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, db.SoftDeleteConfigItem); err != nil { return fmt.Errorf("failed to register informer: %w", err) } } diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 171b5b19..6833d79b 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -8,6 +8,9 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" + "github.com/google/uuid" "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/informers" @@ -31,10 +34,12 @@ type SharedInformerManager struct { cache map[string]map[string]*informerCacheData } -func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error { +type DeleteObjHandler func(ctx context.Context, id string) error + +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteHandler DeleteObjHandler) error { apiVersion, kind := watchResource.ApiVersion, watchResource.Kind - informer, stopper, isNew := t.get(ctx, kubeconfig, apiVersion, kind) + informer, stopper, isNew := t.getOrCreate(ctx, kubeconfig, apiVersion, kind) if informer == nil { return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", apiVersion, kind) } @@ -45,7 +50,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return nil } - ctx.Logger.V(0).Infof("registering shared informer for: %v", watchResource) + ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource) _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) @@ -54,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - logger.Infof("added: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("added: %s %s", u.GetKind(), u.GetName()) buffer <- u }, UpdateFunc: func(oldObj any, newObj any) { @@ -64,33 +69,36 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - logger.Infof("updated: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("updated: %s %s", u.GetKind(), u.GetName()) buffer <- u }, DeleteFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) if err != nil { - logger.Errorf("failed to get unstructured from deleted object: %v", err) + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), "failed to get unstructured %v", err) return } - logger.Infof("deleted:%s %s", u.GetKind(), u.GetName()) - deleteBuffer <- string(u.GetUID()) + if err := deleteHandler(ctx.Context, string(u.GetUID())); err != nil { + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), + "failed to delete (%s) %s/%s/%s resources: %v", u.GetUID(), u.GetKind(), u.GetNamespace(), u.GetName(), err) + return + } }, }) if err != nil { - return fmt.Errorf("failed to add informent event handlers: %w", err) + return fmt.Errorf("failed to add informer event handlers: %w", err) } go func() { informer.Run(stopper) - ctx.Logger.V(0).Infof("stopped shared informer for: %v", watchResource) + ctx.Logger.V(1).Infof("stopped shared informer for: %v", watchResource) }() return nil } -// get returns an existing shared informer instance or creates & returns a new one. -func (t *SharedInformerManager) get(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { +// getOrCreate returns an existing shared informer instance or creates & returns a new one. +func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { t.mu.Lock() defer t.mu.Unlock() @@ -133,7 +141,7 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e if informers, ok := t.cache[kubeconfig]; ok { for key, cached := range informers { if !lo.Contains(exception, key) { - ctx.Logger.V(0).Infof("stopping informer for %s", key) + ctx.Logger.V(1).Infof("stopping informer for %s", key) cached.informer.IsStopped() ctx.Gauge("kubernetes_active_shared_informers").Sub(1) @@ -185,3 +193,14 @@ func getInformer(factory informers.SharedInformerFactory, apiVersion, kind strin return nil } + +// logToJobHistory logs any failures in saving a playbook run to the job history. +func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err string, args ...any) { + jobHistory := models.NewJobHistory(ctx.Logger, job, "", lo.FromPtr(scraperID).String()) + jobHistory.Start() + jobHistory.AddErrorf(err, args...) + + if err := jobHistory.End().Persist(ctx.DB()); err != nil { + logger.Errorf("error persisting job history: %v", err) + } +}