Skip to content

Commit

Permalink
refactor: batch saving
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
adityathebe committed May 12, 2024
1 parent 6f39bd3 commit 43a9d23
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 112 deletions.
10 changes: 0 additions & 10 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,6 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo
ci.DeleteReason = result.DeleteReason
}

if result.ParentExternalID != "" && result.ParentType != "" {
if found, err := ctx.TempCache().Find(result.ParentType, result.ParentExternalID); err != nil {
return nil, err
} else if found != nil {
ci.ParentID = &found.ID
} else {
ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, result.ParentType, result.ParentExternalID)
}
}

return ci, nil
}

Expand Down
268 changes: 168 additions & 100 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"gorm.io/gorm/clause"
)

const configItemsBulkInsertSize = 200

func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error {
var deletedAt interface{}
if change.CreatedAt != nil && !change.CreatedAt.IsZero() {
Expand Down Expand Up @@ -96,55 +98,10 @@ func mapEqual(a, b map[string]any) bool {
return true
}

func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem, bool, error) {
db := ctx.DutyContext().DB()
ci, err := NewConfigItemFromResult(ctx, result)
if err != nil {
return nil, false, errors.Wrapf(err, "unable to create config item: %s", result)
}

ci.ScraperID = ctx.ScrapeConfig().GetPersistedID()
existing := &models.ConfigItem{}

if len(ci.ExternalID) == 0 {
return nil, false, fmt.Errorf("config item %s has no external id", ci)
}

if ci.ID != "" {
if err := uuid.Validate(ci.ID); err == nil {
if existing, err = ctx.TempCache().Get(ci.ID); err != nil {
return nil, false, errors.Wrapf(err, "unable to lookup existing config: %s", ci)
}
}
} else {
if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil {
return nil, false, errors.Wrapf(err, "unable to lookup external id: %s", ci)
}
}

if existing == nil || existing.ID == "" {
if err := ctx.DB().Clauses(clause.OnConflict{UpdateAll: true}).Create(ci).Error; err != nil {
return nil, false, fmt.Errorf("failed to create config: %w", err)
}

return ci, false, nil
}

func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult, ci, existing *models.ConfigItem) ([]*models.ConfigChange, error) {
ci.ID = existing.ID

if ci.ParentID != nil {
if parent, err := ctx.TempCache().Get(*ci.ParentID); err != nil {
return nil, false, errors.Wrapf(err, "unable to find parent %s", *ci.ParentID)
} else if parent == nil {
return nil, false, fmt.Errorf("parent %s not found", *ci.ParentID)
} else if parent.Path != "" {
ci.Path = parent.Path + "." + ci.ID
} else {
ci.Path = parent.ID + "." + ci.ID
}
}

updates := make(map[string]interface{})
changes := make([]*models.ConfigChange, 0)

// In case a resource was marked as deleted but is un-deleted now
// we set an update flag as gorm ignores nil pointers
Expand All @@ -159,9 +116,12 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem
} else if changeResult != nil {
ctx.Logger.V(3).Infof("[%s/%s] detected changes", *ci.Type, ci.ExternalID[0])
result.Changes = []v1.ChangeResult{*changeResult}
if err := saveChanges(ctx, &result, ci); err != nil {
return nil, false, fmt.Errorf("[%s] failed to save %d changes: %w", ci, len(result.Changes), err)
if newChanges, _, err := extractChanges(ctx, &result, ci); err != nil {
return nil, err
} else {
changes = append(changes, newChanges...)
}

updates["config"] = *ci.Config
updates["updated_at"] = gorm.Expr("NOW()")
}
Expand Down Expand Up @@ -223,16 +183,15 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem
}

if len(updates) == 0 {
return ci, false, nil
return changes, nil
}

updates["last_scraped_time"] = gorm.Expr("NOW()")

if err := db.Model(ci).Updates(updates).Error; err != nil {
return nil, false, errors.Wrapf(err, "unable to update config item: %s", ci)
if err := ctx.DutyContext().DB().Model(ci).Updates(updates).Error; err != nil {
return nil, errors.Wrapf(err, "unable to update config item: %s", ci)
}

return ci, true, nil
return changes, nil
}

func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) (bool, error) {
Expand All @@ -254,11 +213,13 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult)
return false, nil
}

func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) error {
changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)

db := ctx.DB()
func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) {
var (
newOnes = []*models.ConfigChange{}
updates = []*models.ConfigChange{}
)

changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)
for _, changeResult := range result.Changes {
if changeResult.Action == v1.Ignore {
continue
Expand All @@ -273,7 +234,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf

if changeResult.Action == v1.Delete {
if err := deleteChangeHandler(ctx, changeResult); err != nil {
return fmt.Errorf("failed to delete config from change: %w", err)
return nil, nil, fmt.Errorf("failed to delete config from change: %w", err)
}
}

Expand All @@ -282,7 +243,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf
if change.CreatedBy != nil {
person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy))
if err != nil {
return fmt.Errorf("error finding person by email: %w", err)
return nil, nil, fmt.Errorf("error finding person by email: %w", err)
} else if person != nil {
change.CreatedBy = ptr.String(person.ID.String())
} else {
Expand All @@ -295,7 +256,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf
change.ConfigID = ci.ID
} else if !change.GetExternalID().IsEmpty() {
if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil {
return fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err)
return nil, nil, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err)
} else if ci != "" {
change.ConfigID = ci
} else if ci == "" {
Expand All @@ -305,17 +266,13 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf
}

if changeResult.UpdateExisting {
if err := db.Save(change).Error; err != nil {
return fmt.Errorf("failed to update config change: %w", err)
}
updates = append(updates, change)
} else {
if err := db.Create(change).Error; err != nil {
return fmt.Errorf("failed to create config change: %w", err)
}
newOnes = append(newOnes, change)
}
}

return nil
return newOnes, updates, nil
}

func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {
Expand Down Expand Up @@ -376,32 +333,44 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {

// TODO:: Sort the results so that parents are inserted first

var toTouch []string
for _, result := range results {
result.LastScrapedTime = &startTime

var ci *models.ConfigItem
if result.Config != nil {
var updated bool
if ci, updated, err = updateCI(ctx, result); err != nil {
return fmt.Errorf("failed to save config item (%s): %w", result, err)
} else if !updated {
toTouch = append(toTouch, ci.ID)
}
newConfigs, configsToUpdate, newChanges, changesToUpdate, err := extractConfigsAndChangesFromResults(ctx, startTime, results)
if err != nil {
return fmt.Errorf("failed to extract configs & changes from results: %w", err)
}
ctx.Logger.V(2).Infof("%d new configs, %d configs to update, %d new changes & %d changes to update",
len(newConfigs), len(configsToUpdate), len(newChanges), len(changesToUpdate))

if err := ctx.DB().CreateInBatches(newConfigs, configItemsBulkInsertSize).Error; err != nil {
return fmt.Errorf("failed to create config items: %w", err)
}

// TODO: Try this in batches as well
for _, config := range configsToUpdate {
if diffChanges, err := updateCI(ctx, config.Result, config.New, config.Existing); err != nil {
return fmt.Errorf("failed to update config item (%s): %w", config.Existing, err)
} else if len(diffChanges) != 0 {
newChanges = append(newChanges, diffChanges...)
}
}

if err := ctx.DB().CreateInBatches(newChanges, 200).Error; err != nil {
return fmt.Errorf("failed to create config changes: %w", err)
}

if len(changesToUpdate) != 0 {
if err := ctx.DB().Save(changesToUpdate).Error; err != nil {
return fmt.Errorf("failed to update config changes: %w", err)
}
}

for _, result := range results {
if result.AnalysisResult != nil {
if err := upsertAnalysis(ctx, &result); err != nil {
return fmt.Errorf("failed to analysis (%s): %w", result, err)
}
}

if err := saveChanges(ctx, &result, ci); err != nil {
return fmt.Errorf("failed to changes (%s): %w", result, err)
}

relationshipToForm = append(relationshipToForm, result.RelationshipResults...)

if len(result.RelationshipSelectors) != 0 {
resultsWithRelationshipSelectors = append(resultsWithRelationshipSelectors, result)
}
Expand All @@ -424,21 +393,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
}
}

if len(toTouch) > 0 {
for i := 0; i < len(toTouch); i = i + 5000 {
end := i + 5000
if end > len(toTouch) {
end = len(toTouch)
}

if err := ctx.DB().Table("config_items").
Where("id in (?)", toTouch[i:end]).
Update("last_scraped_time", gorm.Expr("NOW()")).Error; err != nil {
return fmt.Errorf("error updating last scraped time: %w", err)
}
}
}

ctx.Logger.V(3).Infof("saved %d results.", len(results))
return nil
}
Expand Down Expand Up @@ -594,3 +548,117 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations

return UpdateConfigRelatonships(ctx, configItemRelationships)
}

type updateConfigArgs struct {
Result v1.ScrapeResult
Existing *models.ConfigItem
New *models.ConfigItem
}

type parentExternalKey struct {
externalID string
parentType string
}

func extractConfigsAndChangesFromResults(ctx api.ScrapeContext, scrapeStartTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) {
var (
inserts = make([]*models.ConfigItem, 0, len(results))
updates = make([]*updateConfigArgs, 0, len(results))

newChanges = make([]*models.ConfigChange, 0, len(results))
changesToUpdate = make([]*models.ConfigChange, 0, len(results))

configToExternalParentMap = make(map[string]parentExternalKey)
parentToConfigIDMap = make(map[parentExternalKey]string)
)

for _, result := range results {
if result.Config == nil {
continue
}

result.LastScrapedTime = &scrapeStartTime
ci, err := NewConfigItemFromResult(ctx, result)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to create config item: %s: %w", result, err)
}

if result.ParentExternalID != "" && result.ParentType != "" {
configToExternalParentMap[ci.ID] = parentExternalKey{
externalID: result.ParentExternalID,
parentType: result.ParentType,
}
}

parentToConfigIDMap[parentExternalKey{
externalID: ci.ExternalID[0],
parentType: lo.FromPtr(ci.Type),
}] = ci.ID

ci.ScraperID = ctx.ScrapeConfig().GetPersistedID()
if len(ci.ExternalID) == 0 {
return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci)
}

existing := &models.ConfigItem{}
if ci.ID != "" {
if existing, err = ctx.TempCache().Get(ci.ID); err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config: %s: %w", ci, err)
}
} else {
if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to lookup external id: %s: %w", ci, err)
}
}

if existing == nil || existing.ID == "" {
inserts = append(inserts, ci)
} else {
updates = append(updates, &updateConfigArgs{
Result: result,
Existing: existing,
New: ci,
})
}

if toCreate, toUpdate, err := extractChanges(ctx, &result, ci); err != nil {
return nil, nil, nil, nil, err
} else {
newChanges = append(newChanges, toCreate...)
changesToUpdate = append(changesToUpdate, toUpdate...)
}
}

// Calculate the parents only after we have all the config items.
// This is because, on the first run, we don't have any configs at all in the DB.
// So, all the parent lookups will return empty result and no parent will be set.
// This way, we can first look for the parents within the result set.
for i := range inserts {
ci := inserts[i]
externalParent, ok := configToExternalParentMap[ci.ID]
if !ok {
continue
}

if parentID, found := parentToConfigIDMap[parentExternalKey{
externalID: externalParent.externalID,
parentType: externalParent.parentType,
}]; found {
ci.ParentID = &parentID
continue
}

if found, err := ctx.TempCache().Find(externalParent.parentType, externalParent.externalID); err != nil {
return nil, nil, nil, nil, err
} else if found != nil {
ci.ParentID = &found.ID
} else {
ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, externalParent.parentType, externalParent.externalID)
}
}

// TODO: Sort inserts by path
// i.e. parents should come first.

return inserts, updates, newChanges, changesToUpdate, nil
}
Loading

0 comments on commit 43a9d23

Please sign in to comment.