Skip to content

Commit

Permalink
feature: Add keyspace notifications support to scan_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Aug 22, 2023
1 parent eac40bd commit 037dbe3
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 110 deletions.
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/go-stack/stack v1.8.1
github.com/mcuadros/go-defaults v1.2.0
github.com/redis/go-redis/v9 v9.0.4
github.com/rs/zerolog v1.28.0
github.com/spf13/viper v1.15.0
github.com/theckman/go-flock v0.8.1
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -29,8 +26,8 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 4 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand All @@ -54,8 +50,6 @@ github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -164,8 +158,6 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc=
github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
Expand Down Expand Up @@ -335,17 +327,17 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
10 changes: 10 additions & 0 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ func (r *Redis) DoWithStringReply(args ...string) string {
return reply
}

func (r *Redis) Do(args ...string) interface{} {
r.Send(args...)

reply, err := r.Receive()
if err != nil {
log.Panicf(err.Error())
}
return reply
}

func (r *Redis) Send(args ...string) {
argsInterface := make([]interface{}, len(args))
for inx, item := range args {
Expand Down
210 changes: 117 additions & 93 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,14 @@ import (
"RedisShake/internal/config"
"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
"fmt"
"math/bits"
"regexp"
"strconv"
"strings"
)

type dbKey struct {
db int
key string
isSelect bool
}

type scanStandaloneReader struct {
isCluster bool
ch chan *entry.Entry

// client for scan keys
clientScan *client.Redis

// client for dump keys
keysNeedFetch chan *dbKey
clientDump *client.Redis
clientDumpDbid int

stat struct {
Name string `json:"name"`
Finished bool `json:"finished"`
DbId int `json:"dbId"`
Cursor uint64 `json:"cursor"`
PercentByDbId string `json:"percent"`
}
}

type ScanReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Expand All @@ -48,108 +23,157 @@ type ScanReaderOptions struct {
KSN bool `mapstructure:"ksn" default:"false"`
}

type dbKey struct {
db int
key string
}

type scanStandaloneReader struct {
dbs []int
opts *ScanReaderOptions
ch chan *entry.Entry
keyQueue *utils.UniqueQueue

stat struct {
Name string `json:"name"`
ScanFinished bool `json:"scan_finished"`
ScanDbId int `json:"scan_dbId"`
ScanCursor uint64 `json:"scan_cursor"`
ScanPercentByDbId string `json:"scan_percent"`
NeedUpdateCount int64 `json:"need_update_count"`
}
}

func NewScanStandaloneReader(opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
// dbs
c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.dbs = []int{0}
} else {
r.dbs = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
}
r.opts = opts
r.ch = make(chan *entry.Entry, 1024)
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.clientScan = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.clientDump = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.isCluster = r.clientScan.IsCluster() // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.keyQueue = utils.NewUniqueQueue(100000) // cache 100000 keys
return r
}

func (r *scanStandaloneReader) StartRead() chan *entry.Entry {
r.ch = make(chan *entry.Entry, 1024)
r.keysNeedFetch = make(chan *dbKey, 1024)
r.subscript()
go r.scan()
go r.fetch()
return r.ch
}

func (r *scanStandaloneReader) scan() {
scanDbIdUpper := 15
if r.isCluster {
log.Infof("scanStandaloneReader node are in cluster mode, only scan db 0")
scanDbIdUpper = 0
func (r *scanStandaloneReader) subscript() {
if !r.opts.KSN {
return
}
for dbId := 0; dbId <= scanDbIdUpper; dbId++ {
if !r.isCluster {
reply := r.clientScan.DoWithStringReply("SELECT", strconv.Itoa(dbId))
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c.Send("psubscribe", "__keyevent@*__:*")

go func() {
_, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}
regex := regexp.MustCompile(`\d+`)
for {
resp, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}
key := resp.([]interface{})[3].(string)
dbId := regex.FindString(resp.([]interface{})[2].(string))
dbIdInt, err := strconv.Atoi(dbId)
if err != nil {
log.Panicf(err.Error())
}
r.keyQueue.Put(dbKey{db: dbIdInt, key: key})
}
}()
}

func (r *scanStandaloneReader) scan() {
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
for dbId := range r.dbs {
if dbId != 0 {
reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId))
if reply != "OK" {
log.Panicf("scanStandaloneReader select db failed. db=[%d]", dbId)
}

r.clientDump.Send("SELECT", strconv.Itoa(dbId))
r.keysNeedFetch <- &dbKey{dbId, "", true}
}

var cursor uint64 = 0
for {
var keys []string
cursor, keys = r.clientScan.Scan(cursor)
cursor, keys = c.Scan(cursor)
for _, key := range keys {
r.clientDump.Send("DUMP", key)
r.clientDump.Send("PTTL", key)
r.keysNeedFetch <- &dbKey{dbId, key, false}
r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer
}

// stat
r.stat.Cursor = cursor
r.stat.DbId = dbId
r.stat.PercentByDbId = fmt.Sprintf("%.2f%%", float64(bits.Reverse64(cursor))/float64(^uint(0))*100)
r.stat.ScanCursor = cursor
r.stat.ScanDbId = dbId
r.stat.ScanPercentByDbId = fmt.Sprintf("%.2f%%", float64(bits.Reverse64(cursor))/float64(^uint(0))*100)

if cursor == 0 {
break
}
}
}
r.stat.Finished = true
close(r.keysNeedFetch)
r.stat.ScanFinished = true
if !r.opts.KSN {
r.keyQueue.Close()
}
}

func (r *scanStandaloneReader) fetch() {
var id uint64 = 0
for item := range r.keysNeedFetch {
if item.isSelect {
// select
receive, err := client.String(r.clientDump.Receive())
if err != nil {
log.Panicf("scanStandaloneReader select db failed. db=[%d], err=[%v]", item.db, err)
}
if receive != "OK" {
log.Panicf("scanStandaloneReader select db failed. db=[%d]", item.db)
nowDbId := 0
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
for item := range r.keyQueue.Ch {
r.stat.NeedUpdateCount = int64(r.keyQueue.Len())
dbId := item.(dbKey).db
key := item.(dbKey).key
if nowDbId != dbId {
reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId))
if reply != "OK" {
log.Panicf("scanStandaloneReader select db failed. db=[%d]", dbId)
}
nowDbId = dbId
}
// dump
c.Send("DUMP", key)
c.Send("PTTL", key)
iDump, err1 := c.Receive()
iPttl, err2 := c.Receive()
if err1 == proto.Nil {
continue // key not exist
} else if err1 != nil {
log.Panicf(err1.Error())
} else if err2 != nil {
log.Panicf(err2.Error())
}
dump := iDump.(string)
pttl := int(iPttl.(int64))
if pttl == -2 {
continue // key not exist
}
if pttl == -1 {
pttl = 0 // -1 means no expire
}
if uint64(len(dump)) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen {
log.Panicf("not support large key. key=[%s], len=[%d]", key, len(dump))
} else {
// dump
receive, err := client.String(r.clientDump.Receive())
if err != proto.Nil && err != nil { // error!
log.Panicf(err.Error())
}

// pttl
pttl, pttlErr := client.Int64(r.clientDump.Receive())
if pttlErr != nil { // error!
log.Panicf(pttlErr.Error())
}
if pttl < 0 {
pttl = 0
}

if err == proto.Nil { // key not exist
continue
}

id += 1
argv := []string{"RESTORE", item.key, strconv.FormatInt(pttl, 10), receive}
if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" {
argv = append(argv, "replace")
}
log.Debugf("[%s] send command: [%v], dbid: [%v]", r.stat.Name, argv, item.db)
r.ch <- &entry.Entry{
DbId: item.db,
Argv: argv,
DbId: dbId,
Argv: []string{"RESTORE", key, strconv.Itoa(pttl), dump},
}
}
}

log.Infof("[%s] scanStandaloneReader fetch finished.", r.stat.Name)
close(r.ch)
}
Expand All @@ -159,12 +183,12 @@ func (r *scanStandaloneReader) Status() interface{} {
}

func (r *scanStandaloneReader) StatusString() string {
if r.stat.Finished {
return fmt.Sprintf("finished")
if r.stat.ScanFinished {
return fmt.Sprintf("need_update_count=[%d]", r.stat.NeedUpdateCount)
}
return fmt.Sprintf("dbid=[%d], percent=[%s]", r.stat.DbId, r.stat.PercentByDbId)
return fmt.Sprintf("scan_dbid=[%d], scan_percent=[%s], need_update_count=[%d]", r.stat.ScanDbId, r.stat.ScanPercentByDbId, r.stat.NeedUpdateCount)
}

func (r *scanStandaloneReader) StatusConsistent() bool {
return r.stat.Finished
return r.stat.ScanFinished && r.stat.NeedUpdateCount == 0
}
Loading

0 comments on commit 037dbe3

Please sign in to comment.