diff --git a/db/changes.go b/db/changes.go index 68a2a0f1..d097d9d2 100644 --- a/db/changes.go +++ b/db/changes.go @@ -5,8 +5,8 @@ import ( "time" sw "github.com/RussellLuo/slidingwindow" - "github.com/google/uuid" + "github.com/flanksource/commons/collections" "github.com/flanksource/config-db/api" "github.com/flanksource/config-db/db/models" "github.com/flanksource/config-db/pkg/ratelimit" @@ -15,6 +15,8 @@ import ( const ( rateLimitWindow = time.Hour * 4 maxChangesInWindow = 100 + + ChangeTypeTooManyChanges = "TooManyChanges" ) func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { @@ -29,52 +31,44 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error var ( scraperLocks = sync.Map{} configRateLimiters = map[string]*sw.Limiter{} + + // List of configs that have been rate limited. + // It's used to avoid inserting mutliple "TooManyChanges" changes for the same config. + rateLimitedConfigsPerScraper = sync.Map{} ) -func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) { - if len(newChanges) == 0 { - return nil, nil +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { + if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { + return newChanges, nil, nil } - lock, loaded := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{}) + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + scraperID := ctx.ScrapeConfig().GetPersistedID().String() + + lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) lock.(*sync.Mutex).Lock() defer lock.(*sync.Mutex).Unlock() - window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) - max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + _rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) + rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) if !loaded { - // populate the rate limit window for the scraper - query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes - WHERE change_type != 'TooManyChanges' - AND NOW() - created_at <= ? GROUP BY config_id` - rows, err := ctx.DB().Raw(query, window).Rows() - if err != nil { - return nil, err + // This is the initial sync of the rate limiter with the database. + // Happens only once for each scrape config. + if err := syncWindow(ctx, max, window); err != nil { + return nil, nil, err } - for rows.Next() { - var configID string - var count int - var earliest time.Time - if err := rows.Scan(&configID, &count, &earliest); err != nil { - return nil, err - } - - rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { - win, stopper := ratelimit.NewLocalWindow() - if count > 0 { - win.SetStart(earliest) - win.AddCount(int64(count)) - } - return win, stopper - }) - configRateLimiters[configID] = rateLimiter + if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { + return nil, nil, err + } else { + rateLimitedConfigs = rlConfigs } } + rateLimitedThisRun := map[string]struct{}{} passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges)) - rateLimited := map[string]struct{}{} for _, change := range newChanges { rateLimiter, ok := configRateLimiters[change.ConfigID] if !ok { @@ -86,23 +80,102 @@ func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) } if !rateLimiter.Allow() { - ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID) - rateLimited[change.ConfigID] = struct{}{} + ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType) + rateLimitedThisRun[change.ConfigID] = struct{}{} continue + } else { + delete(rateLimitedConfigs, change.ConfigID) } passingNewChanges = append(passingNewChanges, change) } - // For all the rate limited configs, we add a new "TooManyChanges" change - for configID := range rateLimited { - passingNewChanges = append(passingNewChanges, &models.ConfigChange{ - ConfigID: configID, - Summary: "Changes on this config has been rate limited", - ChangeType: "TooManyChanges", - ExternalChangeId: uuid.New().String(), + var newlyRateLimited []string + for configID := range rateLimitedThisRun { + if _, ok := rateLimitedConfigs[configID]; !ok { + newlyRateLimited = append(newlyRateLimited, configID) + } + } + + rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun) + rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) + + return passingNewChanges, newlyRateLimited, nil +} + +func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { + query := `WITH latest_changes AS ( + SELECT + DISTINCT ON (config_id) config_id, + change_type + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND NOW() - config_changes.created_at <= ? + ORDER BY + config_id, + config_changes.created_at DESC + ) SELECT config_id FROM latest_changes WHERE change_type = ?` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() + if err != nil { + return nil, err + } + + output := make(map[string]struct{}) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + + ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id) + output[id] = struct{}{} + } + + return output, rows.Err() +} + +// syncWindow syncs the rate limit window for the scraper with the changes in the db. +func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { + query := `SELECT + config_id, + COUNT(*), + MIN(config_changes.created_at) AS min_created_at + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND change_type != ? + AND NOW() - config_changes.created_at <= ? + GROUP BY + config_id` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() + if err != nil { + return err + } + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return err + } + ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := ratelimit.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper }) + configRateLimiters[configID] = rateLimiter } - return passingNewChanges, nil + return rows.Err() } diff --git a/db/update.go b/db/update.go index be186927..5af25d87 100644 --- a/db/update.go +++ b/db/update.go @@ -352,7 +352,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } - newChanges, err = rateLimitChanges(ctx, newChanges) + newChanges, rateLimitedThisRun, err := rateLimitChanges(ctx, newChanges) if err != nil { return fmt.Errorf("failed to rate limit changes: %w", err) } @@ -361,6 +361,24 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to create config changes: %w", err) } + // For all the rate limited configs, we add a new "TooManyChanges" change. + // This is intentionally inserted in a different batch from the new changes + // as "ChangeTypeTooManyChanges" will have the same created_at timestamp. + // We want these changes to be newer than the actual changes. + var rateLimitedChanges []*models.ConfigChange + for _, configID := range rateLimitedThisRun { + rateLimitedChanges = append(rateLimitedChanges, &models.ConfigChange{ + ConfigID: configID, + Summary: "Changes on this config has been rate limited", + ChangeType: ChangeTypeTooManyChanges, + ExternalChangeId: uuid.New().String(), + }) + } + + if err := ctx.DB().CreateInBatches(rateLimitedChanges, configItemsBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create rate limited config changes: %w", err) + } + if len(changesToUpdate) != 0 { if err := ctx.DB().Save(changesToUpdate).Error; err != nil { return fmt.Errorf("failed to update config changes: %w", err)