Skip to content

Commit

Permalink
support turn off server reply
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 authored and suxb201 committed Dec 22, 2023
1 parent 6aaab51 commit c160449
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
3 changes: 3 additions & 0 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func main() {
if err != nil {
log.Panicf("failed to read the RedisStandaloneWriter config entry. err: %v", err)
}
if opts.OffReply && config.Opt.Advanced.RDBRestoreCommandBehavior == "panic" {
log.Panicf("the RDBRestoreCommandBehavior can't be 'panic' when the server not reply to commands")
}
if opts.Cluster {
theWriter = writer.NewRedisClusterWriter(opts)
log.Infof("create RedisClusterWriter: %v", opts.Address)
Expand Down
28 changes: 20 additions & 8 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type RedisWriterOptions struct {
Username string `mapstructure:"username" default:""`
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
OffReply bool `mapstructure:"off_reply" default:"false"`
}

type redisStandaloneWriter struct {
Expand All @@ -30,6 +31,7 @@ type redisStandaloneWriter struct {

chWaitReply chan *entry.Entry
chWg sync.WaitGroup
offReply bool

stat struct {
Name string `json:"name"`
Expand All @@ -43,9 +45,15 @@ func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer {
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
rw.chWg.Add(1)
go rw.processReply()
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
rw.client.Send("CLIENT", "REPLY", "OFF")
} else {
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
rw.chWg.Add(1)
go rw.processReply()
}
return rw
}

Expand All @@ -57,8 +65,10 @@ func (w *redisStandaloneWriter) Flush() {
}

func (w *redisStandaloneWriter) Close() {
close(w.chWaitReply)
w.chWg.Wait()
if !w.offReply {
close(w.chWaitReply)
w.chWg.Wait()
}
}

func (w *redisStandaloneWriter) Write(e *entry.Entry) {
Expand All @@ -73,9 +83,11 @@ func (w *redisStandaloneWriter) Write(e *entry.Entry) {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
}

Expand Down
2 changes: 1 addition & 1 deletion shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ address = "127.0.0.1:6380" # when cluster is true, set address to one of the clu
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false

off_reply = false # ture off the server reply

[advanced]
dir = "data"
Expand Down

0 comments on commit c160449

Please sign in to comment.