Skip to content

Commit

Permalink
feat: scraper scoped lookup by external ids
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jul 12, 2024
1 parent 319e6ea commit 286bdab
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 49 deletions.
46 changes: 24 additions & 22 deletions api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,58 @@ import (
"github.com/flanksource/duty/context"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/samber/lo"
)

// TempCache is a temporary cache of config items that is used to speed up config item lookups during scrape, when all config items for a scraper are looked up any way
type TempCache struct {
ctx context.Context
items map[string]models.ConfigItem
aliases map[string]string
}

func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) {
return t.Find(ext.ConfigType, ext.ExternalID[0])
}

func (t TempCache) FindExternalID(ext v1.ExternalID) (string, error) {
if item, err := t.Find(ext.ConfigType, ext.ExternalID[0]); err != nil {
func (t TempCache) FindExternalID(ctx ScrapeContext, ext v1.ExternalID) (string, error) {
if item, err := t.Find(ctx, ext); err != nil {
return "", err
} else if item != nil {
return item.ID, nil
}
return "", nil
}

func (t TempCache) Find(typ, id string) (*models.ConfigItem, error) {
typ = strings.ToLower(typ)
func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.ConfigItem, error) {
configType := strings.ToLower(lookup.ConfigType)
externalID := lookup.ExternalID[0]
scraperID := lo.CoalesceOrEmpty(lookup.ScraperID, string(ctx.ScrapeConfig().GetUID()))

if strings.HasPrefix(typ, "kubernetes::") && uuid.Validate(id) == nil {
if strings.HasPrefix(configType, "kubernetes::") && uuid.Validate(externalID) == nil {
// kubernetes external ids are stored are the same as the config ids
return t.Get(id)
return t.Get(ctx, externalID)
}

if t.aliases == nil {
t.aliases = make(map[string]string)
}

if alias, ok := t.aliases[typ+id]; ok {
return t.Get(alias)
if alias, ok := t.aliases[scraperID+configType+externalID]; ok {
return t.Get(ctx, alias)
}

result := models.ConfigItem{}
if err := t.ctx.DB().Limit(1).Find(&result, "lower(type) = ? and external_id @> ?", typ, pq.StringArray{id}).Error; err != nil {
var result models.ConfigItem
query := ctx.DB().Limit(1).Where("type = ? and external_id @> ?", lookup.ConfigType, pq.StringArray{externalID})
query = query.Where("scraper_id = ?", scraperID)
if err := query.Find(&result).Error; err != nil {
return nil, err
}

if result.ID != "" {
t.Insert(result)
return &result, nil
}

return nil, nil
}

func (t TempCache) Insert(item models.ConfigItem) {
func (t *TempCache) Insert(item models.ConfigItem) {
if t.aliases == nil {
t.aliases = make(map[string]string)
}
Expand All @@ -67,16 +70,16 @@ func (t TempCache) Insert(item models.ConfigItem) {

for _, id := range item.ExternalID {
if item.Type != nil {
t.aliases[strings.ToLower(*item.Type)+id] = item.ID
t.aliases[item.ScraperID.String()+strings.ToLower(*item.Type)+id] = item.ID
} else {
t.aliases[strings.ToLower(id)] = item.ID
t.aliases[item.ScraperID.String()+strings.ToLower(id)] = item.ID
}
}

t.items[strings.ToLower(item.ID)] = item
}

func (t TempCache) Get(id string) (*models.ConfigItem, error) {
func (t *TempCache) Get(ctx ScrapeContext, id string) (*models.ConfigItem, error) {
id = strings.ToLower(id)
if id == "" {
return nil, nil
Expand All @@ -89,7 +92,7 @@ func (t TempCache) Get(id string) (*models.ConfigItem, error) {
}

result := models.ConfigItem{}
if err := t.ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil {
if err := ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil {
return nil, err
}
if result.ID != "" {
Expand All @@ -105,12 +108,11 @@ func QueryCache(ctx context.Context, query string, args ...interface{}) (*TempCa
return nil, fmt.Errorf("no db configured")
}
t := TempCache{
ctx: ctx,
items: make(map[string]models.ConfigItem),
aliases: make(map[string]string),
}
items := []models.ConfigItem{}
if err := t.ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil {
if err := ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil {
return nil, err
}
for _, item := range items {
Expand Down
4 changes: 1 addition & 3 deletions api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ type ScrapeContext struct {
func NewScrapeContext(ctx dutyCtx.Context) ScrapeContext {
return ScrapeContext{
Context: ctx.WithKubernetes(KubernetesClient),
temp: &TempCache{
ctx: ctx,
},
temp: &TempCache{},
}
}

Expand Down
18 changes: 6 additions & 12 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"strings"

"github.com/flanksource/config-db/utils"
"github.com/lib/pq"

"gorm.io/gorm"
)

var AllScraperConfigs = map[string]any{
Expand Down Expand Up @@ -80,24 +77,21 @@ func (c ScraperSpec) IsDebug() bool {
type ExternalID struct {
ConfigType string
ExternalID []string

// Scraper id of the config
// If left empty, the scraper id is the requester's scraper id.
// Use `all` to disregard scraper id.
ScraperID string
}

func (e ExternalID) String() string {
return fmt.Sprintf("type=%s externalids=%s", e.ConfigType, strings.Join(e.ExternalID, ","))
return fmt.Sprintf("scraper_id=%s type=%s externalids=%s", e.ScraperID, e.ConfigType, strings.Join(e.ExternalID, ","))
}

func (e ExternalID) IsEmpty() bool {
return e.ConfigType == "" || len(e.ExternalID) == 0
}

func (e ExternalID) CacheKey() string {
return fmt.Sprintf("external_id:%s:%s", e.ConfigType, strings.Join(e.ExternalID, ","))
}

func (e ExternalID) WhereClause(db *gorm.DB) *gorm.DB {
return db.Where("lower(type) = ? AND external_id @> ?", strings.ToLower(e.ConfigType), pq.StringArray(e.ExternalID))
}

type ConfigDeleteReason string

var (
Expand Down
16 changes: 8 additions & 8 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C
if change.ConfigID == "" && change.GetExternalID().IsEmpty() && ci != nil {
change.ConfigID = ci.ID
} else if !change.GetExternalID().IsEmpty() {
if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil {
if ci, err := ctx.TempCache().FindExternalID(ctx, change.GetExternalID()); err != nil {
return nil, nil, changeSummary, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err)
} else if ci != "" {
change.ConfigID = ci
Expand Down Expand Up @@ -292,7 +292,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C

func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {
analysis := result.AnalysisResult.ToConfigAnalysis()
ciID, err := ctx.TempCache().Find(analysis.ConfigType, analysis.ExternalID)
ciID, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: analysis.ConfigType, ExternalID: []string{analysis.ExternalID}})
if err != nil {
return err
}
Expand Down Expand Up @@ -581,7 +581,7 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations
if relationship.ConfigID != "" {
configID = relationship.ConfigID
} else {
configID, err = ctx.TempCache().FindExternalID(relationship.ConfigExternalID)
configID, err = ctx.TempCache().FindExternalID(ctx, relationship.ConfigExternalID)
if err != nil {
logger.Errorf("error fetching config item(id=%s): %v", relationship.ConfigExternalID, err)
continue
Expand All @@ -596,7 +596,7 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations
if relationship.RelatedConfigID != "" {
relatedID = relationship.RelatedConfigID
} else {
relatedID, err = ctx.TempCache().FindExternalID(relationship.RelatedExternalID)
relatedID, err = ctx.TempCache().FindExternalID(ctx, relationship.RelatedExternalID)
if err != nil {
logger.Errorf("error fetching external config item(id=%s): %v", relationship.RelatedExternalID, err)
continue
Expand Down Expand Up @@ -675,11 +675,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime

existing := &models.ConfigItem{}
if ci.ID != "" {
if existing, err = ctx.TempCache().Get(ci.ID); err != nil {
if existing, err = ctx.TempCache().Get(ctx, ci.ID); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err)
}
} else {
if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil {
if existing, err = ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: *ci.Type, ExternalID: []string{ci.ExternalID[0]}}); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup external id(%s): %w", ci, err)
}
}
Expand Down Expand Up @@ -779,7 +779,7 @@ func setConfigParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExt
break
}

if found, err := ctx.TempCache().Find(parent.Type, parent.ExternalID); err != nil {
if found, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: parent.Type, ExternalID: []string{parent.ExternalID}}); err != nil {
return err
} else if found != nil {
ci.ParentID = &found.ID
Expand Down Expand Up @@ -823,7 +823,7 @@ func generatePartialTree(ctx api.ScrapeContext, tree graph.Graph[string, string]
continue
}

parent, err := ctx.TempCache().Get(*c.ParentID)
parent, err := ctx.TempCache().Get(ctx, *c.ParentID)
if err != nil {
return fmt.Errorf("unable to get parent(%s): %w", c, err)
}
Expand Down
6 changes: 3 additions & 3 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

ctx.Logger.V(2).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
ctx.Logger.V(3).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
buffer <- u
},
UpdateFunc: func(oldObj any, newObj any) {
Expand All @@ -69,7 +69,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

ctx.Logger.V(2).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
buffer <- u
},
DeleteFunc: func(obj any) {
Expand All @@ -79,7 +79,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

ctx.Logger.V(2).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
ctx.Logger.V(3).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
deleteBuffer <- string(u.GetUID())
},
})
Expand Down
2 changes: 1 addition & 1 deletion scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func getObjectChangeExclusionAnnotations(ctx api.ScrapeContext, id string, exclu
// The requested object was not found in the scraped objects.
// This happens on incremental scraper.
// We query the object from the db to get the annotations;
item, err := ctx.TempCache().Get(id)
item, err := ctx.TempCache().Get(ctx, id)
if err != nil {
return "", "", err
} else if item != nil && item.Labels != nil {
Expand Down

0 comments on commit 286bdab

Please sign in to comment.