diff --git a/db/config.go b/db/config.go index ce2bbd140..6b10f3ddd 100644 --- a/db/config.go +++ b/db/config.go @@ -213,14 +213,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } -func SoftDeleteConfigItems(ctx context.Context, ids []string) error { - if len(ids) == 0 { - return nil - } - +func SoftDeleteConfigItem(ctx context.Context, id string) error { return ctx.DB(). Model(&models.ConfigItem{}). - Where("id IN (?)", ids). + Where("id = ?", id). Update("deleted_at", gorm.Expr("NOW()")). Error } diff --git a/scrapers/cron.go b/scrapers/cron.go index 38e24bedf..8d1a5aebf 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -296,19 +296,6 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } } - _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) - if !ok { - return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) - } - deletChan := _deleteCh.(chan string) - - if len(deletChan) > 0 { - deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan)) - if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil { - return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) - } - } - return nil }, } diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 6f7e8b504..7d7ba1928 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -16,6 +16,7 @@ import ( "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/utils/kube" ) @@ -30,10 +31,6 @@ var ( // WatchEventBuffers stores a sync buffer per kubernetes config WatchResourceBuffer = sync.Map{} - - // DeleteResourceBuffer stores a buffer per kubernetes config - // that contains the ids of resources that have been deleted. - DeleteResourceBuffer = sync.Map{} ) func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) { @@ -59,9 +56,6 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) - deleteBuffer := make(chan string, WatchResourceBufferSize) - DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) - var err error if config.Kubeconfig != nil { ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig) @@ -76,7 +70,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { } for _, watchResource := range lo.Uniq(config.Watch) { - if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, deleteBuffer); err != nil { + if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, db.SoftDeleteConfigItem); err != nil { return fmt.Errorf("failed to register informer: %w", err) } } diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 171b5b19a..59286a601 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -8,6 +8,9 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/models" + "github.com/google/uuid" "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/informers" @@ -31,7 +34,9 @@ type SharedInformerManager struct { cache map[string]map[string]*informerCacheData } -func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error { +type DeleteObjHandler func(ctx context.Context, id string) error + +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteHandler DeleteObjHandler) error { apiVersion, kind := watchResource.ApiVersion, watchResource.Kind informer, stopper, isNew := t.get(ctx, kubeconfig, apiVersion, kind) @@ -74,8 +79,10 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - logger.Infof("deleted:%s %s", u.GetKind(), u.GetName()) - deleteBuffer <- string(u.GetUID()) + if err := deleteHandler(ctx.Context, string(u.GetUID())); err != nil { + logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), "failed to delete %s/%s/%s resources: %v", u.GetKind(), u.GetNamespace(), u.GetName(), err) + return + } }, }) if err != nil { @@ -185,3 +192,14 @@ func getInformer(factory informers.SharedInformerFactory, apiVersion, kind strin return nil } + +// logToJobHistory logs any failures in saving a playbook run to the job history. +func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err string, args ...any) { + jobHistory := models.NewJobHistory(ctx.Logger, job, "", lo.FromPtr(scraperID).String()) + jobHistory.Start() + jobHistory.AddErrorf(err, args...) + + if err := jobHistory.End().Persist(ctx.DB()); err != nil { + logger.Errorf("error persisting job history: %v", err) + } +}