diff --git a/api/v1/types.go b/api/v1/types.go index 8bdd67cd..b389e719 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -101,7 +101,8 @@ func (e ExternalID) WhereClause(db *gorm.DB) *gorm.DB { type ConfigDeleteReason string var ( - // DeletedReasonStale is used when a config item doesn't get updated on a scrape run. + // DeletedReasonStale is used when a config item doesn't get updated + // for a period of `staleItemAge`. DeletedReasonStale ConfigDeleteReason = "STALE" // DeletedReasonFromAttribute is used when a deletion field (& reason) diff --git a/db/config.go b/db/config.go index 3693cd99..ce2bbd14 100644 --- a/db/config.go +++ b/db/config.go @@ -17,6 +17,7 @@ import ( "github.com/google/uuid" "github.com/lib/pq" "github.com/ohler55/ojg/oj" + "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -211,3 +212,15 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } + +func SoftDeleteConfigItems(ctx context.Context, ids []string) error { + if len(ids) == 0 { + return nil + } + + return ctx.DB(). + Model(&models.ConfigItem{}). + Where("id IN (?)", ids). + Update("deleted_at", gorm.Expr("NOW()")). + Error +} diff --git a/scrapers/cron.go b/scrapers/cron.go index eda6209f..e99a4fce 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -208,10 +208,12 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name), ResourceType: job.ResourceTypeScraper, Fn: func(ctx job.JobRuntime) error { - ch, ok := kubernetes.WatchEventBuffers[config.Hash()] + _ch, ok := kubernetes.WatchEventBuffers.Load(config.Hash()) if !ok { return fmt.Errorf("no watcher found for config (scrapeconfig: %s) %s", scrapeConfig.GetUID(), config.Hash()) } + + ch := _ch.(chan v1.KubernetesEvent) events, _, _, _ := lo.Buffer(ch, len(ch)) cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) @@ -273,10 +275,11 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name), ResourceType: job.ResourceTypeScraper, Fn: func(ctx job.JobRuntime) error { - ch, ok := kubernetes.WatchResourceBuffer[config.Hash()] + _ch, ok := kubernetes.WatchResourceBuffer.Load(config.Hash()) if !ok { return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) } + ch := _ch.(chan *unstructured.Unstructured) objs, _, _, _ := lo.Buffer(ch, len(ch)) // NOTE: The resource watcher can return multiple objects for the same NEW resource. @@ -308,6 +311,19 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } } + _deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash()) + if !ok { + return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) + } + deletChan := _deleteCh.(chan string) + + if len(deletChan) > 0 { + deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan)) + if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil { + return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) + } + } + return nil }, } diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index e801e9dd..e9c94a9d 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -3,6 +3,7 @@ package kubernetes import ( "fmt" "strings" + "sync" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/types" @@ -24,18 +25,25 @@ var ( BufferSize = 5000 // WatchEventBuffers stores a sync buffer per kubernetes config - WatchEventBuffers = make(map[string]chan v1.KubernetesEvent) + WatchEventBuffers = sync.Map{} WatchResourceBufferSize = 5000 // WatchEventBuffers stores a sync buffer per kubernetes config - WatchResourceBuffer = make(map[string]chan *unstructured.Unstructured) + WatchResourceBuffer = sync.Map{} + + // DeleteResourceBuffer stores a buffer per kubernetes config + // that contains the ids of resources that have been deleted. + DeleteResourceBuffer = sync.Map{} ) // WatchResources watches Kubernetes resources func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) - WatchResourceBuffer[config.Hash()] = buffer + WatchResourceBuffer.Store(config.Hash(), buffer) + + deleteBuffer := make(chan string, WatchResourceBufferSize) + DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) var restConfig *rest.Config var err error @@ -70,7 +78,11 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { for watchEvent := range utils.MergeChannels(channels...) { obj, ok := watchEvent.Object.(*unstructured.Unstructured) if ok { - buffer <- obj + if watchEvent.Type == watch.Deleted { + deleteBuffer <- string(obj.GetUID()) + } else { + buffer <- obj + } } } @@ -81,7 +93,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { // the referenced config items in batches. func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { buffer := make(chan v1.KubernetesEvent, ctx.DutyContext().Properties().Int("kubernetes.watch.events.bufferSize", BufferSize)) - WatchEventBuffers[config.Hash()] = buffer + WatchEventBuffers.Store(config.Hash(), buffer) if config.Kubeconfig != nil { var err error diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index 7fb60149..2dfcfef7 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -401,14 +401,8 @@ func ExtractResults(ctx context.Context, config v1.Kubernetes, objs []*unstructu resourceHealth = &health.HealthStatus{} } - createdAt := obj.GetCreationTimestamp().Time var deletedAt *time.Time var deleteReason v1.ConfigDeleteReason - if !obj.GetDeletionTimestamp().IsZero() { - deletedAt = &obj.GetDeletionTimestamp().Time - deleteReason = v1.DeletedReasonFromAttribute - } - // Evicted Pods must be considered deleted if obj.GetKind() == "Pod" { if objStatus, ok := obj.Object["status"].(map[string]any); ok { @@ -437,7 +431,7 @@ func ExtractResults(ctx context.Context, config v1.Kubernetes, objs []*unstructu Health: models.Health(resourceHealth.Health), Ready: resourceHealth.Ready, Description: resourceHealth.Message, - CreatedAt: &createdAt, + CreatedAt: lo.ToPtr(obj.GetCreationTimestamp().Time), DeletedAt: deletedAt, DeleteReason: deleteReason, Config: configObj,