diff --git a/db/config.go b/db/config.go index bb05abd2..d2ead428 100644 --- a/db/config.go +++ b/db/config.go @@ -180,16 +180,6 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo ci.DeleteReason = result.DeleteReason } - if result.ParentExternalID != "" && result.ParentType != "" { - if found, err := ctx.TempCache().Find(result.ParentType, result.ParentExternalID); err != nil { - return nil, err - } else if found != nil { - ci.ParentID = &found.ID - } else { - ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, result.ParentType, result.ParentExternalID) - } - } - return ci, nil } diff --git a/db/update.go b/db/update.go index f6815f65..0d505620 100644 --- a/db/update.go +++ b/db/update.go @@ -30,6 +30,8 @@ import ( "gorm.io/gorm/clause" ) +const configItemsBulkInsertSize = 200 + func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} if change.CreatedAt != nil && !change.CreatedAt.IsZero() { @@ -96,55 +98,10 @@ func mapEqual(a, b map[string]any) bool { return true } -func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem, bool, error) { - db := ctx.DutyContext().DB() - ci, err := NewConfigItemFromResult(ctx, result) - if err != nil { - return nil, false, errors.Wrapf(err, "unable to create config item: %s", result) - } - - ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() - existing := &models.ConfigItem{} - - if len(ci.ExternalID) == 0 { - return nil, false, fmt.Errorf("config item %s has no external id", ci) - } - - if ci.ID != "" { - if err := uuid.Validate(ci.ID); err == nil { - if existing, err = ctx.TempCache().Get(ci.ID); err != nil { - return nil, false, errors.Wrapf(err, "unable to lookup existing config: %s", ci) - } - } - } else { - if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { - return nil, false, errors.Wrapf(err, "unable to lookup external id: %s", ci) - } - } - - if existing == nil || existing.ID == "" { - if err := ctx.DB().Clauses(clause.OnConflict{UpdateAll: true}).Create(ci).Error; err != nil { - return nil, false, fmt.Errorf("failed to create config: %w", err) - } - - return ci, false, nil - } - +func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult, ci, existing *models.ConfigItem) ([]*models.ConfigChange, error) { ci.ID = existing.ID - - if ci.ParentID != nil { - if parent, err := ctx.TempCache().Get(*ci.ParentID); err != nil { - return nil, false, errors.Wrapf(err, "unable to find parent %s", *ci.ParentID) - } else if parent == nil { - return nil, false, fmt.Errorf("parent %s not found", *ci.ParentID) - } else if parent.Path != "" { - ci.Path = parent.Path + "." + ci.ID - } else { - ci.Path = parent.ID + "." + ci.ID - } - } - updates := make(map[string]interface{}) + changes := make([]*models.ConfigChange, 0) // In case a resource was marked as deleted but is un-deleted now // we set an update flag as gorm ignores nil pointers @@ -159,9 +116,12 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem } else if changeResult != nil { ctx.Logger.V(3).Infof("[%s/%s] detected changes", *ci.Type, ci.ExternalID[0]) result.Changes = []v1.ChangeResult{*changeResult} - if err := saveChanges(ctx, &result, ci); err != nil { - return nil, false, fmt.Errorf("[%s] failed to save %d changes: %w", ci, len(result.Changes), err) + if newChanges, _, err := extractChanges(ctx, &result, ci); err != nil { + return nil, err + } else { + changes = append(changes, newChanges...) } + updates["config"] = *ci.Config updates["updated_at"] = gorm.Expr("NOW()") } @@ -223,16 +183,15 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem } if len(updates) == 0 { - return ci, false, nil + return changes, nil } updates["last_scraped_time"] = gorm.Expr("NOW()") - - if err := db.Model(ci).Updates(updates).Error; err != nil { - return nil, false, errors.Wrapf(err, "unable to update config item: %s", ci) + if err := ctx.DutyContext().DB().Model(ci).Updates(updates).Error; err != nil { + return nil, errors.Wrapf(err, "unable to update config item: %s", ci) } - return ci, true, nil + return changes, nil } func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) (bool, error) { @@ -254,11 +213,13 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) return false, nil } -func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) error { - changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) - - db := ctx.DB() +func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { + var ( + newOnes = []*models.ConfigChange{} + updates = []*models.ConfigChange{} + ) + changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) for _, changeResult := range result.Changes { if changeResult.Action == v1.Ignore { continue @@ -273,7 +234,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if changeResult.Action == v1.Delete { if err := deleteChangeHandler(ctx, changeResult); err != nil { - return fmt.Errorf("failed to delete config from change: %w", err) + return nil, nil, fmt.Errorf("failed to delete config from change: %w", err) } } @@ -282,7 +243,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf if change.CreatedBy != nil { person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy)) if err != nil { - return fmt.Errorf("error finding person by email: %w", err) + return nil, nil, fmt.Errorf("error finding person by email: %w", err) } else if person != nil { change.CreatedBy = ptr.String(person.ID.String()) } else { @@ -295,7 +256,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf change.ConfigID = ci.ID } else if !change.GetExternalID().IsEmpty() { if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil { - return fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) + return nil, nil, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err) } else if ci != "" { change.ConfigID = ci } else if ci == "" { @@ -305,17 +266,13 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf } if changeResult.UpdateExisting { - if err := db.Save(change).Error; err != nil { - return fmt.Errorf("failed to update config change: %w", err) - } + updates = append(updates, change) } else { - if err := db.Create(change).Error; err != nil { - return fmt.Errorf("failed to create config change: %w", err) - } + newOnes = append(newOnes, change) } } - return nil + return newOnes, updates, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { @@ -376,32 +333,44 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { // TODO:: Sort the results so that parents are inserted first - var toTouch []string - for _, result := range results { - result.LastScrapedTime = &startTime - - var ci *models.ConfigItem - if result.Config != nil { - var updated bool - if ci, updated, err = updateCI(ctx, result); err != nil { - return fmt.Errorf("failed to save config item (%s): %w", result, err) - } else if !updated { - toTouch = append(toTouch, ci.ID) - } + newConfigs, configsToUpdate, newChanges, changesToUpdate, err := extractConfigsAndChangesFromResults(ctx, startTime, results) + if err != nil { + return fmt.Errorf("failed to extract configs & changes from results: %w", err) + } + ctx.Logger.V(2).Infof("%d new configs, %d configs to update, %d new changes & %d changes to update", + len(newConfigs), len(configsToUpdate), len(newChanges), len(changesToUpdate)) + + if err := ctx.DB().CreateInBatches(newConfigs, configItemsBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create config items: %w", err) + } + + // TODO: Try this in batches as well + for _, config := range configsToUpdate { + if diffChanges, err := updateCI(ctx, config.Result, config.New, config.Existing); err != nil { + return fmt.Errorf("failed to update config item (%s): %w", config.Existing, err) + } else if len(diffChanges) != 0 { + newChanges = append(newChanges, diffChanges...) } + } + if err := ctx.DB().CreateInBatches(newChanges, 200).Error; err != nil { + return fmt.Errorf("failed to create config changes: %w", err) + } + + if len(changesToUpdate) != 0 { + if err := ctx.DB().Save(changesToUpdate).Error; err != nil { + return fmt.Errorf("failed to update config changes: %w", err) + } + } + + for _, result := range results { if result.AnalysisResult != nil { if err := upsertAnalysis(ctx, &result); err != nil { return fmt.Errorf("failed to analysis (%s): %w", result, err) } } - if err := saveChanges(ctx, &result, ci); err != nil { - return fmt.Errorf("failed to changes (%s): %w", result, err) - } - relationshipToForm = append(relationshipToForm, result.RelationshipResults...) - if len(result.RelationshipSelectors) != 0 { resultsWithRelationshipSelectors = append(resultsWithRelationshipSelectors, result) } @@ -424,21 +393,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } - if len(toTouch) > 0 { - for i := 0; i < len(toTouch); i = i + 5000 { - end := i + 5000 - if end > len(toTouch) { - end = len(toTouch) - } - - if err := ctx.DB().Table("config_items"). - Where("id in (?)", toTouch[i:end]). - Update("last_scraped_time", gorm.Expr("NOW()")).Error; err != nil { - return fmt.Errorf("error updating last scraped time: %w", err) - } - } - } - ctx.Logger.V(3).Infof("saved %d results.", len(results)) return nil } @@ -594,3 +548,117 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations return UpdateConfigRelatonships(ctx, configItemRelationships) } + +type updateConfigArgs struct { + Result v1.ScrapeResult + Existing *models.ConfigItem + New *models.ConfigItem +} + +type parentExternalKey struct { + externalID string + parentType string +} + +func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) { + var ( + inserts = make([]*models.ConfigItem, 0, len(results)) + updates = make([]*updateConfigArgs, 0, len(results)) + + newChanges = make([]*models.ConfigChange, 0, len(results)) + changesToUpdate = make([]*models.ConfigChange, 0, len(results)) + + configToExternalParentMap = make(map[string]parentExternalKey) + parentToConfigIDMap = make(map[parentExternalKey]string) + ) + + for _, result := range results { + if result.Config == nil { + continue + } + + result.LastScrapedTime = &scrapeStartTime + ci, err := NewConfigItemFromResult(ctx, result) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to create config item: %s: %w", result, err) + } + + if result.ParentExternalID != "" && result.ParentType != "" { + configToExternalParentMap[ci.ID] = parentExternalKey{ + externalID: result.ParentExternalID, + parentType: result.ParentType, + } + } + + parentToConfigIDMap[parentExternalKey{ + externalID: ci.ExternalID[0], + parentType: lo.FromPtr(ci.Type), + }] = ci.ID + + ci.ScraperID = ctx.ScrapeConfig().GetPersistedID() + if len(ci.ExternalID) == 0 { + return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci) + } + + existing := &models.ConfigItem{} + if ci.ID != "" { + if existing, err = ctx.TempCache().Get(ci.ID); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config: %s: %w", ci, err) + } + } else { + if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil { + return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id: %s: %w", ci, err) + } + } + + if existing == nil || existing.ID == "" { + inserts = append(inserts, ci) + } else { + updates = append(updates, &updateConfigArgs{ + Result: result, + Existing: existing, + New: ci, + }) + } + + if toCreate, toUpdate, err := extractChanges(ctx, &result, ci); err != nil { + return nil, nil, nil, nil, err + } else { + newChanges = append(newChanges, toCreate...) + changesToUpdate = append(changesToUpdate, toUpdate...) + } + } + + // Calculate the parents only after we have all the config items. + // This is because, on the first run, we don't have any configs at all in the DB. + // So, all the parent lookups will return empty result and no parent will be set. + // This way, we can first look for the parents within the result set. + for i := range inserts { + ci := inserts[i] + externalParent, ok := configToExternalParentMap[ci.ID] + if !ok { + continue + } + + if parentID, found := parentToConfigIDMap[parentExternalKey{ + externalID: externalParent.externalID, + parentType: externalParent.parentType, + }]; found { + ci.ParentID = &parentID + continue + } + + if found, err := ctx.TempCache().Find(externalParent.parentType, externalParent.externalID); err != nil { + return nil, nil, nil, nil, err + } else if found != nil { + ci.ParentID = &found.ID + } else { + ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID) + } + } + + // TODO: Sort inserts by path + // i.e. parents should come first. + + return inserts, updates, newChanges, changesToUpdate, nil +} diff --git a/scrapers/cron.go b/scrapers/cron.go index db120390..df363edc 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -17,6 +17,8 @@ import ( "github.com/robfig/cron/v3" "github.com/samber/lo" "github.com/sethvargo/go-retry" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" ) var ( @@ -142,6 +144,7 @@ func scheduleScraperJob(sc api.ScrapeContext) error { Schedule: schedule, Singleton: true, JobHistory: true, + RunNow: true, // TODO: remove this Retention: job.RetentionBalanced, ResourceID: sc.ScrapeConfig().GetPersistedID().String(), ResourceType: job.ResourceTypeScraper, @@ -167,8 +170,9 @@ func scheduleScraperJob(sc api.ScrapeContext) error { config.Watch = v1.DefaultWatchKinds } - go watchKubernetesEventsWithRetry(sc, config) - go watchKubernetesResourcesWithRetry(sc, config) + // TODO: Uncomment + // go watchKubernetesEventsWithRetry(sc, config) + // go watchKubernetesResourcesWithRetry(sc, config) eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config) if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil { @@ -239,6 +243,23 @@ func consumeKubernetesWatchResourcesJobKey(id string) string { return id + "-consume-kubernetes-watch-resources" } +func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured { + var output []*unstructured.Unstructured + var seen = make(map[types.UID]struct{}) + + // Iterate in reverse, cuz we want the latest + for i := len(objs) - 1; i >= 0; i-- { + if _, ok := seen[objs[i].GetUID()]; ok { + continue + } + + seen[objs[i].GetUID()] = struct{}{} + output = append(output, objs[i]) + } + + return output +} + // ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events // for the given config of the scrapeconfig. func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job { @@ -261,6 +282,16 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube } objs, _, _, _ := lo.Buffer(ch, len(ch)) + // NOTE: The resource watcher can return multiple objects for the same NEW resource. + // Example: if a new pod is created, we'll get that pod object multiple times for different events. + // Hence, we need to use the latest one otherwise saving fails + // as we'll be trying to INSERT multiple config items with the same id. + // + // In the process, we will lose diff changes though. + // If diff changes are necessary, then we can split up the results in such + // a way that no two objects in a batch have the same id. + objs = dedup(objs) + cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History) results, err := RunK8ObjScraper(cc, config, objs) if err != nil {