Skip to content

Commit

Permalink
feat: support parallerl writer in sync & scan mode
Browse files Browse the repository at this point in the history
  • Loading branch information
OxalisCu committed Sep 17, 2024
1 parent 94d36b0 commit 245d6b5
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 40 deletions.
7 changes: 5 additions & 2 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,18 @@ func main() {

log.Infof("start syncing...")

ch := theReader.StartRead(ctx)
chr := theReader.StartRead(ctx)

theWriter.StartWrite(ctx)

go waitShutdown(cancel)

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Loop:
for {
select {
case e, ok := <-ch:
case e, ok := <-chr:
if !ok {
// ch has been closed, exit the loop
break Loop
Expand Down
2 changes: 2 additions & 0 deletions internal/writer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package writer
import (
"RedisShake/internal/entry"
"RedisShake/internal/status"
"context"
)

type Writer interface {
status.Statusable
Write(entry *entry.Entry)
StartWrite(ctx context.Context) (ch chan *entry.Entry)
Close()
}
61 changes: 44 additions & 17 deletions internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package writer

import (
"context"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/log"
Expand All @@ -14,18 +15,22 @@ type RedisClusterWriter struct {
addresses []string
writers []Writer
router [KeySlots]Writer

stat []interface{}
ch chan *entry.Entry
chWg sync.WaitGroup
stat []interface{}
}

func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
rw := new(RedisClusterWriter)
rw.loadClusterNodes(ctx, opts)
rw.ch = make(chan *entry.Entry, 1024)
log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
return rw
}

func (r *RedisClusterWriter) Close() {
r.chWg.Wait()
close(r.ch)
for _, writer := range r.writers {
writer.Close()
}
Expand Down Expand Up @@ -54,24 +59,46 @@ func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWr
}
}

func (r *RedisClusterWriter) Write(entry *entry.Entry) {
if len(entry.Slots) == 0 {
for _, writer := range r.writers {
writer.Write(entry)
}
return
func (r *RedisClusterWriter) StartWrite(ctx context.Context) chan *entry.Entry {
chs := make(map[string]chan *entry.Entry, len(r.writers))
for _, w := range r.writers {
stat := w.Status().(struct {
Name string `json:"name"`
UnansweredBytes int64 `json:"unanswered_bytes"`
UnansweredEntries int64 `json:"unanswered_entries"`
})
chs[stat.Name] = w.StartWrite(ctx)
}

lastSlot := -1
for _, slot := range entry.Slots {
if lastSlot == -1 {
lastSlot = slot
}
if slot != lastSlot {
log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
r.chWg = sync.WaitGroup{}
r.chWg.Add(1)
go func() {
for entry := range r.ch {
if len(entry.Slots) == 0 {
for _, writer := range r.writers {
writer.Write(entry)
}
continue
}
lastSlot := -1
for _, slot := range entry.Slots {
if lastSlot == -1 {
lastSlot = slot
}
if slot != lastSlot {
log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
}
}
r.router[lastSlot].Write(entry)
}
}
r.router[lastSlot].Write(entry)
r.chWg.Done()
}()

return r.ch
}

func (r *RedisClusterWriter) Write(entry *entry.Entry) {
r.ch <- entry
}

func (r *RedisClusterWriter) Consistent() bool {
Expand Down
59 changes: 38 additions & 21 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ type redisStandaloneWriter struct {
DbId int

chWaitReply chan *entry.Entry
chWg sync.WaitGroup
chWaitWg sync.WaitGroup
offReply bool
ch chan *entry.Entry
chWg sync.WaitGroup

stat struct {
Name string `json:"name"`
Expand All @@ -49,43 +51,58 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
rw.ch = make(chan *entry.Entry, 1024)
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
rw.client.Send("CLIENT", "REPLY", "OFF")
} else {
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
rw.chWg.Add(1)
rw.chWaitWg.Add(1)
go rw.processReply()
}
return rw
}

func (w *redisStandaloneWriter) Close() {
if !w.offReply {
close(w.chWaitReply)
close(w.ch)
w.chWg.Wait()
close(w.chWaitReply)
w.chWaitWg.Wait()
}
}

func (w *redisStandaloneWriter) Write(e *entry.Entry) {
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}
func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry {
w.chWg = sync.WaitGroup{}
w.chWg.Add(1)
go func() {
for e := range w.ch {
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}
// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
}
w.chWg.Done()
}()

// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
return w.ch
}

func (w *redisStandaloneWriter) Write(e *entry.Entry) {
w.ch <- e
}

func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
Expand Down Expand Up @@ -124,7 +141,7 @@ func (w *redisStandaloneWriter) processReply() {
atomic.AddInt64(&w.stat.UnansweredBytes, -e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, -1)
}
w.chWg.Done()
w.chWaitWg.Done()
}

func (w *redisStandaloneWriter) Status() interface{} {
Expand Down

0 comments on commit 245d6b5

Please sign in to comment.