diff --git a/api/context.go b/api/context.go index edac89ea..5059745d 100644 --- a/api/context.go +++ b/api/context.go @@ -18,6 +18,8 @@ type ScrapeContext struct { temp *TempCache + isIncremental bool + namespace string jobHistory *models.JobHistory @@ -124,6 +126,15 @@ func (ctx ScrapeContext) JobHistory() *models.JobHistory { return h } +func (ctx ScrapeContext) AsIncrementalScrape() ScrapeContext { + ctx.isIncremental = true + return ctx +} + +func (ctx ScrapeContext) IsIncrementalScrape() bool { + return ctx.isIncremental +} + func (ctx ScrapeContext) ScrapeConfig() *v1.ScrapeConfig { return ctx.scrapeConfig } diff --git a/api/v1/interface.go b/api/v1/interface.go index c039627d..5d94a443 100644 --- a/api/v1/interface.go +++ b/api/v1/interface.go @@ -602,7 +602,7 @@ type ScrapeResult struct { // List of candidate parents in order of precision. Parents []ConfigExternalKey `json:"-"` - // List of candidate children in order of precision. + // List of children whose hard parent should be set to this config item. Children []ConfigExternalKey `json:"-"` // RelationshipSelectors are used to form relationship of this scraped item with other items. diff --git a/db/update.go b/db/update.go index 40e26daa..0fc453ce 100644 --- a/db/update.go +++ b/db/update.go @@ -32,6 +32,7 @@ import ( "github.com/lib/pq" "github.com/patrickmn/go-cache" "github.com/pkg/errors" + "github.com/robfig/cron/v3" "github.com/samber/lo" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -40,6 +41,15 @@ import ( const configItemsBulkInsertSize = 200 +// parentCache stores child -> parent relationship +// derived from children lookup hooks. +// +// This is to cater to cases where only the parent has the knowledge of its direct child. +// During incremental scrape, the child can look into this cache to find its parent +// (that would have been set in a full scape). + +var parentCache = cache.New(time.Hour*24, time.Hour*24) + func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} if change.CreatedAt != nil && !change.CreatedAt.IsZero() { @@ -560,11 +570,11 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum return summary, fmt.Errorf("unable to get current db time: %w", err) } - newConfigs, configsToUpdate, newChanges, changesToUpdate, changeSummary, err := extractConfigsAndChangesFromResults(ctx, startTime, results) + extractResult, err := extractConfigsAndChangesFromResults(ctx, startTime, results) if err != nil { return summary, fmt.Errorf("failed to extract configs & changes from results: %w", err) } - for configType, cs := range changeSummary { + for configType, cs := range extractResult.changeSummary { summary.AddChangeSummary(configType, cs) } @@ -572,10 +582,10 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum // because an incremental scraper might have already inserted the config item. if err := ctx.DB(). Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "id"}}, DoNothing: true}). - CreateInBatches(newConfigs, configItemsBulkInsertSize).Error; err != nil { + CreateInBatches(extractResult.newConfigs, configItemsBulkInsertSize).Error; err != nil { return summary, fmt.Errorf("failed to create config items: %w", dutydb.ErrorDetails(err)) } - for _, config := range newConfigs { + for _, config := range extractResult.newConfigs { summary.AddInserted(config.Type) } @@ -584,7 +594,7 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum var nonUpdatedConfigs []string // TODO: Try this in batches as well - for _, updateArg := range configsToUpdate { + for _, updateArg := range extractResult.configsToUpdate { updated, diffChanges, err := updateCI(ctx, &summary, updateArg.Result, updateArg.New, updateArg.Existing) if err != nil { return summary, fmt.Errorf("failed to update config item (%s): %w", updateArg.Existing, err) @@ -598,7 +608,7 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum } if len(diffChanges) != 0 { - newChanges = append(newChanges, diffChanges...) + extractResult.newChanges = append(extractResult.newChanges, diffChanges...) } } @@ -607,14 +617,14 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum } if ctx.ScrapeConfig().Spec.CRDSync { - allScrapedConfigs := append(newConfigs, lo.Map(configsToUpdate, func(item *updateConfigArgs, _ int) *models.ConfigItem { return item.New })...) + allScrapedConfigs := append(extractResult.newConfigs, lo.Map(extractResult.configsToUpdate, func(item *updateConfigArgs, _ int) *models.ConfigItem { return item.New })...) if err := syncCRDChanges(ctx, allScrapedConfigs); err != nil { return summary, fmt.Errorf("failed to sync CRD changes: %w", err) } } dedupWindow := ctx.Properties().Duration("changes.dedup.window", time.Hour) - newChanges, deduped := dedupChanges(dedupWindow, newChanges) + newChanges, deduped := dedupChanges(dedupWindow, extractResult.newChanges) if err := ctx.DB().CreateInBatches(&newChanges, configItemsBulkInsertSize).Error; err != nil { return summary, fmt.Errorf("failed to create config changes: %w", dutydb.ErrorDetails(err)) @@ -648,7 +658,7 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum // Couldn't find a way to do it with gorm. // Cannot use .Save() because it will try to insert first and then update. // That'll trigger the .BeforeCreate() hook which doesn't have a ON Conflict clause on the primary key. - for _, changeToUpdate := range changesToUpdate { + for _, changeToUpdate := range extractResult.changesToUpdate { if err := ctx.DB().Updates(&changeToUpdate).Error; err != nil { return summary, fmt.Errorf("failed to update config changes: %w", err) } @@ -917,18 +927,25 @@ type configExternalKey struct { parentType string } -func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, map[string]v1.ChangeSummary, error) { - var ( - allChangeSummary = make(v1.ChangeSummaryByType) - - newConfigs = make([]*models.ConfigItem, 0, len(results)) - configsToUpdate = make([]*updateConfigArgs, 0, len(results)) - - newChanges = make([]*models.ConfigChange, 0, len(results)) - changesToUpdate = make([]*models.ConfigChange, 0, len(results)) +// extractResult holds the extracted configs & changes from the scrape result +type extractResult struct { + newConfigs []*models.ConfigItem + configsToUpdate []*updateConfigArgs + newChanges []*models.ConfigChange + changesToUpdate []*models.ConfigChange + changeSummary v1.ChangeSummaryByType +} - allConfigs = make([]*models.ConfigItem, 0, len(results)) +func NewExtractResult() *extractResult { + return &extractResult{ + changeSummary: make(v1.ChangeSummaryByType), + } +} +func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) (*extractResult, error) { + var ( + extractResult = NewExtractResult() + allConfigs = make([]*models.ConfigItem, 0, len(results)) parentTypeToConfigMap = make(map[configExternalKey]string) ) @@ -947,11 +964,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime // doesn't have any id. ci, err = NewConfigItemFromResult(ctx, result) if err != nil { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to create config item(%s): %w", result, err) + return nil, fmt.Errorf("unable to create config item(%s): %w", result, err) } if len(ci.ExternalID) == 0 { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("config item %s has no external id", ci) + return nil, fmt.Errorf("config item %s has no external id", ci) } parentExternalKey := configExternalKey{externalID: ci.ExternalID[0], parentType: ci.Type} @@ -960,18 +977,18 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime existing := &models.ConfigItem{} if ci.ID != "" { if existing, err = ctx.TempCache().Get(ctx, ci.ID); err != nil { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err) + return nil, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err) } } else { if existing, err = ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: ci.Type, ExternalID: ci.ExternalID[0]}); err != nil { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup external id(%s): %w", ci, err) + return nil, fmt.Errorf("unable to lookup external id(%s): %w", ci, err) } } allConfigs = append(allConfigs, ci) if result.Config != nil { if existing == nil || existing.ID == "" { - newConfigs = append(newConfigs, ci) + extractResult.newConfigs = append(extractResult.newConfigs, ci) } else { // In case, we are not able to derive the path & parent_id // by forming a tree, we need to use the existing one @@ -979,7 +996,7 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime ci.ParentID = existing.ParentID ci.Path = existing.Path - configsToUpdate = append(configsToUpdate, &updateConfigArgs{ + extractResult.configsToUpdate = append(extractResult.configsToUpdate, &updateConfigArgs{ Result: result, Existing: existing, New: ci, @@ -989,7 +1006,7 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime } if toCreate, toUpdate, changeSummary, err := extractChanges(ctx, &result, ci); err != nil { - return nil, nil, nil, nil, allChangeSummary, err + return nil, err } else { if !changeSummary.IsEmpty() { var configType string @@ -1003,11 +1020,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime configType = "None" } - allChangeSummary.Merge(configType, changeSummary) + extractResult.changeSummary.Merge(configType, changeSummary) } - newChanges = append(newChanges, toCreate...) - changesToUpdate = append(changesToUpdate, toUpdate...) + extractResult.newChanges = append(extractResult.newChanges, toCreate...) + extractResult.changesToUpdate = append(extractResult.changesToUpdate, toUpdate...) } } @@ -1016,19 +1033,31 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime // So, all the parent lookups will return empty result and no parent will be set. // This way, we can first look for the parents within the result set. if err := setConfigProbableParents(ctx, parentTypeToConfigMap, allConfigs); err != nil { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to set parents: %w", err) - } - if err := setConfigChildren(ctx, allConfigs); err != nil { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to set children: %w", err) + return nil, fmt.Errorf("unable to set parents: %w", err) } if err := setConfigPaths(ctx, allConfigs); err != nil { - return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to set config paths: %w", err) + return nil, fmt.Errorf("unable to set config paths: %w", err) + } + + // run this after setting the config path. else whatever the parent is set here will be overwritten by it. + if err := setParentForChildren(ctx, allConfigs); err != nil { + return nil, fmt.Errorf("unable to set children: %w", err) + } + + if ctx.IsIncrementalScrape() { + // This is to preserve the child-parent hard relationship + // when the child doesn't know about its parent. + for _, c := range allConfigs { + if parentID, ok := parentCache.Get(c.ID); ok { + c.ParentID = lo.ToPtr(parentID.(string)) + } + } } // We sort the new config items such that parents are always first. // This avoids foreign key constraint errors. - slices.SortFunc(newConfigs, func(a, b *models.ConfigItem) int { + slices.SortFunc(extractResult.newConfigs, func(a, b *models.ConfigItem) int { if len(a.Path) < len(b.Path) { return -1 } @@ -1040,7 +1069,7 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime return 0 }) - return newConfigs, configsToUpdate, newChanges, changesToUpdate, allChangeSummary, nil + return extractResult, nil } func setConfigProbableParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExternalKey]string, allConfigs []*models.ConfigItem) error { @@ -1083,7 +1112,17 @@ func setConfigProbableParents(ctx api.ScrapeContext, parentTypeToConfigMap map[c return nil } -func setConfigChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) error { +func setParentForChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) error { + var cacheExpiry time.Duration + + // Attempt to get a fixed interval from the schedule so we can set the appropriate cache expiry. + if parsedSchedule, err := cron.ParseStandard(ctx.ScrapeConfig().Spec.Schedule); err == nil { + next := parsedSchedule.Next(time.Now()) + cacheExpiry = time.Until(next) * 2 + } else { + cacheExpiry = 2 * time.Hour + } + for _, ci := range allConfigs { if len(ci.Children) == 0 { // No action required @@ -1098,17 +1137,19 @@ func setConfigChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) err found, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: child.Type, ExternalID: child.ExternalID}) if err != nil { return err - } - if found == nil { + } else if found == nil { ctx.Logger.Tracef("child:[%s/%s] not found for config [%s]", child.Type, child.ExternalID, ci) continue } + childRef := allConfigs.GetByID(found.ID) if childRef != nil { + parentCache.Set(childRef.ID, ci.ID, cacheExpiry) childRef.ParentID = &ci.ID } } } + return nil } @@ -1163,7 +1204,7 @@ func setConfigPaths(ctx api.ScrapeContext, allConfigs []*models.ConfigItem) erro for _, config := range allConfigs { for i := 0; i < len(config.ProbableParents); i++ { - // If no cylce is detected, we set path and parentID + // If no cycle is detected, we set path and parentID path, ok := getPath(ctx, parentMap, config.ID) if ok { config.Path = path @@ -1171,8 +1212,10 @@ func setConfigPaths(ctx api.ScrapeContext, allConfigs []*models.ConfigItem) erro if path != "" { config.ParentID = lo.ToPtr(parentMap[config.ID]) } + break } + // If a cycle is detected we assume the parent is bad and move to the next // probable parent and redo path computation parentMap[config.ID] = config.ProbableParents[i] diff --git a/go.mod b/go.mod index 2aea17d6..fd63c621 100644 --- a/go.mod +++ b/go.mod @@ -115,10 +115,8 @@ require ( github.com/asecurityteam/rolling v2.0.4+incompatible // indirect github.com/aws/aws-sdk-go-v2/config v1.27.29 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.29 // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.6.1 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudflare/circl v1.3.7 // indirect @@ -158,7 +156,6 @@ require ( github.com/hashicorp/hcl/v2 v2.21.0 // indirect github.com/henvic/httpretty v0.1.3 // indirect github.com/hirochachacha/go-smb2 v1.1.0 // indirect - github.com/invopop/jsonschema v0.12.0 // indirect github.com/itchyny/gojq v0.12.16 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect @@ -203,11 +200,7 @@ require ( github.com/vadimi/go-http-ntlm v1.0.3 // indirect github.com/vadimi/go-http-ntlm/v2 v2.4.1 // indirect github.com/vadimi/go-ntlm v1.2.1 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect - github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect - github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect - github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect diff --git a/go.sum b/go.sum index 9f35f7bc..d1f5bde1 100644 --- a/go.sum +++ b/go.sum @@ -815,8 +815,6 @@ github.com/aws/aws-sdk-go-v2/service/support v1.24.3 h1:Bbesu6YZvEYACyydELMwUTYY github.com/aws/aws-sdk-go-v2/service/support v1.24.3/go.mod h1:NvXUhACskXZ2tiXzECpC/97xKzyY7/Wcc1ug5rla7kY= github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -829,8 +827,6 @@ github.com/bmatcuk/doublestar/v4 v4.6.1 h1:FH9SifrbvJhnlQpztAx++wlkk70QBf0iBWDwN github.com/bmatcuk/doublestar/v4 v4.6.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -1211,8 +1207,6 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI= -github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/itchyny/gojq v0.12.13/go.mod h1:JzwzAqenfhrPUuwbmEz3nu3JQmFLlQTQMUcOdnu/Sf4= github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= @@ -1590,16 +1584,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= -github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= -github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= -github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xhit/go-str2duration v1.2.0/go.mod h1:3cPSlfZlUHVlneIVfePFWcJZsuwf+P1v2SRTV4cUmp4= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo= diff --git a/scrapers/cron.go b/scrapers/cron.go index 803fb695..e1555159 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -300,7 +300,7 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne ch := _ch.(chan v1.KubernetesEvent) events, _, _, _ := lo.Buffer(ch, len(ch)) - cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("%s/%s", ctx.GetNamespace(), ctx.GetName())) results, err := RunK8IncrementalScraper(cc, config, events) if err != nil { @@ -380,7 +380,7 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube // a way that no two objects in a batch have the same id. objs = dedup(objs) - cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape() cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName())) results, err := RunK8ObjScraper(cc, config, objs) if err != nil { diff --git a/scrapers/kubernetes/context.go b/scrapers/kubernetes/context.go index 5aa3bd28..e02dec3f 100644 --- a/scrapers/kubernetes/context.go +++ b/scrapers/kubernetes/context.go @@ -22,14 +22,17 @@ type KubernetesContext struct { // globalLabels are common labels for any kubernetes resource globalLabels map[string]string logSkipped, logExclusions, logNoResourceId bool - isIncremental bool cluster v1.ScrapeResult resourceIDMap *ResourceIDMapContainer exclusionByType map[string]string exclusionBySeverity map[string]string } -func newKubernetesContext(ctx api.ScrapeContext, config v1.Kubernetes) *KubernetesContext { +func newKubernetesContext(ctx api.ScrapeContext, isIncremental bool, config v1.Kubernetes) *KubernetesContext { + if isIncremental { + ctx = ctx.AsIncrementalScrape() + } + return &KubernetesContext{ ScrapeContext: ctx, config: config, diff --git a/scrapers/kubernetes/kubernetes.go b/scrapers/kubernetes/kubernetes.go index 11ecc3fb..c1846742 100644 --- a/scrapers/kubernetes/kubernetes.go +++ b/scrapers/kubernetes/kubernetes.go @@ -60,7 +60,7 @@ func (kubernetes KubernetesScraper) IncrementalScrape( config v1.Kubernetes, objects []*unstructured.Unstructured, ) v1.ScrapeResults { - return ExtractResults(newKubernetesContext(ctx, config), objects) + return ExtractResults(newKubernetesContext(ctx, true, config), objects) } func (kubernetes KubernetesScraper) IncrementalEventScrape( @@ -147,7 +147,7 @@ func (kubernetes KubernetesScraper) IncrementalEventScrape( return nil } - return ExtractResults(newKubernetesContext(ctx, config), objects) + return ExtractResults(newKubernetesContext(ctx, true, config), objects) } func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResults { @@ -166,7 +166,7 @@ func (kubernetes KubernetesScraper) Scrape(ctx api.ScrapeContext) v1.ScrapeResul return results.Errorf(err, "error running ketall") } - extracted := ExtractResults(newKubernetesContext(ctx, config), objs) + extracted := ExtractResults(newKubernetesContext(ctx, false, config), objs) results = append(results, extracted...) } @@ -196,7 +196,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v results = append(results, cluster) ctx.Load(objs) - if ctx.isIncremental { + if ctx.IsIncrementalScrape() { // On incremental scrape, we do not have all the data in the resource ID map. // we use it from the cached resource id map. ctx.resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate( @@ -502,7 +502,7 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v } results = append(results, changeResults...) - if ctx.isIncremental { + if ctx.IsIncrementalScrape() { results = append([]v1.ScrapeResult{ctx.cluster}, results...) }