Skip to content

Commit

Permalink
feat: hard parent for Argo (#1093)
Browse files Browse the repository at this point in the history
* feat: hard parent for Argo

* chore: refactor extractConfigsAndChangesFromResults

* chore: go mod tidy

* feat: set a parent cache
  • Loading branch information
adityathebe authored Oct 22, 2024
1 parent 61a49c3 commit 2a4cbd5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 71 deletions.
11 changes: 11 additions & 0 deletions api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type ScrapeContext struct {

temp *TempCache

isIncremental bool

namespace string

jobHistory *models.JobHistory
Expand Down Expand Up @@ -124,6 +126,15 @@ func (ctx ScrapeContext) JobHistory() *models.JobHistory {
return h
}

func (ctx ScrapeContext) AsIncrementalScrape() ScrapeContext {
ctx.isIncremental = true
return ctx
}

func (ctx ScrapeContext) IsIncrementalScrape() bool {
return ctx.isIncremental
}

func (ctx ScrapeContext) ScrapeConfig() *v1.ScrapeConfig {
return ctx.scrapeConfig
}
Expand Down
2 changes: 1 addition & 1 deletion api/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ type ScrapeResult struct {
// List of candidate parents in order of precision.
Parents []ConfigExternalKey `json:"-"`

// List of candidate children in order of precision.
// List of children whose hard parent should be set to this config item.
Children []ConfigExternalKey `json:"-"`

// RelationshipSelectors are used to form relationship of this scraped item with other items.
Expand Down
123 changes: 83 additions & 40 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/lib/pq"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/samber/lo"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand All @@ -40,6 +41,15 @@ import (

const configItemsBulkInsertSize = 200

// parentCache stores child -> parent relationship
// derived from children lookup hooks.
//
// This is to cater to cases where only the parent has the knowledge of its direct child.
// During incremental scrape, the child can look into this cache to find its parent
// (that would have been set in a full scape).

var parentCache = cache.New(time.Hour*24, time.Hour*24)

func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error {
var deletedAt interface{}
if change.CreatedAt != nil && !change.CreatedAt.IsZero() {
Expand Down Expand Up @@ -560,22 +570,22 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum
return summary, fmt.Errorf("unable to get current db time: %w", err)
}

newConfigs, configsToUpdate, newChanges, changesToUpdate, changeSummary, err := extractConfigsAndChangesFromResults(ctx, startTime, results)
extractResult, err := extractConfigsAndChangesFromResults(ctx, startTime, results)
if err != nil {
return summary, fmt.Errorf("failed to extract configs & changes from results: %w", err)
}
for configType, cs := range changeSummary {
for configType, cs := range extractResult.changeSummary {
summary.AddChangeSummary(configType, cs)
}

// NOTE: On duplicate primary key do nothing
// because an incremental scraper might have already inserted the config item.
if err := ctx.DB().
Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "id"}}, DoNothing: true}).
CreateInBatches(newConfigs, configItemsBulkInsertSize).Error; err != nil {
CreateInBatches(extractResult.newConfigs, configItemsBulkInsertSize).Error; err != nil {
return summary, fmt.Errorf("failed to create config items: %w", dutydb.ErrorDetails(err))
}
for _, config := range newConfigs {
for _, config := range extractResult.newConfigs {
summary.AddInserted(config.Type)
}

Expand All @@ -584,7 +594,7 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum
var nonUpdatedConfigs []string

// TODO: Try this in batches as well
for _, updateArg := range configsToUpdate {
for _, updateArg := range extractResult.configsToUpdate {
updated, diffChanges, err := updateCI(ctx, &summary, updateArg.Result, updateArg.New, updateArg.Existing)
if err != nil {
return summary, fmt.Errorf("failed to update config item (%s): %w", updateArg.Existing, err)
Expand All @@ -598,7 +608,7 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum
}

if len(diffChanges) != 0 {
newChanges = append(newChanges, diffChanges...)
extractResult.newChanges = append(extractResult.newChanges, diffChanges...)
}
}

Expand All @@ -607,14 +617,14 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum
}

if ctx.ScrapeConfig().Spec.CRDSync {
allScrapedConfigs := append(newConfigs, lo.Map(configsToUpdate, func(item *updateConfigArgs, _ int) *models.ConfigItem { return item.New })...)
allScrapedConfigs := append(extractResult.newConfigs, lo.Map(extractResult.configsToUpdate, func(item *updateConfigArgs, _ int) *models.ConfigItem { return item.New })...)
if err := syncCRDChanges(ctx, allScrapedConfigs); err != nil {
return summary, fmt.Errorf("failed to sync CRD changes: %w", err)
}
}

dedupWindow := ctx.Properties().Duration("changes.dedup.window", time.Hour)
newChanges, deduped := dedupChanges(dedupWindow, newChanges)
newChanges, deduped := dedupChanges(dedupWindow, extractResult.newChanges)

if err := ctx.DB().CreateInBatches(&newChanges, configItemsBulkInsertSize).Error; err != nil {
return summary, fmt.Errorf("failed to create config changes: %w", dutydb.ErrorDetails(err))
Expand Down Expand Up @@ -648,7 +658,7 @@ func saveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) (v1.ScrapeSum
// Couldn't find a way to do it with gorm.
// Cannot use .Save() because it will try to insert first and then update.
// That'll trigger the .BeforeCreate() hook which doesn't have a ON Conflict clause on the primary key.
for _, changeToUpdate := range changesToUpdate {
for _, changeToUpdate := range extractResult.changesToUpdate {
if err := ctx.DB().Updates(&changeToUpdate).Error; err != nil {
return summary, fmt.Errorf("failed to update config changes: %w", err)
}
Expand Down Expand Up @@ -917,18 +927,25 @@ type configExternalKey struct {
parentType string
}

func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, map[string]v1.ChangeSummary, error) {
var (
allChangeSummary = make(v1.ChangeSummaryByType)

newConfigs = make([]*models.ConfigItem, 0, len(results))
configsToUpdate = make([]*updateConfigArgs, 0, len(results))

newChanges = make([]*models.ConfigChange, 0, len(results))
changesToUpdate = make([]*models.ConfigChange, 0, len(results))
// extractResult holds the extracted configs & changes from the scrape result
type extractResult struct {
newConfigs []*models.ConfigItem
configsToUpdate []*updateConfigArgs
newChanges []*models.ConfigChange
changesToUpdate []*models.ConfigChange
changeSummary v1.ChangeSummaryByType
}

allConfigs = make([]*models.ConfigItem, 0, len(results))
func NewExtractResult() *extractResult {
return &extractResult{
changeSummary: make(v1.ChangeSummaryByType),
}
}

func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) (*extractResult, error) {
var (
extractResult = NewExtractResult()
allConfigs = make([]*models.ConfigItem, 0, len(results))
parentTypeToConfigMap = make(map[configExternalKey]string)
)

Expand All @@ -947,11 +964,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
// doesn't have any id.
ci, err = NewConfigItemFromResult(ctx, result)
if err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to create config item(%s): %w", result, err)
return nil, fmt.Errorf("unable to create config item(%s): %w", result, err)
}

if len(ci.ExternalID) == 0 {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("config item %s has no external id", ci)
return nil, fmt.Errorf("config item %s has no external id", ci)
}

parentExternalKey := configExternalKey{externalID: ci.ExternalID[0], parentType: ci.Type}
Expand All @@ -960,26 +977,26 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
existing := &models.ConfigItem{}
if ci.ID != "" {
if existing, err = ctx.TempCache().Get(ctx, ci.ID); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err)
return nil, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err)
}
} else {
if existing, err = ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: ci.Type, ExternalID: ci.ExternalID[0]}); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to lookup external id(%s): %w", ci, err)
return nil, fmt.Errorf("unable to lookup external id(%s): %w", ci, err)
}
}

allConfigs = append(allConfigs, ci)
if result.Config != nil {
if existing == nil || existing.ID == "" {
newConfigs = append(newConfigs, ci)
extractResult.newConfigs = append(extractResult.newConfigs, ci)
} else {
// In case, we are not able to derive the path & parent_id
// by forming a tree, we need to use the existing one
// otherwise they'll be updated to empty values
ci.ParentID = existing.ParentID
ci.Path = existing.Path

configsToUpdate = append(configsToUpdate, &updateConfigArgs{
extractResult.configsToUpdate = append(extractResult.configsToUpdate, &updateConfigArgs{
Result: result,
Existing: existing,
New: ci,
Expand All @@ -989,7 +1006,7 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
}

if toCreate, toUpdate, changeSummary, err := extractChanges(ctx, &result, ci); err != nil {
return nil, nil, nil, nil, allChangeSummary, err
return nil, err
} else {
if !changeSummary.IsEmpty() {
var configType string
Expand All @@ -1003,11 +1020,11 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
configType = "None"
}

allChangeSummary.Merge(configType, changeSummary)
extractResult.changeSummary.Merge(configType, changeSummary)
}

newChanges = append(newChanges, toCreate...)
changesToUpdate = append(changesToUpdate, toUpdate...)
extractResult.newChanges = append(extractResult.newChanges, toCreate...)
extractResult.changesToUpdate = append(extractResult.changesToUpdate, toUpdate...)
}
}

Expand All @@ -1016,19 +1033,31 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
// So, all the parent lookups will return empty result and no parent will be set.
// This way, we can first look for the parents within the result set.
if err := setConfigProbableParents(ctx, parentTypeToConfigMap, allConfigs); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to set parents: %w", err)
}
if err := setConfigChildren(ctx, allConfigs); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to set children: %w", err)
return nil, fmt.Errorf("unable to set parents: %w", err)
}

if err := setConfigPaths(ctx, allConfigs); err != nil {
return nil, nil, nil, nil, allChangeSummary, fmt.Errorf("unable to set config paths: %w", err)
return nil, fmt.Errorf("unable to set config paths: %w", err)
}

// run this after setting the config path. else whatever the parent is set here will be overwritten by it.
if err := setParentForChildren(ctx, allConfigs); err != nil {
return nil, fmt.Errorf("unable to set children: %w", err)
}

if ctx.IsIncrementalScrape() {
// This is to preserve the child-parent hard relationship
// when the child doesn't know about its parent.
for _, c := range allConfigs {
if parentID, ok := parentCache.Get(c.ID); ok {
c.ParentID = lo.ToPtr(parentID.(string))
}
}
}

// We sort the new config items such that parents are always first.
// This avoids foreign key constraint errors.
slices.SortFunc(newConfigs, func(a, b *models.ConfigItem) int {
slices.SortFunc(extractResult.newConfigs, func(a, b *models.ConfigItem) int {
if len(a.Path) < len(b.Path) {
return -1
}
Expand All @@ -1040,7 +1069,7 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
return 0
})

return newConfigs, configsToUpdate, newChanges, changesToUpdate, allChangeSummary, nil
return extractResult, nil
}

func setConfigProbableParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExternalKey]string, allConfigs []*models.ConfigItem) error {
Expand Down Expand Up @@ -1083,7 +1112,17 @@ func setConfigProbableParents(ctx api.ScrapeContext, parentTypeToConfigMap map[c
return nil
}

func setConfigChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) error {
func setParentForChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) error {
var cacheExpiry time.Duration

// Attempt to get a fixed interval from the schedule so we can set the appropriate cache expiry.
if parsedSchedule, err := cron.ParseStandard(ctx.ScrapeConfig().Spec.Schedule); err == nil {
next := parsedSchedule.Next(time.Now())
cacheExpiry = time.Until(next) * 2
} else {
cacheExpiry = 2 * time.Hour
}

for _, ci := range allConfigs {
if len(ci.Children) == 0 {
// No action required
Expand All @@ -1098,17 +1137,19 @@ func setConfigChildren(ctx api.ScrapeContext, allConfigs models.ConfigItems) err
found, err := ctx.TempCache().Find(ctx, v1.ExternalID{ConfigType: child.Type, ExternalID: child.ExternalID})
if err != nil {
return err
}
if found == nil {
} else if found == nil {
ctx.Logger.Tracef("child:[%s/%s] not found for config [%s]", child.Type, child.ExternalID, ci)
continue
}

childRef := allConfigs.GetByID(found.ID)
if childRef != nil {
parentCache.Set(childRef.ID, ci.ID, cacheExpiry)
childRef.ParentID = &ci.ID
}
}
}

return nil
}

Expand Down Expand Up @@ -1163,16 +1204,18 @@ func setConfigPaths(ctx api.ScrapeContext, allConfigs []*models.ConfigItem) erro

for _, config := range allConfigs {
for i := 0; i < len(config.ProbableParents); i++ {
// If no cylce is detected, we set path and parentID
// If no cycle is detected, we set path and parentID
path, ok := getPath(ctx, parentMap, config.ID)
if ok {
config.Path = path
// Empty path means root config, where parentID should be nil
if path != "" {
config.ParentID = lo.ToPtr(parentMap[config.ID])
}

break
}

// If a cycle is detected we assume the parent is bad and move to the next
// probable parent and redo path computation
parentMap[config.ID] = config.ProbableParents[i]
Expand Down
7 changes: 0 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,8 @@ require (
github.com/asecurityteam/rolling v2.0.4+incompatible // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.29 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.29 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.6.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
Expand Down Expand Up @@ -158,7 +156,6 @@ require (
github.com/hashicorp/hcl/v2 v2.21.0 // indirect
github.com/henvic/httpretty v0.1.3 // indirect
github.com/hirochachacha/go-smb2 v1.1.0 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/itchyny/gojq v0.12.16 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
Expand Down Expand Up @@ -203,11 +200,7 @@ require (
github.com/vadimi/go-http-ntlm v1.0.3 // indirect
github.com/vadimi/go-http-ntlm/v2 v2.4.1 // indirect
github.com/vadimi/go-ntlm v1.2.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
Expand Down
Loading

0 comments on commit 2a4cbd5

Please sign in to comment.