Skip to content

Commit

Permalink
fix: prevent infinite re-queuing of events with missing involved objects
Browse files Browse the repository at this point in the history
For events of temporary kubernetes resources (like a pod created by a
canary and immediately deleted), we'll not find the involved ever. The
event would then be requeued again & again.

This resulted in ever increasing queue and high db calls.

* fix: mark the scrape context as incremental
* fix: change labels on temp cache metrics
  • Loading branch information
adityathebe committed Jan 2, 2025
1 parent 58ad407 commit e094558
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
33 changes: 17 additions & 16 deletions api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"fmt"
"strconv"
"strings"

"github.com/flanksource/commons/logger"
Expand Down Expand Up @@ -59,7 +58,7 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi
if _, ok := t.notFound[lookup.Key()]; ok {
ctx.Counter("temp_cache_missing_hit",
"scraper_id", ctx.ScraperID(),
"incremental", strconv.FormatBool(ctx.IsIncrementalScrape()),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
"query_type", "external_id",
).Add(1)
return nil, nil
Expand All @@ -74,17 +73,18 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi
return t.Get(ctx, alias)
}

ctx.Counter("temp_cache_miss",
"scraper_id", ctx.ScraperID(),
"incremental", strconv.FormatBool(ctx.IsIncrementalScrape()),
"query_type", "external_id",
).Add(1)

var result models.ConfigItem
if err := lookup.Find(ctx.DB()).Find(&result).Error; err != nil {
return nil, err
}

ctx.Counter("temp_cache_miss",
"scraper_id", ctx.ScraperID(),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
"query_type", "external_id",
"config_type", result.Type,
).Add(1)

if result.ID == "" {
t.notFound[lookup.Key()] = struct{}{}
return nil, nil
Expand Down Expand Up @@ -130,7 +130,7 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod
if _, notFound := t.notFound[id]; notFound && !optMap[IgnoreNotFound] {
ctx.Counter("temp_cache_missing_hit",
"scraper_id", ctx.ScraperID(),
"incremental", strconv.FormatBool(ctx.IsIncrementalScrape()),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
"query_type", "id",
).Add(1)
return nil, nil
Expand All @@ -139,18 +139,12 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod
if item, ok := t.items[id]; ok {
ctx.Counter("temp_cache_hit",
"scraper_id", ctx.ScraperID(),
"incremental", strconv.FormatBool(ctx.IsIncrementalScrape()),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
"query_type", "id",
).Add(1)
return &item, nil
}

ctx.Counter("temp_cache_miss",
"scraper_id", ctx.ScraperID(),
"incremental", strconv.FormatBool(ctx.IsIncrementalScrape()),
"query_type", "id",
).Add(1)

result := models.ConfigItem{}

if uuid.Validate(id) == nil {
Expand All @@ -167,6 +161,13 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod
}
}

ctx.Counter("temp_cache_miss",
"scraper_id", ctx.ScraperID(),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
"query_type", "id",
"config_type", result.Type,
).Add(1)

if result.ID != "" {
t.Insert(result)
return &result, nil
Expand Down
12 changes: 8 additions & 4 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q

config := config.DeepCopy()

sc := sc.WithScrapeConfig(sc.ScrapeConfig(), plugins...)
sc := sc.WithScrapeConfig(sc.ScrapeConfig(), plugins...).AsIncrementalScrape()
config.BaseScraper = config.BaseScraper.ApplyPlugins(plugins...)

// Use temp cache if it already exists for scraper
Expand Down Expand Up @@ -110,11 +110,15 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
ci, err := sc.TempCache().Get(sc, string(involvedObject.UID), api.IgnoreNotFound)
if err != nil {
return fmt.Errorf("failed to fetch from cache: %w", err)
}
if ci == nil {
queue.EnqueueWithDelay(kubernetes.NewQueueItem(obj, kubernetes.QueueItemOperationReEnqueue), 30*time.Second)
} else if ci == nil {
// Remove event object from objects list
objs = lo.DropRight(objs, 1)

// Re-enqueue the event with a delay but only if it wasn't previously enqueued
// else the event gets recycled indefinitely
if queueItem.Operation != kubernetes.QueueItemOperationReEnqueue {
queue.EnqueueWithDelay(kubernetes.NewQueueItem(obj, kubernetes.QueueItemOperationReEnqueue), 30*time.Second)
}
}

objectsFromEvents = append(objectsFromEvents, involvedObject)
Expand Down

0 comments on commit e094558

Please sign in to comment.