From 5f43ea748591173dccdde6e848594881bc953bf6 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Sun, 26 May 2024 22:54:08 +0545 Subject: [PATCH] feat: post insertion rate limitation --- db/changes.go | 228 +------------------------------------ db/changes_rate_limit.go | 162 ++++++++++++++++++++++++++ db/models/config_change.go | 11 +- db/update.go | 87 +++++++++++--- go.mod | 3 +- go.sum | 6 +- pkg/ratelimit/ratelimit.go | 48 -------- 7 files changed, 242 insertions(+), 303 deletions(-) create mode 100644 db/changes_rate_limit.go delete mode 100644 pkg/ratelimit/ratelimit.go diff --git a/db/changes.go b/db/changes.go index c4a954cf..209aaea3 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1,27 +1,6 @@ package db -import ( - "sync" - "time" - - sw "github.com/RussellLuo/slidingwindow" - "github.com/patrickmn/go-cache" - "github.com/samber/lo" - - "github.com/flanksource/commons/collections" - "github.com/flanksource/config-db/api" - "github.com/flanksource/config-db/db/models" - "github.com/flanksource/config-db/pkg/ratelimit" -) - -const ( - rateLimitWindow = time.Hour * 4 - maxChangesInWindow = 100 - - ChangeTypeTooManyChanges = "TooManyChanges" -) - -var configChangesCache = cache.New(time.Hour*24, time.Hour*24) +import "github.com/flanksource/config-db/api" func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { var count int64 @@ -31,208 +10,3 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error Error return count, err } - -var ( - scraperLocks = sync.Map{} - configRateLimiters = map[string]*sw.Limiter{} - - // List of configs that have been rate limited. - // It's used to avoid inserting mutliple "TooManyChanges" changes for the same config. - rateLimitedConfigsPerScraper = sync.Map{} -) - -func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { - if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { - return newChanges, nil, nil - } - - window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) - max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) - scraperID := ctx.ScrapeConfig().GetPersistedID().String() - - lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) - lock.(*sync.Mutex).Lock() - defer lock.(*sync.Mutex).Unlock() - - _rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) - rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) - - if !loaded { - // This is the initial sync of the rate limiter with the database. - // Happens only once for each scrape config. - if err := syncWindow(ctx, max, window); err != nil { - return nil, nil, err - } - - if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { - return nil, nil, err - } else { - rateLimitedConfigs = rlConfigs - } - } - - rateLimitedThisRun := map[string]struct{}{} - passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges)) - for _, change := range newChanges { - rateLimiter, ok := configRateLimiters[change.ConfigID] - if !ok { - rl, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { - return sw.NewLocalWindow() - }) - configRateLimiters[change.ConfigID] = rl - rateLimiter = rl - } - - if !rateLimiter.Allow() { - ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType) - rateLimitedThisRun[change.ConfigID] = struct{}{} - continue - } else { - delete(rateLimitedConfigs, change.ConfigID) - } - - passingNewChanges = append(passingNewChanges, change) - } - - var newlyRateLimited []string - for configID := range rateLimitedThisRun { - if _, ok := rateLimitedConfigs[configID]; !ok { - newlyRateLimited = append(newlyRateLimited, configID) - } - } - - rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun) - rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) - - return passingNewChanges, newlyRateLimited, nil -} - -func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { - query := `WITH latest_changes AS ( - SELECT - DISTINCT ON (config_id) config_id, - change_type - FROM - config_changes - LEFT JOIN config_items ON config_items.id = config_changes.config_id - WHERE - scraper_id = ? - AND NOW() - config_changes.created_at <= ? - ORDER BY - config_id, - config_changes.created_at DESC - ) SELECT config_id FROM latest_changes WHERE change_type = ?` - rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() - if err != nil { - return nil, err - } - defer rows.Close() - - output := make(map[string]struct{}) - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - return nil, err - } - - ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id) - output[id] = struct{}{} - } - - return output, rows.Err() -} - -// syncWindow syncs the rate limit window for the scraper with the changes in the db. -func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { - query := `SELECT - config_id, - COUNT(*), - MIN(config_changes.created_at) AS min_created_at - FROM - config_changes - LEFT JOIN config_items ON config_items.id = config_changes.config_id - WHERE - scraper_id = ? - AND change_type != ? - AND NOW() - config_changes.created_at <= ? - GROUP BY - config_id` - rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var configID string - var count int - var earliest time.Time - if err := rows.Scan(&configID, &count, &earliest); err != nil { - return err - } - ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) - - rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { - win, stopper := ratelimit.NewLocalWindow() - if count > 0 { - win.SetStart(earliest) - win.AddCount(int64(count)) - } - return win, stopper - }) - configRateLimiters[configID] = rateLimiter - } - - return rows.Err() -} - -// filterOutPersistedChanges returns only those changes that weren't seen in the db. -func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) { - // use cache to filter out ones that we've already seen before - changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { - _, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId) - if found { - _ = found - } - return !found - }) - - if len(changes) == 0 { - return nil, nil - } - - query := `SELECT config_id, external_change_id - FROM config_changes - WHERE (config_id, external_change_id) IN ?` - args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string { - return []string{c.ConfigID, c.ExternalChangeId} - }) - - rows, err := ctx.DB().Raw(query, args).Rows() - if err != nil { - return nil, err - } - defer rows.Close() - - existing := make(map[string]struct{}) - for rows.Next() { - var configID, externalChangeID string - if err := rows.Scan(&configID, &externalChangeID); err != nil { - return nil, err - } - - configChangesCache.SetDefault(configID+externalChangeID, struct{}{}) - existing[configID+externalChangeID] = struct{}{} - } - - newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { - _, found := existing[c.ConfigID+c.ExternalChangeId] - return !found - }) - - if len(newOnes) > 0 { - _ = query - } - - return newOnes, nil -} diff --git a/db/changes_rate_limit.go b/db/changes_rate_limit.go new file mode 100644 index 00000000..15c41faf --- /dev/null +++ b/db/changes_rate_limit.go @@ -0,0 +1,162 @@ +package db + +import ( + "sync" + "time" + + "github.com/flanksource/commons/collections" + sw "github.com/flanksource/slidingwindow" + + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db/models" +) + +const ( + rateLimitWindow = time.Hour * 4 + maxChangesInWindow = 100 + + ChangeTypeTooManyChanges = "TooManyChanges" +) + +var ( + scraperLocks = sync.Map{} + configRateLimiters = map[string]*sw.Limiter{} + + // List of configs that are currently in being rate limited. + // It's used to avoid inserting multiple "TooManyChanges" changes for the same config. + currentlyRateLimitedConfigsPerScraper = sync.Map{} +) + +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { + if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { + return newChanges, nil, nil + } + + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + scraperID := ctx.ScrapeConfig().GetPersistedID().String() + + lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) + lock.(*sync.Mutex).Lock() + defer lock.(*sync.Mutex).Unlock() + + _rateLimitedConfigs, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) + rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) + + if !loaded { + // This is the initial sync of the rate limiter with the database. + // Happens only once for each scrape config. + if err := syncWindow(ctx, max, window); err != nil { + return nil, nil, err + } + + if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { + return nil, nil, err + } else { + rateLimitedConfigs = rlConfigs + } + } + + var rateLimitedConfigsThisRun = make(map[string]struct{}) + var passingNewChanges = make([]*models.ConfigChange, 0, len(newChanges)) + for _, change := range newChanges { + if _, ok := rateLimitedConfigs[change.ConfigID]; ok { + rateLimitedConfigsThisRun[change.ConfigID] = struct{}{} + } else { + passingNewChanges = append(passingNewChanges, change) + } + } + + // Find those changes that were rate limited only this run but + // weren't previously in the rate limited state. + var newlyRateLimited []string + for configID := range rateLimitedConfigsThisRun { + if _, ok := rateLimitedConfigs[configID]; !ok { + newlyRateLimited = append(newlyRateLimited, configID) + } + } + + rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedConfigsThisRun) + currentlyRateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) + + return passingNewChanges, newlyRateLimited, nil +} + +func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { + query := `WITH latest_changes AS ( + SELECT + DISTINCT ON (config_id) config_id, + change_type + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND NOW() - config_changes.created_at <= ? + ORDER BY + config_id, + config_changes.created_at DESC + ) SELECT config_id FROM latest_changes WHERE change_type = ?` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() + if err != nil { + return nil, err + } + defer rows.Close() + + output := make(map[string]struct{}) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + + ctx.Logger.V(3).Infof("config %s is currently found to be rate limited", id) + output[id] = struct{}{} + } + + return output, rows.Err() +} + +// syncWindow syncs the rate limit window for the scraper with the changes in the db. +func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { + query := `SELECT + config_id, + COUNT(*), + MIN(config_changes.created_at) AS min_created_at + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND change_type != ? + AND NOW() - config_changes.created_at <= ? + GROUP BY + config_id` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return err + } + ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := sw.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper + }) + configRateLimiters[configID] = rateLimiter + } + + return rows.Err() +} diff --git a/db/models/config_change.go b/db/models/config_change.go index ae2534f6..8545db14 100644 --- a/db/models/config_change.go +++ b/db/models/config_change.go @@ -5,8 +5,8 @@ import ( "time" v1 "github.com/flanksource/config-db/api/v1" - "github.com/google/uuid" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // ConfigChange represents the config change database table @@ -14,8 +14,8 @@ type ConfigChange struct { ExternalID string `gorm:"-"` ConfigType string `gorm:"-"` ExternalChangeId string `gorm:"column:external_change_id" json:"external_change_id"` - ID string `gorm:"primaryKey;unique_index;not null;column:id" json:"id"` - ConfigID string `gorm:"column:config_id;default:''" json:"config_id"` + ID string `gorm:"primaryKey;unique_index;not null;column:id;default:generate_ulid()" json:"id"` + ConfigID string `gorm:"column:config_id" json:"config_id"` ChangeType string `gorm:"column:change_type" json:"change_type"` Diff *string `gorm:"column:diff" json:"diff,omitempty"` Severity string `gorm:"column:severity" json:"severity"` @@ -61,9 +61,6 @@ func NewConfigChangeFromV1(result v1.ScrapeResult, change v1.ChangeResult) *Conf } func (c *ConfigChange) BeforeCreate(tx *gorm.DB) (err error) { - if c.ID == "" { - c.ID = uuid.New().String() - } - + tx.Statement.AddClause(clause.OnConflict{DoNothing: true}) return } diff --git a/db/update.go b/db/update.go index d3de2695..e07fb9c7 100644 --- a/db/update.go +++ b/db/update.go @@ -6,6 +6,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/aws/smithy-go/ptr" @@ -21,6 +22,7 @@ import ( dutyContext "github.com/flanksource/duty/context" dutyModels "github.com/flanksource/duty/models" "github.com/flanksource/gomplate/v3" + sw "github.com/flanksource/slidingwindow" "github.com/google/uuid" "github.com/hexops/gotextdiff" "github.com/hexops/gotextdiff/myers" @@ -31,7 +33,11 @@ import ( "gorm.io/gorm/clause" ) -const configItemsBulkInsertSize = 200 +const ( + configItemsBulkInsertSize = 200 + + configChangesBulkInsertSize = 200 +) func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} @@ -209,8 +215,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { var ( - toInsert = []*models.ConfigChange{} - toUpdate = []*models.ConfigChange{} + newOnes = []*models.ConfigChange{} + updates = []*models.ConfigChange{} ) changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) @@ -267,19 +273,13 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C } if changeResult.UpdateExisting { - toUpdate = append(toUpdate, change) + updates = append(updates, change) } else { - toInsert = append(toInsert, change) + newOnes = append(newOnes, change) } } - // Remove the changes that have already been inserted. - newOnes, err := filterOutPersistedChanges(ctx, toInsert) - if err != nil { - return nil, nil, err - } - - return newOnes, toUpdate, nil + return newOnes, updates, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { @@ -367,10 +367,46 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to rate limit changes: %w", err) } - if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { - return fmt.Errorf("failed to create config changes: %w", err) + // NOTE: Returning just the needed columns didn't work in gorm for some reason. + // So, returning * for now. + // returnClause := clause.Returning{Columns: []clause.Column{{Name: "id"}, {Name: "config_id"}, {Name: "external_change_id"}}} + returnClause := clause.Returning{} + + // NOTE: Ran into a weird gorm behavior where CreateInBatches combined with a return clause + // panics. So, handling batching manually. + var rateLimitedAfterInsertion = map[string]struct{}{} + for _, batch := range lo.Chunk(newChanges, configChangesBulkInsertSize) { + if err := ctx.DB().Clauses(returnClause).Create(&batch).Error; err != nil { + return fmt.Errorf("failed to create config changes: %w", err) + } + + if len(batch) != 0 { + // the `batch` slice now only contains those changes that were actually inserted. + // we increase the usage count on the rate limiter for those changes. + _l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{}) + lock := _l.(*sync.Mutex) + lock.Lock() + for _, b := range batch { + rateLimiter, ok := configRateLimiters[b.ConfigID] + if !ok { + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + rateLimiter, _ = sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + return sw.NewLocalWindow() + }) + configRateLimiters[b.ConfigID] = rateLimiter + } + + if rateLimiter.Allow() { + rateLimitedAfterInsertion[b.ConfigID] = struct{}{} + } + } + lock.Unlock() + } } + syncCurrentlyRatelimitedConfigMap(ctx, newChanges, rateLimitedAfterInsertion) + // For all the rate limited configs, we add a new "TooManyChanges" change. // This is intentionally inserted in a different batch from the new changes // as "ChangeTypeTooManyChanges" will have the same created_at timestamp. @@ -385,7 +421,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { }) } - if err := ctx.DB().CreateInBatches(rateLimitedChanges, configItemsBulkInsertSize).Error; err != nil { + if err := ctx.DB().CreateInBatches(rateLimitedChanges, configChangesBulkInsertSize).Error; err != nil { return fmt.Errorf("failed to create rate limited config changes: %w", err) } @@ -440,6 +476,27 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return nil } +func syncCurrentlyRatelimitedConfigMap(ctx api.ScrapeContext, insertedChanged []*models.ConfigChange, rateLimitedAfterInsertion map[string]struct{}) { + _l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{}) + lock := _l.(*sync.Mutex) + lock.Lock() + + m, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), map[string]struct{}{}) + mm := m.(map[string]struct{}) + for _, c := range insertedChanged { + if _, ok := rateLimitedAfterInsertion[c.ConfigID]; !ok { + // All the configs that weren't rate limited, will need to be removed from the + // "currently rate limited" map + delete(mm, c.ConfigID) + } else { + mm[c.ConfigID] = struct{}{} + } + } + + currentlyRateLimitedConfigsPerScraper.Store(ctx.ScrapeConfig().GetPersistedID().String(), mm) + lock.Unlock() +} + func updateLastScrapedTime(ctx api.ScrapeContext, ids []string) error { if len(ids) == 0 { return nil diff --git a/go.mod b/go.mod index 8403a33e..d79179cc 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager v1.0.0 github.com/Jeffail/gabs/v2 v2.7.0 - github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/config v1.18.25 github.com/aws/aws-sdk-go-v2/credentials v1.13.24 @@ -47,6 +46,7 @@ require ( github.com/flanksource/is-healthy v1.0.7 github.com/flanksource/ketall v1.1.6 github.com/flanksource/mapstructure v1.6.0 + github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b github.com/go-logr/zapr v1.2.4 github.com/gobwas/glob v0.2.3 github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a @@ -118,7 +118,6 @@ require ( github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/inflect v0.19.0 // indirect - github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect diff --git a/go.sum b/go.sum index 2452d2e3..f080de42 100644 --- a/go.sum +++ b/go.sum @@ -672,8 +672,6 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/RaveNoX/go-jsonmerge v1.0.0 h1:2e0nqnadoGUP8rAvcA0hkQelZreVO5X3BHomT2XMrAk= github.com/RaveNoX/go-jsonmerge v1.0.0/go.mod h1:qYM/NA77LhO4h51JJM7Z+xBU3ovqrNIACZe+SkSNVFo= -github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= -github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf h1:+GdVyvpzTy3UFAS1+hbTqm9Mk0U1Xrocm28s/E2GWz0= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf/go.mod h1:FiuynIwe98RFhWI8nZ0dnsldPVsBy9rHH1hn2WYwme4= github.com/WinterYukky/gorm-extra-clause-plugin v0.2.0 h1:s1jobT8PlSyG/FXczfoGSt4r46iPiT4ZShe35k5/2y4= @@ -874,6 +872,8 @@ github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhG github.com/flanksource/mapstructure v1.6.0/go.mod h1:dttg5+FFE2sp4D/CrcPCVqufNDrBggDaM+08nk5S8Ps= github.com/flanksource/postq v0.1.3 h1:eTslG04hwxAvntZv8gIUsXKQPLGeLiRPNkZC+kQdL7c= github.com/flanksource/postq v0.1.3/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8= +github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b h1:zB2nVrRAUgrZQb2eutgKzWd6ld7syPacrY/XQmz7Wks= +github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b/go.mod h1:+hHPT8Yx+8I6i4SF6WwvwRso532uHlPJ1T029dtHFak= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -924,8 +924,6 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go deleted file mode 100644 index fa106bdf..00000000 --- a/pkg/ratelimit/ratelimit.go +++ /dev/null @@ -1,48 +0,0 @@ -package ratelimit - -import ( - "time" - - sw "github.com/RussellLuo/slidingwindow" -) - -// LocalWindow represents a window that ignores sync behavior entirely -// and only stores counters in memory. -// -// NOTE: It's an exact copy of the LocalWindow provided by RussellLuo/slidingwindow -// with an added capability of setting a custom start time. -type LocalWindow struct { - // The start boundary (timestamp in nanoseconds) of the window. - // [start, start + size) - start int64 - - // The total count of events happened in the window. - count int64 -} - -func NewLocalWindow() (*LocalWindow, sw.StopFunc) { - return &LocalWindow{}, func() {} -} - -func (w *LocalWindow) SetStart(s time.Time) { - w.start = s.UnixNano() -} - -func (w *LocalWindow) Start() time.Time { - return time.Unix(0, w.start) -} - -func (w *LocalWindow) Count() int64 { - return w.count -} - -func (w *LocalWindow) AddCount(n int64) { - w.count += n -} - -func (w *LocalWindow) Reset(s time.Time, c int64) { - w.start = s.UnixNano() - w.count = c -} - -func (w *LocalWindow) Sync(now time.Time) {}