Skip to content

Commit

Permalink
feat: set a parent cache
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Oct 21, 2024
1 parent 5fe63f8 commit d2ef799
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 9 deletions.
11 changes: 11 additions & 0 deletions api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type ScrapeContext struct {

temp *TempCache

isIncremental bool

namespace string

jobHistory *models.JobHistory
Expand Down Expand Up @@ -124,6 +126,15 @@ func (ctx ScrapeContext) JobHistory() *models.JobHistory {
return h
}

func (ctx ScrapeContext) AsIncrementalScrape() ScrapeContext {
ctx.isIncremental = true
return ctx
}

func (ctx ScrapeContext) IsIncrementalScrape() bool {
return ctx.isIncremental
}

func (ctx ScrapeContext) ScrapeConfig() *v1.ScrapeConfig {
return ctx.scrapeConfig
}
Expand Down
31 changes: 31 additions & 0 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/lib/pq"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/samber/lo"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand All @@ -40,6 +41,15 @@ import (

const configItemsBulkInsertSize = 200

// parentCache stores child -> parent relationship
// derived from children lookup hooks.
//
// This is to cater to cases where only the parent has the knowledge of its direct child.
// During incremental scrape, the child can look into this cache to find its parent
// (that would have been set in a full scape).

var parentCache = cache.New(time.Hour*24, time.Hour*24)

func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error {
var deletedAt interface{}
if change.CreatedAt != nil && !change.CreatedAt.IsZero() {
Expand Down Expand Up @@ -1035,6 +1045,16 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
return nil, fmt.Errorf("unable to set children: %w", err)
}

if ctx.IsIncrementalScrape() {
// This is to preserve the child-parent hard relationship
// when the child doesn't know about its parent.
for _, c := range allConfigs {
if parentID, ok := parentCache.Get(c.ID); ok {
c.ParentID = lo.ToPtr(parentID.(string))
}
}
}

// We sort the new config items such that parents are always first.
// This avoids foreign key constraint errors.
slices.SortFunc(extractResult.newConfigs, func(a, b *models.ConfigItem) int {
Expand Down Expand Up @@ -1093,6 +1113,16 @@ func setConfigProbableParents(ctx api.ScrapeContext, parentTypeToConfigMap map[c
}

func setParentForChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) error {
var cacheExpiry time.Duration

// Attempt to get a fixed interval from the schedule so we can set the appropriate cache expiry.
if parsedSchedule, err := cron.ParseStandard(ctx.ScrapeConfig().Spec.Schedule); err == nil {
next := parsedSchedule.Next(time.Now())
cacheExpiry = (next.Sub(time.Now())) * 2

Check failure on line 1121 in db/update.go

View workflow job for this annotation

GitHub Actions / lint

S1024: should use time.Until instead of t.Sub(time.Now()) (gosimple)
} else {
cacheExpiry = 2 * time.Hour
}

for _, ci := range allConfigs {
if len(ci.Children) == 0 {
// No action required
Expand All @@ -1114,6 +1144,7 @@ func setParentForChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems)

childRef := allConfigs.GetByID(found.ID)
if childRef != nil {
parentCache.Set(childRef.ID, ci.ID, cacheExpiry)
childRef.ParentID = &ci.ID
}
}
Expand Down
4 changes: 2 additions & 2 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne
ch := _ch.(chan v1.KubernetesEvent)
events, _, _, _ := lo.Buffer(ch, len(ch))

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History)
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName()))
results, err := RunK8IncrementalScraper(cc, config, events)
if err != nil {
Expand Down Expand Up @@ -380,7 +380,7 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube
// a way that no two objects in a batch have the same id.
objs = dedup(objs)

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History)
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))
results, err := RunK8ObjScraper(cc, config, objs)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions scrapers/kubernetes/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ type KubernetesContext struct {
// globalLabels are common labels for any kubernetes resource
globalLabels map[string]string
logSkipped, logExclusions, logNoResourceId bool
isIncremental bool
cluster v1.ScrapeResult
resourceIDMap *ResourceIDMapContainer
exclusionByType map[string]string
exclusionBySeverity map[string]string
}

func newKubernetesContext(ctx api.ScrapeContext, config v1.Kubernetes) *KubernetesContext {
func newKubernetesContext(ctx api.ScrapeContext, isIncremental bool, config v1.Kubernetes) *KubernetesContext {
if isIncremental {
ctx = ctx.AsIncrementalScrape()
}

return &KubernetesContext{
ScrapeContext: ctx,
config: config,
Expand Down
10 changes: 5 additions & 5 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (kubernetes KubernetesScraper) IncrementalScrape(
config v1.Kubernetes,
objects []*unstructured.Unstructured,
) v1.ScrapeResults {
return ExtractResults(newKubernetesContext(ctx, config), objects)
return ExtractResults(newKubernetesContext(ctx, true, config), objects)
}

func (kubernetes KubernetesScraper) IncrementalEventScrape(
Expand Down Expand Up @@ -147,7 +147,7 @@ func (kubernetes KubernetesScraper) IncrementalEventScrape(
return nil
}

return ExtractResults(newKubernetesContext(ctx, config), objects)
return ExtractResults(newKubernetesContext(ctx, true, config), objects)
}

func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResults {
Expand All @@ -166,7 +166,7 @@ func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResul
return results.Errorf(err, "error running ketall")
}

extracted := ExtractResults(newKubernetesContext(ctx, config), objs)
extracted := ExtractResults(newKubernetesContext(ctx, false, config), objs)
results = append(results, extracted...)
}

Expand Down Expand Up @@ -196,7 +196,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
results = append(results, cluster)

ctx.Load(objs)
if ctx.isIncremental {
if ctx.IsIncrementalScrape() {
// On incremental scrape, we do not have all the data in the resource ID map.
// we use it from the cached resource id map.
ctx.resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate(
Expand Down Expand Up @@ -502,7 +502,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
}

results = append(results, changeResults...)
if ctx.isIncremental {
if ctx.IsIncrementalScrape() {
results = append([]v1.ScrapeResult{ctx.cluster}, results...)
}

Expand Down

0 comments on commit d2ef799

Please sign in to comment.