From c2212831c83c94b5bbc62ab409bb1a26a82f3cd3 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Wed, 6 Mar 2024 10:46:51 +0800 Subject: [PATCH 1/8] update else if to switch --- cmd/redis-shake/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 0c98de9a..5b5b4e35 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -34,7 +34,8 @@ func main() { // create reader var theReader reader.Reader - if v.IsSet("sync_reader") { + switch { + case v.IsSet("sync_reader"): opts := new(reader.SyncReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("sync_reader", opts) @@ -48,7 +49,7 @@ func main() { theReader = reader.NewSyncStandaloneReader(ctx, opts) log.Infof("create SyncStandaloneReader: %v", opts.Address) } - } else if v.IsSet("scan_reader") { + case v.IsSet("scan_reader"): opts := new(reader.ScanReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("scan_reader", opts) @@ -62,7 +63,7 @@ func main() { theReader = reader.NewScanStandaloneReader(ctx, opts) log.Infof("create ScanStandaloneReader: %v", opts.Address) } - } else if v.IsSet("rdb_reader") { + case v.IsSet("rdb_reader"): opts := new(reader.RdbReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("rdb_reader", opts) @@ -71,7 +72,7 @@ func main() { } theReader = reader.NewRDBReader(opts) log.Infof("create RdbReader: %v", opts.Filepath) - } else if v.IsSet("aof_reader") { + case v.IsSet("aof_reader"): opts := new(reader.AOFReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("aof_reader", opts) @@ -80,10 +81,9 @@ func main() { } theReader = reader.NewAOFReader(opts) log.Infof("create AOFReader: %v", opts.Filepath) - } else { + default: log.Panicf("no reader config entry found") } - // create writer var theWriter writer.Writer if v.IsSet("redis_writer") { From d80c68becff7a0aa646f6e09d2252251ad4c8564 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Wed, 6 Mar 2024 15:45:07 +0800 Subject: [PATCH 2/8] update else if to switch --- cmd/redis-shake/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 5b5b4e35..91ab37bc 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -86,7 +86,8 @@ func main() { } // create writer var theWriter writer.Writer - if v.IsSet("redis_writer") { + switch { + case v.IsSet("redis_writer"): opts := new(writer.RedisWriterOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("redis_writer", opts) @@ -109,10 +110,9 @@ func main() { entry.Argv = []string{"FLUSHALL"} theWriter.Write(entry) } - } else { + default: log.Panicf("no writer config entry found") } - // create status status.Init(theReader, theWriter) From d84f6ffe088dfd7c92641dd0a886df0d831868e8 Mon Sep 17 00:00:00 2001 From: blight Date: Tue, 19 Mar 2024 09:26:35 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E8=AF=86=E5=88=AB=E5=BA=93=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/reader/scan_standalone_reader.go | 7 ++++++- internal/utils/parse.go | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 internal/utils/parse.go diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 2b131594..43178991 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -58,7 +58,12 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade r.dbs = []int{0} } else { if len(opts.DBS) == 0 { - r.dbs = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + c.Send("info", "keyspace") + info, err := c.Receive() + if err != nil { + log.Panicf(err.Error()) + } + r.dbs = utils.ParseDBs(info.(string)) } else { r.dbs = opts.DBS } diff --git a/internal/utils/parse.go b/internal/utils/parse.go new file mode 100644 index 00000000..3109cfa8 --- /dev/null +++ b/internal/utils/parse.go @@ -0,0 +1,19 @@ +package utils + +import ( + "regexp" + "strconv" +) + +func ParseDBs(s string) []int { + dbsString := regexp.MustCompile(`db(\d+):`).FindAllStringSubmatch(s, -1) + if dbsString == nil { + return []int{} + } + dbs := make([]int, len(dbsString)) + for i, dbString := range dbsString { + db, _ := strconv.Atoi(dbString[1]) + dbs[i] = db + } + return dbs +} From 36915f0f2984e73e26be4e2392b38b85fe5a1f02 Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 09:20:38 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E6=80=A7=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/reader/scan_standalone_reader.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 43178991..e18121a3 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -106,8 +106,9 @@ func (r *scanStandaloneReader) subscript() { if err != nil { log.Panicf(err.Error()) } - key := resp.([]interface{})[3].(string) - dbId := regex.FindString(resp.([]interface{})[2].(string)) + respSlice := resp.([]interface{}) + key := respSlice[3].(string) + dbId := regex.FindString(respSlice[2].(string)) dbIdInt, err := strconv.Atoi(dbId) if err != nil { log.Panicf(err.Error()) From f286da4ea73329404b3b91d354d137f93e0053ca Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 09:22:58 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0scan=E6=97=B6=E5=80=99?= =?UTF-8?q?=E7=9A=84=E4=B8=80=E6=AC=A1=E6=89=AB=E6=8F=8F=E6=AC=A1=E6=95=B0?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E9=85=8D=E7=BD=AE=EF=BC=8C=E9=98=B2=E6=AD=A2?= =?UTF-8?q?=E6=BA=90=E7=AB=AFcpu=E9=AB=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/client/redis.go | 4 ++-- internal/reader/scan_standalone_reader.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index c884deeb..2d43ff48 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -148,8 +148,8 @@ func (r *Redis) Close() { /* Commands */ -func (r *Redis) Scan(cursor uint64) (newCursor uint64, keys []string) { - r.Send("scan", strconv.FormatUint(cursor, 10), "count", "2048") +func (r *Redis) Scan(cursor uint64, batch string) (newCursor uint64, keys []string) { + r.Send("scan", strconv.FormatUint(cursor, 10), "count", batch) reply, err := r.Receive() if err != nil { log.Panicf(err.Error()) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index e18121a3..55d66988 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -26,6 +26,7 @@ type ScanReaderOptions struct { KSN bool `mapstructure:"ksn" default:"false"` DBS []int `mapstructure:"dbs"` PreferReplica bool `mapstructure:"prefer_replica" default:"false"` + Batch int `mapstructure:"batch" default:"2048"` } type dbKey struct { @@ -131,9 +132,10 @@ func (r *scanStandaloneReader) scan() { } var cursor uint64 = 0 + batch := strconv.Itoa(r.opts.Batch) for { var keys []string - cursor, keys = c.Scan(cursor) + cursor, keys = c.Scan(cursor, batch) for _, key := range keys { r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer } From 55b41f51f1c81fe5b4209f58b687d5bf145d41e3 Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 15:35:57 +0800 Subject: [PATCH 6/8] rename batch to count --- internal/client/redis.go | 4 ++-- internal/reader/scan_standalone_reader.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index 2d43ff48..583897ad 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -148,8 +148,8 @@ func (r *Redis) Close() { /* Commands */ -func (r *Redis) Scan(cursor uint64, batch string) (newCursor uint64, keys []string) { - r.Send("scan", strconv.FormatUint(cursor, 10), "count", batch) +func (r *Redis) Scan(cursor uint64, count string) (newCursor uint64, keys []string) { + r.Send("scan", strconv.FormatUint(cursor, 10), "count", count) reply, err := r.Receive() if err != nil { log.Panicf(err.Error()) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 55d66988..c9c14a75 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -26,7 +26,7 @@ type ScanReaderOptions struct { KSN bool `mapstructure:"ksn" default:"false"` DBS []int `mapstructure:"dbs"` PreferReplica bool `mapstructure:"prefer_replica" default:"false"` - Batch int `mapstructure:"batch" default:"2048"` + Count int `mapstructure:"count" default:"2048"` } type dbKey struct { @@ -132,10 +132,10 @@ func (r *scanStandaloneReader) scan() { } var cursor uint64 = 0 - batch := strconv.Itoa(r.opts.Batch) + count := strconv.Itoa(r.opts.Count) for { var keys []string - cursor, keys = c.Scan(cursor, batch) + cursor, keys = c.Scan(cursor, count) for _, key := range keys { r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer } From 1ef449dfc99b8895e576d21658b284e6254fe5fa Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 15:57:34 +0800 Subject: [PATCH 7/8] edit ...string to ...interface --- internal/client/redis.go | 8 ++++---- internal/reader/scan_standalone_reader.go | 2 +- internal/reader/sync_standalone_reader.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index 583897ad..7d75dc2c 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -71,7 +71,7 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo return r } -func (r *Redis) DoWithStringReply(args ...string) string { +func (r *Redis) DoWithStringReply(args ...interface{}) string { r.Send(args...) replyInterface, err := r.Receive() @@ -82,7 +82,7 @@ func (r *Redis) DoWithStringReply(args ...string) string { return reply } -func (r *Redis) Do(args ...string) interface{} { +func (r *Redis) Do(args ...interface{}) interface{} { r.Send(args...) reply, err := r.Receive() @@ -92,7 +92,7 @@ func (r *Redis) Do(args ...string) interface{} { return reply } -func (r *Redis) Send(args ...string) { +func (r *Redis) Send(args ...interface{}) { argsInterface := make([]interface{}, len(args)) for inx, item := range args { argsInterface[inx] = item @@ -148,7 +148,7 @@ func (r *Redis) Close() { /* Commands */ -func (r *Redis) Scan(cursor uint64, count string) (newCursor uint64, keys []string) { +func (r *Redis) Scan(cursor uint64, count int) (newCursor uint64, keys []string) { r.Send("scan", strconv.FormatUint(cursor, 10), "count", count) reply, err := r.Receive() if err != nil { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index c9c14a75..0f04792d 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -132,7 +132,7 @@ func (r *scanStandaloneReader) scan() { } var cursor uint64 = 0 - count := strconv.Itoa(r.opts.Count) + count := r.opts.Count for { var keys []string cursor, keys = c.Scan(cursor, count) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 9dc7a8c4..bab36105 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -116,7 +116,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry func (r *syncStandaloneReader) sendReplconfListenPort() { // use status_port as redis-shake port - argv := []string{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} + argv := []interface{}{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} r.client.Send(argv...) _, err := r.client.Receive() if err != nil { @@ -126,9 +126,9 @@ func (r *syncStandaloneReader) sendReplconfListenPort() { func (r *syncStandaloneReader) sendPSync() { // send PSync - argv := []string{"PSYNC", "?", "-1"} + argv := []interface{}{"PSYNC", "?", "-1"} if config.Opt.Advanced.AwsPSync != "" { - argv = []string{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} + argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} } r.client.Send(argv...) From d4291a7fcc8fb716f39becb3a92a32774c54f539 Mon Sep 17 00:00:00 2001 From: blight Date: Fri, 22 Mar 2024 10:53:23 +0800 Subject: [PATCH 8/8] fix bug:ksn --- internal/reader/scan_standalone_reader.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 0f04792d..c9a21ab3 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -90,7 +90,11 @@ func (r *scanStandaloneReader) subscript() { } c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) c.Send("psubscribe", "__keyevent@*__:*") - + // filter dbs + dbIDmap := make(map[int]struct{}) + for _, db := range r.dbs { + dbIDmap[db] = struct{}{} + } go func() { _, err := c.Receive() if err != nil { @@ -114,7 +118,10 @@ func (r *scanStandaloneReader) subscript() { if err != nil { log.Panicf(err.Error()) } - r.keyQueue.Put(dbKey{db: dbIdInt, key: key}) + // if the db is not in the dbs, ignore it + if _, ok := dbIDmap[dbIdInt]; ok { + r.keyQueue.Put(dbKey{db: dbIdInt, key: key}) + } } } }()