Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add feature about perfer connect to replica node #725

Merged
merged 2 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type scanClusterReader struct {
}

func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica)

rd := &scanClusterReader{}
for _, address := range addresses {
Expand Down
15 changes: 8 additions & 7 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
)

type ScanReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
PerferReplica bool `mapstructure:"perfer_replica" default:"true"`
jjz921024 marked this conversation as resolved.
Show resolved Hide resolved
}

type dbKey struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type syncClusterReader struct {
}

func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
log.Debugf("%s", address)
Expand Down
15 changes: 8 additions & 7 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
)

type SyncReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
PerferReplica bool `mapstructure:"perfer_replica" default:"true"`
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
}

type State string
Expand Down
38 changes: 32 additions & 6 deletions internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
"RedisShake/internal/log"
)

func GetRedisClusterNodes(address string, username string, password string, Tls bool) (addresses []string, slots [][]int) {
func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(address, username, password, Tls)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
log.Infof("address=%v, reply=%v", address, reply)
masters := make(map[string]string)
replicas := make(map[string][]string)
for _, line := range strings.Split(reply, "\n") {
line = strings.TrimSpace(line)
words := strings.Split(line, " ")
if !strings.Contains(words[2], "master") {
continue
}
isMaster := strings.Contains(words[2], "master")
if len(words) < 8 {
log.Panicf("invalid cluster nodes line: %s", line)
}
Expand All @@ -36,11 +36,22 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
ipv6Addr := strings.Join(tok[:len(tok)-1], ":")
address = fmt.Sprintf("[%s]:%s", ipv6Addr, port)
}
if len(words) < 9 {
if isMaster && len(words) < 9 {
log.Warnf("the current master node does not hold any slots. address=[%v]", address)
continue
}
addresses = append(addresses, address)

nodeId := words[0]
if isMaster {
masters[nodeId] = address
} else {
if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") {
continue
}
masterId := words[3]
replicas[masterId] = append(replicas[masterId], address)
continue
}

// parse slots
slot := make([]int, 0)
Expand Down Expand Up @@ -81,5 +92,20 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
if slotsCount != 16384 {
log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address)
}

if perferReplica && len(replicas) > 0 {
for masterId, replicaAddr := range replicas {
if len(replicaAddr) > 0 {
addresses = append(addresses, replicaAddr[0])
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
} else {
addresses = append(addresses, masters[masterId])
}
}
} else {
for _, v := range masters {
addresses = append(addresses, v)
}
}

return addresses, slots
}
2 changes: 1 addition & 1 deletion internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *RedisClusterWriter) Close() {
}

func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false)
r.addresses = addresses
for i, address := range addresses {
theOpts := *opts
Expand Down
2 changes: 2 additions & 0 deletions shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ password = "" # keep empty if no authentication is required
tls = false
sync_rdb = true # set to false if you don't want to sync rdb
sync_aof = true # set to false if you don't want to sync aof
prefer_replica = true # set to true if you want to sync from replica node
suxb201 marked this conversation as resolved.
Show resolved Hide resolved

# [scan_reader]
# cluster = false # set to true if source is a redis cluster
Expand All @@ -18,6 +19,7 @@ sync_aof = true # set to false if you don't want to sync aof
# ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
# tls = false
# dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all
# prefer_replica = true # set to true if you want to sync from replica node
suxb201 marked this conversation as resolved.
Show resolved Hide resolved

# [rdb_reader]
# filepath = "/tmp/dump.rdb"
Expand Down
Loading