diff --git a/go.mod b/go.mod index 6c649304..6c90ddca 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/go-stack/stack v1.8.1 github.com/mcuadros/go-defaults v1.2.0 - github.com/redis/go-redis/v9 v9.0.4 github.com/rs/zerolog v1.28.0 github.com/spf13/viper v1.15.0 github.com/theckman/go-flock v0.8.1 @@ -14,8 +13,6 @@ require ( ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gofrs/flock v0.8.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -29,8 +26,8 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.2 // indirect - golang.org/x/sys v0.3.0 // indirect - golang.org/x/text v0.5.0 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.7.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 411cbddb..23056550 100644 --- a/go.sum +++ b/go.sum @@ -38,11 +38,7 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= -github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -54,8 +50,6 @@ github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -164,8 +158,6 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= -github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -335,8 +327,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -344,8 +336,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/client/redis.go b/internal/client/redis.go index 16861c3f..5c1cb463 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -70,6 +70,16 @@ func (r *Redis) DoWithStringReply(args ...string) string { return reply } +func (r *Redis) Do(args ...string) interface{} { + r.Send(args...) + + reply, err := r.Receive() + if err != nil { + log.Panicf(err.Error()) + } + return reply +} + func (r *Redis) Send(args ...string) { argsInterface := make([]interface{}, len(args)) for inx, item := range args { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 23d4ce4b..b9211de0 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -6,39 +6,14 @@ import ( "RedisShake/internal/config" "RedisShake/internal/entry" "RedisShake/internal/log" + "RedisShake/internal/utils" "fmt" "math/bits" + "regexp" "strconv" "strings" ) -type dbKey struct { - db int - key string - isSelect bool -} - -type scanStandaloneReader struct { - isCluster bool - ch chan *entry.Entry - - // client for scan keys - clientScan *client.Redis - - // client for dump keys - keysNeedFetch chan *dbKey - clientDump *client.Redis - clientDumpDbid int - - stat struct { - Name string `json:"name"` - Finished bool `json:"finished"` - DbId int `json:"dbId"` - Cursor uint64 `json:"cursor"` - PercentByDbId string `json:"percent"` - } -} - type ScanReaderOptions struct { Cluster bool `mapstructure:"cluster" default:"false"` Address string `mapstructure:"address" default:""` @@ -48,108 +23,157 @@ type ScanReaderOptions struct { KSN bool `mapstructure:"ksn" default:"false"` } +type dbKey struct { + db int + key string +} + +type scanStandaloneReader struct { + dbs []int + opts *ScanReaderOptions + ch chan *entry.Entry + keyQueue *utils.UniqueQueue + + stat struct { + Name string `json:"name"` + ScanFinished bool `json:"scan_finished"` + ScanDbId int `json:"scan_dbId"` + ScanCursor uint64 `json:"scan_cursor"` + ScanPercentByDbId string `json:"scan_percent"` + NeedUpdateCount int64 `json:"need_update_count"` + } +} + func NewScanStandaloneReader(opts *ScanReaderOptions) Reader { r := new(scanStandaloneReader) + // dbs + c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) + if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node + r.dbs = []int{0} + } else { + r.dbs = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + } + r.opts = opts + r.ch = make(chan *entry.Entry, 1024) r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) - r.clientScan = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) - r.clientDump = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) - r.isCluster = r.clientScan.IsCluster() // not use opts.Cluster, because user may use standalone mode to scan a cluster node + r.keyQueue = utils.NewUniqueQueue(100000) // cache 100000 keys return r } func (r *scanStandaloneReader) StartRead() chan *entry.Entry { - r.ch = make(chan *entry.Entry, 1024) - r.keysNeedFetch = make(chan *dbKey, 1024) + r.subscript() go r.scan() go r.fetch() return r.ch } -func (r *scanStandaloneReader) scan() { - scanDbIdUpper := 15 - if r.isCluster { - log.Infof("scanStandaloneReader node are in cluster mode, only scan db 0") - scanDbIdUpper = 0 +func (r *scanStandaloneReader) subscript() { + if !r.opts.KSN { + return } - for dbId := 0; dbId <= scanDbIdUpper; dbId++ { - if !r.isCluster { - reply := r.clientScan.DoWithStringReply("SELECT", strconv.Itoa(dbId)) + c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c.Send("psubscribe", "__keyevent@*__:*") + + go func() { + _, err := c.Receive() + if err != nil { + log.Panicf(err.Error()) + } + regex := regexp.MustCompile(`\d+`) + for { + resp, err := c.Receive() + if err != nil { + log.Panicf(err.Error()) + } + key := resp.([]interface{})[3].(string) + dbId := regex.FindString(resp.([]interface{})[2].(string)) + dbIdInt, err := strconv.Atoi(dbId) + if err != nil { + log.Panicf(err.Error()) + } + r.keyQueue.Put(dbKey{db: dbIdInt, key: key}) + } + }() +} + +func (r *scanStandaloneReader) scan() { + c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + for dbId := range r.dbs { + if dbId != 0 { + reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId)) if reply != "OK" { log.Panicf("scanStandaloneReader select db failed. db=[%d]", dbId) } - - r.clientDump.Send("SELECT", strconv.Itoa(dbId)) - r.keysNeedFetch <- &dbKey{dbId, "", true} } var cursor uint64 = 0 for { var keys []string - cursor, keys = r.clientScan.Scan(cursor) + cursor, keys = c.Scan(cursor) for _, key := range keys { - r.clientDump.Send("DUMP", key) - r.clientDump.Send("PTTL", key) - r.keysNeedFetch <- &dbKey{dbId, key, false} + r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer } // stat - r.stat.Cursor = cursor - r.stat.DbId = dbId - r.stat.PercentByDbId = fmt.Sprintf("%.2f%%", float64(bits.Reverse64(cursor))/float64(^uint(0))*100) + r.stat.ScanCursor = cursor + r.stat.ScanDbId = dbId + r.stat.ScanPercentByDbId = fmt.Sprintf("%.2f%%", float64(bits.Reverse64(cursor))/float64(^uint(0))*100) if cursor == 0 { break } } } - r.stat.Finished = true - close(r.keysNeedFetch) + r.stat.ScanFinished = true + if !r.opts.KSN { + r.keyQueue.Close() + } } func (r *scanStandaloneReader) fetch() { - var id uint64 = 0 - for item := range r.keysNeedFetch { - if item.isSelect { - // select - receive, err := client.String(r.clientDump.Receive()) - if err != nil { - log.Panicf("scanStandaloneReader select db failed. db=[%d], err=[%v]", item.db, err) - } - if receive != "OK" { - log.Panicf("scanStandaloneReader select db failed. db=[%d]", item.db) + nowDbId := 0 + c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + for item := range r.keyQueue.Ch { + r.stat.NeedUpdateCount = int64(r.keyQueue.Len()) + dbId := item.(dbKey).db + key := item.(dbKey).key + if nowDbId != dbId { + reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId)) + if reply != "OK" { + log.Panicf("scanStandaloneReader select db failed. db=[%d]", dbId) } + nowDbId = dbId + } + // dump + c.Send("DUMP", key) + c.Send("PTTL", key) + iDump, err1 := c.Receive() + iPttl, err2 := c.Receive() + if err1 == proto.Nil { + continue // key not exist + } else if err1 != nil { + log.Panicf(err1.Error()) + } else if err2 != nil { + log.Panicf(err2.Error()) + } + dump := iDump.(string) + pttl := int(iPttl.(int64)) + if pttl == -2 { + continue // key not exist + } + if pttl == -1 { + pttl = 0 // -1 means no expire + } + if uint64(len(dump)) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen { + log.Panicf("not support large key. key=[%s], len=[%d]", key, len(dump)) } else { - // dump - receive, err := client.String(r.clientDump.Receive()) - if err != proto.Nil && err != nil { // error! - log.Panicf(err.Error()) - } - - // pttl - pttl, pttlErr := client.Int64(r.clientDump.Receive()) - if pttlErr != nil { // error! - log.Panicf(pttlErr.Error()) - } - if pttl < 0 { - pttl = 0 - } - - if err == proto.Nil { // key not exist - continue - } - - id += 1 - argv := []string{"RESTORE", item.key, strconv.FormatInt(pttl, 10), receive} - if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" { - argv = append(argv, "replace") - } - log.Debugf("[%s] send command: [%v], dbid: [%v]", r.stat.Name, argv, item.db) r.ch <- &entry.Entry{ - DbId: item.db, - Argv: argv, + DbId: dbId, + Argv: []string{"RESTORE", key, strconv.Itoa(pttl), dump}, } } } + log.Infof("[%s] scanStandaloneReader fetch finished.", r.stat.Name) close(r.ch) } @@ -159,12 +183,12 @@ func (r *scanStandaloneReader) Status() interface{} { } func (r *scanStandaloneReader) StatusString() string { - if r.stat.Finished { - return fmt.Sprintf("finished") + if r.stat.ScanFinished { + return fmt.Sprintf("need_update_count=[%d]", r.stat.NeedUpdateCount) } - return fmt.Sprintf("dbid=[%d], percent=[%s]", r.stat.DbId, r.stat.PercentByDbId) + return fmt.Sprintf("scan_dbid=[%d], scan_percent=[%s], need_update_count=[%d]", r.stat.ScanDbId, r.stat.ScanPercentByDbId, r.stat.NeedUpdateCount) } func (r *scanStandaloneReader) StatusConsistent() bool { - return r.stat.Finished + return r.stat.ScanFinished && r.stat.NeedUpdateCount == 0 } diff --git a/internal/utils/UniqueQueue.go b/internal/utils/UniqueQueue.go new file mode 100644 index 00000000..140f8202 --- /dev/null +++ b/internal/utils/UniqueQueue.go @@ -0,0 +1,50 @@ +package utils + +import ( + "sync" +) + +type UniqueQueue struct { + innerChannel chan interface{} + set map[interface{}]bool + lock sync.Mutex + closed bool + Ch chan interface{} +} + +func NewUniqueQueue(size int) *UniqueQueue { + mc := new(UniqueQueue) + mc.innerChannel = make(chan interface{}, size) + mc.Ch = make(chan interface{}) + mc.set = make(map[interface{}]bool) + go func() { + for item := range mc.innerChannel { + mc.lock.Lock() + delete(mc.set, item) + mc.lock.Unlock() + mc.Ch <- item + } + close(mc.Ch) + }() + return mc +} + +func (mc *UniqueQueue) Put(item interface{}) { + mc.lock.Lock() + if _, ok := mc.set[item]; ok { + mc.lock.Unlock() + return + } else { + mc.set[item] = true + mc.lock.Unlock() + mc.innerChannel <- item + } +} + +func (mc *UniqueQueue) Len() int { + return len(mc.innerChannel) +} + +func (mc *UniqueQueue) Close() { + close(mc.innerChannel) +}