From 961376f384cab416b0b2586a34316d9c9a3a2ebb Mon Sep 17 00:00:00 2001 From: skyjiang <470623352@qq.com> Date: Wed, 13 Dec 2023 17:11:49 +0800 Subject: [PATCH 1/2] add feature about perfer connect to replica node --- internal/reader/scan_cluster_reader.go | 2 +- internal/reader/scan_standalone_reader.go | 15 ++++----- internal/reader/sync_cluster_reader.go | 2 +- internal/reader/sync_standalone_reader.go | 15 ++++----- internal/utils/cluster_nodes.go | 38 +++++++++++++++++++---- internal/writer/redis_cluster_writer.go | 2 +- shake.toml | 2 ++ 7 files changed, 53 insertions(+), 23 deletions(-) diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index f3a39dc3..60c5f8a8 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -15,7 +15,7 @@ type scanClusterReader struct { } func NewScanClusterReader(opts *ScanReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica) rd := &scanClusterReader{} for _, address := range addresses { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 9ebf5567..bfb3f933 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -18,13 +18,14 @@ import ( ) type ScanReaderOptions struct { - Cluster bool `mapstructure:"cluster" default:"false"` - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` - KSN bool `mapstructure:"ksn" default:"false"` - DBS []int `mapstructure:"dbs"` + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + KSN bool `mapstructure:"ksn" default:"false"` + DBS []int `mapstructure:"dbs"` + PerferReplica bool `mapstructure:"perfer_replica" default:"true"` } type dbKey struct { diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 9dafaccc..6b6ff8a6 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -16,7 +16,7 @@ type syncClusterReader struct { } func NewSyncClusterReader(opts *SyncReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica) log.Debugf("get redis cluster nodes:") for _, address := range addresses { log.Debugf("%s", address) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 4cdef2ef..84d0af60 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -23,13 +23,14 @@ import ( ) type SyncReaderOptions struct { - Cluster bool `mapstructure:"cluster" default:"false"` - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` - SyncRdb bool `mapstructure:"sync_rdb" default:"true"` - SyncAof bool `mapstructure:"sync_aof" default:"true"` + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + SyncRdb bool `mapstructure:"sync_rdb" default:"true"` + SyncAof bool `mapstructure:"sync_aof" default:"true"` + PerferReplica bool `mapstructure:"perfer_replica" default:"true"` } type State string diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index 91a0849d..daf59591 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -9,18 +9,18 @@ import ( "RedisShake/internal/log" ) -func GetRedisClusterNodes(address string, username string, password string, Tls bool) (addresses []string, slots [][]int) { +func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { c := client.NewRedisClient(address, username, password, Tls) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 log.Infof("address=%v, reply=%v", address, reply) + masters := make(map[string]string) + replicas := make(map[string][]string) for _, line := range strings.Split(reply, "\n") { line = strings.TrimSpace(line) words := strings.Split(line, " ") - if !strings.Contains(words[2], "master") { - continue - } + isMaster := strings.Contains(words[2], "master") if len(words) < 8 { log.Panicf("invalid cluster nodes line: %s", line) } @@ -36,11 +36,22 @@ func GetRedisClusterNodes(address string, username string, password string, Tls ipv6Addr := strings.Join(tok[:len(tok)-1], ":") address = fmt.Sprintf("[%s]:%s", ipv6Addr, port) } - if len(words) < 9 { + if isMaster && len(words) < 9 { log.Warnf("the current master node does not hold any slots. address=[%v]", address) continue } - addresses = append(addresses, address) + + nodeId := words[0] + if isMaster { + masters[nodeId] = address + } else { + if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") { + continue + } + masterId := words[3] + replicas[masterId] = append(replicas[masterId], address) + continue + } // parse slots slot := make([]int, 0) @@ -81,5 +92,20 @@ func GetRedisClusterNodes(address string, username string, password string, Tls if slotsCount != 16384 { log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address) } + + if perferReplica && len(replicas) > 0 { + for masterId, replicaAddr := range replicas { + if len(replicaAddr) > 0 { + addresses = append(addresses, replicaAddr[0]) + } else { + addresses = append(addresses, masters[masterId]) + } + } + } else { + for _, v := range masters { + addresses = append(addresses, v) + } + } + return addresses, slots } diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 76853907..e5d17dab 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -30,7 +30,7 @@ func (r *RedisClusterWriter) Close() { } func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) { - addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false) r.addresses = addresses for i, address := range addresses { theOpts := *opts diff --git a/shake.toml b/shake.toml index 776b537e..99d52c61 100644 --- a/shake.toml +++ b/shake.toml @@ -9,6 +9,7 @@ password = "" # keep empty if no authentication is required tls = false sync_rdb = true # set to false if you don't want to sync rdb sync_aof = true # set to false if you don't want to sync aof +prefer_replica = true # set to true if you want to sync from replica node # [scan_reader] # cluster = false # set to true if source is a redis cluster @@ -18,6 +19,7 @@ sync_aof = true # set to false if you don't want to sync aof # ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription # tls = false # dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all +# prefer_replica = true # set to true if you want to sync from replica node # [rdb_reader] # filepath = "/tmp/dump.rdb" From c2430feeb29ba5341dd1ec59175c6f06efbaf7eb Mon Sep 17 00:00:00 2001 From: skyjiang <470623352@qq.com> Date: Wed, 27 Dec 2023 10:27:03 +0800 Subject: [PATCH 2/2] fix: the address corresponds to the slot --- internal/reader/scan_cluster_reader.go | 2 +- internal/reader/scan_standalone_reader.go | 2 +- internal/reader/sync_cluster_reader.go | 2 +- internal/reader/sync_standalone_reader.go | 2 +- internal/utils/cluster_nodes.go | 26 ++++++++++++----------- 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index 60c5f8a8..dcb53788 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -15,7 +15,7 @@ type scanClusterReader struct { } func NewScanClusterReader(opts *ScanReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) rd := &scanClusterReader{} for _, address := range addresses { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index bfb3f933..2e103b84 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -25,7 +25,7 @@ type ScanReaderOptions struct { Tls bool `mapstructure:"tls" default:"false"` KSN bool `mapstructure:"ksn" default:"false"` DBS []int `mapstructure:"dbs"` - PerferReplica bool `mapstructure:"perfer_replica" default:"true"` + PreferReplica bool `mapstructure:"prefer_replica" default:"false"` } type dbKey struct { diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 6b6ff8a6..18c4935a 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -16,7 +16,7 @@ type syncClusterReader struct { } func NewSyncClusterReader(opts *SyncReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) log.Debugf("get redis cluster nodes:") for _, address := range addresses { log.Debugf("%s", address) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 84d0af60..17ce90d9 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -30,7 +30,7 @@ type SyncReaderOptions struct { Tls bool `mapstructure:"tls" default:"false"` SyncRdb bool `mapstructure:"sync_rdb" default:"true"` SyncAof bool `mapstructure:"sync_aof" default:"true"` - PerferReplica bool `mapstructure:"perfer_replica" default:"true"` + PreferReplica bool `mapstructure:"prefer_replica" default:"false"` } type State string diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index daf59591..be909ef3 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -14,9 +14,13 @@ func GetRedisClusterNodes(address string, username string, password string, Tls reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 - log.Infof("address=%v, reply=%v", address, reply) + // map of master's nodeId to address masters := make(map[string]string) + // map of master's nodeId to replica addresses replicas := make(map[string][]string) + // keep nodeID sort by slots + nodeIds := make([]string, 0) + log.Infof("address=%v, reply=%v", address, reply) for _, line := range strings.Split(reply, "\n") { line = strings.TrimSpace(line) words := strings.Split(line, " ") @@ -45,6 +49,7 @@ func GetRedisClusterNodes(address string, username string, password string, Tls if isMaster { masters[nodeId] = address } else { + // execlude invalid replicas node if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") { continue } @@ -88,22 +93,19 @@ func GetRedisClusterNodes(address string, username string, password string, Tls } } slots = append(slots, slot) + nodeIds = append(nodeIds, nodeId) } if slotsCount != 16384 { log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address) } - if perferReplica && len(replicas) > 0 { - for masterId, replicaAddr := range replicas { - if len(replicaAddr) > 0 { - addresses = append(addresses, replicaAddr[0]) - } else { - addresses = append(addresses, masters[masterId]) - } - } - } else { - for _, v := range masters { - addresses = append(addresses, v) + for _, id := range nodeIds { + if replicaAddr, exist := replicas[id]; exist && perferReplica { + addresses = append(addresses, replicaAddr[0]) + } else if masterAddr, exist := masters[id]; exist { + addresses = append(addresses, masterAddr) + } else { + log.Panicf("unknown id=%s", id) } }