Skip to content

Commit

Permalink
fix: handle concurrent access on tempcache
Browse files Browse the repository at this point in the history
this happens when a incremental scrape runs along with a full scrape
  • Loading branch information
adityathebe authored and moshloop committed Jan 9, 2025
1 parent 461ff36 commit 4f710e3
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"fmt"
"strings"
"sync"

"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
Expand All @@ -19,25 +20,22 @@ func init() {

// TempCache is a temporary cache of config items that is used to speed up config item lookups during scrape, when all config items for a scraper are looked up any way
type TempCache struct {
items map[string]models.ConfigItem
aliases map[string]string
notFound map[string]struct{}
// map[string]models.ConfigItem
items sync.Map
aliases sync.Map
notFound sync.Map
scraperID string
}

func NewTempCache() *TempCache {
return &TempCache{
items: make(map[string]models.ConfigItem),
aliases: make(map[string]string),
notFound: make(map[string]struct{}),
}
return &TempCache{}
}

func (t *TempCache) SetScraperID(id string) {
t.scraperID = id
}

func (t TempCache) GetScraperID() string {
func (t *TempCache) GetScraperID() string {
return t.scraperID
}

Expand All @@ -55,7 +53,7 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi
lookup.ScraperID = ctx.ScraperID()
}

if _, ok := t.notFound[lookup.Key()]; ok {
if _, ok := t.notFound.Load(lookup.Key()); ok {
ctx.Counter("temp_cache_missing_hit",
"scraper_id", ctx.ScraperID(),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
Expand All @@ -69,8 +67,8 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi
return t.Get(ctx, uid)
}

if alias, ok := t.aliases[lookup.Key()]; ok {
return t.Get(ctx, alias)
if alias, ok := t.aliases.Load(lookup.Key()); ok {
return t.Get(ctx, alias.(string))
}

var result models.ConfigItem
Expand All @@ -86,7 +84,7 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi
).Add(1)

if result.ID == "" {
t.notFound[lookup.Key()] = struct{}{}
t.notFound.Store(lookup.Key(), struct{}{})
return nil, nil
}

Expand All @@ -101,21 +99,19 @@ func (t *TempCache) Insert(item models.ConfigItem) {
}
for _, extID := range item.ExternalID {
key := v1.ExternalID{ConfigType: item.Type, ExternalID: extID, ScraperID: scraperID}.Key()
t.aliases[key] = item.ID
t.aliases.Store(key, item.ID)

// Remove from nonFound cache
delete(t.notFound, key)
t.notFound.Delete(key)
}

t.items[strings.ToLower(item.ID)] = item
delete(t.notFound, strings.ToLower(item.ID))
t.items.Store(strings.ToLower(item.ID), item)
t.notFound.Delete(strings.ToLower(item.ID))
}

type CacheOption string

var (
IgnoreNotFound CacheOption = "IgnoreNotFound"
)
var IgnoreNotFound CacheOption = "IgnoreNotFound"

func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*models.ConfigItem, error) {
id = strings.ToLower(id)
Expand All @@ -127,7 +123,7 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod
return opt, true
})

if _, notFound := t.notFound[id]; notFound && !optMap[IgnoreNotFound] {
if _, notFound := t.notFound.Load(id); notFound && !optMap[IgnoreNotFound] {
ctx.Counter("temp_cache_missing_hit",
"scraper_id", ctx.ScraperID(),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
Expand All @@ -136,13 +132,15 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod
return nil, nil
}

if item, ok := t.items[id]; ok {
if item, ok := t.items.Load(id); ok {
ctx.Counter("temp_cache_hit",
"scraper_id", ctx.ScraperID(),
"scrape_type", lo.Ternary(ctx.IsIncrementalScrape(), "incremental", "full"),
"query_type", "id",
).Add(1)
return &item, nil

config := item.(models.ConfigItem)
return &config, nil
}

result := models.ConfigItem{}
Expand Down Expand Up @@ -172,7 +170,7 @@ func (t *TempCache) Get(ctx ScrapeContext, id string, opts ...CacheOption) (*mod
t.Insert(result)
return &result, nil
} else {
t.notFound[id] = struct{}{}
t.notFound.Store(id, struct{}{})
}

return nil, nil
Expand Down

0 comments on commit 4f710e3

Please sign in to comment.