diff --git a/db/config.go b/db/config.go index ce2bbd14..6b10f3dd 100644 --- a/db/config.go +++ b/db/config.go @@ -213,14 +213,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } -func SoftDeleteConfigItems(ctx context.Context, ids []string) error { - if len(ids) == 0 { - return nil - } - +func SoftDeleteConfigItem(ctx context.Context, id string) error { return ctx.DB(). Model(&models.ConfigItem{}). - Where("id IN (?)", ids). + Where("id = ?", id). Update("deleted_at", gorm.Expr("NOW()")). Error } diff --git a/scrapers/cron.go b/scrapers/cron.go index 38e24bed..8d1a5aeb 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -296,19 +296,6 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } } - _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) - if !ok { - return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) - } - deletChan := _deleteCh.(chan string) - - if len(deletChan) > 0 { - deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan)) - if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil { - return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) - } - } - return nil }, } diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 6f7e8b50..7d7ba192 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -16,6 +16,7 @@ import ( "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/utils/kube" ) @@ -30,10 +31,6 @@ var ( // WatchEventBuffers stores a sync buffer per kubernetes config WatchResourceBuffer = sync.Map{} - - // DeleteResourceBuffer stores a buffer per kubernetes config - // that contains the ids of resources that have been deleted. - DeleteResourceBuffer = sync.Map{} ) func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) { @@ -59,9 +56,6 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) - deleteBuffer := make(chan string, WatchResourceBufferSize) - DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) - var err error if config.Kubeconfig != nil { ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig) @@ -76,7 +70,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } for _, watchResource := range lo.Uniq(config.Watch) { - if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, deleteBuffer); err != nil { + if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, db.SoftDeleteConfigItem); err != nil { return fmt.Errorf("failed to register informer: %w", err) } } diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 171b5b19..6833d79b 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -8,6 +8,9 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" + "github.com/google/uuid" "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/informers" @@ -31,10 +34,12 @@ type SharedInformerManager struct { cache map[string]map[string]*informerCacheData } -func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error { +type DeleteObjHandler func(ctx context.Context, id string) error + +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteHandler DeleteObjHandler) error { apiVersion, kind := watchResource.ApiVersion, watchResource.Kind - informer, stopper, isNew := t.get(ctx, kubeconfig, apiVersion, kind) + informer, stopper, isNew := t.getOrCreate(ctx, kubeconfig, apiVersion, kind) if informer == nil { return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", apiVersion, kind) } @@ -45,7 +50,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return nil } - ctx.Logger.V(0).Infof("registering shared informer for: %v", watchResource) + ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource) _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) @@ -54,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - logger.Infof("added: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("added: %s %s", u.GetKind(), u.GetName()) buffer <- u }, UpdateFunc: func(oldObj any, newObj any) { @@ -64,33 +69,36 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - logger.Infof("updated: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("updated: %s %s", u.GetKind(), u.GetName()) buffer <- u }, DeleteFunc: func(obj any) { u, err := getUnstructuredFromInformedObj(watchResource, obj) if err != nil { - logger.Errorf("failed to get unstructured from deleted object: %v", err) + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), "failed to get unstructured %v", err) return } - logger.Infof("deleted:%s %s", u.GetKind(), u.GetName()) - deleteBuffer <- string(u.GetUID()) + if err := deleteHandler(ctx.Context, string(u.GetUID())); err != nil { + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), + "failed to delete (%s) %s/%s/%s resources: %v", u.GetUID(), u.GetKind(), u.GetNamespace(), u.GetName(), err) + return + } }, }) if err != nil { - return fmt.Errorf("failed to add informent event handlers: %w", err) + return fmt.Errorf("failed to add informer event handlers: %w", err) } go func() { informer.Run(stopper) - ctx.Logger.V(0).Infof("stopped shared informer for: %v", watchResource) + ctx.Logger.V(1).Infof("stopped shared informer for: %v", watchResource) }() return nil } -// get returns an existing shared informer instance or creates & returns a new one. -func (t *SharedInformerManager) get(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { +// getOrCreate returns an existing shared informer instance or creates & returns a new one. +func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) { t.mu.Lock() defer t.mu.Unlock() @@ -133,7 +141,7 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e if informers, ok := t.cache[kubeconfig]; ok { for key, cached := range informers { if !lo.Contains(exception, key) { - ctx.Logger.V(0).Infof("stopping informer for %s", key) + ctx.Logger.V(1).Infof("stopping informer for %s", key) cached.informer.IsStopped() ctx.Gauge("kubernetes_active_shared_informers").Sub(1) @@ -185,3 +193,14 @@ func getInformer(factory informers.SharedInformerFactory, apiVersion, kind strin return nil } + +// logToJobHistory logs any failures in saving a playbook run to the job history. +func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err string, args ...any) { + jobHistory := models.NewJobHistory(ctx.Logger, job, "", lo.FromPtr(scraperID).String()) + jobHistory.Start() + jobHistory.AddErrorf(err, args...) + + if err := jobHistory.End().Persist(ctx.DB()); err != nil { + logger.Errorf("error persisting job history: %v", err) + } +}