Skip to content

Commit

Permalink
fix: the address corresponds to the slot
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 authored and suxb201 committed Dec 27, 2023
1 parent 2d30da2 commit c2aa817
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
2 changes: 1 addition & 1 deletion internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type scanClusterReader struct {
}

func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)

rd := &scanClusterReader{}
for _, address := range addresses {
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ScanReaderOptions struct {
Tls bool `mapstructure:"tls" default:"false"`
KSN bool `mapstructure:"ksn" default:"false"`
DBS []int `mapstructure:"dbs"`
PerferReplica bool `mapstructure:"perfer_replica" default:"true"`
PreferReplica bool `mapstructure:"prefer_replica" default:"false"`
}

type dbKey struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type syncClusterReader struct {
}

func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PerferReplica)
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
log.Debugf("%s", address)
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type SyncReaderOptions struct {
Tls bool `mapstructure:"tls" default:"false"`
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
PerferReplica bool `mapstructure:"perfer_replica" default:"true"`
PreferReplica bool `mapstructure:"prefer_replica" default:"false"`
}

type State string
Expand Down
26 changes: 14 additions & 12 deletions internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
log.Infof("address=%v, reply=%v", address, reply)
// map of master's nodeId to address
masters := make(map[string]string)
// map of master's nodeId to replica addresses
replicas := make(map[string][]string)
// keep nodeID sort by slots
nodeIds := make([]string, 0)
log.Infof("address=%v, reply=%v", address, reply)
for _, line := range strings.Split(reply, "\n") {
line = strings.TrimSpace(line)
words := strings.Split(line, " ")
Expand Down Expand Up @@ -45,6 +49,7 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
if isMaster {
masters[nodeId] = address
} else {
// execlude invalid replicas node
if strings.Contains(words[2], "fail") || strings.Contains(words[2], "noaddr") {
continue
}
Expand Down Expand Up @@ -88,22 +93,19 @@ func GetRedisClusterNodes(address string, username string, password string, Tls
}
}
slots = append(slots, slot)
nodeIds = append(nodeIds, nodeId)
}
if slotsCount != 16384 {
log.Panicf("invalid cluster nodes slots. slots_count=%v, address=%v", slotsCount, address)
}

if perferReplica && len(replicas) > 0 {
for masterId, replicaAddr := range replicas {
if len(replicaAddr) > 0 {
addresses = append(addresses, replicaAddr[0])
} else {
addresses = append(addresses, masters[masterId])
}
}
} else {
for _, v := range masters {
addresses = append(addresses, v)
for _, id := range nodeIds {
if replicaAddr, exist := replicas[id]; exist && perferReplica {
addresses = append(addresses, replicaAddr[0])
} else if masterAddr, exist := masters[id]; exist {
addresses = append(addresses, masterAddr)
} else {
log.Panicf("unknown id=%s", id)
}
}

Expand Down

0 comments on commit c2aa817

Please sign in to comment.