diff --git a/api/context.go b/api/context.go index edac89ea..5059745d 100644 --- a/api/context.go +++ b/api/context.go @@ -18,6 +18,8 @@ type ScrapeContext struct { temp *TempCache + isIncremental bool + namespace string jobHistory *models.JobHistory @@ -124,6 +126,15 @@ func (ctx ScrapeContext) JobHistory() *models.JobHistory { return h } +func (ctx ScrapeContext) AsIncrementalScrape() ScrapeContext { + ctx.isIncremental = true + return ctx +} + +func (ctx ScrapeContext) IsIncrementalScrape() bool { + return ctx.isIncremental +} + func (ctx ScrapeContext) ScrapeConfig() *v1.ScrapeConfig { return ctx.scrapeConfig } diff --git a/db/update.go b/db/update.go index d2aad01d..8a984ba1 100644 --- a/db/update.go +++ b/db/update.go @@ -32,6 +32,7 @@ import ( "github.com/lib/pq" "github.com/patrickmn/go-cache" "github.com/pkg/errors" + "github.com/robfig/cron/v3" "github.com/samber/lo" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -40,6 +41,15 @@ import ( const configItemsBulkInsertSize = 200 +// parentCache stores child -> parent relationship +// derived from children lookup hooks. +// +// This is to cater to cases where only the parent has the knowledge of its direct child. +// During incremental scrape, the child can look into this cache to find its parent +// (that would have been set in a full scape). + +var parentCache = cache.New(time.Hour*24, time.Hour*24) + func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} if change.CreatedAt != nil && !change.CreatedAt.IsZero() { @@ -1035,6 +1045,16 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime return nil, fmt.Errorf("unable to set children: %w", err) } + if ctx.IsIncrementalScrape() { + // This is to preserve the child-parent hard relationship + // when the child doesn't know about its parent. + for _, c := range allConfigs { + if parentID, ok := parentCache.Get(c.ID); ok { + c.ParentID = lo.ToPtr(parentID.(string)) + } + } + } + // We sort the new config items such that parents are always first. // This avoids foreign key constraint errors. slices.SortFunc(extractResult.newConfigs, func(a, b *models.ConfigItem) int { @@ -1093,6 +1113,16 @@ func setConfigProbableParents(ctx api.ScrapeContext, parentTypeToConfigMap map[c } func setParentForChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) error { + var cacheExpiry time.Duration + + // Attempt to get a fixed interval from the schedule so we can set the appropriate cache expiry. + if parsedSchedule, err := cron.ParseStandard(ctx.ScrapeConfig().Spec.Schedule); err == nil { + next := parsedSchedule.Next(time.Now()) + cacheExpiry = (next.Sub(time.Now())) * 2 + } else { + cacheExpiry = 2 * time.Hour + } + for _, ci := range allConfigs { if len(ci.Children) == 0 { // No action required @@ -1114,6 +1144,7 @@ func setParentForChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) childRef := allConfigs.GetByID(found.ID) if childRef != nil { + parentCache.Set(childRef.ID, ci.ID, cacheExpiry) childRef.ParentID = &ci.ID } } diff --git a/scrapers/cron.go b/scrapers/cron.go index 803fb695..e1555159 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -300,7 +300,7 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne ch := _ch.(chan v1.KubernetesEvent) events, _, _, _ := lo.Buffer(ch, len(ch)) - cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName())) results, err := RunK8IncrementalScraper(cc, config, events) if err != nil { @@ -380,7 +380,7 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube // a way that no two objects in a batch have the same id. objs = dedup(objs) - cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName())) results, err := RunK8ObjScraper(cc, config, objs) if err != nil { diff --git a/scrapers/kubernetes/context.go b/scrapers/kubernetes/context.go index 5aa3bd28..e02dec3f 100644 --- a/scrapers/kubernetes/context.go +++ b/scrapers/kubernetes/context.go @@ -22,14 +22,17 @@ type KubernetesContext struct { // globalLabels are common labels for any kubernetes resource globalLabels map[string]string logSkipped, logExclusions, logNoResourceId bool - isIncremental bool cluster v1.ScrapeResult resourceIDMap *ResourceIDMapContainer exclusionByType map[string]string exclusionBySeverity map[string]string } -func newKubernetesContext(ctx api.ScrapeContext, config v1.Kubernetes) *KubernetesContext { +func newKubernetesContext(ctx api.ScrapeContext, isIncremental bool, config v1.Kubernetes) *KubernetesContext { + if isIncremental { + ctx = ctx.AsIncrementalScrape() + } + return &KubernetesContext{ ScrapeContext: ctx, config: config, diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index 11ecc3fb..c1846742 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -60,7 +60,7 @@ func (kubernetes KubernetesScraper) IncrementalScrape( config v1.Kubernetes, objects []*unstructured.Unstructured, ) v1.ScrapeResults { - return ExtractResults(newKubernetesContext(ctx, config), objects) + return ExtractResults(newKubernetesContext(ctx, true, config), objects) } func (kubernetes KubernetesScraper) IncrementalEventScrape( @@ -147,7 +147,7 @@ func (kubernetes KubernetesScraper) IncrementalEventScrape( return nil } - return ExtractResults(newKubernetesContext(ctx, config), objects) + return ExtractResults(newKubernetesContext(ctx, true, config), objects) } func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResults { @@ -166,7 +166,7 @@ func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResul return results.Errorf(err, "error running ketall") } - extracted := ExtractResults(newKubernetesContext(ctx, config), objs) + extracted := ExtractResults(newKubernetesContext(ctx, false, config), objs) results = append(results, extracted...) } @@ -196,7 +196,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v results = append(results, cluster) ctx.Load(objs) - if ctx.isIncremental { + if ctx.IsIncrementalScrape() { // On incremental scrape, we do not have all the data in the resource ID map. // we use it from the cached resource id map. ctx.resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate( @@ -502,7 +502,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v } results = append(results, changeResults...) - if ctx.isIncremental { + if ctx.IsIncrementalScrape() { results = append([]v1.ScrapeResult{ctx.cluster}, results...) }