Skip to content

Commit

Permalink
fix: handle duplicate "TooManyChanges" change insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed May 16, 2024
1 parent a7358dd commit fe21c62
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 43 deletions.
157 changes: 115 additions & 42 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"time"

sw "github.com/RussellLuo/slidingwindow"
"github.com/google/uuid"

"github.com/flanksource/commons/collections"
"github.com/flanksource/config-db/api"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/config-db/pkg/ratelimit"
Expand All @@ -15,6 +15,8 @@ import (
const (
rateLimitWindow = time.Hour * 4
maxChangesInWindow = 100

ChangeTypeTooManyChanges = "TooManyChanges"
)

func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) {
Expand All @@ -29,52 +31,44 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error
var (
scraperLocks = sync.Map{}
configRateLimiters = map[string]*sw.Limiter{}

// List of configs that have been rate limited.
// It's used to avoid inserting mutliple "TooManyChanges" changes for the same config.
rateLimitedConfigsPerScraper = sync.Map{}
)

func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) {
if len(newChanges) == 0 {
return nil, nil
func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) {
if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil {
return newChanges, nil, nil
}

lock, loaded := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{})
window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
scraperID := ctx.ScrapeConfig().GetPersistedID().String()

lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{})
lock.(*sync.Mutex).Lock()
defer lock.(*sync.Mutex).Unlock()

window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
_rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{})
rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{})

if !loaded {
// populate the rate limit window for the scraper
query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes
WHERE change_type != 'TooManyChanges'
AND NOW() - created_at <= ? GROUP BY config_id`
rows, err := ctx.DB().Raw(query, window).Rows()
if err != nil {
return nil, err
// This is the initial sync of the rate limiter with the database.
// Happens only once for each scrape config.
if err := syncWindow(ctx, max, window); err != nil {
return nil, nil, err
}

for rows.Next() {
var configID string
var count int
var earliest time.Time
if err := rows.Scan(&configID, &count, &earliest); err != nil {
return nil, err
}

rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
win, stopper := ratelimit.NewLocalWindow()
if count > 0 {
win.SetStart(earliest)
win.AddCount(int64(count))
}
return win, stopper
})
configRateLimiters[configID] = rateLimiter
if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil {
return nil, nil, err
} else {
rateLimitedConfigs = rlConfigs
}
}

rateLimitedThisRun := map[string]struct{}{}
passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges))
rateLimited := map[string]struct{}{}
for _, change := range newChanges {
rateLimiter, ok := configRateLimiters[change.ConfigID]
if !ok {
Expand All @@ -86,23 +80,102 @@ func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange)
}

if !rateLimiter.Allow() {
ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID)
rateLimited[change.ConfigID] = struct{}{}
ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType)
rateLimitedThisRun[change.ConfigID] = struct{}{}
continue
} else {
delete(rateLimitedConfigs, change.ConfigID)
}

passingNewChanges = append(passingNewChanges, change)
}

// For all the rate limited configs, we add a new "TooManyChanges" change
for configID := range rateLimited {
passingNewChanges = append(passingNewChanges, &models.ConfigChange{
ConfigID: configID,
Summary: "Changes on this config has been rate limited",
ChangeType: "TooManyChanges",
ExternalChangeId: uuid.New().String(),
var newlyRateLimited []string
for configID := range rateLimitedThisRun {
if _, ok := rateLimitedConfigs[configID]; !ok {
newlyRateLimited = append(newlyRateLimited, configID)
}
}

rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun)
rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs)

return passingNewChanges, newlyRateLimited, nil
}

func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) {
query := `WITH latest_changes AS (
SELECT
DISTINCT ON (config_id) config_id,
change_type
FROM
config_changes
LEFT JOIN config_items ON config_items.id = config_changes.config_id
WHERE
scraper_id = ?
AND NOW() - config_changes.created_at <= ?
ORDER BY
config_id,
config_changes.created_at DESC
) SELECT config_id FROM latest_changes WHERE change_type = ?`
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows()
if err != nil {
return nil, err
}

output := make(map[string]struct{})
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}

ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id)
output[id] = struct{}{}
}

return output, rows.Err()
}

// syncWindow syncs the rate limit window for the scraper with the changes in the db.
func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
query := `SELECT
config_id,
COUNT(*),
MIN(config_changes.created_at) AS min_created_at
FROM
config_changes
LEFT JOIN config_items ON config_items.id = config_changes.config_id
WHERE
scraper_id = ?
AND change_type != ?
AND NOW() - config_changes.created_at <= ?
GROUP BY
config_id`
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows()
if err != nil {
return err
}

for rows.Next() {
var configID string
var count int
var earliest time.Time
if err := rows.Scan(&configID, &count, &earliest); err != nil {
return err
}
ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window)

rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
win, stopper := ratelimit.NewLocalWindow()
if count > 0 {
win.SetStart(earliest)
win.AddCount(int64(count))
}
return win, stopper
})
configRateLimiters[configID] = rateLimiter
}

return passingNewChanges, nil
return rows.Err()
}
20 changes: 19 additions & 1 deletion db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
}
}

newChanges, err = rateLimitChanges(ctx, newChanges)
newChanges, rateLimitedThisRun, err := rateLimitChanges(ctx, newChanges)
if err != nil {
return fmt.Errorf("failed to rate limit changes: %w", err)
}
Expand All @@ -361,6 +361,24 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
return fmt.Errorf("failed to create config changes: %w", err)
}

// For all the rate limited configs, we add a new "TooManyChanges" change.
// This is intentionally inserted in a different batch from the new changes
// as "ChangeTypeTooManyChanges" will have the same created_at timestamp.
// We want these changes to be newer than the actual changes.
var rateLimitedChanges []*models.ConfigChange
for _, configID := range rateLimitedThisRun {
rateLimitedChanges = append(rateLimitedChanges, &models.ConfigChange{
ConfigID: configID,
Summary: "Changes on this config has been rate limited",
ChangeType: ChangeTypeTooManyChanges,
ExternalChangeId: uuid.New().String(),
})
}

if err := ctx.DB().CreateInBatches(rateLimitedChanges, configItemsBulkInsertSize).Error; err != nil {
return fmt.Errorf("failed to create rate limited config changes: %w", err)
}

if len(changesToUpdate) != 0 {
if err := ctx.DB().Save(changesToUpdate).Error; err != nil {
return fmt.Errorf("failed to update config changes: %w", err)
Expand Down

0 comments on commit fe21c62

Please sign in to comment.