Skip to content

Commit

Permalink
bug fix prefer replica in master-slave mode (#828)
Browse files Browse the repository at this point in the history
* update else if to switch

* update else if to switch

* 添加动态识别库号

* 性能优化

* 添加scan时候的一次扫描次数控制配置,防止源端cpu高

* rename batch to count

* edit ...string to ...interface

* performance optimization

* fix r.db length is 0

* fix prefer replica bug

* fix prefer replica bug
  • Loading branch information
blight19 authored Jun 17, 2024
1 parent c3694f0 commit 5456398
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 11 deletions.
71 changes: 68 additions & 3 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"crypto/tls"
"net"
"regexp"
"strconv"
"strings"
"time"

"RedisShake/internal/client/proto"
Expand All @@ -20,11 +22,11 @@ type Redis struct {
protoWriter *proto.Writer
}

func NewSentinelClient(ctx context.Context, address string, Tls bool) *Redis {
return NewRedisClient(ctx, address, "", "", Tls)
func NewSentinelMasterClient(ctx context.Context, address string, Tls bool) *Redis {
return NewRedisClient(ctx, address, "", "", Tls, false)
}

func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis {
func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool, replica bool) *Redis {
r := new(Redis)
var conn net.Conn
var dialer = &net.Dialer{
Expand Down Expand Up @@ -71,10 +73,73 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo
if reply != "PONG" {
panic("ping failed with reply: " + reply)
}
reply = r.DoWithStringReply("info", "replication")
// get best replica
if replica {
replicaInfo := getReplicaAddr(reply)
log.Infof("best replica: %s", replicaInfo.BestReplica)
r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false)
}

return r
}

type Replica struct {
Addr string
Offset string
}

type RedisReplicaInfo struct {
Role string
BestReplica string
}

func getReplicaAddr(info string) RedisReplicaInfo {
infoReplica := RedisReplicaInfo{}
replicas := make([]Replica, 0)
slaveInfoRegexp := regexp.MustCompile(`slave\d+:ip=.*`)
for _, line := range strings.Split(info, "\n") {
line = strings.TrimSpace(line)
switch {
case strings.HasPrefix(line, "role:slave"):
infoReplica.Role = "slave"
return infoReplica
case strings.HasPrefix(line, "role:master"):
infoReplica.Role = "master"
case slaveInfoRegexp.MatchString(line):
slaveInfo := strings.Split(line, ":")
s1 := slaveInfo[1]
slaveInfo = strings.Split(s1, ",")
replica := Replica{}
var host string
var port string
var offset string
for _, item := range slaveInfo {
if strings.HasPrefix(item, "ip=") {
host = strings.Split(item, "=")[1]
}
if strings.HasPrefix(item, "port=") {
port = strings.Split(item, "=")[1]
}
if strings.HasPrefix(item, "offset=") {
offset = strings.Split(item, "=")[1]
}
}
replica.Addr = host + ":" + port
replica.Offset = offset
replicas = append(replicas, replica)
}
}
best := replicas[0]
for _, replica := range replicas {
if replica.Offset > best.Offset {
best = replica
}
}
infoReplica.BestReplica = best.Addr
return infoReplica
}

func (r *Redis) DoWithStringReply(args ...interface{}) string {
r.Send(args...)

Expand Down
8 changes: 4 additions & 4 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type scanStandaloneReader struct {
func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
// dbs
c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.dbs = []int{0}
} else {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
}

func (r *scanStandaloneReader) subscript() {
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica)
if len(r.dbs) == 0 {
c.Send("psubscribe", "__keyevent@*__:*")
} else {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (r *scanStandaloneReader) subscript() {
}

func (r *scanStandaloneReader) scan() {
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica)
defer c.Close()
for _, dbId := range r.dbs {
if dbId != 0 {
Expand Down Expand Up @@ -194,7 +194,7 @@ func (r *scanStandaloneReader) scan() {

func (r *scanStandaloneReader) dump() {
nowDbId := 0
r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica)
// Support prefer_replica=true in both Cluster and Standalone mode
if r.opts.PreferReplica {
r.dumpClient.Do("READONLY")
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type syncStandaloneReader struct {
func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader {
r := new(syncStandaloneReader)
r.opts = opts
r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
r.rd = r.client.BufioReader()
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.stat.Address = opts.Address
Expand Down
2 changes: 1 addition & 1 deletion internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func GetRedisClusterNodes(ctx context.Context, address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(ctx, address, username, password, Tls)
c := client.NewRedisClient(ctx, address, username, password, Tls, false)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
Expand Down
2 changes: 1 addition & 1 deletion internal/writer/redis_sentinel_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func NewRedisSentinelWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
sentinel := client.NewSentinelClient(ctx, opts.Address, opts.Tls)
sentinel := client.NewSentinelMasterClient(ctx, opts.Address, opts.Tls)
sentinel.Send("SENTINEL", "GET-MASTER-ADDR-BY-NAME", opts.Master)
addr, err := sentinel.Receive()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
rw := new(redisStandaloneWriter)
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
Expand Down

0 comments on commit 5456398

Please sign in to comment.