diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index c23159e2..34afa2bf 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -28,6 +28,9 @@ func main() { utils.SetPprofPort() luaRuntime := function.New(config.Opt.Function) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // create reader var theReader reader.Reader if v.IsSet("sync_reader") { @@ -38,10 +41,10 @@ func main() { log.Panicf("failed to read the SyncReader config entry. err: %v", err) } if opts.Cluster { - theReader = reader.NewSyncClusterReader(opts) + theReader = reader.NewSyncClusterReader(ctx, opts) log.Infof("create SyncClusterReader: %v", opts.Address) } else { - theReader = reader.NewSyncStandaloneReader(opts) + theReader = reader.NewSyncStandaloneReader(ctx, opts) log.Infof("create SyncStandaloneReader: %v", opts.Address) } } else if v.IsSet("scan_reader") { @@ -52,10 +55,10 @@ func main() { log.Panicf("failed to read the ScanReader config entry. err: %v", err) } if opts.Cluster { - theReader = reader.NewScanClusterReader(opts) + theReader = reader.NewScanClusterReader(ctx, opts) log.Infof("create ScanClusterReader: %v", opts.Address) } else { - theReader = reader.NewScanStandaloneReader(opts) + theReader = reader.NewScanStandaloneReader(ctx, opts) log.Infof("create ScanStandaloneReader: %v", opts.Address) } } else if v.IsSet("rdb_reader") { @@ -93,10 +96,10 @@ func main() { log.Panicf("the RDBRestoreCommandBehavior can't be 'panic' when the server not reply to commands") } if opts.Cluster { - theWriter = writer.NewRedisClusterWriter(opts) + theWriter = writer.NewRedisClusterWriter(ctx, opts) log.Infof("create RedisClusterWriter: %v", opts.Address) } else { - theWriter = writer.NewRedisStandaloneWriter(opts) + theWriter = writer.NewRedisStandaloneWriter(ctx, opts) log.Infof("create RedisStandaloneWriter: %v", opts.Address) } if config.Opt.Advanced.EmptyDBBeforeSync { @@ -114,7 +117,6 @@ func main() { log.Infof("start syncing...") - ctx, cancel := context.WithCancel(context.Background()) ch := theReader.StartRead(ctx) go waitShutdown(cancel) diff --git a/internal/client/redis.go b/internal/client/redis.go index 5672bf5c..924b2193 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -2,6 +2,7 @@ package client import ( "bufio" + "context" "crypto/tls" "net" "strconv" @@ -19,16 +20,22 @@ type Redis struct { protoWriter *proto.Writer } -func NewRedisClient(address string, username string, password string, Tls bool) *Redis { +func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis { r := new(Redis) var conn net.Conn - var dialer net.Dialer + var dialer = &net.Dialer{ + Timeout: 5 * time.Minute, + KeepAlive: 5 * time.Minute, + } var err error - dialer.Timeout = 3 * time.Second if Tls { - conn, err = tls.DialWithDialer(&dialer, "tcp", address, &tls.Config{InsecureSkipVerify: true}) + tlsDialer := &tls.Dialer{ + NetDialer: dialer, + Config: &tls.Config{InsecureSkipVerify: true}, + } + conn, err = tlsDialer.DialContext(ctx, "tcp", address) } else { - conn, err = dialer.Dial("tcp", address) + conn, err = dialer.DialContext(ctx, "tcp", address) } if err != nil { log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err) diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index dcb53788..42f6f1e6 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -14,14 +14,14 @@ type scanClusterReader struct { statusId int } -func NewScanClusterReader(opts *ScanReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) +func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader { + addresses, _ := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) rd := &scanClusterReader{} for _, address := range addresses { theOpts := *opts theOpts.Address = address - rd.readers = append(rd.readers, NewScanStandaloneReader(&theOpts)) + rd.readers = append(rd.readers, NewScanStandaloneReader(ctx, &theOpts)) } return rd } diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 2e103b84..aca1566a 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -50,10 +50,10 @@ type scanStandaloneReader struct { } } -func NewScanStandaloneReader(opts *ScanReaderOptions) Reader { +func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader { r := new(scanStandaloneReader) // dbs - c := client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) + c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node r.dbs = []int{0} } else { @@ -82,7 +82,7 @@ func (r *scanStandaloneReader) subscript() { if !r.opts.KSN { return } - c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) c.Send("psubscribe", "__keyevent@*__:*") go func() { @@ -114,7 +114,7 @@ func (r *scanStandaloneReader) subscript() { } func (r *scanStandaloneReader) scan() { - c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) defer c.Close() for _, dbId := range r.dbs { if dbId != 0 { @@ -150,7 +150,7 @@ func (r *scanStandaloneReader) scan() { func (r *scanStandaloneReader) fetch() { nowDbId := 0 - c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) defer c.Close() for item := range r.keyQueue.Ch { r.stat.NeedUpdateCount = int64(r.keyQueue.Len()) diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 18c4935a..89986dad 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -15,8 +15,8 @@ type syncClusterReader struct { statusId int } -func NewSyncClusterReader(opts *SyncReaderOptions) Reader { - addresses, _ := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) +func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader { + addresses, _ := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) log.Debugf("get redis cluster nodes:") for _, address := range addresses { log.Debugf("%s", address) @@ -25,7 +25,7 @@ func NewSyncClusterReader(opts *SyncReaderOptions) Reader { for _, address := range addresses { theOpts := *opts theOpts.Address = address - rd.readers = append(rd.readers, NewSyncStandaloneReader(&theOpts)) + rd.readers = append(rd.readers, NewSyncStandaloneReader(ctx, &theOpts)) } return rd } diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index b78c5639..8c2eae17 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -1,8 +1,8 @@ package reader import ( - "context" "bufio" + "context" "fmt" "io" "os" @@ -77,10 +77,10 @@ type syncStandaloneReader struct { } } -func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader { +func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader { r := new(syncStandaloneReader) r.opts = opts - r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) + r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) r.rd = r.client.BufioReader() r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) r.stat.Address = opts.Address diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index be909ef3..cf33b207 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -1,6 +1,7 @@ package utils import ( + "context" "fmt" "strconv" "strings" @@ -9,8 +10,8 @@ import ( "RedisShake/internal/log" ) -func GetRedisClusterNodes(address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { - c := client.NewRedisClient(address, username, password, Tls) +func GetRedisClusterNodes(ctx context.Context, address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { + c := client.NewRedisClient(ctx, address, username, password, Tls) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index e5d17dab..1fc22b54 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -1,6 +1,8 @@ package writer import ( + "context" + "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/utils" @@ -16,9 +18,9 @@ type RedisClusterWriter struct { stat []interface{} } -func NewRedisClusterWriter(opts *RedisWriterOptions) Writer { +func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer { rw := new(RedisClusterWriter) - rw.loadClusterNodes(opts) + rw.loadClusterNodes(ctx, opts) log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses) return rw } @@ -29,13 +31,13 @@ func (r *RedisClusterWriter) Close() { } } -func (r *RedisClusterWriter) loadClusterNodes(opts *RedisWriterOptions) { - addresses, slots := utils.GetRedisClusterNodes(opts.Address, opts.Username, opts.Password, opts.Tls, false) +func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWriterOptions) { + addresses, slots := utils.GetRedisClusterNodes(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) r.addresses = addresses for i, address := range addresses { theOpts := *opts theOpts.Address = address - redisWriter := NewRedisStandaloneWriter(&theOpts) + redisWriter := NewRedisStandaloneWriter(ctx, &theOpts) r.writers = append(r.writers, redisWriter) for _, s := range slots[i] { if r.router[s] != nil { diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index 4e636a4a..8fc25fe2 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -1,6 +1,7 @@ package writer import ( + "context" "fmt" "strconv" "strings" @@ -40,11 +41,11 @@ type redisStandaloneWriter struct { } } -func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer { +func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Writer { rw := new(redisStandaloneWriter) rw.address = opts.Address rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) - rw.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls) + rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true