From 2d30da2b402de2587e8a61fc0eaa75d34c5a7c54 Mon Sep 17 00:00:00 2001 From: skyjiang <470623352@qq.com> Date: Wed, 13 Dec 2023 17:11:49 +0800 Subject: [PATCH] add feature about perfer connect to replica node --- internal/reader/scan_cluster_reader.go | 2 +- internal/reader/scan_standalone_reader.go | 15 ++++----- internal/reader/sync_cluster_reader.go | 2 +- internal/reader/sync_standalone_reader.go | 15 ++++----- internal/utils/cluster_nodes.go | 38 +++++++++++++++++++---- internal/writer/redis_cluster_writer.go | 2 +- shake.toml | 2 ++ 7 files changed, 53 insertions(+), 23 deletions(-) diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index f3a39dc3..60c5f8a8 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -15,7 +15,7 @@ type scanClusterReader struct { } func NewScanClusterReader(opts *ScanReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica) rd := &scanClusterReader{} for _, address := range addresses { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 9ebf5567..bfb3f933 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -18,13 +18,14 @@ import ( ) type ScanReaderOptions struct { - Cluster bool `mapstructure:"cluster" default:"false"` - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` - KSN bool `mapstructure:"ksn" default:"false"` - DBS []int `mapstructure:"dbs"` + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + KSN bool `mapstructure:"ksn" default:"false"` + DBS []int `mapstructure:"dbs"` + PerferReplica bool `mapstructure:"perfer_replica" default:"true"` } type dbKey struct { diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 9dafaccc..6b6ff8a6 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -16,7 +16,7 @@ type syncClusterReader struct { } func NewSyncClusterReader(opts *SyncReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica) log.Debugf("get redis cluster nodes:") for _, address := range addresses { log.Debugf("%s", address) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 4cdef2ef..84d0af60 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -23,13 +23,14 @@ import ( ) type SyncReaderOptions struct { - Cluster bool `mapstructure:"cluster" default:"false"` - Address string `mapstructure:"address" default:""` - Username string `mapstructure:"username" default:""` - Password string `mapstructure:"password" default:""` - Tls bool `mapstructure:"tls" default:"false"` - SyncRdb bool `mapstructure:"sync_rdb" default:"true"` - SyncAof bool `mapstructure:"sync_aof" default:"true"` + Cluster bool `mapstructure:"cluster" default:"false"` + Address string `mapstructure:"address" default:""` + Username string `mapstructure:"username" default:""` + Password string `mapstructure:"password" default:""` + Tls bool `mapstructure:"tls" default:"false"` + SyncRdb bool `mapstructure:"sync_rdb" default:"true"` + SyncAof bool `mapstructure:"sync_aof" default:"true"` + PerferReplica bool `mapstructure:"perfer_replica" default:"true"` } type State string diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index 91a0849d..daf59591 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -9,18 +9,18 @@ import ( "RedisShake/internal/log" ) -func GetRedisClusterNodes(address string, username string, password string, Tls bool) (addresses []string, slots [][]int) { +func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { c := client.NewRedisClient(address, username, password, Tls) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 log.Infof("address=%v, reply=%v", address, reply) + masters := make(map[string]string) + replicas := make(map[string][]string) for _, line := range strings.Split(reply, "\n") { line = strings.TrimSpace(line) words := strings.Split(line, " ") - if !strings.Contains(words[2], "master") { - continue - } + isMaster := strings.Contains(words[2], "master") if len(words) < 8 { log.Panicf("invalid cluster nodes line: %s", line) } @@ -36,11 +36,22 @@ func GetRedisClusterNodes(address string, username string, password string, Tls ipv6Addr := strings.Join(tok[:len(tok)-1], ":") address = fmt.Sprintf("[%s]:%s", ipv6Addr, port) } - if len(words) < 9 { + if isMaster && len(words) < 9 { log.Warnf("the current master node does not hold any slots. address=[%v]", address) continue } - addresses = append(addresses, address) + + nodeId := words[0] + if isMaster { + masters[nodeId] = address + } else { + if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") { + continue + } + masterId := words[3] + replicas[masterId] = append(replicas[masterId], address) + continue + } // parse slots slot := make([]int, 0) @@ -81,5 +92,20 @@ func GetRedisClusterNodes(address string, username string, password string, Tls if slotsCount != 16384 { log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address) } + + if perferReplica && len(replicas) > 0 { + for masterId, replicaAddr := range replicas { + if len(replicaAddr) > 0 { + addresses = append(addresses, replicaAddr[0]) + } else { + addresses = append(addresses, masters[masterId]) + } + } + } else { + for _, v := range masters { + addresses = append(addresses, v) + } + } + return addresses, slots } diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 76853907..e5d17dab 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -30,7 +30,7 @@ func (r *RedisClusterWriter) Close() { } func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) { - addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls) + addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false) r.addresses = addresses for i, address := range addresses { theOpts := *opts diff --git a/shake.toml b/shake.toml index 776b537e..99d52c61 100644 --- a/shake.toml +++ b/shake.toml @@ -9,6 +9,7 @@ password = "" # keep empty if no authentication is required tls = false sync_rdb = true # set to false if you don't want to sync rdb sync_aof = true # set to false if you don't want to sync aof +prefer_replica = true # set to true if you want to sync from replica node # [scan_reader] # cluster = false # set to true if source is a redis cluster @@ -18,6 +19,7 @@ sync_aof = true # set to false if you don't want to sync aof # ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription # tls = false # dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all +# prefer_replica = true # set to true if you want to sync from replica node # [rdb_reader] # filepath = "/tmp/dump.rdb"