diff --git a/db/config.go b/db/config.go index ce2bbd14..6b10f3dd 100644 --- a/db/config.go +++ b/db/config.go @@ -213,14 +213,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } -func SoftDeleteConfigItems(ctx context.Context, ids []string) error { - if len(ids) == 0 { - return nil - } - +func SoftDeleteConfigItem(ctx context.Context, id string) error { return ctx.DB(). Model(&models.ConfigItem{}). - Where("id IN (?)", ids). + Where("id = ?", id). Update("deleted_at", gorm.Expr("NOW()")). Error } diff --git a/scrapers/cron.go b/scrapers/cron.go index 85803f1d..8d1a5aeb 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -87,28 +87,6 @@ func watchKubernetesEventsWithRetry(ctx api.ScrapeContext, config v1.Kubernetes) } } -func watchKubernetesResourcesWithRetry(ctx api.ScrapeContext, config v1.Kubernetes) { - const ( - timeout = time.Minute // how long to keep retrying before we reset and retry again - exponentialBaseDuration = time.Second - ) - - for { - backoff := retry.WithMaxDuration(timeout, retry.NewExponential(exponentialBaseDuration)) - err := retry.Do(ctx, backoff, func(ctxt gocontext.Context) error { - ctx := ctxt.(api.ScrapeContext) - if err := kubernetes.WatchResources(ctx, config); err != nil { - logger.Errorf("failed to watch resources: %v", err) - return retry.RetryableError(err) - } - - return nil - }) - - logger.Errorf("failed to watch kubernetes resources. cluster=%s: %v", config.ClusterName, err) - } -} - func SyncScrapeJob(sc api.ScrapeContext) error { id := sc.ScrapeConfig().GetPersistedID().String() @@ -175,7 +153,10 @@ func scheduleScraperJob(sc api.ScrapeContext) error { } go watchKubernetesEventsWithRetry(sc, config) - go watchKubernetesResourcesWithRetry(sc, config) + + if err := kubernetes.WatchResources(sc, config); err != nil { + return fmt.Errorf("failed to watch kubernetes resources: %v", err) + } eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config) if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { @@ -315,19 +296,6 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } } - _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) - if !ok { - return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) - } - deletChan := _deleteCh.(chan string) - - if len(deletChan) > 0 { - deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan)) - if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil { - return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) - } - } - return nil }, } diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index e9c94a9d..7d7ba192 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -1,6 +1,7 @@ package kubernetes import ( + "encoding/json" "fmt" "strings" "sync" @@ -10,13 +11,12 @@ import ( "github.com/samber/lo" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" - "github.com/flanksource/config-db/utils" + "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/utils/kube" ) @@ -31,61 +31,59 @@ var ( // WatchEventBuffers stores a sync buffer per kubernetes config WatchResourceBuffer = sync.Map{} - - // DeleteResourceBuffer stores a buffer per kubernetes config - // that contains the ids of resources that have been deleted. - DeleteResourceBuffer = sync.Map{} ) -// WatchResources watches Kubernetes resources +func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) { + b, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("failed to marshal: %v", err) + } + + var m map[string]any + if err := json.Unmarshal(b, &m); err != nil { + return nil, fmt.Errorf("failed to unmarshal on add func: %v", err) + } + + // The object returned by the informers do not have kind and apiversion set + m["kind"] = resource.Kind + m["apiVersion"] = resource.ApiVersion + + return &unstructured.Unstructured{Object: m}, nil +} + +// WatchResources watches Kubernetes resources with shared informers func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) - deleteBuffer := make(chan string, WatchResourceBufferSize) - DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) - - var restConfig *rest.Config var err error if config.Kubeconfig != nil { - ctx, restConfig, err = applyKubeconfig(ctx, *config.Kubeconfig) + ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig) if err != nil { - return fmt.Errorf("failed to apply kube config") + return fmt.Errorf("failed to apply custom kube config(%s): %w", config.Kubeconfig, err) } } else { - restConfig, err = kube.DefaultRestConfig() + _, err = kube.DefaultRestConfig() if err != nil { - return fmt.Errorf("failed to apply kube config") + return fmt.Errorf("failed to apply default kube config: %w", err) } } - var channels []<-chan watch.Event - for _, k := range lo.Uniq(config.Watch) { - client, err := kube.GetClientByGroupVersionKind(restConfig, k.ApiVersion, k.Kind) - if err != nil { - return fmt.Errorf("failed to create client for kind(%s): %v", k, err) - } - - watcher, err := client.Watch(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("failed to create watcher for kind(%s): %v", k, err) + for _, watchResource := range lo.Uniq(config.Watch) { + if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, db.SoftDeleteConfigItem); err != nil { + return fmt.Errorf("failed to register informer: %w", err) } - defer watcher.Stop() - - channels = append(channels, watcher.ResultChan()) } - for watchEvent := range utils.MergeChannels(channels...) { - obj, ok := watchEvent.Object.(*unstructured.Unstructured) - if ok { - if watchEvent.Type == watch.Deleted { - deleteBuffer <- string(obj.GetUID()) - } else { - buffer <- obj - } - } + // Stop all the other active shared informers, if any, that were previously started + // but then removed from the config. + var existingWatches []string + for _, w := range config.Watch { + existingWatches = append(existingWatches, w.ApiVersion+w.Kind) } + globalSharedInformerManager.stop(ctx, config.Kubeconfig.ValueStatic, existingWatches...) + ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) return nil } @@ -110,6 +108,7 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { } defer watcher.Stop() + ctx.Counter("kubernetes_scraper_event_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) for watchEvent := range watcher.ResultChan() { var event v1.KubernetesEvent if err := event.FromObj(watchEvent.Object); err != nil { diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go new file mode 100644 index 00000000..6833d79b --- /dev/null +++ b/scrapers/kubernetes/informers.go @@ -0,0 +1,206 @@ +package kubernetes + +import ( + "fmt" + "strings" + "sync" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/config-db/api" + v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" + "github.com/google/uuid" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +type informerCacheData struct { + informer cache.SharedInformer + stopper chan (struct{}) +} + +// singleton +var globalSharedInformerManager = SharedInformerManager{ + cache: make(map[string]map[string]*informerCacheData), +} + +// SharedInformerManager distributes the same share informer for a given pair of +// +type SharedInformerManager struct { + mu sync.Mutex + cache map[string]map[string]*informerCacheData +} + +type DeleteObjHandler func(ctx context.Context, id string) error + +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteHandler DeleteObjHandler) error { + apiVersion, kind := watchResource.ApiVersion, watchResource.Kind + + informer, stopper, isNew := t.getOrCreate(ctx, kubeconfig, apiVersion, kind) + if informer == nil { + return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", apiVersion, kind) + } + + if !isNew { + // event handlers have already been set. + // nothing left to do. + return nil + } + + ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource) + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logger.Errorf("failed to get unstructured from new object: %v", err) + return + } + + ctx.Logger.V(2).Infof("added: %s %s", u.GetKind(), u.GetName()) + buffer <- u + }, + UpdateFunc: func(oldObj any, newObj any) { + u, err := getUnstructuredFromInformedObj(watchResource, newObj) + if err != nil { + logger.Errorf("failed to get unstructured from updated object: %v", err) + return + } + + ctx.Logger.V(2).Infof("updated: %s %s", u.GetKind(), u.GetName()) + buffer <- u + }, + DeleteFunc: func(obj any) { + u, err := getUnstructuredFromInformedObj(watchResource, obj) + if err != nil { + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), "failed to get unstructured %v", err) + return + } + + if err := deleteHandler(ctx.Context, string(u.GetUID())); err != nil { + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), + "failed to delete (%s) %s/%s/%s resources: %v", u.GetUID(), u.GetKind(), u.GetNamespace(), u.GetName(), err) + return + } + }, + }) + if err != nil { + return fmt.Errorf("failed to add informer event handlers: %w", err) + } + + go func() { + informer.Run(stopper) + ctx.Logger.V(1).Infof("stopped shared informer for: %v", watchResource) + }() + return nil +} + +// getOrCreate returns an existing shared informer instance or creates & returns a new one. +func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { + t.mu.Lock() + defer t.mu.Unlock() + + cacheKey := apiVersion + kind + + if val, ok := t.cache[kubeconfig]; ok { + if data, ok := val[cacheKey]; ok { + return data.informer, data.stopper, false + } + } + + factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0) + stopper := make(chan struct{}) + + informer := getInformer(factory, apiVersion, kind) + ctx.Gauge("kubernetes_active_shared_informers").Add(1) + + cacheValue := &informerCacheData{ + stopper: stopper, + informer: informer, + } + if _, ok := t.cache[kubeconfig]; ok { + t.cache[kubeconfig][cacheKey] = cacheValue + } else { + t.cache[kubeconfig] = map[string]*informerCacheData{ + cacheKey: cacheValue, + } + } + + return informer, stopper, true +} + +// stop stops all shared informers for the given kubeconfig +// apart from the ones provided. +func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, exception ...string) { + t.mu.Lock() + defer t.mu.Unlock() + + var toDelete []string + if informers, ok := t.cache[kubeconfig]; ok { + for key, cached := range informers { + if !lo.Contains(exception, key) { + ctx.Logger.V(1).Infof("stopping informer for %s", key) + + cached.informer.IsStopped() + ctx.Gauge("kubernetes_active_shared_informers").Sub(1) + + toDelete = append(toDelete, key) + close(cached.stopper) + } + } + } + + for _, key := range toDelete { + delete(t.cache[kubeconfig], key) + } +} + +func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer { + // TODO: need to populate this + + kind = strings.ToLower(kind) + switch strings.ToLower(apiVersion) { + case "v1": + switch kind { + case "pod": + return factory.Core().V1().Pods().Informer() + case "node": + return factory.Core().V1().Nodes().Informer() + } + + case "apps/v1": + switch kind { + case "deployment": + return factory.Apps().V1().Deployments().Informer() + case "daemonset": + return factory.Apps().V1().DaemonSets().Informer() + case "replicaset": + return factory.Apps().V1().ReplicaSets().Informer() + case "statefulset": + return factory.Apps().V1().StatefulSets().Informer() + } + + case "batch/v1": + switch kind { + case "cronjob": + return factory.Batch().V1().CronJobs().Informer() + case "job": + return factory.Batch().V1().Jobs().Informer() + } + } + + return nil +} + +// logToJobHistory logs any failures in saving a playbook run to the job history. +func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err string, args ...any) { + jobHistory := models.NewJobHistory(ctx.Logger, job, "", lo.FromPtr(scraperID).String()) + jobHistory.Start() + jobHistory.AddErrorf(err, args...) + + if err := jobHistory.End().Persist(ctx.DB()); err != nil { + logger.Errorf("error persisting job history: %v", err) + } +} diff --git a/utils/kube/kube.go b/utils/kube/kube.go index 83b2c408..6f6d75a3 100644 --- a/utils/kube/kube.go +++ b/utils/kube/kube.go @@ -29,6 +29,8 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "github.com/flanksource/commons/files" + "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus" "gopkg.in/flanksource/yaml.v3" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +40,18 @@ import ( "k8s.io/client-go/util/homedir" ) +var kubeClientCreatedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kube_client_created_count", + Help: "The total number of times kubernetes clientset were created from kube config", + }, + []string{"cached"}, +) + +func init() { + prometheus.MustRegister(kubeClientCreatedCount) +} + func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { // re-use kubectl cache host := config.Host @@ -53,7 +67,7 @@ func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { return restmapper.NewDeferredDiscoveryRESTMapper(cache), nil } -func getGroupVersion(apiVersion string) (string, string) { +func GetGroupVersion(apiVersion string) (string, string) { split := strings.Split(apiVersion, "/") if len(split) == 1 { return "", apiVersion @@ -73,7 +87,7 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return nil, err } - group, version := getGroupVersion(apiVersion) + group, version := GetGroupVersion(apiVersion) gvk, err := rm.KindFor(schema.GroupVersionResource{Group: group, Version: version, Resource: kind}) if err != nil { return nil, err @@ -88,17 +102,44 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return dc.Resource(mapping.Resource), nil } +var kubeCache = cache.New(time.Hour, time.Hour) + +type kubeCacheData struct { + Client kubernetes.Interface + Config *rest.Config +} + func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) { + key := fmt.Sprintf("kube-config-path-%s", kubeConfigPath) + if val, ok := kubeCache.Get(key); ok { + d := val.(*kubeCacheData) + kubeClientCreatedCount.WithLabelValues("true").Inc() + return d.Client, d.Config, nil + } + config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) if err != nil { return fake.NewSimpleClientset(), nil, err } client, err := kubernetes.NewForConfig(config) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + + kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config}) + kubeClientCreatedCount.WithLabelValues("false").Inc() return client, config, err } func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Config, error) { + key := fmt.Sprintf("kube-config-%s", kubeConfig) + if val, ok := kubeCache.Get(key); ok { + kubeClientCreatedCount.WithLabelValues("true").Inc() + d := val.(*kubeCacheData) + return d.Client, d.Config, nil + } + getter := func() (*clientcmdapi.Config, error) { clientCfg, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig)) if err != nil { @@ -119,6 +160,12 @@ func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Con } client, err := kubernetes.NewForConfig(config) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + + kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config}) + kubeClientCreatedCount.WithLabelValues("false").Inc() return client, config, err } @@ -150,6 +197,10 @@ func NewK8sClient() (kubernetes.Interface, *rest.Config, error) { } client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + return client, restConfig, err } diff --git a/utils/kube/kube_test.go b/utils/kube/kube_test.go index 103046a5..023c3b06 100644 --- a/utils/kube/kube_test.go +++ b/utils/kube/kube_test.go @@ -25,7 +25,7 @@ func TestGetGroupVersion(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - group, version := getGroupVersion(tc.apiVersion) + group, version := GetGroupVersion(tc.apiVersion) if group != tc.expectedGroup || version != tc.expectedVersion { t.Errorf("getGroupVersion(%q) = %q, %q; expected %q, %q", tc.apiVersion, group, version, tc.expectedGroup, tc.expectedVersion)