Skip to content

Commit

Permalink
reduce source pressure when use scan (#823)
Browse files Browse the repository at this point in the history
* update else if to switch

* update else if to switch

* 添加动态识别库号

* 性能优化

* 添加scan时候的一次扫描次数控制配置,防止源端cpu高

* rename batch to count

* edit ...string to ...interface

* performance optimization

* fix r.db length is 0
  • Loading branch information
blight19 authored Jun 13, 2024
1 parent 0be518f commit aaeedea
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry

func (r *scanStandaloneReader) subscript() {
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c.Send("psubscribe", "__keyevent@*__:*")
// filter dbs
dbIDmap := make(map[int]struct{})
for _, db := range r.dbs {
dbIDmap[db] = struct{}{}
if len(r.dbs) == 0 {
c.Send("psubscribe", "__keyevent@*__:*")
} else {
strs := make([]string, len(r.dbs))
for i, v := range r.dbs {
strs[i] = strconv.Itoa(v)
}
s := fmt.Sprintf("__keyevent@[%v]__:*", strings.Join(strs, ","))
c.Send("psubscribe", s)
}
_, err := c.Receive()
if err != nil {
Expand All @@ -130,20 +134,16 @@ func (r *scanStandaloneReader) subscript() {
if err != nil {
log.Panicf(err.Error())
}
// if the db is not in the dbs, ignore it
if _, ok := dbIDmap[dbIdInt]; ok {
// handle del action
eventSlice := strings.Split(respSlice[2].(string), ":")
if eventSlice[1] == "del" {
e := entry.NewEntry()
e.DbId = dbIdInt
e.Argv = []string{"DEL", key}
r.ch <- e
continue
}

r.needDumpQueue.Put(dbKey{db: dbIdInt, key: key})
// handle del action
eventSlice := strings.Split(respSlice[2].(string), ":")
if eventSlice[1] == "del" {
e := entry.NewEntry()
e.DbId = dbIdInt
e.Argv = []string{"DEL", key}
r.ch <- e
continue
}
r.needDumpQueue.Put(dbKey{db: dbIdInt, key: key})
}
}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (r *scanStandaloneReader) dump() {
r.dumpClient.Do("READONLY")
log.Infof("running dump() in read-only mode")
}

for item := range r.needDumpQueue.Ch {
r.stat.NeedUpdateCount = int64(r.needDumpQueue.Len())
dbId := item.(dbKey).db
Expand Down

0 comments on commit aaeedea

Please sign in to comment.