Skip to content

Commit

Permalink
refactor: Manage all redis client into context.Context (#745)
Browse files Browse the repository at this point in the history
* refactor: Manage all redis client into `context.Context`

* Update code

---------

Co-authored-by: suxb201 <[email protected]>
  • Loading branch information
Zheaoli and suxb201 authored Jan 19, 2024
1 parent 871c038 commit 0f374b1
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 35 deletions.
16 changes: 9 additions & 7 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func main() {
utils.SetPprofPort()
luaRuntime := function.New(config.Opt.Function)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create reader
var theReader reader.Reader
if v.IsSet("sync_reader") {
Expand All @@ -38,10 +41,10 @@ func main() {
log.Panicf("failed to read the SyncReader config entry. err: %v", err)
}
if opts.Cluster {
theReader = reader.NewSyncClusterReader(opts)
theReader = reader.NewSyncClusterReader(ctx, opts)
log.Infof("create SyncClusterReader: %v", opts.Address)
} else {
theReader = reader.NewSyncStandaloneReader(opts)
theReader = reader.NewSyncStandaloneReader(ctx, opts)
log.Infof("create SyncStandaloneReader: %v", opts.Address)
}
} else if v.IsSet("scan_reader") {
Expand All @@ -52,10 +55,10 @@ func main() {
log.Panicf("failed to read the ScanReader config entry. err: %v", err)
}
if opts.Cluster {
theReader = reader.NewScanClusterReader(opts)
theReader = reader.NewScanClusterReader(ctx, opts)
log.Infof("create ScanClusterReader: %v", opts.Address)
} else {
theReader = reader.NewScanStandaloneReader(opts)
theReader = reader.NewScanStandaloneReader(ctx, opts)
log.Infof("create ScanStandaloneReader: %v", opts.Address)
}
} else if v.IsSet("rdb_reader") {
Expand Down Expand Up @@ -93,10 +96,10 @@ func main() {
log.Panicf("the RDBRestoreCommandBehavior can't be 'panic' when the server not reply to commands")
}
if opts.Cluster {
theWriter = writer.NewRedisClusterWriter(opts)
theWriter = writer.NewRedisClusterWriter(ctx, opts)
log.Infof("create RedisClusterWriter: %v", opts.Address)
} else {
theWriter = writer.NewRedisStandaloneWriter(opts)
theWriter = writer.NewRedisStandaloneWriter(ctx, opts)
log.Infof("create RedisStandaloneWriter: %v", opts.Address)
}
if config.Opt.Advanced.EmptyDBBeforeSync {
Expand All @@ -114,7 +117,6 @@ func main() {

log.Infof("start syncing...")

ctx, cancel := context.WithCancel(context.Background())
ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

Expand Down
17 changes: 12 additions & 5 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"bufio"
"context"
"crypto/tls"
"net"
"strconv"
Expand All @@ -19,16 +20,22 @@ type Redis struct {
protoWriter *proto.Writer
}

func NewRedisClient(address string, username string, password string, Tls bool) *Redis {
func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis {
r := new(Redis)
var conn net.Conn
var dialer net.Dialer
var dialer = &net.Dialer{
Timeout: 5 * time.Minute,
KeepAlive: 5 * time.Minute,
}
var err error
dialer.Timeout = 3 * time.Second
if Tls {
conn, err = tls.DialWithDialer(&dialer, "tcp", address, &tls.Config{InsecureSkipVerify: true})
tlsDialer := &tls.Dialer{
NetDialer: dialer,
Config: &tls.Config{InsecureSkipVerify: true},
}
conn, err = tlsDialer.DialContext(ctx, "tcp", address)
} else {
conn, err = dialer.Dial("tcp", address)
conn, err = dialer.DialContext(ctx, "tcp", address)
}
if err != nil {
log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err)
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ type scanClusterReader struct {
statusId int
}

func NewScanClusterReader(opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)

rd := &scanClusterReader{}
for _, address := range addresses {
theOpts := *opts
theOpts.Address = address
rd.readers = append(rd.readers, NewScanStandaloneReader(&theOpts))
rd.readers = append(rd.readers, NewScanStandaloneReader(ctx, &theOpts))
}
return rd
}
Expand Down
10 changes: 5 additions & 5 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type scanStandaloneReader struct {
}
}

func NewScanStandaloneReader(opts *ScanReaderOptions) Reader {
func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
// dbs
c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.dbs = []int{0}
} else {
Expand Down Expand Up @@ -82,7 +82,7 @@ func (r *scanStandaloneReader) subscript() {
if !r.opts.KSN {
return
}
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c.Send("psubscribe", "__keyevent@*__:*")

go func() {
Expand Down Expand Up @@ -114,7 +114,7 @@ func (r *scanStandaloneReader) subscript() {
}

func (r *scanStandaloneReader) scan() {
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
defer c.Close()
for _, dbId := range r.dbs {
if dbId != 0 {
Expand Down Expand Up @@ -150,7 +150,7 @@ func (r *scanStandaloneReader) scan() {

func (r *scanStandaloneReader) fetch() {
nowDbId := 0
c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls)
defer c.Close()
for item := range r.keyQueue.Ch {
r.stat.NeedUpdateCount = int64(r.keyQueue.Len())
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type syncClusterReader struct {
statusId int
}

func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader {
addresses, _ := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
log.Debugf("get redis cluster nodes:")
for _, address := range addresses {
log.Debugf("%s", address)
Expand All @@ -25,7 +25,7 @@ func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
for _, address := range addresses {
theOpts := *opts
theOpts.Address = address
rd.readers = append(rd.readers, NewSyncStandaloneReader(&theOpts))
rd.readers = append(rd.readers, NewSyncStandaloneReader(ctx, &theOpts))
}
return rd
}
Expand Down
6 changes: 3 additions & 3 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package reader

import (
"context"
"bufio"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -77,10 +77,10 @@ type syncStandaloneReader struct {
}
}

func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader {
func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader {
r := new(syncStandaloneReader)
r.opts = opts
r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
r.rd = r.client.BufioReader()
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.stat.Address = opts.Address
Expand Down
5 changes: 3 additions & 2 deletions internal/utils/cluster_nodes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"context"
"fmt"
"strconv"
"strings"
Expand All @@ -9,8 +10,8 @@ import (
"RedisShake/internal/log"
)

func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(address, username, password, Tls)
func GetRedisClusterNodes(ctx context.Context, address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) {
c := client.NewRedisClient(ctx, address, username, password, Tls)
reply := c.DoWithStringReply("cluster", "nodes")
reply = strings.TrimSpace(reply)
slotsCount := 0
Expand Down
12 changes: 7 additions & 5 deletions internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package writer

import (
"context"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
Expand All @@ -16,9 +18,9 @@ type RedisClusterWriter struct {
stat []interface{}
}

func NewRedisClusterWriter(opts *RedisWriterOptions) Writer {
func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
rw := new(RedisClusterWriter)
rw.loadClusterNodes(opts)
rw.loadClusterNodes(ctx, opts)
log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
return rw
}
Expand All @@ -29,13 +31,13 @@ func (r *RedisClusterWriter) Close() {
}
}

func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false)
func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWriterOptions) {
addresses, slots := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
r.addresses = addresses
for i, address := range addresses {
theOpts := *opts
theOpts.Address = address
redisWriter := NewRedisStandaloneWriter(&theOpts)
redisWriter := NewRedisStandaloneWriter(ctx, &theOpts)
r.writers = append(r.writers, redisWriter)
for _, s := range slots[i] {
if r.router[s] != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package writer

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -40,11 +41,11 @@ type redisStandaloneWriter struct {
}
}

func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer {
func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
rw := new(redisStandaloneWriter)
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls)
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
Expand Down

0 comments on commit 0f374b1

Please sign in to comment.