diff --git a/db/changes.go b/db/changes.go index 209aaea3a..68a2a0f13 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1,6 +1,21 @@ package db -import "github.com/flanksource/config-db/api" +import ( + "sync" + "time" + + sw "github.com/RussellLuo/slidingwindow" + "github.com/google/uuid" + + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db/models" + "github.com/flanksource/config-db/pkg/ratelimit" +) + +const ( + rateLimitWindow = time.Hour * 4 + maxChangesInWindow = 100 +) func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { var count int64 @@ -10,3 +25,84 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error Error return count, err } + +var ( + scraperLocks = sync.Map{} + configRateLimiters = map[string]*sw.Limiter{} +) + +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) { + if len(newChanges) == 0 { + return nil, nil + } + + lock, loaded := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{}) + lock.(*sync.Mutex).Lock() + defer lock.(*sync.Mutex).Unlock() + + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + + if !loaded { + // populate the rate limit window for the scraper + query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes + WHERE change_type != 'TooManyChanges' + AND NOW() - created_at <= ? GROUP BY config_id` + rows, err := ctx.DB().Raw(query, window).Rows() + if err != nil { + return nil, err + } + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return nil, err + } + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := ratelimit.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper + }) + configRateLimiters[configID] = rateLimiter + } + } + + passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges)) + rateLimited := map[string]struct{}{} + for _, change := range newChanges { + rateLimiter, ok := configRateLimiters[change.ConfigID] + if !ok { + rl, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + return sw.NewLocalWindow() + }) + configRateLimiters[change.ConfigID] = rl + rateLimiter = rl + } + + if !rateLimiter.Allow() { + ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID) + rateLimited[change.ConfigID] = struct{}{} + continue + } + + passingNewChanges = append(passingNewChanges, change) + } + + // For all the rate limited configs, we add a new "TooManyChanges" change + for configID := range rateLimited { + passingNewChanges = append(passingNewChanges, &models.ConfigChange{ + ConfigID: configID, + Summary: "Changes on this config has been rate limited", + ChangeType: "TooManyChanges", + ExternalChangeId: uuid.New().String(), + }) + } + + return passingNewChanges, nil +} diff --git a/db/update.go b/db/update.go index 1afb1d309..be186927a 100644 --- a/db/update.go +++ b/db/update.go @@ -352,6 +352,11 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { } } + newChanges, err = rateLimitChanges(ctx, newChanges) + if err != nil { + return fmt.Errorf("failed to rate limit changes: %w", err) + } + if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { return fmt.Errorf("failed to create config changes: %w", err) } diff --git a/go.mod b/go.mod index 940668619..ba7cda58b 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager v1.0.0 github.com/Jeffail/gabs/v2 v2.7.0 + github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/config v1.18.25 github.com/aws/aws-sdk-go-v2/credentials v1.13.24 @@ -117,6 +118,7 @@ require ( github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/inflect v0.19.0 // indirect + github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect diff --git a/go.sum b/go.sum index ad1606d12..cbb974ccc 100644 --- a/go.sum +++ b/go.sum @@ -672,6 +672,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/RaveNoX/go-jsonmerge v1.0.0 h1:2e0nqnadoGUP8rAvcA0hkQelZreVO5X3BHomT2XMrAk= github.com/RaveNoX/go-jsonmerge v1.0.0/go.mod h1:qYM/NA77LhO4h51JJM7Z+xBU3ovqrNIACZe+SkSNVFo= +github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= +github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf h1:+GdVyvpzTy3UFAS1+hbTqm9Mk0U1Xrocm28s/E2GWz0= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf/go.mod h1:FiuynIwe98RFhWI8nZ0dnsldPVsBy9rHH1hn2WYwme4= github.com/WinterYukky/gorm-extra-clause-plugin v0.2.0 h1:s1jobT8PlSyG/FXczfoGSt4r46iPiT4ZShe35k5/2y4= @@ -926,6 +928,8 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go new file mode 100644 index 000000000..fa106bdf5 --- /dev/null +++ b/pkg/ratelimit/ratelimit.go @@ -0,0 +1,48 @@ +package ratelimit + +import ( + "time" + + sw "github.com/RussellLuo/slidingwindow" +) + +// LocalWindow represents a window that ignores sync behavior entirely +// and only stores counters in memory. +// +// NOTE: It's an exact copy of the LocalWindow provided by RussellLuo/slidingwindow +// with an added capability of setting a custom start time. +type LocalWindow struct { + // The start boundary (timestamp in nanoseconds) of the window. + // [start, start + size) + start int64 + + // The total count of events happened in the window. + count int64 +} + +func NewLocalWindow() (*LocalWindow, sw.StopFunc) { + return &LocalWindow{}, func() {} +} + +func (w *LocalWindow) SetStart(s time.Time) { + w.start = s.UnixNano() +} + +func (w *LocalWindow) Start() time.Time { + return time.Unix(0, w.start) +} + +func (w *LocalWindow) Count() int64 { + return w.count +} + +func (w *LocalWindow) AddCount(n int64) { + w.count += n +} + +func (w *LocalWindow) Reset(s time.Time, c int64) { + w.start = s.UnixNano() + w.count = c +} + +func (w *LocalWindow) Sync(now time.Time) {}