Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add feature about perfer connect to replica node #725

Merged
merged 2 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type scanClusterReader struct {
}

func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)

rd := &scanClusterReader{}
for _, address := range addresses {
Expand Down
15 changes: 8 additions & 7 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
)

type ScanReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
PreferReplica bool `mapstructure:"prefer_replica" default:"false"`
}

type dbKey struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type syncClusterReader struct {
}

func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
log.Debugf("%s", address)
Expand Down
15 changes: 8 additions & 7 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
)

type SyncReaderOptions struct {
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
Cluster bool `mapstructure:"cluster" default:"false"`
Address string `mapstructure:"address" default:""`
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
PreferReplica bool `mapstructure:"prefer_replica" default:"false"`
}

type State string
Expand Down
40 changes: 34 additions & 6 deletions internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ import (
"RedisShake/internal/log"
)

func GetRedisClusterNodes(address string, username string, password string, Tls bool) (addresses []string, slots [][]int) {
func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(address, username, password, Tls)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
// map of master's nodeId to address
masters := make(map[string]string)
// map of master's nodeId to replica addresses
replicas := make(map[string][]string)
// keep nodeID sort by slots
nodeIds := make([]string, 0)
log.Infof("address=%v, reply=%v", address, reply)
for _, line := range strings.Split(reply, "\n") {
line = strings.TrimSpace(line)
words := strings.Split(line, " ")
if !strings.Contains(words[2], "master") {
continue
}
isMaster := strings.Contains(words[2], "master")
if len(words) < 8 {
log.Panicf("invalid cluster nodes line: %s", line)
}
Expand All @@ -36,11 +40,23 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
ipv6Addr := strings.Join(tok[:len(tok)-1], ":")
address = fmt.Sprintf("[%s]:%s", ipv6Addr, port)
}
if len(words) < 9 {
if isMaster && len(words) < 9 {
log.Warnf("the current master node does not hold any slots. address=[%v]", address)
continue
}
addresses = append(addresses, address)

nodeId := words[0]
if isMaster {
masters[nodeId] = address
} else {
// execlude invalid replicas node
if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") {
continue
}
masterId := words[3]
replicas[masterId] = append(replicas[masterId], address)
continue
}

// parse slots
slot := make([]int, 0)
Expand Down Expand Up @@ -77,9 +93,21 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
}
}
slots = append(slots, slot)
nodeIds = append(nodeIds, nodeId)
}
if slotsCount != 16384 {
log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address)
}

for _, id := range nodeIds {
if replicaAddr, exist := replicas[id]; exist && perferReplica {
addresses = append(addresses, replicaAddr[0])
} else if masterAddr, exist := masters[id]; exist {
addresses = append(addresses, masterAddr)
} else {
log.Panicf("unknown id=%s", id)
}
}

return addresses, slots
}
2 changes: 1 addition & 1 deletion internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *RedisClusterWriter) Close() {
}

func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls)
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false)
r.addresses = addresses
for i, address := range addresses {
theOpts := *opts
Expand Down
2 changes: 2 additions & 0 deletions shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ password = "" # keep empty if no authentication is required
tls = false
sync_rdb = true # set to false if you don't want to sync rdb
sync_aof = true # set to false if you don't want to sync aof
prefer_replica = true # set to true if you want to sync from replica node
suxb201 marked this conversation as resolved.
Show resolved Hide resolved

# [scan_reader]
# cluster = false # set to true if source is a redis cluster
Expand All @@ -18,6 +19,7 @@ sync_aof = true # set to false if you don't want to sync aof
# ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
# tls = false
# dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all
# prefer_replica = true # set to true if you want to sync from replica node
suxb201 marked this conversation as resolved.
Show resolved Hide resolved

# [rdb_reader]
# filepath = "/tmp/dump.rdb"
Expand Down
Loading