From 286bdab17ad3f676f0114bee476ad32dc355d3c3 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 12 Jul 2024 11:00:38 +0545 Subject: [PATCH] feat: scraper scoped lookup by external ids --- api/cache.go | 46 ++++++++++++++++--------------- api/context.go | 4 +-- api/v1/types.go | 18 ++++-------- db/update.go | 16 +++++------ scrapers/kubernetes/informers.go | 6 ++-- scrapers/kubernetes/kubernetes.go | 2 +- 6 files changed, 43 insertions(+), 49 deletions(-) diff --git a/api/cache.go b/api/cache.go index 311f89ed3..f7d2e3be0 100644 --- a/api/cache.go +++ b/api/cache.go @@ -9,21 +9,17 @@ import ( "github.com/flanksource/duty/context" "github.com/google/uuid" "github.com/lib/pq" + "github.com/samber/lo" ) // TempCache is a temporary cache of config items that is used to speed up config item lookups during scrape, when all config items for a scraper are looked up any way type TempCache struct { - ctx context.Context items map[string]models.ConfigItem aliases map[string]string } -func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) { - return t.Find(ext.ConfigType, ext.ExternalID[0]) -} - -func (t TempCache) FindExternalID(ext v1.ExternalID) (string, error) { - if item, err := t.Find(ext.ConfigType, ext.ExternalID[0]); err != nil { +func (t TempCache) FindExternalID(ctx ScrapeContext, ext v1.ExternalID) (string, error) { + if item, err := t.Find(ctx, ext); err != nil { return "", err } else if item != nil { return item.ID, nil @@ -31,33 +27,40 @@ func (t TempCache) FindExternalID(ext v1.ExternalID) (string, error) { return "", nil } -func (t TempCache) Find(typ, id string) (*models.ConfigItem, error) { - typ = strings.ToLower(typ) +func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.ConfigItem, error) { + configType := strings.ToLower(lookup.ConfigType) + externalID := lookup.ExternalID[0] + scraperID := lo.CoalesceOrEmpty(lookup.ScraperID, string(ctx.ScrapeConfig().GetUID())) - if strings.HasPrefix(typ, "kubernetes::") && uuid.Validate(id) == nil { + if strings.HasPrefix(configType, "kubernetes::") && uuid.Validate(externalID) == nil { // kubernetes external ids are stored are the same as the config ids - return t.Get(id) + return t.Get(ctx, externalID) } + if t.aliases == nil { t.aliases = make(map[string]string) } - if alias, ok := t.aliases[typ+id]; ok { - return t.Get(alias) + if alias, ok := t.aliases[scraperID+configType+externalID]; ok { + return t.Get(ctx, alias) } - result := models.ConfigItem{} - if err := t.ctx.DB().Limit(1).Find(&result, "lower(type) = ? and external_id @> ?", typ, pq.StringArray{id}).Error; err != nil { + var result models.ConfigItem + query := ctx.DB().Limit(1).Where("type = ? and external_id @> ?", lookup.ConfigType, pq.StringArray{externalID}) + query = query.Where("scraper_id = ?", scraperID) + if err := query.Find(&result).Error; err != nil { return nil, err } + if result.ID != "" { t.Insert(result) return &result, nil } + return nil, nil } -func (t TempCache) Insert(item models.ConfigItem) { +func (t *TempCache) Insert(item models.ConfigItem) { if t.aliases == nil { t.aliases = make(map[string]string) } @@ -67,16 +70,16 @@ func (t TempCache) Insert(item models.ConfigItem) { for _, id := range item.ExternalID { if item.Type != nil { - t.aliases[strings.ToLower(*item.Type)+id] = item.ID + t.aliases[item.ScraperID.String()+strings.ToLower(*item.Type)+id] = item.ID } else { - t.aliases[strings.ToLower(id)] = item.ID + t.aliases[item.ScraperID.String()+strings.ToLower(id)] = item.ID } } t.items[strings.ToLower(item.ID)] = item } -func (t TempCache) Get(id string) (*models.ConfigItem, error) { +func (t *TempCache) Get(ctx ScrapeContext, id string) (*models.ConfigItem, error) { id = strings.ToLower(id) if id == "" { return nil, nil @@ -89,7 +92,7 @@ func (t TempCache) Get(id string) (*models.ConfigItem, error) { } result := models.ConfigItem{} - if err := t.ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil { + if err := ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil { return nil, err } if result.ID != "" { @@ -105,12 +108,11 @@ func QueryCache(ctx context.Context, query string, args ...interface{}) (*TempCa return nil, fmt.Errorf("no db configured") } t := TempCache{ - ctx: ctx, items: make(map[string]models.ConfigItem), aliases: make(map[string]string), } items := []models.ConfigItem{} - if err := t.ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil { + if err := ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil { return nil, err } for _, item := range items { diff --git a/api/context.go b/api/context.go index 41494f104..4515651e3 100644 --- a/api/context.go +++ b/api/context.go @@ -21,9 +21,7 @@ type ScrapeContext struct { func NewScrapeContext(ctx dutyCtx.Context) ScrapeContext { return ScrapeContext{ Context: ctx.WithKubernetes(KubernetesClient), - temp: &TempCache{ - ctx: ctx, - }, + temp: &TempCache{}, } } diff --git a/api/v1/types.go b/api/v1/types.go index b389e719e..e709ac36d 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -5,9 +5,6 @@ import ( "strings" "github.com/flanksource/config-db/utils" - "github.com/lib/pq" - - "gorm.io/gorm" ) var AllScraperConfigs = map[string]any{ @@ -80,24 +77,21 @@ func (c ScraperSpec) IsDebug() bool { type ExternalID struct { ConfigType string ExternalID []string + + // Scraper id of the config + // If left empty, the scraper id is the requester's scraper id. + // Use `all` to disregard scraper id. + ScraperID string } func (e ExternalID) String() string { - return fmt.Sprintf("type=%s externalids=%s", e.ConfigType, strings.Join(e.ExternalID, ",")) + return fmt.Sprintf("scraper_id=%s type=%s externalids=%s", e.ScraperID, e.ConfigType, strings.Join(e.ExternalID, ",")) } func (e ExternalID) IsEmpty() bool { return e.ConfigType == "" || len(e.ExternalID) == 0 } -func (e ExternalID) CacheKey() string { - return fmt.Sprintf("external_id:%s:%s", e.ConfigType, strings.Join(e.ExternalID, ",")) -} - -func (e ExternalID) WhereClause(db *gorm.DB) *gorm.DB { - return db.Where("lower(type) = ? AND external_id @> ?", strings.ToLower(e.ConfigType), pq.StringArray(e.ExternalID)) -} - type ConfigDeleteReason string var ( diff --git a/db/update.go b/db/update.go index 46c82f59c..5de96f7c3 100644 --- a/db/update.go +++ b/db/update.go @@ -261,7 +261,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C if change.ConfigID == "" && change.GetExternalID().IsEmpty() && ci != nil { change.ConfigID = ci.ID } else if !change.GetExternalID().IsEmpty() { - if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil { + if ci, err := ctx.TempCache().FindExternalID(ctx, change.GetExternalID()); err != nil { return nil, nil, changeSummary, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) } else if ci != "" { change.ConfigID = ci @@ -292,7 +292,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { analysis := result.AnalysisResult.ToConfigAnalysis() - ciID, err := ctx.TempCache().Find(analysis.ConfigType, analysis.ExternalID) + ciID, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: analysis.ConfigType, ExternalID: []string{analysis.ExternalID}}) if err != nil { return err } @@ -581,7 +581,7 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations if relationship.ConfigID != "" { configID = relationship.ConfigID } else { - configID, err = ctx.TempCache().FindExternalID(relationship.ConfigExternalID) + configID, err = ctx.TempCache().FindExternalID(ctx, relationship.ConfigExternalID) if err != nil { logger.Errorf("error fetching config item(id=%s): %v", relationship.ConfigExternalID, err) continue @@ -596,7 +596,7 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations if relationship.RelatedConfigID != "" { relatedID = relationship.RelatedConfigID } else { - relatedID, err = ctx.TempCache().FindExternalID(relationship.RelatedExternalID) + relatedID, err = ctx.TempCache().FindExternalID(ctx, relationship.RelatedExternalID) if err != nil { logger.Errorf("error fetching external config item(id=%s): %v", relationship.RelatedExternalID, err) continue @@ -675,11 +675,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime existing := &models.ConfigItem{} if ci.ID != "" { - if existing, err = ctx.TempCache().Get(ci.ID); err != nil { + if existing, err = ctx.TempCache().Get(ctx, ci.ID); err != nil { return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err) } } else { - if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { + if existing, err = ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: *ci.Type, ExternalID: []string{ci.ExternalID[0]}}); err != nil { return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup external id(%s): %w", ci, err) } } @@ -779,7 +779,7 @@ func setConfigParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExt break } - if found, err := ctx.TempCache().Find(parent.Type, parent.ExternalID); err != nil { + if found, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: parent.Type, ExternalID: []string{parent.ExternalID}}); err != nil { return err } else if found != nil { ci.ParentID = &found.ID @@ -823,7 +823,7 @@ func generatePartialTree(ctx api.ScrapeContext, tree graph.Graph[string, string] continue } - parent, err := ctx.TempCache().Get(*c.ParentID) + parent, err := ctx.TempCache().Get(ctx, *c.ParentID) if err != nil { return fmt.Errorf("unable to get parent(%s): %w", c, err) } diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 57e03b800..32b8817f1 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -59,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) + ctx.Logger.V(3).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) buffer <- u }, UpdateFunc: func(oldObj any, newObj any) { @@ -69,7 +69,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) + ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) buffer <- u }, DeleteFunc: func(obj any) { @@ -79,7 +79,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) + ctx.Logger.V(3).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) deleteBuffer <- string(u.GetUID()) }, }) diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index 23c32db8b..54be7f3d0 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -207,7 +207,7 @@ func getObjectChangeExclusionAnnotations(ctx api.ScrapeContext, id string, exclu // The requested object was not found in the scraped objects. // This happens on incremental scraper. // We query the object from the db to get the annotations; - item, err := ctx.TempCache().Get(id) + item, err := ctx.TempCache().Get(ctx, id) if err != nil { return "", "", err } else if item != nil && item.Labels != nil {