Skip to content

Commit

Permalink
feat: scraper scoped lookup by external ids
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Jul 12, 2024
1 parent ef1f3ea commit 460cdfd
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 56 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and
resources: fmt manifests

test: manifests generate fmt vet envtest ## Run tests.
$(MAKE) gotest

.PHONY: gotest
gotest:
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./... -coverprofile cover.out

fmt:
Expand Down
48 changes: 26 additions & 22 deletions api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,60 @@ import (
"github.com/flanksource/duty/context"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/samber/lo"
)

// TempCache is a temporary cache of config items that is used to speed up config item lookups during scrape, when all config items for a scraper are looked up any way
type TempCache struct {
ctx context.Context
items map[string]models.ConfigItem
aliases map[string]string
}

func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) {
return t.Find(ext.ConfigType, ext.ExternalID[0])
}

func (t TempCache) FindExternalID(ext v1.ExternalID) (string, error) {
if item, err := t.Find(ext.ConfigType, ext.ExternalID[0]); err != nil {
func (t *TempCache) FindExternalID(ctx ScrapeContext, ext v1.ExternalID) (string, error) {
if item, err := t.Find(ctx, ext); err != nil {
return "", err
} else if item != nil {
return item.ID, nil
}
return "", nil
}

func (t TempCache) Find(typ, id string) (*models.ConfigItem, error) {
typ = strings.ToLower(typ)
func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.ConfigItem, error) {
configType := strings.ToLower(lookup.ConfigType)
externalID := lookup.ExternalID[0]
scraperID := lo.CoalesceOrEmpty(lookup.ScraperID, string(ctx.ScrapeConfig().GetUID()))

if strings.HasPrefix(typ, "kubernetes::") && uuid.Validate(id) == nil {
if strings.HasPrefix(configType, "kubernetes::") && uuid.Validate(externalID) == nil {
// kubernetes external ids are stored are the same as the config ids
return t.Get(id)
return t.Get(ctx, externalID)
}

if t.aliases == nil {
t.aliases = make(map[string]string)
}

if alias, ok := t.aliases[typ+id]; ok {
return t.Get(alias)
if alias, ok := t.aliases[scraperID+configType+externalID]; ok {
return t.Get(ctx, alias)
}

result := models.ConfigItem{}
if err := t.ctx.DB().Limit(1).Find(&result, "lower(type) = ? and external_id @> ?", typ, pq.StringArray{id}).Error; err != nil {
var result models.ConfigItem
query := ctx.DB().Limit(1).Where("type = ? and external_id @> ?", lookup.ConfigType, pq.StringArray{externalID})
if scraperID != "all" && scraperID != "" {
query = query.Where("scraper_id = ?", scraperID)
}
if err := query.Find(&result).Error; err != nil {
return nil, err
}

if result.ID != "" {
t.Insert(result)
return &result, nil
}

return nil, nil
}

func (t TempCache) Insert(item models.ConfigItem) {
func (t *TempCache) Insert(item models.ConfigItem) {
if t.aliases == nil {
t.aliases = make(map[string]string)
}
Expand All @@ -67,16 +72,16 @@ func (t TempCache) Insert(item models.ConfigItem) {

for _, id := range item.ExternalID {
if item.Type != nil {
t.aliases[strings.ToLower(*item.Type)+id] = item.ID
t.aliases[lo.FromPtr(item.ScraperID).String()+strings.ToLower(*item.Type)+id] = item.ID
} else {
t.aliases[strings.ToLower(id)] = item.ID
t.aliases[lo.FromPtr(item.ScraperID).String()+strings.ToLower(id)] = item.ID
}
}

t.items[strings.ToLower(item.ID)] = item
}

func (t TempCache) Get(id string) (*models.ConfigItem, error) {
func (t *TempCache) Get(ctx ScrapeContext, id string) (*models.ConfigItem, error) {
id = strings.ToLower(id)
if id == "" {
return nil, nil
Expand All @@ -89,7 +94,7 @@ func (t TempCache) Get(id string) (*models.ConfigItem, error) {
}

result := models.ConfigItem{}
if err := t.ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil {
if err := ctx.DB().Limit(1).Find(&result, "id = ? ", id).Error; err != nil {
return nil, err
}
if result.ID != "" {
Expand All @@ -105,12 +110,11 @@ func QueryCache(ctx context.Context, query string, args ...interface{}) (*TempCa
return nil, fmt.Errorf("no db configured")
}
t := TempCache{
ctx: ctx,
items: make(map[string]models.ConfigItem),
aliases: make(map[string]string),
}
items := []models.ConfigItem{}
if err := t.ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil {
if err := ctx.DB().Table("config_items").Where(query, args...).Find(&items).Error; err != nil {
return nil, err
}
for _, item := range items {
Expand Down
4 changes: 1 addition & 3 deletions api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ type ScrapeContext struct {
func NewScrapeContext(ctx dutyCtx.Context) ScrapeContext {
return ScrapeContext{
Context: ctx.WithKubernetes(KubernetesClient),
temp: &TempCache{
ctx: ctx,
},
temp: &TempCache{},
}
}

Expand Down
19 changes: 8 additions & 11 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"strings"

"github.com/flanksource/config-db/utils"
"github.com/lib/pq"

"gorm.io/gorm"
)

var AllScraperConfigs = map[string]any{
Expand Down Expand Up @@ -80,24 +77,24 @@ func (c ScraperSpec) IsDebug() bool {
type ExternalID struct {
ConfigType string
ExternalID []string

// Scraper id of the config
// If left empty, the scraper id is the requester's scraper id.
// Use `all` to disregard scraper id.
ScraperID string
}

func (e ExternalID) String() string {
if e.ScraperID != "" {
return fmt.Sprintf("scraper_id=%s type=%s externalids=%s", e.ScraperID, e.ConfigType, strings.Join(e.ExternalID, ","))
}
return fmt.Sprintf("type=%s externalids=%s", e.ConfigType, strings.Join(e.ExternalID, ","))
}

func (e ExternalID) IsEmpty() bool {
return e.ConfigType == "" || len(e.ExternalID) == 0
}

func (e ExternalID) CacheKey() string {
return fmt.Sprintf("external_id:%s:%s", e.ConfigType, strings.Join(e.ExternalID, ","))
}

func (e ExternalID) WhereClause(db *gorm.DB) *gorm.DB {
return db.Where("lower(type) = ? AND external_id @> ?", strings.ToLower(e.ConfigType), pq.StringArray(e.ExternalID))
}

type ConfigDeleteReason string

var (
Expand Down
30 changes: 16 additions & 14 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C
if change.ConfigID == "" && change.GetExternalID().IsEmpty() && ci != nil {
change.ConfigID = ci.ID
} else if !change.GetExternalID().IsEmpty() {
if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil {
if ci, err := ctx.TempCache().FindExternalID(ctx, change.GetExternalID()); err != nil {
return nil, nil, changeSummary, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err)
} else if ci != "" {
change.ConfigID = ci
Expand All @@ -276,7 +276,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C
// Some scrapers can generate changes for config items that don't exist on our db.
// Example: Cloudtrail scraper reporting changes for a resource that has been excluded.
changeSummary.AddOrphaned(changeResult.ChangeType)
ctx.Logger.V(1).Infof("(type=%s source=%s external_id=%s) change doesn't have an associated config", change.ChangeType, change.Source, change.GetExternalID())
ctx.Logger.V(2).Infof("change doesn't have an associated config (type=%s source=%s external_id=%s)", change.ChangeType, change.Source, change.GetExternalID())
continue
}

Expand All @@ -292,7 +292,7 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C

func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {
analysis := result.AnalysisResult.ToConfigAnalysis()
ciID, err := ctx.TempCache().Find(analysis.ConfigType, analysis.ExternalID)
ciID, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: analysis.ConfigType, ExternalID: []string{analysis.ExternalID}})
if err != nil {
return err
}
Expand Down Expand Up @@ -581,13 +581,13 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations
if relationship.ConfigID != "" {
configID = relationship.ConfigID
} else {
configID, err = ctx.TempCache().FindExternalID(relationship.ConfigExternalID)
configID, err = ctx.TempCache().FindExternalID(ctx, relationship.ConfigExternalID)
if err != nil {
logger.Errorf("error fetching config item(id=%s): %v", relationship.ConfigExternalID, err)
continue
}
if configID == "" {
ctx.Logger.V(2).Infof("unable to form relationship. failed to find the parent config %s for config %s", relationship.ConfigExternalID, cUtils.Coalesce(relationship.RelatedConfigID, relationship.RelatedExternalID.String()))
ctx.Logger.V(2).Infof("unable to form relationship. failed to find the parent config (%s) for config (%s)", relationship.ConfigExternalID, cUtils.Coalesce(relationship.RelatedConfigID, relationship.RelatedExternalID.String()))
continue
}
}
Expand All @@ -596,13 +596,13 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations
if relationship.RelatedConfigID != "" {
relatedID = relationship.RelatedConfigID
} else {
relatedID, err = ctx.TempCache().FindExternalID(relationship.RelatedExternalID)
relatedID, err = ctx.TempCache().FindExternalID(ctx, relationship.RelatedExternalID)
if err != nil {
logger.Errorf("error fetching external config item(id=%s): %v", relationship.RelatedExternalID, err)
continue
}
if relatedID == "" {
ctx.Logger.V(2).Infof("unable to form relationship. failed to find related config %s for config %s", relationship.RelatedExternalID, configID)
ctx.Logger.V(2).Infof("unable to form relationship. failed to find related config (%s) for config (%s)", relationship.RelatedExternalID, configID)
continue
}
}
Expand Down Expand Up @@ -675,11 +675,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime

existing := &models.ConfigItem{}
if ci.ID != "" {
if existing, err = ctx.TempCache().Get(ci.ID); err != nil {
if existing, err = ctx.TempCache().Get(ctx, ci.ID); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err)
}
} else {
if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil {
if existing, err = ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: *ci.Type, ExternalID: []string{ci.ExternalID[0]}}); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup external id(%s): %w", ci, err)
}
}
Expand Down Expand Up @@ -779,7 +779,7 @@ func setConfigParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExt
break
}

if found, err := ctx.TempCache().Find(parent.Type, parent.ExternalID); err != nil {
if found, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: parent.Type, ExternalID: []string{parent.ExternalID}}); err != nil {
return err
} else if found != nil {
ci.ParentID = &found.ID
Expand Down Expand Up @@ -813,17 +813,19 @@ func generatePartialTree(ctx api.ScrapeContext, tree graph.Graph[string, string]

for _, c := range allConfigs {
if c.ParentID == nil {
// We aren't supposed to hit this point, except when an incremental scraper runs before a full scrape
//
// We aren't supposed to hit this point.
// Happens if
// - an incremental scraper runs before a full scrape
// - the full scrape didn't scrape the parent for some reason.
// We fail early here than failing on db insert.
return fmt.Errorf("a non root config found without a parent %s", c)
return fmt.Errorf("encountered an unexpected situation: a non-root config found without a parent (%s) (parents' external ids: %v)", c, c.Parents)
}

if _, found := configIDs[*c.ParentID]; found {
continue
}

parent, err := ctx.TempCache().Get(*c.ParentID)
parent, err := ctx.TempCache().Get(ctx, *c.ParentID)
if err != nil {
return fmt.Errorf("unable to get parent(%s): %w", c, err)
}
Expand Down
6 changes: 3 additions & 3 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

ctx.Logger.V(2).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
ctx.Logger.V(3).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
buffer <- u
},
UpdateFunc: func(oldObj any, newObj any) {
Expand All @@ -69,7 +69,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

ctx.Logger.V(2).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
ctx.Logger.V(3).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
buffer <- u
},
DeleteFunc: func(obj any) {
Expand All @@ -79,7 +79,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

ctx.Logger.V(2).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
ctx.Logger.V(3).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
deleteBuffer <- string(u.GetUID())
},
})
Expand Down
8 changes: 5 additions & 3 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (kubernetes KubernetesScraper) IncrementalScrape(ctx api.ScrapeContext, con
ctx.DutyContext().Logger.V(5).Infof("fetching resource namespace=%s name=%s kind=%s apiVersion=%s", resource.Namespace, resource.Name, resource.Kind, resource.APIVersion)
obj, err := kclient.Namespace(resource.Namespace).Get(ctx, resource.Name, metav1.GetOptions{})
if err != nil {
ctx.DutyContext().Errorf("failed to get resource (Kind=%s, Name=%s, Namespace=%s): %v", resource.Kind, resource.Name, resource.Namespace, err)
ctx.DutyContext().Logger.Warnf("failed to get resource (Kind=%s, Name=%s, Namespace=%s): %v", resource.Kind, resource.Name, resource.Namespace, err)
continue
} else if obj != nil {
seenObjects[cacheKey] = struct{}{} // mark it as seen so we don't run ketall.KetOne again (in this run)
Expand Down Expand Up @@ -207,7 +207,7 @@ func getObjectChangeExclusionAnnotations(ctx api.ScrapeContext, id string, exclu
// The requested object was not found in the scraped objects.
// This happens on incremental scraper.
// We query the object from the db to get the annotations;
item, err := ctx.TempCache().Get(id)
item, err := ctx.TempCache().Get(ctx, id)
if err != nil {
return "", "", err
} else if item != nil && item.Labels != nil {
Expand Down Expand Up @@ -393,8 +393,10 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
if spec["nodeName"] != nil {
nodeName = spec["nodeName"].(string)
nodeID := resourceIDMap[""]["Node"][nodeName]
nodeExternalID := lo.CoalesceOrEmpty(nodeID, getKubernetesAlias("Node", "", nodeName))

relationships = append(relationships, v1.RelationshipResult{
ConfigExternalID: v1.ExternalID{ExternalID: []string{nodeID}, ConfigType: ConfigTypePrefix + "Node"},
ConfigExternalID: v1.ExternalID{ExternalID: []string{nodeExternalID}, ConfigType: ConfigTypePrefix + "Node"},
RelatedExternalID: v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: ConfigTypePrefix + "Pod"},
Relationship: "NodePod",
})
Expand Down

0 comments on commit 460cdfd

Please sign in to comment.