From 1b39557a476f2177f3d25cd958ad4c9288fe5c7d Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 2 Jan 2025 14:36:01 +0545 Subject: [PATCH] fix: prevent infinite re-queuing of events with missing involved objects For events of temporary kubernetes resources (like a pod created by a canary and immediately deleted), we'll not find the involved ever. The event would then be requeued again & again. This resulted in ever increasing queue and high db calls. * fix: mark the scrape context as incremental * fix: change labels on temp cache metrics --- api/cache.go | 33 +++++++++++++++++---------------- scrapers/incremental.go | 12 ++++++++---- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/api/cache.go b/api/cache.go index 27abaa3f..64d497fb 100644 --- a/api/cache.go +++ b/api/cache.go @@ -2,7 +2,6 @@ package api import ( "fmt" - "strconv" "strings" "github.com/flanksource/commons/logger" @@ -59,7 +58,7 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi if _, ok := t.notFound[lookup.Key()]; ok { ctx.Counter("temp_cache_missing_hit", "scraper_id", ctx.ScraperID(), - "incremental", strconv.FormatBool(ctx.IsIncrementalScrape()), + "scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"), "query_type", "external_id", ).Add(1) return nil, nil @@ -74,17 +73,18 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi return t.Get(ctx, alias) } - ctx.Counter("temp_cache_miss", - "scraper_id", ctx.ScraperID(), - "incremental", strconv.FormatBool(ctx.IsIncrementalScrape()), - "query_type", "external_id", - ).Add(1) - var result models.ConfigItem if err := lookup.Find(ctx.DB()).Find(&result).Error; err != nil { return nil, err } + ctx.Counter("temp_cache_miss", + "scraper_id", ctx.ScraperID(), + "scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"), + "query_type", "external_id", + "config_type", result.Type, + ).Add(1) + if result.ID == "" { t.notFound[lookup.Key()] = struct{}{} return nil, nil @@ -130,7 +130,7 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod if _, notFound := t.notFound[id]; notFound && !optMap[IgnoreNotFound] { ctx.Counter("temp_cache_missing_hit", "scraper_id", ctx.ScraperID(), - "incremental", strconv.FormatBool(ctx.IsIncrementalScrape()), + "scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"), "query_type", "id", ).Add(1) return nil, nil @@ -139,18 +139,12 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod if item, ok := t.items[id]; ok { ctx.Counter("temp_cache_hit", "scraper_id", ctx.ScraperID(), - "incremental", strconv.FormatBool(ctx.IsIncrementalScrape()), + "scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"), "query_type", "id", ).Add(1) return &item, nil } - ctx.Counter("temp_cache_miss", - "scraper_id", ctx.ScraperID(), - "incremental", strconv.FormatBool(ctx.IsIncrementalScrape()), - "query_type", "id", - ).Add(1) - result := models.ConfigItem{} if uuid.Validate(id) == nil { @@ -167,6 +161,13 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod } } + ctx.Counter("temp_cache_miss", + "scraper_id", ctx.ScraperID(), + "scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"), + "query_type", "id", + "config_type", result.Type, + ).Add(1) + if result.ID != "" { t.Insert(result) return &result, nil diff --git a/scrapers/incremental.go b/scrapers/incremental.go index ba09cfb8..127976f6 100644 --- a/scrapers/incremental.go +++ b/scrapers/incremental.go @@ -51,7 +51,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q config := config.DeepCopy() - sc := sc.WithScrapeConfig(sc.ScrapeConfig(), plugins...) + sc := sc.WithScrapeConfig(sc.ScrapeConfig(), plugins...).AsIncrementalScrape() config.BaseScraper = config.BaseScraper.ApplyPlugins(plugins...) // Use temp cache if it already exists for scraper @@ -110,11 +110,15 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q ci, err := sc.TempCache().Get(sc, string(involvedObject.UID), api.IgnoreNotFound) if err != nil { return fmt.Errorf("failed to fetch from cache: %w", err) - } - if ci == nil { - queue.EnqueueWithDelay(kubernetes.NewQueueItem(obj, kubernetes.QueueItemOperationReEnqueue), 30*time.Second) + } else if ci == nil { // Remove event object from objects list objs = lo.DropRight(objs, 1) + + // Re-enqueue the event with a delay but only if it wasn't previously enqueued + // else the event gets recycled indefinitely + if queueItem.Operation != kubernetes.QueueItemOperationReEnqueue { + queue.EnqueueWithDelay(kubernetes.NewQueueItem(obj, kubernetes.QueueItemOperationReEnqueue), 30*time.Second) + } } objectsFromEvents = append(objectsFromEvents, involvedObject)