From 460cdfde7f6fbbb6352fac15e0ba602e49ea07b1 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 12 Jul 2024 11:00:38 +0545 Subject: [PATCH] feat: scraper scoped lookup by external ids --- Makefile | 4 +++ api/cache.go | 48 +++++++++++++++++-------------- api/context.go | 4 +-- api/v1/types.go | 19 ++++++------ db/update.go | 30 ++++++++++--------- scrapers/kubernetes/informers.go | 6 ++-- scrapers/kubernetes/kubernetes.go | 8 ++++-- 7 files changed, 63 insertions(+), 56 deletions(-) diff --git a/Makefile b/Makefile index 4a20775a..471f6edb 100644 --- a/Makefile +++ b/Makefile @@ -52,6 +52,10 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and resources: fmt manifests test: manifests generate fmt vet envtest ## Run tests. + $(MAKE) gotest + +.PHONY: gotest +gotest: KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./... -coverprofile cover.out fmt: diff --git a/api/cache.go b/api/cache.go index 311f89ed..0efd2228 100644 --- a/api/cache.go +++ b/api/cache.go @@ -9,21 +9,17 @@ import ( "github.com/flanksource/duty/context" "github.com/google/uuid" "github.com/lib/pq" + "github.com/samber/lo" ) // TempCache is a temporary cache of config items that is used to speed up config item lookups during scrape, when all config items for a scraper are looked up any way type TempCache struct { - ctx context.Context items map[string]models.ConfigItem aliases map[string]string } -func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) { - return t.Find(ext.ConfigType, ext.ExternalID[0]) -} - -func (t TempCache) FindExternalID(ext v1.ExternalID) (string, error) { - if item, err := t.Find(ext.ConfigType, ext.ExternalID[0]); err != nil { +func (t *TempCache) FindExternalID(ctx ScrapeContext, ext v1.ExternalID) (string, error) { + if item, err := t.Find(ctx, ext); err != nil { return "", err } else if item != nil { return item.ID, nil @@ -31,33 +27,42 @@ func (t TempCache) FindExternalID(ext v1.ExternalID) (string, error) { return "", nil } -func (t TempCache) Find(typ, id string) (*models.ConfigItem, error) { - typ = strings.ToLower(typ) +func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.ConfigItem, error) { + configType := strings.ToLower(lookup.ConfigType) + externalID := lookup.ExternalID[0] + scraperID := lo.CoalesceOrEmpty(lookup.ScraperID, string(ctx.ScrapeConfig().GetUID())) - if strings.HasPrefix(typ, "kubernetes::") && uuid.Validate(id) == nil { + if strings.HasPrefix(configType, "kubernetes::") && uuid.Validate(externalID) == nil { // kubernetes external ids are stored are the same as the config ids - return t.Get(id) + return t.Get(ctx, externalID) } + if t.aliases == nil { t.aliases = make(map[string]string) } - if alias, ok := t.aliases[typ+id]; ok { - return t.Get(alias) + if alias, ok := t.aliases[scraperID+configType+externalID]; ok { + return t.Get(ctx, alias) } - result := models.ConfigItem{} - if err := t.ctx.DB().Limit(1).Find(&result, "lower(type) = ? and external_id @> ?", typ, pq.StringArray{id}).Error; err != nil { + var result models.ConfigItem + query := ctx.DB().Limit(1).Where("type = ? and external_id @> ?", lookup.ConfigType, pq.StringArray{externalID}) + if scraperID != "all" && scraperID != "" { + query = query.Where("scraper_id = ?", scraperID) + } + if err := query.Find(&result).Error; err != nil { return nil, err } + if result.ID != "" { t.Insert(result) return &result, nil } + return nil, nil } -func (t TempCache) Insert(item models.ConfigItem) { +func (t *TempCache) Insert(item models.ConfigItem) { if t.aliases == nil { t.aliases = make(map[string]string) } @@ -67,16 +72,16 @@ func (t TempCache) Insert(item models.ConfigItem) { for _, id := range item.ExternalID { if item.Type != nil { - t.aliases[strings.ToLower(*item.Type)+id] = item.ID + t.aliases[lo.FromPtr(item.ScraperID).String()+strings.ToLower(*item.Type)+id] = item.ID } else { - t.aliases[strings.ToLower(id)] = item.ID + t.aliases[lo.FromPtr(item.ScraperID).String()+strings.ToLower(id)] = item.ID } } t.items[strings.ToLower(item.ID)] = item } -func (t TempCache) Get(id string) (*models.ConfigItem, error) { +func (t *TempCache) Get(ctx ScrapeContext, id string) (*models.ConfigItem, error) { id = strings.ToLower(id) if id == "" { return nil, nil @@ -89,7 +94,7 @@ func (t TempCache) Get(id string) (*models.ConfigItem, error) { } result := models.ConfigItem{} - if err := t.ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil { + if err := ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil { return nil, err } if result.ID != "" { @@ -105,12 +110,11 @@ func QueryCache(ctx context.Context, query string, args ...interface{}) (*TempCa return nil, fmt.Errorf("no db configured") } t := TempCache{ - ctx: ctx, items: make(map[string]models.ConfigItem), aliases: make(map[string]string), } items := []models.ConfigItem{} - if err := t.ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil { + if err := ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil { return nil, err } for _, item := range items { diff --git a/api/context.go b/api/context.go index 41494f10..4515651e 100644 --- a/api/context.go +++ b/api/context.go @@ -21,9 +21,7 @@ type ScrapeContext struct { func NewScrapeContext(ctx dutyCtx.Context) ScrapeContext { return ScrapeContext{ Context: ctx.WithKubernetes(KubernetesClient), - temp: &TempCache{ - ctx: ctx, - }, + temp: &TempCache{}, } } diff --git a/api/v1/types.go b/api/v1/types.go index b389e719..95552f28 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -5,9 +5,6 @@ import ( "strings" "github.com/flanksource/config-db/utils" - "github.com/lib/pq" - - "gorm.io/gorm" ) var AllScraperConfigs = map[string]any{ @@ -80,9 +77,17 @@ func (c ScraperSpec) IsDebug() bool { type ExternalID struct { ConfigType string ExternalID []string + + // Scraper id of the config + // If left empty, the scraper id is the requester's scraper id. + // Use `all` to disregard scraper id. + ScraperID string } func (e ExternalID) String() string { + if e.ScraperID != "" { + return fmt.Sprintf("scraper_id=%s type=%s externalids=%s", e.ScraperID, e.ConfigType, strings.Join(e.ExternalID, ",")) + } return fmt.Sprintf("type=%s externalids=%s", e.ConfigType, strings.Join(e.ExternalID, ",")) } @@ -90,14 +95,6 @@ func (e ExternalID) IsEmpty() bool { return e.ConfigType == "" || len(e.ExternalID) == 0 } -func (e ExternalID) CacheKey() string { - return fmt.Sprintf("external_id:%s:%s", e.ConfigType, strings.Join(e.ExternalID, ",")) -} - -func (e ExternalID) WhereClause(db *gorm.DB) *gorm.DB { - return db.Where("lower(type) = ? AND external_id @> ?", strings.ToLower(e.ConfigType), pq.StringArray(e.ExternalID)) -} - type ConfigDeleteReason string var ( diff --git a/db/update.go b/db/update.go index 46c82f59..c5db0506 100644 --- a/db/update.go +++ b/db/update.go @@ -261,7 +261,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C if change.ConfigID == "" && change.GetExternalID().IsEmpty() && ci != nil { change.ConfigID = ci.ID } else if !change.GetExternalID().IsEmpty() { - if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil { + if ci, err := ctx.TempCache().FindExternalID(ctx, change.GetExternalID()); err != nil { return nil, nil, changeSummary, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) } else if ci != "" { change.ConfigID = ci @@ -276,7 +276,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C // Some scrapers can generate changes for config items that don't exist on our db. // Example: Cloudtrail scraper reporting changes for a resource that has been excluded. changeSummary.AddOrphaned(changeResult.ChangeType) - ctx.Logger.V(1).Infof("(type=%s source=%s external_id=%s) change doesn't have an associated config", change.ChangeType, change.Source, change.GetExternalID()) + ctx.Logger.V(2).Infof("change doesn't have an associated config (type=%s source=%s external_id=%s)", change.ChangeType, change.Source, change.GetExternalID()) continue } @@ -292,7 +292,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { analysis := result.AnalysisResult.ToConfigAnalysis() - ciID, err := ctx.TempCache().Find(analysis.ConfigType, analysis.ExternalID) + ciID, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: analysis.ConfigType, ExternalID: []string{analysis.ExternalID}}) if err != nil { return err } @@ -581,13 +581,13 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations if relationship.ConfigID != "" { configID = relationship.ConfigID } else { - configID, err = ctx.TempCache().FindExternalID(relationship.ConfigExternalID) + configID, err = ctx.TempCache().FindExternalID(ctx, relationship.ConfigExternalID) if err != nil { logger.Errorf("error fetching config item(id=%s): %v", relationship.ConfigExternalID, err) continue } if configID == "" { - ctx.Logger.V(2).Infof("unable to form relationship. failed to find the parent config %s for config %s", relationship.ConfigExternalID, cUtils.Coalesce(relationship.RelatedConfigID, relationship.RelatedExternalID.String())) + ctx.Logger.V(2).Infof("unable to form relationship. failed to find the parent config (%s) for config (%s)", relationship.ConfigExternalID, cUtils.Coalesce(relationship.RelatedConfigID, relationship.RelatedExternalID.String())) continue } } @@ -596,13 +596,13 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations if relationship.RelatedConfigID != "" { relatedID = relationship.RelatedConfigID } else { - relatedID, err = ctx.TempCache().FindExternalID(relationship.RelatedExternalID) + relatedID, err = ctx.TempCache().FindExternalID(ctx, relationship.RelatedExternalID) if err != nil { logger.Errorf("error fetching external config item(id=%s): %v", relationship.RelatedExternalID, err) continue } if relatedID == "" { - ctx.Logger.V(2).Infof("unable to form relationship. failed to find related config %s for config %s", relationship.RelatedExternalID, configID) + ctx.Logger.V(2).Infof("unable to form relationship. failed to find related config (%s) for config (%s)", relationship.RelatedExternalID, configID) continue } } @@ -675,11 +675,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime existing := &models.ConfigItem{} if ci.ID != "" { - if existing, err = ctx.TempCache().Get(ci.ID); err != nil { + if existing, err = ctx.TempCache().Get(ctx, ci.ID); err != nil { return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err) } } else { - if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { + if existing, err = ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: *ci.Type, ExternalID: []string{ci.ExternalID[0]}}); err != nil { return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup external id(%s): %w", ci, err) } } @@ -779,7 +779,7 @@ func setConfigParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExt break } - if found, err := ctx.TempCache().Find(parent.Type, parent.ExternalID); err != nil { + if found, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: parent.Type, ExternalID: []string{parent.ExternalID}}); err != nil { return err } else if found != nil { ci.ParentID = &found.ID @@ -813,17 +813,19 @@ func generatePartialTree(ctx api.ScrapeContext, tree graph.Graph[string, string] for _, c := range allConfigs { if c.ParentID == nil { - // We aren't supposed to hit this point, except when an incremental scraper runs before a full scrape - // + // We aren't supposed to hit this point. + // Happens if + // - an incremental scraper runs before a full scrape + // - the full scrape didn't scrape the parent for some reason. // We fail early here than failing on db insert. - return fmt.Errorf("a non root config found without a parent %s", c) + return fmt.Errorf("encountered an unexpected situation: a non-root config found without a parent (%s) (parents' external ids: %v)", c, c.Parents) } if _, found := configIDs[*c.ParentID]; found { continue } - parent, err := ctx.TempCache().Get(*c.ParentID) + parent, err := ctx.TempCache().Get(ctx, *c.ParentID) if err != nil { return fmt.Errorf("unable to get parent(%s): %w", c, err) } diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 57e03b80..32b8817f 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -59,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) + ctx.Logger.V(3).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) buffer <- u }, UpdateFunc: func(oldObj any, newObj any) { @@ -69,7 +69,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) + ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) buffer <- u }, DeleteFunc: func(obj any) { @@ -79,7 +79,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) + ctx.Logger.V(3).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) deleteBuffer <- string(u.GetUID()) }, }) diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index 23c32db8..4106131c 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -124,7 +124,7 @@ func (kubernetes KubernetesScraper) IncrementalScrape(ctx api.ScrapeContext, con ctx.DutyContext().Logger.V(5).Infof("fetching resource namespace=%s name=%s kind=%s apiVersion=%s", resource.Namespace, resource.Name, resource.Kind, resource.APIVersion) obj, err := kclient.Namespace(resource.Namespace).Get(ctx, resource.Name, metav1.GetOptions{}) if err != nil { - ctx.DutyContext().Errorf("failed to get resource (Kind=%s, Name=%s, Namespace=%s): %v", resource.Kind, resource.Name, resource.Namespace, err) + ctx.DutyContext().Logger.Warnf("failed to get resource (Kind=%s, Name=%s, Namespace=%s): %v", resource.Kind, resource.Name, resource.Namespace, err) continue } else if obj != nil { seenObjects[cacheKey] = struct{}{} // mark it as seen so we don't run ketall.KetOne again (in this run) @@ -207,7 +207,7 @@ func getObjectChangeExclusionAnnotations(ctx api.ScrapeContext, id string, exclu // The requested object was not found in the scraped objects. // This happens on incremental scraper. // We query the object from the db to get the annotations; - item, err := ctx.TempCache().Get(id) + item, err := ctx.TempCache().Get(ctx, id) if err != nil { return "", "", err } else if item != nil && item.Labels != nil { @@ -393,8 +393,10 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc if spec["nodeName"] != nil { nodeName = spec["nodeName"].(string) nodeID := resourceIDMap[""]["Node"][nodeName] + nodeExternalID := lo.CoalesceOrEmpty(nodeID, getKubernetesAlias("Node", "", nodeName)) + relationships = append(relationships, v1.RelationshipResult{ - ConfigExternalID: v1.ExternalID{ExternalID: []string{nodeID}, ConfigType: ConfigTypePrefix + "Node"}, + ConfigExternalID: v1.ExternalID{ExternalID: []string{nodeExternalID}, ConfigType: ConfigTypePrefix + "Node"}, RelatedExternalID: v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: ConfigTypePrefix + "Pod"}, Relationship: "NodePod", })