Skip to content

Commit

Permalink
feat: use graphs to determine the config path
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed May 13, 2024
1 parent 698be6e commit 9a1cf43
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 41 deletions.
131 changes: 95 additions & 36 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/aws/smithy-go/ptr"
"github.com/dominikbraun/graph"
jsonpatch "github.com/evanphx/json-patch"
"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
Expand Down Expand Up @@ -320,19 +321,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
return fmt.Errorf("unable to get current db time: %w", err)
}

var (
// Keep note of the all the relationships in each of the results
// so we can create them once the all the configs are saved.
relationshipToForm []v1.RelationshipResult

// resultsWithRelationshipSelectors is a list of scraped results that have
// relationship selectors. These selectors are stored here to be processed
// once the all the scraped results are saved.
resultsWithRelationshipSelectors []v1.ScrapeResult
)

// TODO:: Sort the results so that parents are inserted first

newConfigs, configsToUpdate, newChanges, changesToUpdate, err := extractConfigsAndChangesFromResults(ctx, startTime, results)
if err != nil {
return fmt.Errorf("failed to extract configs & changes from results: %w", err)
Expand All @@ -353,7 +341,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
}
}

if err := ctx.DB().CreateInBatches(newChanges, 200).Error; err != nil {
if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil {
return fmt.Errorf("failed to create config changes: %w", err)
}

Expand All @@ -363,6 +351,16 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
}
}

var (
// Keep note of the all the relationships in each of the results
// so we can create them once the all the configs are saved.
relationshipToForm []v1.RelationshipResult

// resultsWithRelationshipSelectors is a list of scraped results that have
// relationship selectors. These selectors are stored here to be processed
// once the all the scraped results are saved.
resultsWithRelationshipSelectors []v1.ScrapeResult
)
for _, result := range results {
if result.AnalysisResult != nil {
if err := upsertAnalysis(ctx, &result); err != nil {
Expand Down Expand Up @@ -560,27 +558,45 @@ type parentExternalKey struct {
parentType string
}

func isTreeRoot(configType string) bool {
switch configType {
case "AWS::::Account", "Kubernetes::Cluster":
return true
}

return false
}

func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) {
var (
inserts = make([]*models.ConfigItem, 0, len(results))
updates = make([]*updateConfigArgs, 0, len(results))
newConfigs = make([]*models.ConfigItem, 0, len(results))
configsToUpdate = make([]*updateConfigArgs, 0, len(results))

newChanges = make([]*models.ConfigChange, 0, len(results))
changesToUpdate = make([]*models.ConfigChange, 0, len(results))

root string
tree = graph.New(graph.StringHash, graph.Rooted(), graph.PreventCycles())
allConfigs = make([]*models.ConfigItem, 0, len(results))

configToExternalParentMap = make(map[string]parentExternalKey)
parentToConfigIDMap = make(map[parentExternalKey]string)
)

for _, result := range results {
if result.Config == nil {
continue
}

result.LastScrapedTime = &scrapeStartTime
ci, err := NewConfigItemFromResult(ctx, result)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to create config item: %s: %w", result, err)
return nil, nil, nil, nil, fmt.Errorf("unable to create config item(%s): %w", result, err)
}

if isTreeRoot(lo.FromPtr(ci.Type)) {
root = ci.ID
}

if err := tree.AddVertex(ci.ID); err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to add vertex(%s): %w", ci, err)
}

if result.ParentExternalID != "" && result.ParentType != "" {
Expand All @@ -603,22 +619,25 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
existing := &models.ConfigItem{}
if ci.ID != "" {
if existing, err = ctx.TempCache().Get(ci.ID); err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config: %s: %w", ci, err)
return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err)
}
} else {
if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id: %s: %w", ci, err)
return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id(%s): %w", ci, err)
}
}

if existing == nil || existing.ID == "" {
inserts = append(inserts, ci)
} else {
updates = append(updates, &updateConfigArgs{
Result: result,
Existing: existing,
New: ci,
})
allConfigs = append(allConfigs, ci)
if result.Config != nil {
if existing == nil || existing.ID == "" {
newConfigs = append(newConfigs, ci)
} else {
configsToUpdate = append(configsToUpdate, &updateConfigArgs{
Result: result,
Existing: existing,
New: ci,
})
}
}

if toCreate, toUpdate, err := extractChanges(ctx, &result, ci); err != nil {
Expand All @@ -633,8 +652,9 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
// This is because, on the first run, we don't have any configs at all in the DB.
// So, all the parent lookups will return empty result and no parent will be set.
// This way, we can first look for the parents within the result set.
for i := range inserts {
ci := inserts[i]
for i := range allConfigs {
ci := allConfigs[i]

externalParent, ok := configToExternalParentMap[ci.ID]
if !ok {
continue
Expand All @@ -653,12 +673,51 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime
} else if found != nil {
ci.ParentID = &found.ID
} else {
ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID)
ctx.Logger.V(0).Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID)
}
}

// TODO: Sort inserts by path
// i.e. parents should come first.
if root != "" {
// Only work with the Tree if the result set has the root node.
// Incremental scrapers only have partial result set.
if err := setConfigPaths(ctx, tree, root, allConfigs); err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to set config paths: %w", err)
}
}

// We sort the new config items such that parents are always first.
// This avoids foreign key constraint errors.
slices.SortFunc(newConfigs, func(a, b *models.ConfigItem) int {
if len(a.Path) < len(b.Path) {
return -1
}

return inserts, updates, newChanges, changesToUpdate, nil
if len(a.Path) > len(b.Path) {
return 1
}

return 0
})

return newConfigs, configsToUpdate, newChanges, changesToUpdate, nil
}

func setConfigPaths(ctx api.ScrapeContext, tree graph.Graph[string, string], root string, allConfigs []*models.ConfigItem) error {
for _, c := range allConfigs {
if c.ParentID != nil {
if err := tree.AddEdge(*c.ParentID, c.ID); err != nil {
return fmt.Errorf("unable to add edge(%s): %w", c, err)
}
}
}

for _, c := range allConfigs {
if paths, err := graph.ShortestPath(tree, root, c.ID); err != nil {
ctx.Logger.V(3).Infof("unable to get the path for config(%s): %v", c, err)
} else if len(paths) > 0 {
c.Path = strings.Join(paths, ".")
}
}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/dominikbraun/graph v0.23.0 // indirect
github.com/eko/gocache/lib/v4 v4.1.5 // indirect
github.com/eko/gocache/store/go_cache/v4 v4.2.1 // indirect
github.com/evanphx/json-patch/v5 v5.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,8 @@ github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eko/gocache/lib/v4 v4.1.5 h1:CeMQmdIzwBKKLRjk3FCDXzNFsQTyqJ01JLI7Ib0C9r8=
github.com/eko/gocache/lib/v4 v4.1.5/go.mod h1:XaNfCwW8KYW1bRZ/KoHA1TugnnkMz0/gT51NDIu7LSY=
Expand Down
9 changes: 4 additions & 5 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func scheduleScraperJob(sc api.ScrapeContext) error {
Schedule: schedule,
Singleton: true,
JobHistory: true,
RunNow: true, // TODO: remove this
Retention: job.RetentionBalanced,
ResourceID: sc.ScrapeConfig().GetPersistedID().String(),
ResourceType: job.ResourceTypeScraper,
Expand All @@ -170,9 +169,8 @@ func scheduleScraperJob(sc api.ScrapeContext) error {
config.Watch = v1.DefaultWatchKinds
}

// TODO: Uncomment
// go watchKubernetesEventsWithRetry(sc, config)
// go watchKubernetesResourcesWithRetry(sc, config)
go watchKubernetesEventsWithRetry(sc, config)
go watchKubernetesResourcesWithRetry(sc, config)

eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config)
if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil {
Expand Down Expand Up @@ -284,8 +282,9 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube

// NOTE: The resource watcher can return multiple objects for the same NEW resource.
// Example: if a new pod is created, we'll get that pod object multiple times for different events.
// All those resource objects are seen as distinct new config items.
// Hence, we need to use the latest one otherwise saving fails
// as we'll be trying to INSERT multiple config items with the same id.
// as we'll be trying to BATCH INSERT multiple config items with the same id.
//
// In the process, we will lose diff changes though.
// If diff changes are necessary, then we can split up the results in such
Expand Down

0 comments on commit 9a1cf43

Please sign in to comment.