diff --git a/db/config.go b/db/config.go index bb05abd2..93914355 100644 --- a/db/config.go +++ b/db/config.go @@ -137,6 +137,9 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo Config: &dataStr, Ready: result.Ready, LastScrapedTime: result.LastScrapedTime, + + ParentExternalID: result.ParentExternalID, + ParentType: result.ParentType, } if parsed, err := result.Tags.AsMap(); err != nil { @@ -180,16 +183,6 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo ci.DeleteReason = result.DeleteReason } - if result.ParentExternalID != "" && result.ParentType != "" { - if found, err := ctx.TempCache().Find(result.ParentType, result.ParentExternalID); err != nil { - return nil, err - } else if found != nil { - ci.ParentID = &found.ID - } else { - ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, result.ParentType, result.ParentExternalID) - } - } - return ci, nil } diff --git a/db/models/config_item.go b/db/models/config_item.go index 2ee46889..a2619231 100644 --- a/db/models/config_item.go +++ b/db/models/config_item.go @@ -41,7 +41,10 @@ type ConfigItem struct { DeletedAt *time.Time `gorm:"column:deleted_at" json:"deleted_at"` LastScrapedTime *time.Time `gorm:"column:last_scraped_time" json:"last_scraped_time"` DeleteReason v1.ConfigDeleteReason `gorm:"column:delete_reason" json:"delete_reason"` - TouchDeletedAt bool `gorm:"-" json:"-"` + + ParentExternalID string `gorm:"-" json:"-"` + ParentType string `gorm:"-" json:"-"` + TouchDeletedAt bool `gorm:"-" json:"-"` } func (ci ConfigItem) String() string { diff --git a/db/update.go b/db/update.go index fb1c4aba..42a60abb 100644 --- a/db/update.go +++ b/db/update.go @@ -9,6 +9,7 @@ import ( "time" "github.com/aws/smithy-go/ptr" + "github.com/dominikbraun/graph" jsonpatch "github.com/evanphx/json-patch" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -30,6 +31,8 @@ import ( "gorm.io/gorm/clause" ) +const configItemsBulkInsertSize = 200 + func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} if change.CreatedAt != nil && !change.CreatedAt.IsZero() { @@ -96,54 +99,10 @@ func mapEqual(a, b map[string]any) bool { return true } -func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem, bool, error) { - db := ctx.DutyContext().DB() - ci, err := NewConfigItemFromResult(ctx, result) - if err != nil { - return nil, false, errors.Wrapf(err, "unable to create config item: %s", result) - } - - ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() - existing := &models.ConfigItem{} - - if len(ci.ExternalID) == 0 { - return nil, false, fmt.Errorf("config item %s has no external id", ci) - } - if ci.ID != "" { - if err := uuid.Validate(ci.ID); err == nil { - if existing, err = ctx.TempCache().Get(ci.ID); err != nil { - return nil, false, errors.Wrapf(err, "unable to lookup existing config: %s", ci) - } - } - } else { - if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { - return nil, false, errors.Wrapf(err, "unable to lookup external id: %s", ci) - } - } - - if existing == nil || existing.ID == "" { - if err := ctx.DB().Clauses(clause.OnConflict{UpdateAll: true}).Create(ci).Error; err != nil { - return nil, false, err - } - - return ci, false, nil - } - +func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult, ci, existing *models.ConfigItem) ([]*models.ConfigChange, error) { ci.ID = existing.ID - - if ci.ParentID != nil { - if parent, err := ctx.TempCache().Get(*ci.ParentID); err != nil { - return nil, false, errors.Wrapf(err, "unable to find parent %s", *ci.ParentID) - } else if parent == nil { - return nil, false, fmt.Errorf("parent %s not found", *ci.ParentID) - } else if parent.Path != "" { - ci.Path = parent.Path + "." + ci.ID - } else { - ci.Path = parent.ID + "." + ci.ID - } - } - updates := make(map[string]interface{}) + changes := make([]*models.ConfigChange, 0) // In case a resource was marked as deleted but is un-deleted now // we set an update flag as gorm ignores nil pointers @@ -158,9 +117,12 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem } else if changeResult != nil { ctx.Logger.V(3).Infof("[%s/%s] detected changes", *ci.Type, ci.ExternalID[0]) result.Changes = []v1.ChangeResult{*changeResult} - if err := saveChanges(ctx, &result, ci); err != nil { - return nil, false, fmt.Errorf("[%s] failed to save %d changes: %w", ci, len(result.Changes), err) + if newChanges, _, err := extractChanges(ctx, &result, ci); err != nil { + return nil, err + } else { + changes = append(changes, newChanges...) } + updates["config"] = *ci.Config updates["updated_at"] = gorm.Expr("NOW()") } @@ -222,16 +184,15 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem } if len(updates) == 0 { - return ci, false, nil + return changes, nil } updates["last_scraped_time"] = gorm.Expr("NOW()") - - if err := db.Model(ci).Updates(updates).Error; err != nil { - return nil, false, errors.Wrapf(err, "unable to update config item: %s", ci) + if err := ctx.DutyContext().DB().Model(ci).Updates(updates).Error; err != nil { + return nil, errors.Wrapf(err, "unable to update config item: %s", ci) } - return ci, true, nil + return changes, nil } func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) (bool, error) { @@ -253,11 +214,13 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) return false, nil } -func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) error { - changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) - - db := ctx.DB() +func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { + var ( + newOnes = []*models.ConfigChange{} + updates = []*models.ConfigChange{} + ) + changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) for _, changeResult := range result.Changes { if changeResult.Action == v1.Ignore { continue @@ -272,7 +235,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if changeResult.Action == v1.Delete { if err := deleteChangeHandler(ctx, changeResult); err != nil { - return err + return nil, nil, fmt.Errorf("failed to delete config from change: %w", err) } } @@ -281,7 +244,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if change.CreatedBy != nil { person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy)) if err != nil { - return fmt.Errorf("error finding person by email: %w", err) + return nil, nil, fmt.Errorf("error finding person by email: %w", err) } else if person != nil { change.CreatedBy = ptr.String(person.ID.String()) } else { @@ -294,7 +257,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf change.ConfigID = ci.ID } else if !change.GetExternalID().IsEmpty() { if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil { - return err + return nil, nil, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) } else if ci != "" { change.ConfigID = ci } else if ci == "" { @@ -304,17 +267,13 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf } if changeResult.UpdateExisting { - if err := db.Save(change).Error; err != nil { - return err - } + updates = append(updates, change) } else { - if err := db.Create(change).Error; err != nil { - return err - } + newOnes = append(newOnes, change) } } - return nil + return newOnes, updates, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { @@ -362,6 +321,36 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("unable to get current db time: %w", err) } + newConfigs, configsToUpdate, newChanges, changesToUpdate, err := extractConfigsAndChangesFromResults(ctx, startTime, results) + if err != nil { + return fmt.Errorf("failed to extract configs & changes from results: %w", err) + } + ctx.Logger.V(2).Infof("%d new configs, %d configs to update, %d new changes & %d changes to update", + len(newConfigs), len(configsToUpdate), len(newChanges), len(changesToUpdate)) + + if err := ctx.DB().CreateInBatches(newConfigs, configItemsBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create config items: %w", err) + } + + // TODO: Try this in batches as well + for _, config := range configsToUpdate { + if diffChanges, err := updateCI(ctx, config.Result, config.New, config.Existing); err != nil { + return fmt.Errorf("failed to update config item (%s): %w", config.Existing, err) + } else if len(diffChanges) != 0 { + newChanges = append(newChanges, diffChanges...) + } + } + + if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create config changes: %w", err) + } + + if len(changesToUpdate) != 0 { + if err := ctx.DB().Save(changesToUpdate).Error; err != nil { + return fmt.Errorf("failed to update config changes: %w", err) + } + } + var ( // Keep note of the all the relationships in each of the results // so we can create them once the all the configs are saved. @@ -372,33 +361,14 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { // once the all the scraped results are saved. resultsWithRelationshipSelectors []v1.ScrapeResult ) - - var toTouch []string for _, result := range results { - result.LastScrapedTime = &startTime - - var ci *models.ConfigItem - if result.Config != nil { - var updated bool - if ci, updated, err = updateCI(ctx, result); err != nil { - return err - } else if !updated { - toTouch = append(toTouch, ci.ID) - } - } - if result.AnalysisResult != nil { if err := upsertAnalysis(ctx, &result); err != nil { - return err + return fmt.Errorf("failed to analysis (%s): %w", result, err) } } - if err := saveChanges(ctx, &result, ci); err != nil { - return err - } - relationshipToForm = append(relationshipToForm, result.RelationshipResults...) - if len(result.RelationshipSelectors) != 0 { resultsWithRelationshipSelectors = append(resultsWithRelationshipSelectors, result) } @@ -421,21 +391,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } - if len(toTouch) > 0 { - for i := 0; i < len(toTouch); i = i + 5000 { - end := i + 5000 - if end > len(toTouch) { - end = len(toTouch) - } - - if err := ctx.DB().Table("config_items"). - Where("id in (?)", toTouch[i:end]). - Update("last_scraped_time", gorm.Expr("NOW()")).Error; err != nil { - return err - } - } - } - ctx.Logger.V(3).Infof("saved %d results.", len(results)) return nil } @@ -591,3 +546,172 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations return UpdateConfigRelatonships(ctx, configItemRelationships) } + +type updateConfigArgs struct { + Result v1.ScrapeResult + Existing *models.ConfigItem + New *models.ConfigItem +} + +type configExternalKey struct { + externalID string + parentType string +} + +func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) { + var ( + newConfigs = make([]*models.ConfigItem, 0, len(results)) + configsToUpdate = make([]*updateConfigArgs, 0, len(results)) + + newChanges = make([]*models.ConfigChange, 0, len(results)) + changesToUpdate = make([]*models.ConfigChange, 0, len(results)) + + root string + tree = graph.New(graph.StringHash, graph.Rooted(), graph.PreventCycles()) + allConfigs = make([]*models.ConfigItem, 0, len(results)) + + parentTypeToConfigMap = make(map[configExternalKey]string) + ) + + for _, result := range results { + result.LastScrapedTime = &scrapeStartTime + ci, err := NewConfigItemFromResult(ctx, result) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to create config item(%s): %w", result, err) + } + + ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() + if len(ci.ExternalID) == 0 { + return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci) + } + + if isTreeRoot(lo.FromPtr(ci.Type)) { + root = ci.ID + } + + if err := tree.AddVertex(ci.ID); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to add vertex(%s): %w", ci, err) + } + + parentExternalKey := configExternalKey{externalID: ci.ExternalID[0], parentType: lo.FromPtr(ci.Type)} + parentTypeToConfigMap[parentExternalKey] = ci.ID + + existing := &models.ConfigItem{} + if ci.ID != "" { + if existing, err = ctx.TempCache().Get(ci.ID); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err) + } + } else { + if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id(%s): %w", ci, err) + } + } + + allConfigs = append(allConfigs, ci) + if result.Config != nil { + if existing == nil || existing.ID == "" { + newConfigs = append(newConfigs, ci) + } else { + configsToUpdate = append(configsToUpdate, &updateConfigArgs{ + Result: result, + Existing: existing, + New: ci, + }) + } + } + + if toCreate, toUpdate, err := extractChanges(ctx, &result, ci); err != nil { + return nil, nil, nil, nil, err + } else { + newChanges = append(newChanges, toCreate...) + changesToUpdate = append(changesToUpdate, toUpdate...) + } + } + + // Calculate the parents only after we have all the config items. + // This is because, on the first run, we don't have any configs at all in the DB. + // So, all the parent lookups will return empty result and no parent will be set. + // This way, we can first look for the parents within the result set. + if err := setConfigParents(ctx, parentTypeToConfigMap, allConfigs); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to setup parents: %w", err) + } + + if root != "" { + // Only work with the Tree if the result set has the root node. + // Incremental scrapers only have partial result set. + if err := setConfigPaths(ctx, tree, root, allConfigs); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to set config paths: %w", err) + } + } + + // We sort the new config items such that parents are always first. + // This avoids foreign key constraint errors. + slices.SortFunc(newConfigs, func(a, b *models.ConfigItem) int { + if len(a.Path) < len(b.Path) { + return -1 + } + + if len(a.Path) > len(b.Path) { + return 1 + } + + return 0 + }) + + return newConfigs, configsToUpdate, newChanges, changesToUpdate, nil +} + +func setConfigParents(ctx api.ScrapeContext, parentTypeToConfigMap map[configExternalKey]string, allConfigs []*models.ConfigItem) error { + for _, ci := range allConfigs { + if ci.ParentExternalID == "" || ci.ParentType == "" { + continue + } + + if parentID, found := parentTypeToConfigMap[configExternalKey{ + externalID: ci.ParentExternalID, + parentType: ci.ParentType, + }]; found { + ci.ParentID = &parentID + continue + } + + if found, err := ctx.TempCache().Find(ci.ParentType, ci.ParentExternalID); err != nil { + return err + } else if found != nil { + ci.ParentID = &found.ID + } else { + ctx.Logger.V(0).Infof("[%s] parent %s/%s not found", ci, ci.ParentType, ci.ParentExternalID) + } + } + + return nil +} + +func setConfigPaths(ctx api.ScrapeContext, tree graph.Graph[string, string], root string, allConfigs []*models.ConfigItem) error { + for _, c := range allConfigs { + if c.ParentID != nil { + if err := tree.AddEdge(*c.ParentID, c.ID); err != nil { + return fmt.Errorf("unable to add edge(%s): %w", c, err) + } + } + } + + for _, c := range allConfigs { + if paths, err := graph.ShortestPath(tree, root, c.ID); err != nil { + ctx.Logger.V(3).Infof("unable to get the path for config(%s): %v", c, err) + } else if len(paths) > 0 { + c.Path = strings.Join(paths, ".") + } + } + + return nil +} + +func isTreeRoot(configType string) bool { + switch configType { + case "AWS::::Account", "Kubernetes::Cluster": + return true + } + + return false +} diff --git a/go.mod b/go.mod index 22051762..26cc1de3 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/distribution/reference v0.5.0 // indirect + github.com/dominikbraun/graph v0.23.0 // indirect github.com/eko/gocache/lib/v4 v4.1.5 // indirect github.com/eko/gocache/store/go_cache/v4 v4.2.1 // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect diff --git a/go.sum b/go.sum index d4b5c02d..dca65bd4 100644 --- a/go.sum +++ b/go.sum @@ -817,6 +817,8 @@ github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5 github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eko/gocache/lib/v4 v4.1.5 h1:CeMQmdIzwBKKLRjk3FCDXzNFsQTyqJ01JLI7Ib0C9r8= github.com/eko/gocache/lib/v4 v4.1.5/go.mod h1:XaNfCwW8KYW1bRZ/KoHA1TugnnkMz0/gT51NDIu7LSY= diff --git a/scrapers/cron.go b/scrapers/cron.go index db120390..9cc39734 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -17,6 +17,8 @@ import ( "github.com/robfig/cron/v3" "github.com/samber/lo" "github.com/sethvargo/go-retry" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" ) var ( @@ -239,6 +241,23 @@ func consumeKubernetesWatchResourcesJobKey(id string) string { return id + "-consume-kubernetes-watch-resources" } +func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured { + var output []*unstructured.Unstructured + var seen = make(map[types.UID]struct{}) + + // Iterate in reverse, cuz we want the latest + for i := len(objs) - 1; i >= 0; i-- { + if _, ok := seen[objs[i].GetUID()]; ok { + continue + } + + seen[objs[i].GetUID()] = struct{}{} + output = append(output, objs[i]) + } + + return output +} + // ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events // for the given config of the scrapeconfig. func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job { @@ -261,6 +280,17 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } objs, _, _, _ := lo.Buffer(ch, len(ch)) + // NOTE: The resource watcher can return multiple objects for the same NEW resource. + // Example: if a new pod is created, we'll get that pod object multiple times for different events. + // All those resource objects are seen as distinct new config items. + // Hence, we need to use the latest one otherwise saving fails + // as we'll be trying to BATCH INSERT multiple config items with the same id. + // + // In the process, we will lose diff changes though. + // If diff changes are necessary, then we can split up the results in such + // a way that no two objects in a batch have the same id. + objs = dedup(objs) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) results, err := RunK8ObjScraper(cc, config, objs) if err != nil {