From 6f39bd3b2ace9430ccc284599bfb1b041aabad2a Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 10 May 2024 17:36:29 +0545 Subject: [PATCH 1/4] chore: improve errors when saving config results --- db/update.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/db/update.go b/db/update.go index fb1c4aba..f6815f65 100644 --- a/db/update.go +++ b/db/update.go @@ -109,6 +109,7 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem if len(ci.ExternalID) == 0 { return nil, false, fmt.Errorf("config item %s has no external id", ci) } + if ci.ID != "" { if err := uuid.Validate(ci.ID); err == nil { if existing, err = ctx.TempCache().Get(ci.ID); err != nil { @@ -123,7 +124,7 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem if existing == nil || existing.ID == "" { if err := ctx.DB().Clauses(clause.OnConflict{UpdateAll: true}).Create(ci).Error; err != nil { - return nil, false, err + return nil, false, fmt.Errorf("failed to create config: %w", err) } return ci, false, nil @@ -272,7 +273,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if changeResult.Action == v1.Delete { if err := deleteChangeHandler(ctx, changeResult); err != nil { - return err + return fmt.Errorf("failed to delete config from change: %w", err) } } @@ -294,7 +295,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf change.ConfigID = ci.ID } else if !change.GetExternalID().IsEmpty() { if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil { - return err + return fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) } else if ci != "" { change.ConfigID = ci } else if ci == "" { @@ -305,11 +306,11 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if changeResult.UpdateExisting { if err := db.Save(change).Error; err != nil { - return err + return fmt.Errorf("failed to update config change: %w", err) } } else { if err := db.Create(change).Error; err != nil { - return err + return fmt.Errorf("failed to create config change: %w", err) } } } @@ -373,6 +374,8 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { resultsWithRelationshipSelectors []v1.ScrapeResult ) + // TODO:: Sort the results so that parents are inserted first + var toTouch []string for _, result := range results { result.LastScrapedTime = &startTime @@ -381,7 +384,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { if result.Config != nil { var updated bool if ci, updated, err = updateCI(ctx, result); err != nil { - return err + return fmt.Errorf("failed to save config item (%s): %w", result, err) } else if !updated { toTouch = append(toTouch, ci.ID) } @@ -389,12 +392,12 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { if result.AnalysisResult != nil { if err := upsertAnalysis(ctx, &result); err != nil { - return err + return fmt.Errorf("failed to analysis (%s): %w", result, err) } } if err := saveChanges(ctx, &result, ci); err != nil { - return err + return fmt.Errorf("failed to changes (%s): %w", result, err) } relationshipToForm = append(relationshipToForm, result.RelationshipResults...) @@ -431,7 +434,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { if err := ctx.DB().Table("config_items"). Where("id in (?)", toTouch[i:end]). Update("last_scraped_time", gorm.Expr("NOW()")).Error; err != nil { - return err + return fmt.Errorf("error updating last scraped time: %w", err) } } } From 43a9d231106bd0016afd627f7b3ce105f686956d Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Sun, 12 May 2024 13:46:43 +0545 Subject: [PATCH 2/4] refactor: batch saving [skip ci] --- db/config.go | 10 -- db/update.go | 268 +++++++++++++++++++++++++++++------------------ scrapers/cron.go | 35 ++++++- 3 files changed, 201 insertions(+), 112 deletions(-) diff --git a/db/config.go b/db/config.go index bb05abd2..d2ead428 100644 --- a/db/config.go +++ b/db/config.go @@ -180,16 +180,6 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo ci.DeleteReason = result.DeleteReason } - if result.ParentExternalID != "" && result.ParentType != "" { - if found, err := ctx.TempCache().Find(result.ParentType, result.ParentExternalID); err != nil { - return nil, err - } else if found != nil { - ci.ParentID = &found.ID - } else { - ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, result.ParentType, result.ParentExternalID) - } - } - return ci, nil } diff --git a/db/update.go b/db/update.go index f6815f65..0d505620 100644 --- a/db/update.go +++ b/db/update.go @@ -30,6 +30,8 @@ import ( "gorm.io/gorm/clause" ) +const configItemsBulkInsertSize = 200 + func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} if change.CreatedAt != nil && !change.CreatedAt.IsZero() { @@ -96,55 +98,10 @@ func mapEqual(a, b map[string]any) bool { return true } -func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem, bool, error) { - db := ctx.DutyContext().DB() - ci, err := NewConfigItemFromResult(ctx, result) - if err != nil { - return nil, false, errors.Wrapf(err, "unable to create config item: %s", result) - } - - ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() - existing := &models.ConfigItem{} - - if len(ci.ExternalID) == 0 { - return nil, false, fmt.Errorf("config item %s has no external id", ci) - } - - if ci.ID != "" { - if err := uuid.Validate(ci.ID); err == nil { - if existing, err = ctx.TempCache().Get(ci.ID); err != nil { - return nil, false, errors.Wrapf(err, "unable to lookup existing config: %s", ci) - } - } - } else { - if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { - return nil, false, errors.Wrapf(err, "unable to lookup external id: %s", ci) - } - } - - if existing == nil || existing.ID == "" { - if err := ctx.DB().Clauses(clause.OnConflict{UpdateAll: true}).Create(ci).Error; err != nil { - return nil, false, fmt.Errorf("failed to create config: %w", err) - } - - return ci, false, nil - } - +func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult, ci, existing *models.ConfigItem) ([]*models.ConfigChange, error) { ci.ID = existing.ID - - if ci.ParentID != nil { - if parent, err := ctx.TempCache().Get(*ci.ParentID); err != nil { - return nil, false, errors.Wrapf(err, "unable to find parent %s", *ci.ParentID) - } else if parent == nil { - return nil, false, fmt.Errorf("parent %s not found", *ci.ParentID) - } else if parent.Path != "" { - ci.Path = parent.Path + "." + ci.ID - } else { - ci.Path = parent.ID + "." + ci.ID - } - } - updates := make(map[string]interface{}) + changes := make([]*models.ConfigChange, 0) // In case a resource was marked as deleted but is un-deleted now // we set an update flag as gorm ignores nil pointers @@ -159,9 +116,12 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem } else if changeResult != nil { ctx.Logger.V(3).Infof("[%s/%s] detected changes", *ci.Type, ci.ExternalID[0]) result.Changes = []v1.ChangeResult{*changeResult} - if err := saveChanges(ctx, &result, ci); err != nil { - return nil, false, fmt.Errorf("[%s] failed to save %d changes: %w", ci, len(result.Changes), err) + if newChanges, _, err := extractChanges(ctx, &result, ci); err != nil { + return nil, err + } else { + changes = append(changes, newChanges...) } + updates["config"] = *ci.Config updates["updated_at"] = gorm.Expr("NOW()") } @@ -223,16 +183,15 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem } if len(updates) == 0 { - return ci, false, nil + return changes, nil } updates["last_scraped_time"] = gorm.Expr("NOW()") - - if err := db.Model(ci).Updates(updates).Error; err != nil { - return nil, false, errors.Wrapf(err, "unable to update config item: %s", ci) + if err := ctx.DutyContext().DB().Model(ci).Updates(updates).Error; err != nil { + return nil, errors.Wrapf(err, "unable to update config item: %s", ci) } - return ci, true, nil + return changes, nil } func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) (bool, error) { @@ -254,11 +213,13 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) return false, nil } -func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) error { - changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) - - db := ctx.DB() +func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { + var ( + newOnes = []*models.ConfigChange{} + updates = []*models.ConfigChange{} + ) + changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) for _, changeResult := range result.Changes { if changeResult.Action == v1.Ignore { continue @@ -273,7 +234,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if changeResult.Action == v1.Delete { if err := deleteChangeHandler(ctx, changeResult); err != nil { - return fmt.Errorf("failed to delete config from change: %w", err) + return nil, nil, fmt.Errorf("failed to delete config from change: %w", err) } } @@ -282,7 +243,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if change.CreatedBy != nil { person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy)) if err != nil { - return fmt.Errorf("error finding person by email: %w", err) + return nil, nil, fmt.Errorf("error finding person by email: %w", err) } else if person != nil { change.CreatedBy = ptr.String(person.ID.String()) } else { @@ -295,7 +256,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf change.ConfigID = ci.ID } else if !change.GetExternalID().IsEmpty() { if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil { - return fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) + return nil, nil, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) } else if ci != "" { change.ConfigID = ci } else if ci == "" { @@ -305,17 +266,13 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf } if changeResult.UpdateExisting { - if err := db.Save(change).Error; err != nil { - return fmt.Errorf("failed to update config change: %w", err) - } + updates = append(updates, change) } else { - if err := db.Create(change).Error; err != nil { - return fmt.Errorf("failed to create config change: %w", err) - } + newOnes = append(newOnes, change) } } - return nil + return newOnes, updates, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { @@ -376,32 +333,44 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { // TODO:: Sort the results so that parents are inserted first - var toTouch []string - for _, result := range results { - result.LastScrapedTime = &startTime - - var ci *models.ConfigItem - if result.Config != nil { - var updated bool - if ci, updated, err = updateCI(ctx, result); err != nil { - return fmt.Errorf("failed to save config item (%s): %w", result, err) - } else if !updated { - toTouch = append(toTouch, ci.ID) - } + newConfigs, configsToUpdate, newChanges, changesToUpdate, err := extractConfigsAndChangesFromResults(ctx, startTime, results) + if err != nil { + return fmt.Errorf("failed to extract configs & changes from results: %w", err) + } + ctx.Logger.V(2).Infof("%d new configs, %d configs to update, %d new changes & %d changes to update", + len(newConfigs), len(configsToUpdate), len(newChanges), len(changesToUpdate)) + + if err := ctx.DB().CreateInBatches(newConfigs, configItemsBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create config items: %w", err) + } + + // TODO: Try this in batches as well + for _, config := range configsToUpdate { + if diffChanges, err := updateCI(ctx, config.Result, config.New, config.Existing); err != nil { + return fmt.Errorf("failed to update config item (%s): %w", config.Existing, err) + } else if len(diffChanges) != 0 { + newChanges = append(newChanges, diffChanges...) } + } + if err := ctx.DB().CreateInBatches(newChanges, 200).Error; err != nil { + return fmt.Errorf("failed to create config changes: %w", err) + } + + if len(changesToUpdate) != 0 { + if err := ctx.DB().Save(changesToUpdate).Error; err != nil { + return fmt.Errorf("failed to update config changes: %w", err) + } + } + + for _, result := range results { if result.AnalysisResult != nil { if err := upsertAnalysis(ctx, &result); err != nil { return fmt.Errorf("failed to analysis (%s): %w", result, err) } } - if err := saveChanges(ctx, &result, ci); err != nil { - return fmt.Errorf("failed to changes (%s): %w", result, err) - } - relationshipToForm = append(relationshipToForm, result.RelationshipResults...) - if len(result.RelationshipSelectors) != 0 { resultsWithRelationshipSelectors = append(resultsWithRelationshipSelectors, result) } @@ -424,21 +393,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } - if len(toTouch) > 0 { - for i := 0; i < len(toTouch); i = i + 5000 { - end := i + 5000 - if end > len(toTouch) { - end = len(toTouch) - } - - if err := ctx.DB().Table("config_items"). - Where("id in (?)", toTouch[i:end]). - Update("last_scraped_time", gorm.Expr("NOW()")).Error; err != nil { - return fmt.Errorf("error updating last scraped time: %w", err) - } - } - } - ctx.Logger.V(3).Infof("saved %d results.", len(results)) return nil } @@ -594,3 +548,117 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations return UpdateConfigRelatonships(ctx, configItemRelationships) } + +type updateConfigArgs struct { + Result v1.ScrapeResult + Existing *models.ConfigItem + New *models.ConfigItem +} + +type parentExternalKey struct { + externalID string + parentType string +} + +func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) { + var ( + inserts = make([]*models.ConfigItem, 0, len(results)) + updates = make([]*updateConfigArgs, 0, len(results)) + + newChanges = make([]*models.ConfigChange, 0, len(results)) + changesToUpdate = make([]*models.ConfigChange, 0, len(results)) + + configToExternalParentMap = make(map[string]parentExternalKey) + parentToConfigIDMap = make(map[parentExternalKey]string) + ) + + for _, result := range results { + if result.Config == nil { + continue + } + + result.LastScrapedTime = &scrapeStartTime + ci, err := NewConfigItemFromResult(ctx, result) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to create config item: %s: %w", result, err) + } + + if result.ParentExternalID != "" && result.ParentType != "" { + configToExternalParentMap[ci.ID] = parentExternalKey{ + externalID: result.ParentExternalID, + parentType: result.ParentType, + } + } + + parentToConfigIDMap[parentExternalKey{ + externalID: ci.ExternalID[0], + parentType: lo.FromPtr(ci.Type), + }] = ci.ID + + ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() + if len(ci.ExternalID) == 0 { + return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci) + } + + existing := &models.ConfigItem{} + if ci.ID != "" { + if existing, err = ctx.TempCache().Get(ci.ID); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config: %s: %w", ci, err) + } + } else { + if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id: %s: %w", ci, err) + } + } + + if existing == nil || existing.ID == "" { + inserts = append(inserts, ci) + } else { + updates = append(updates, &updateConfigArgs{ + Result: result, + Existing: existing, + New: ci, + }) + } + + if toCreate, toUpdate, err := extractChanges(ctx, &result, ci); err != nil { + return nil, nil, nil, nil, err + } else { + newChanges = append(newChanges, toCreate...) + changesToUpdate = append(changesToUpdate, toUpdate...) + } + } + + // Calculate the parents only after we have all the config items. + // This is because, on the first run, we don't have any configs at all in the DB. + // So, all the parent lookups will return empty result and no parent will be set. + // This way, we can first look for the parents within the result set. + for i := range inserts { + ci := inserts[i] + externalParent, ok := configToExternalParentMap[ci.ID] + if !ok { + continue + } + + if parentID, found := parentToConfigIDMap[parentExternalKey{ + externalID: externalParent.externalID, + parentType: externalParent.parentType, + }]; found { + ci.ParentID = &parentID + continue + } + + if found, err := ctx.TempCache().Find(externalParent.parentType, externalParent.externalID); err != nil { + return nil, nil, nil, nil, err + } else if found != nil { + ci.ParentID = &found.ID + } else { + ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID) + } + } + + // TODO: Sort inserts by path + // i.e. parents should come first. + + return inserts, updates, newChanges, changesToUpdate, nil +} diff --git a/scrapers/cron.go b/scrapers/cron.go index db120390..df363edc 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -17,6 +17,8 @@ import ( "github.com/robfig/cron/v3" "github.com/samber/lo" "github.com/sethvargo/go-retry" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" ) var ( @@ -142,6 +144,7 @@ func scheduleScraperJob(sc api.ScrapeContext) error { Schedule: schedule, Singleton: true, JobHistory: true, + RunNow: true, // TODO: remove this Retention: job.RetentionBalanced, ResourceID: sc.ScrapeConfig().GetPersistedID().String(), ResourceType: job.ResourceTypeScraper, @@ -167,8 +170,9 @@ func scheduleScraperJob(sc api.ScrapeContext) error { config.Watch = v1.DefaultWatchKinds } - go watchKubernetesEventsWithRetry(sc, config) - go watchKubernetesResourcesWithRetry(sc, config) + // TODO: Uncomment + // go watchKubernetesEventsWithRetry(sc, config) + // go watchKubernetesResourcesWithRetry(sc, config) eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config) if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { @@ -239,6 +243,23 @@ func consumeKubernetesWatchResourcesJobKey(id string) string { return id + "-consume-kubernetes-watch-resources" } +func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured { + var output []*unstructured.Unstructured + var seen = make(map[types.UID]struct{}) + + // Iterate in reverse, cuz we want the latest + for i := len(objs) - 1; i >= 0; i-- { + if _, ok := seen[objs[i].GetUID()]; ok { + continue + } + + seen[objs[i].GetUID()] = struct{}{} + output = append(output, objs[i]) + } + + return output +} + // ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events // for the given config of the scrapeconfig. func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job { @@ -261,6 +282,16 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } objs, _, _, _ := lo.Buffer(ch, len(ch)) + // NOTE: The resource watcher can return multiple objects for the same NEW resource. + // Example: if a new pod is created, we'll get that pod object multiple times for different events. + // Hence, we need to use the latest one otherwise saving fails + // as we'll be trying to INSERT multiple config items with the same id. + // + // In the process, we will lose diff changes though. + // If diff changes are necessary, then we can split up the results in such + // a way that no two objects in a batch have the same id. + objs = dedup(objs) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) results, err := RunK8ObjScraper(cc, config, objs) if err != nil { From f3fb2eaab788a2c6242bf62a089ea9b051524592 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 13 May 2024 08:04:47 +0545 Subject: [PATCH 3/4] feat: use graphs to determine the config path --- db/update.go | 131 ++++++++++++++++++++++++++++++++++------------- go.mod | 1 + go.sum | 2 + scrapers/cron.go | 9 ++-- 4 files changed, 102 insertions(+), 41 deletions(-) diff --git a/db/update.go b/db/update.go index 0d505620..33207abe 100644 --- a/db/update.go +++ b/db/update.go @@ -9,6 +9,7 @@ import ( "time" "github.com/aws/smithy-go/ptr" + "github.com/dominikbraun/graph" jsonpatch "github.com/evanphx/json-patch" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -320,19 +321,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("unable to get current db time: %w", err) } - var ( - // Keep note of the all the relationships in each of the results - // so we can create them once the all the configs are saved. - relationshipToForm []v1.RelationshipResult - - // resultsWithRelationshipSelectors is a list of scraped results that have - // relationship selectors. These selectors are stored here to be processed - // once the all the scraped results are saved. - resultsWithRelationshipSelectors []v1.ScrapeResult - ) - - // TODO:: Sort the results so that parents are inserted first - newConfigs, configsToUpdate, newChanges, changesToUpdate, err := extractConfigsAndChangesFromResults(ctx, startTime, results) if err != nil { return fmt.Errorf("failed to extract configs & changes from results: %w", err) @@ -353,7 +341,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } - if err := ctx.DB().CreateInBatches(newChanges, 200).Error; err != nil { + if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { return fmt.Errorf("failed to create config changes: %w", err) } @@ -363,6 +351,16 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } + var ( + // Keep note of the all the relationships in each of the results + // so we can create them once the all the configs are saved. + relationshipToForm []v1.RelationshipResult + + // resultsWithRelationshipSelectors is a list of scraped results that have + // relationship selectors. These selectors are stored here to be processed + // once the all the scraped results are saved. + resultsWithRelationshipSelectors []v1.ScrapeResult + ) for _, result := range results { if result.AnalysisResult != nil { if err := upsertAnalysis(ctx, &result); err != nil { @@ -560,27 +558,45 @@ type parentExternalKey struct { parentType string } +func isTreeRoot(configType string) bool { + switch configType { + case "AWS::::Account", "Kubernetes::Cluster": + return true + } + + return false +} + func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) { var ( - inserts = make([]*models.ConfigItem, 0, len(results)) - updates = make([]*updateConfigArgs, 0, len(results)) + newConfigs = make([]*models.ConfigItem, 0, len(results)) + configsToUpdate = make([]*updateConfigArgs, 0, len(results)) newChanges = make([]*models.ConfigChange, 0, len(results)) changesToUpdate = make([]*models.ConfigChange, 0, len(results)) + root string + tree = graph.New(graph.StringHash, graph.Rooted(), graph.PreventCycles()) + allConfigs = make([]*models.ConfigItem, 0, len(results)) + configToExternalParentMap = make(map[string]parentExternalKey) parentToConfigIDMap = make(map[parentExternalKey]string) ) for _, result := range results { - if result.Config == nil { - continue - } result.LastScrapedTime = &scrapeStartTime ci, err := NewConfigItemFromResult(ctx, result) if err != nil { - return nil, nil, nil, nil, fmt.Errorf("unable to create config item: %s: %w", result, err) + return nil, nil, nil, nil, fmt.Errorf("unable to create config item(%s): %w", result, err) + } + + if isTreeRoot(lo.FromPtr(ci.Type)) { + root = ci.ID + } + + if err := tree.AddVertex(ci.ID); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to add vertex(%s): %w", ci, err) } if result.ParentExternalID != "" && result.ParentType != "" { @@ -603,22 +619,25 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime existing := &models.ConfigItem{} if ci.ID != "" { if existing, err = ctx.TempCache().Get(ci.ID); err != nil { - return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config: %s: %w", ci, err) + return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err) } } else { if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { - return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id: %s: %w", ci, err) + return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id(%s): %w", ci, err) } } - if existing == nil || existing.ID == "" { - inserts = append(inserts, ci) - } else { - updates = append(updates, &updateConfigArgs{ - Result: result, - Existing: existing, - New: ci, - }) + allConfigs = append(allConfigs, ci) + if result.Config != nil { + if existing == nil || existing.ID == "" { + newConfigs = append(newConfigs, ci) + } else { + configsToUpdate = append(configsToUpdate, &updateConfigArgs{ + Result: result, + Existing: existing, + New: ci, + }) + } } if toCreate, toUpdate, err := extractChanges(ctx, &result, ci); err != nil { @@ -633,8 +652,9 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime // This is because, on the first run, we don't have any configs at all in the DB. // So, all the parent lookups will return empty result and no parent will be set. // This way, we can first look for the parents within the result set. - for i := range inserts { - ci := inserts[i] + for i := range allConfigs { + ci := allConfigs[i] + externalParent, ok := configToExternalParentMap[ci.ID] if !ok { continue @@ -653,12 +673,51 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime } else if found != nil { ci.ParentID = &found.ID } else { - ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID) + ctx.Logger.V(0).Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID) } } - // TODO: Sort inserts by path - // i.e. parents should come first. + if root != "" { + // Only work with the Tree if the result set has the root node. + // Incremental scrapers only have partial result set. + if err := setConfigPaths(ctx, tree, root, allConfigs); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to set config paths: %w", err) + } + } + + // We sort the new config items such that parents are always first. + // This avoids foreign key constraint errors. + slices.SortFunc(newConfigs, func(a, b *models.ConfigItem) int { + if len(a.Path) < len(b.Path) { + return -1 + } - return inserts, updates, newChanges, changesToUpdate, nil + if len(a.Path) > len(b.Path) { + return 1 + } + + return 0 + }) + + return newConfigs, configsToUpdate, newChanges, changesToUpdate, nil +} + +func setConfigPaths(ctx api.ScrapeContext, tree graph.Graph[string, string], root string, allConfigs []*models.ConfigItem) error { + for _, c := range allConfigs { + if c.ParentID != nil { + if err := tree.AddEdge(*c.ParentID, c.ID); err != nil { + return fmt.Errorf("unable to add edge(%s): %w", c, err) + } + } + } + + for _, c := range allConfigs { + if paths, err := graph.ShortestPath(tree, root, c.ID); err != nil { + ctx.Logger.V(3).Infof("unable to get the path for config(%s): %v", c, err) + } else if len(paths) > 0 { + c.Path = strings.Join(paths, ".") + } + } + + return nil } diff --git a/go.mod b/go.mod index 22051762..26cc1de3 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/distribution/reference v0.5.0 // indirect + github.com/dominikbraun/graph v0.23.0 // indirect github.com/eko/gocache/lib/v4 v4.1.5 // indirect github.com/eko/gocache/store/go_cache/v4 v4.2.1 // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect diff --git a/go.sum b/go.sum index d4b5c02d..dca65bd4 100644 --- a/go.sum +++ b/go.sum @@ -817,6 +817,8 @@ github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5 github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eko/gocache/lib/v4 v4.1.5 h1:CeMQmdIzwBKKLRjk3FCDXzNFsQTyqJ01JLI7Ib0C9r8= github.com/eko/gocache/lib/v4 v4.1.5/go.mod h1:XaNfCwW8KYW1bRZ/KoHA1TugnnkMz0/gT51NDIu7LSY= diff --git a/scrapers/cron.go b/scrapers/cron.go index df363edc..9cc39734 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -144,7 +144,6 @@ func scheduleScraperJob(sc api.ScrapeContext) error { Schedule: schedule, Singleton: true, JobHistory: true, - RunNow: true, // TODO: remove this Retention: job.RetentionBalanced, ResourceID: sc.ScrapeConfig().GetPersistedID().String(), ResourceType: job.ResourceTypeScraper, @@ -170,9 +169,8 @@ func scheduleScraperJob(sc api.ScrapeContext) error { config.Watch = v1.DefaultWatchKinds } - // TODO: Uncomment - // go watchKubernetesEventsWithRetry(sc, config) - // go watchKubernetesResourcesWithRetry(sc, config) + go watchKubernetesEventsWithRetry(sc, config) + go watchKubernetesResourcesWithRetry(sc, config) eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config) if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { @@ -284,8 +282,9 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube // NOTE: The resource watcher can return multiple objects for the same NEW resource. // Example: if a new pod is created, we'll get that pod object multiple times for different events. + // All those resource objects are seen as distinct new config items. // Hence, we need to use the latest one otherwise saving fails - // as we'll be trying to INSERT multiple config items with the same id. + // as we'll be trying to BATCH INSERT multiple config items with the same id. // // In the process, we will lose diff changes though. // If diff changes are necessary, then we can split up the results in such From 6c07fbf2df4fe118d1fca06b16dc786ae2b8048a Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 13 May 2024 09:24:06 +0545 Subject: [PATCH 4/4] chore: cleanup --- db/config.go | 3 ++ db/models/config_item.go | 5 +- db/update.go | 98 +++++++++++++++++++--------------------- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/db/config.go b/db/config.go index d2ead428..93914355 100644 --- a/db/config.go +++ b/db/config.go @@ -137,6 +137,9 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo Config: &dataStr, Ready: result.Ready, LastScrapedTime: result.LastScrapedTime, + + ParentExternalID: result.ParentExternalID, + ParentType: result.ParentType, } if parsed, err := result.Tags.AsMap(); err != nil { diff --git a/db/models/config_item.go b/db/models/config_item.go index 2ee46889..a2619231 100644 --- a/db/models/config_item.go +++ b/db/models/config_item.go @@ -41,7 +41,10 @@ type ConfigItem struct { DeletedAt *time.Time `gorm:"column:deleted_at" json:"deleted_at"` LastScrapedTime *time.Time `gorm:"column:last_scraped_time" json:"last_scraped_time"` DeleteReason v1.ConfigDeleteReason `gorm:"column:delete_reason" json:"delete_reason"` - TouchDeletedAt bool `gorm:"-" json:"-"` + + ParentExternalID string `gorm:"-" json:"-"` + ParentType string `gorm:"-" json:"-"` + TouchDeletedAt bool `gorm:"-" json:"-"` } func (ci ConfigItem) String() string { diff --git a/db/update.go b/db/update.go index 33207abe..42a60abb 100644 --- a/db/update.go +++ b/db/update.go @@ -553,20 +553,11 @@ type updateConfigArgs struct { New *models.ConfigItem } -type parentExternalKey struct { +type configExternalKey struct { externalID string parentType string } -func isTreeRoot(configType string) bool { - switch configType { - case "AWS::::Account", "Kubernetes::Cluster": - return true - } - - return false -} - func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) { var ( newConfigs = make([]*models.ConfigItem, 0, len(results)) @@ -579,18 +570,21 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime tree = graph.New(graph.StringHash, graph.Rooted(), graph.PreventCycles()) allConfigs = make([]*models.ConfigItem, 0, len(results)) - configToExternalParentMap = make(map[string]parentExternalKey) - parentToConfigIDMap = make(map[parentExternalKey]string) + parentTypeToConfigMap = make(map[configExternalKey]string) ) for _, result := range results { - result.LastScrapedTime = &scrapeStartTime ci, err := NewConfigItemFromResult(ctx, result) if err != nil { return nil, nil, nil, nil, fmt.Errorf("unable to create config item(%s): %w", result, err) } + ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() + if len(ci.ExternalID) == 0 { + return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci) + } + if isTreeRoot(lo.FromPtr(ci.Type)) { root = ci.ID } @@ -599,22 +593,8 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime return nil, nil, nil, nil, fmt.Errorf("unable to add vertex(%s): %w", ci, err) } - if result.ParentExternalID != "" && result.ParentType != "" { - configToExternalParentMap[ci.ID] = parentExternalKey{ - externalID: result.ParentExternalID, - parentType: result.ParentType, - } - } - - parentToConfigIDMap[parentExternalKey{ - externalID: ci.ExternalID[0], - parentType: lo.FromPtr(ci.Type), - }] = ci.ID - - ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() - if len(ci.ExternalID) == 0 { - return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci) - } + parentExternalKey := configExternalKey{externalID: ci.ExternalID[0], parentType: lo.FromPtr(ci.Type)} + parentTypeToConfigMap[parentExternalKey] = ci.ID existing := &models.ConfigItem{} if ci.ID != "" { @@ -652,29 +632,8 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime // This is because, on the first run, we don't have any configs at all in the DB. // So, all the parent lookups will return empty result and no parent will be set. // This way, we can first look for the parents within the result set. - for i := range allConfigs { - ci := allConfigs[i] - - externalParent, ok := configToExternalParentMap[ci.ID] - if !ok { - continue - } - - if parentID, found := parentToConfigIDMap[parentExternalKey{ - externalID: externalParent.externalID, - parentType: externalParent.parentType, - }]; found { - ci.ParentID = &parentID - continue - } - - if found, err := ctx.TempCache().Find(externalParent.parentType, externalParent.externalID); err != nil { - return nil, nil, nil, nil, err - } else if found != nil { - ci.ParentID = &found.ID - } else { - ctx.Logger.V(0).Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID) - } + if err := setConfigParents(ctx, parentTypeToConfigMap, allConfigs); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to setup parents: %w", err) } if root != "" { @@ -702,6 +661,32 @@ func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime return newConfigs, configsToUpdate, newChanges, changesToUpdate, nil } +func setConfigParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExternalKey]string, allConfigs []*models.ConfigItem) error { + for _, ci := range allConfigs { + if ci.ParentExternalID == "" || ci.ParentType == "" { + continue + } + + if parentID, found := parentTypeToConfigMap[configExternalKey{ + externalID: ci.ParentExternalID, + parentType: ci.ParentType, + }]; found { + ci.ParentID = &parentID + continue + } + + if found, err := ctx.TempCache().Find(ci.ParentType, ci.ParentExternalID); err != nil { + return err + } else if found != nil { + ci.ParentID = &found.ID + } else { + ctx.Logger.V(0).Infof("[%s] parent %s/%s not found", ci, ci.ParentType, ci.ParentExternalID) + } + } + + return nil +} + func setConfigPaths(ctx api.ScrapeContext, tree graph.Graph[string, string], root string, allConfigs []*models.ConfigItem) error { for _, c := range allConfigs { if c.ParentID != nil { @@ -721,3 +706,12 @@ func setConfigPaths(ctx api.ScrapeContext, tree graph.Graph[string, string], roo return nil } + +func isTreeRoot(configType string) bool { + switch configType { + case "AWS::::Account", "Kubernetes::Cluster": + return true + } + + return false +}