Skip to content

Commit

Permalink
refactor: batch saving
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed May 12, 2024
1 parent 6f39bd3 commit 9bc2411
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 109 deletions.
10 changes: 0 additions & 10 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,6 @@ func NewConfigItemFromResult(ctx api.ScrapeContext, result v1.ScrapeResult) (*mo
ci.DeleteReason = result.DeleteReason
}

if result.ParentExternalID != "" && result.ParentType != "" {
if found, err := ctx.TempCache().Find(result.ParentType, result.ParentExternalID); err != nil {
return nil, err
} else if found != nil {
ci.ParentID = &found.ID
} else {
ctx.DutyContext().Infof("[%s] parent %s/%s not found", ci, result.ParentType, result.ParentExternalID)
}
}

return ci, nil
}

Expand Down
213 changes: 114 additions & 99 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,55 +96,10 @@ func mapEqual(a, b map[string]any) bool {
return true
}

func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem, bool, error) {
db := ctx.DutyContext().DB()
ci, err := NewConfigItemFromResult(ctx, result)
if err != nil {
return nil, false, errors.Wrapf(err, "unable to create config item: %s", result)
}

ci.ScraperID = ctx.ScrapeConfig().GetPersistedID()
existing := &models.ConfigItem{}

if len(ci.ExternalID) == 0 {
return nil, false, fmt.Errorf("config item %s has no external id", ci)
}

if ci.ID != "" {
if err := uuid.Validate(ci.ID); err == nil {
if existing, err = ctx.TempCache().Get(ci.ID); err != nil {
return nil, false, errors.Wrapf(err, "unable to lookup existing config: %s", ci)
}
}
} else {
if existing, err = ctx.TempCache().Find(*ci.Type, ci.ExternalID[0]); err != nil {
return nil, false, errors.Wrapf(err, "unable to lookup external id: %s", ci)
}
}

if existing == nil || existing.ID == "" {
if err := ctx.DB().Clauses(clause.OnConflict{UpdateAll: true}).Create(ci).Error; err != nil {
return nil, false, fmt.Errorf("failed to create config: %w", err)
}

return ci, false, nil
}

func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult, ci, existing *models.ConfigItem) ([]*models.ConfigChange, error) {
ci.ID = existing.ID

if ci.ParentID != nil {
if parent, err := ctx.TempCache().Get(*ci.ParentID); err != nil {
return nil, false, errors.Wrapf(err, "unable to find parent %s", *ci.ParentID)
} else if parent == nil {
return nil, false, fmt.Errorf("parent %s not found", *ci.ParentID)
} else if parent.Path != "" {
ci.Path = parent.Path + "." + ci.ID
} else {
ci.Path = parent.ID + "." + ci.ID
}
}

updates := make(map[string]interface{})
changes := make([]*models.ConfigChange, 0)

// In case a resource was marked as deleted but is un-deleted now
// we set an update flag as gorm ignores nil pointers
Expand All @@ -159,9 +114,12 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem
} else if changeResult != nil {
ctx.Logger.V(3).Infof("[%s/%s] detected changes", *ci.Type, ci.ExternalID[0])
result.Changes = []v1.ChangeResult{*changeResult}
if err := saveChanges(ctx, &result, ci); err != nil {
return nil, false, fmt.Errorf("[%s] failed to save %d changes: %w", ci, len(result.Changes), err)
if newChanges, _, err := getChanges(ctx, &result, ci); err != nil {
return nil, err
} else {
changes = append(changes, newChanges...)
}

updates["config"] = *ci.Config
updates["updated_at"] = gorm.Expr("NOW()")
}
Expand Down Expand Up @@ -223,16 +181,15 @@ func updateCI(ctx api.ScrapeContext, result v1.ScrapeResult) (*models.ConfigItem
}

if len(updates) == 0 {
return ci, false, nil
return changes, nil
}

updates["last_scraped_time"] = gorm.Expr("NOW()")

if err := db.Model(ci).Updates(updates).Error; err != nil {
return nil, false, errors.Wrapf(err, "unable to update config item: %s", ci)
if err := ctx.DutyContext().DB().Model(ci).Updates(updates).Error; err != nil {
return nil, errors.Wrapf(err, "unable to update config item: %s", ci)
}

return ci, true, nil
return changes, nil
}

func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) (bool, error) {
Expand All @@ -254,11 +211,13 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult)
return false, nil
}

func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) error {
changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)

db := ctx.DB()
func getChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) {
var (
newOnes = []*models.ConfigChange{}
updates = []*models.ConfigChange{}
)

changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)
for _, changeResult := range result.Changes {
if changeResult.Action == v1.Ignore {
continue
Expand All @@ -273,7 +232,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf

if changeResult.Action == v1.Delete {
if err := deleteChangeHandler(ctx, changeResult); err != nil {
return fmt.Errorf("failed to delete config from change: %w", err)
return nil, nil, fmt.Errorf("failed to delete config from change: %w", err)
}
}

Expand All @@ -282,7 +241,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf
if change.CreatedBy != nil {
person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy))
if err != nil {
return fmt.Errorf("error finding person by email: %w", err)
return nil, nil, fmt.Errorf("error finding person by email: %w", err)
} else if person != nil {
change.CreatedBy = ptr.String(person.ID.String())
} else {
Expand All @@ -295,7 +254,7 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf
change.ConfigID = ci.ID
} else if !change.GetExternalID().IsEmpty() {
if ci, err := ctx.TempCache().FindExternalID(change.GetExternalID()); err != nil {
return fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err)
return nil, nil, fmt.Errorf("failed to get config from change (externalID=%s): %w", change.GetExternalID(), err)
} else if ci != "" {
change.ConfigID = ci
} else if ci == "" {
Expand All @@ -305,17 +264,13 @@ func saveChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.Conf
}

if changeResult.UpdateExisting {
if err := db.Save(change).Error; err != nil {
return fmt.Errorf("failed to update config change: %w", err)
}
updates = append(updates, change)
} else {
if err := db.Create(change).Error; err != nil {
return fmt.Errorf("failed to create config change: %w", err)
}
newOnes = append(newOnes, change)
}
}

return nil
return newOnes, updates, nil
}

func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {
Expand Down Expand Up @@ -376,32 +331,50 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {

// TODO:: Sort the results so that parents are inserted first

var toTouch []string
for _, result := range results {
result.LastScrapedTime = &startTime
newConfigs, configsToUpdate, newChanges, changesToUpdate, err := evaluateResultsToConfigsAndChanges(ctx, startTime, results)
if err != nil {
return fmt.Errorf("failed to setup parent and path: %w", err)
}
ctx.Logger.V(2).Infof("%d new configs, %d configs to update, %d new changes & %d changes to update",
len(newConfigs), len(configsToUpdate), len(newChanges), len(changesToUpdate))

var ci *models.ConfigItem
if result.Config != nil {
var updated bool
if ci, updated, err = updateCI(ctx, result); err != nil {
return fmt.Errorf("failed to save config item (%s): %w", result, err)
} else if !updated {
toTouch = append(toTouch, ci.ID)
}
if len(newConfigs) != 0 {
if err := ctx.DB().CreateInBatches(newConfigs, 200).Error; err != nil {
return fmt.Errorf("failed to create config items: %w", err)
}
}

// TODO: Try this in batches as well
for _, config := range configsToUpdate {
if diffChanges, err := updateCI(ctx, config.Result, config.New, config.Existing); err != nil {
return fmt.Errorf("failed to update config item (%s): %w", config.Existing, err)
} else if len(diffChanges) != 0 {
newChanges = append(newChanges, diffChanges...)
}
}

if len(newChanges) != 0 {
if err := ctx.DB().CreateInBatches(newChanges, 200).Error; err != nil {
return fmt.Errorf("failed to create config changes: %w", err)
}
}

if len(changesToUpdate) != 0 {
if err := ctx.DB().Save(changesToUpdate).Error; err != nil {
return fmt.Errorf("failed to update config changes: %w", err)
}
}

for _, result := range results {
result.LastScrapedTime = &startTime

if result.AnalysisResult != nil {
if err := upsertAnalysis(ctx, &result); err != nil {
return fmt.Errorf("failed to analysis (%s): %w", result, err)
}
}

if err := saveChanges(ctx, &result, ci); err != nil {
return fmt.Errorf("failed to changes (%s): %w", result, err)
}

relationshipToForm = append(relationshipToForm, result.RelationshipResults...)

if len(result.RelationshipSelectors) != 0 {
resultsWithRelationshipSelectors = append(resultsWithRelationshipSelectors, result)
}
Expand All @@ -424,21 +397,6 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
}
}

if len(toTouch) > 0 {
for i := 0; i < len(toTouch); i = i + 5000 {
end := i + 5000
if end > len(toTouch) {
end = len(toTouch)
}

if err := ctx.DB().Table("config_items").
Where("id in (?)", toTouch[i:end]).
Update("last_scraped_time", gorm.Expr("NOW()")).Error; err != nil {
return fmt.Errorf("error updating last scraped time: %w", err)
}
}
}

ctx.Logger.V(3).Infof("saved %d results.", len(results))
return nil
}
Expand Down Expand Up @@ -594,3 +552,60 @@ func relationshipResultHandler(ctx api.ScrapeContext, relationships v1.Relations

return UpdateConfigRelatonships(ctx, configItemRelationships)
}

type updateConfigArgs struct {
Result v1.ScrapeResult
Existing *models.ConfigItem
New *models.ConfigItem
}

func evaluateResultsToConfigsAndChanges(ctx api.ScrapeContext, currentTime time.Time, results []v1.ScrapeResult) ([]*models.ConfigItem, []*updateConfigArgs, []*models.ConfigChange, []*models.ConfigChange, error) {
var (
inserts = make([]*models.ConfigItem, 0, len(results))
updates = make([]*updateConfigArgs, 0, len(results))

newChanges = make([]*models.ConfigChange, 0, len(results))
changesToUpdate = make([]*models.ConfigChange, 0, len(results))
)

for _, result := range results {
if result.Config == nil {
continue
}

result.LastScrapedTime = &currentTime
ci, err := NewConfigItemFromResult(ctx, result)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to create config item: %s: %w", result, err)
}

ci.ScraperID = ctx.ScrapeConfig().GetPersistedID()
if len(ci.ExternalID) == 0 {
return nil, nil, nil, nil, fmt.Errorf("config item %s has no external id", ci)
}

existing, err := ctx.TempCache().Get(ci.ID)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to lookup existing config(%s): %w", ci, err)
}

if existing == nil || existing.ID == "" {
inserts = append(inserts, ci)
} else {
updates = append(updates, &updateConfigArgs{
Result: result,
Existing: existing,
New: ci,
})
}

if toCreate, toUpdate, err := getChanges(ctx, &result, ci); err != nil {
return nil, nil, nil, nil, err
} else {
newChanges = append(newChanges, toCreate...)
changesToUpdate = append(changesToUpdate, toUpdate...)
}
}

return inserts, updates, newChanges, changesToUpdate, nil
}
29 changes: 29 additions & 0 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/robfig/cron/v3"
"github.com/samber/lo"
"github.com/sethvargo/go-retry"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
)

var (
Expand Down Expand Up @@ -239,6 +241,23 @@ func consumeKubernetesWatchResourcesJobKey(id string) string {
return id + "-consume-kubernetes-watch-resources"
}

func dedup(objs []*unstructured.Unstructured) []*unstructured.Unstructured {
var output []*unstructured.Unstructured
var seen = make(map[types.UID]struct{})

// Iterate in reverse, cuz we want the latest
for i := len(objs) - 1; i >= 0; i-- {
if _, ok := seen[objs[i].GetUID()]; ok {
continue
}

seen[objs[i].GetUID()] = struct{}{}
output = append(output, objs[i])
}

return output
}

// ConsumeKubernetesWatchEventsJobFunc returns a job that consumes kubernetes watch events
// for the given config of the scrapeconfig.
func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kubernetes) *job.Job {
Expand All @@ -261,6 +280,16 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube
}
objs, _, _, _ := lo.Buffer(ch, len(ch))

// NOTE: The resource watcher can return multiple objects for the same NEW resource.
// Example: if a new pod is created, we'll get that pod object multiple times for different events.
// Hence, we need to use the latest one otherwise saving fails
// as we'll be trying to INSERT multiple config items with the same id.
//
// In the process, we will lose diff changes though.
// If diff changes are necessary, then we can split up the results in such
// a way that no two objects in a batch have the same id.
objs = dedup(objs)

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History)
results, err := RunK8ObjScraper(cc, config, objs)
if err != nil {
Expand Down

0 comments on commit 9bc2411

Please sign in to comment.