Skip to content

Commit

Permalink
Allow to customize the high redis memory water mark
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Nov 17, 2023
1 parent cbfe528 commit 8d3282e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ type SecondaryStorage struct {
Spanner *SpannerConfig
// max number of jobs that storage pumps per batch
MaxJobPumpBatchSize int64
// range from 0 to 1, when the redis memory usage is greater than this value,
// the storage won't pump jobs to redis anymore until the memory usage is lower than this value.
//
// Default is 1, means no limit
HighRedisMemoryWatermark float64
}

func (storage *SecondaryStorage) validate() error {
if storage.HighRedisMemoryWatermark < 0 || storage.HighRedisMemoryWatermark > 1 {
return fmt.Errorf("invalid HighRedisMemoryWatermark: %f, should be between 0 and 1", storage.HighRedisMemoryWatermark)
}
return storage.Spanner.validate()
}

Expand Down Expand Up @@ -156,6 +164,10 @@ func MustLoad(path string) (*Config, error) {
if err := conf.SecondaryStorage.validate(); err != nil {
return nil, err
}
if conf.SecondaryStorage.HighRedisMemoryWatermark == 0 {
// default to 1
conf.SecondaryStorage.HighRedisMemoryWatermark = 1
}
}
return conf, nil
}
11 changes: 7 additions & 4 deletions storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ const (

addJobSuccessStatus = "success"
addJobFailedStatus = "failed"

redisMemoryUsageWatermark = 0.8 // used_memory / max_memory
)

type Manager struct {
cfg *config.Config
wg sync.WaitGroup
mu sync.Mutex
pools map[string]engine.Engine
Expand Down Expand Up @@ -76,6 +75,7 @@ func NewManger(cfg *config.Config) (*Manager, error) {
return nil, fmt.Errorf("create redis client err: %w", err)
}
return &Manager{
cfg: cfg,
redisCli: redisCli,
storage: storage,
pools: make(map[string]engine.Engine),
Expand All @@ -87,7 +87,7 @@ func NewManger(cfg *config.Config) (*Manager, error) {
func (m *Manager) PumpFn(name string, pool engine.Engine, threshold int64) func() bool {
return func() bool {
logger := log.Get().WithField("pool", name)
if isHighRedisMemUsage(m.redisCli) {
if isHighRedisMemUsage(m.redisCli, m.cfg.SecondaryStorage.HighRedisMemoryWatermark) {
logger.Error("High redis usage, storage stops pumping data")
return false
}
Expand Down Expand Up @@ -183,7 +183,10 @@ func (m *Manager) Shutdown() {
m.storage.Close()
}

func isHighRedisMemUsage(cli *redis.Client) bool {
func isHighRedisMemUsage(cli *redis.Client, redisMemoryUsageWatermark float64) bool {
if redisMemoryUsageWatermark == 0 || redisMemoryUsageWatermark >= 1 {
return false
}
memoryInfo, err := cli.Info(context.TODO(), "memory").Result()
if err != nil {
return false
Expand Down

0 comments on commit 8d3282e

Please sign in to comment.