diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index f3a39dc3..dcb53788 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -15,7 +15,7 @@ type scanClusterReader struct { } func NewScanClusterReader(opts *ScanReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) rd := &scanClusterReader{} for _, address := range addresses { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 9ebf5567..2e103b84 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -18,13 +18,14 @@ import ( ) type ScanReaderOptions struct { - Cluster bool `mapstructure:"cluster" default:"false"` - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` - KSN bool `mapstructure:"ksn" default:"false"` - DBS []int `mapstructure:"dbs"` + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + KSN bool `mapstructure:"ksn" default:"false"` + DBS []int `mapstructure:"dbs"` + PreferReplica bool `mapstructure:"prefer_replica" default:"false"` } type dbKey struct { diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 9dafaccc..18c4935a 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -16,7 +16,7 @@ type syncClusterReader struct { } func NewSyncClusterReader(opts *SyncReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) log.Debugf("get redis cluster nodes:") for _, address := range addresses { log.Debugf("%s", address) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 4cdef2ef..17ce90d9 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -23,13 +23,14 @@ import ( ) type SyncReaderOptions struct { - Cluster bool `mapstructure:"cluster" default:"false"` - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` - SyncRdb bool `mapstructure:"sync_rdb" default:"true"` - SyncAof bool `mapstructure:"sync_aof" default:"true"` + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + SyncRdb bool `mapstructure:"sync_rdb" default:"true"` + SyncAof bool `mapstructure:"sync_aof" default:"true"` + PreferReplica bool `mapstructure:"prefer_replica" default:"false"` } type State string diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index 91a0849d..be909ef3 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -9,18 +9,22 @@ import ( "RedisShake/internal/log" ) -func GetRedisClusterNodes(address string, username string, password string, Tls bool) (addresses []string, slots [][]int) { +func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { c := client.NewRedisClient(address, username, password, Tls) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 + // map of master's nodeId to address + masters := make(map[string]string) + // map of master's nodeId to replica addresses + replicas := make(map[string][]string) + // keep nodeID sort by slots + nodeIds := make([]string, 0) log.Infof("address=%v, reply=%v", address, reply) for _, line := range strings.Split(reply, "\n") { line = strings.TrimSpace(line) words := strings.Split(line, " ") - if !strings.Contains(words[2], "master") { - continue - } + isMaster := strings.Contains(words[2], "master") if len(words) < 8 { log.Panicf("invalid cluster nodes line: %s", line) } @@ -36,11 +40,23 @@ func GetRedisClusterNodes(address string, username string, password string, Tls ipv6Addr := strings.Join(tok[:len(tok)-1], ":") address = fmt.Sprintf("[%s]:%s", ipv6Addr, port) } - if len(words) < 9 { + if isMaster && len(words) < 9 { log.Warnf("the current master node does not hold any slots. address=[%v]", address) continue } - addresses = append(addresses, address) + + nodeId := words[0] + if isMaster { + masters[nodeId] = address + } else { + // execlude invalid replicas node + if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") { + continue + } + masterId := words[3] + replicas[masterId] = append(replicas[masterId], address) + continue + } // parse slots slot := make([]int, 0) @@ -77,9 +93,21 @@ func GetRedisClusterNodes(address string, username string, password string, Tls } } slots = append(slots, slot) + nodeIds = append(nodeIds, nodeId) } if slotsCount != 16384 { log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address) } + + for _, id := range nodeIds { + if replicaAddr, exist := replicas[id]; exist && perferReplica { + addresses = append(addresses, replicaAddr[0]) + } else if masterAddr, exist := masters[id]; exist { + addresses = append(addresses, masterAddr) + } else { + log.Panicf("unknown id=%s", id) + } + } + return addresses, slots } diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 76853907..e5d17dab 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -30,7 +30,7 @@ func (r *RedisClusterWriter) Close() { } func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) { - addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false) r.addresses = addresses for i, address := range addresses { theOpts := *opts diff --git a/shake.toml b/shake.toml index 776b537e..99d52c61 100644 --- a/shake.toml +++ b/shake.toml @@ -9,6 +9,7 @@ password = "" # keep empty if no authentication is required tls = false sync_rdb = true # set to false if you don't want to sync rdb sync_aof = true # set to false if you don't want to sync aof +prefer_replica = true # set to true if you want to sync from replica node # [scan_reader] # cluster = false # set to true if source is a redis cluster @@ -18,6 +19,7 @@ sync_aof = true # set to false if you don't want to sync aof # ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription # tls = false # dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all +# prefer_replica = true # set to true if you want to sync from replica node # [rdb_reader] # filepath = "/tmp/dump.rdb"