Skip to content

Commit

Permalink
feature: Add parallel writer support
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Dec 22, 2023
1 parent 4e467c2 commit 11c9986
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 78 deletions.
44 changes: 30 additions & 14 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
_ "net/http/pprof"
"sync"

"RedisShake/internal/config"
"RedisShake/internal/function"
Expand Down Expand Up @@ -102,28 +103,43 @@ func main() {
status.Init(theReader, theWriter)

if config.Opt.Advanced.EmptyDBBeforeSync {
log.Warnf("empty db before sync...")
theWriter.Flush()
}

log.Infof("start syncing...")

ch := theReader.StartRead()
for e := range ch {
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)
channels := theReader.StartRead()
wg := sync.WaitGroup{}
for _, ch := range channels {
ch := ch
wg.Add(1)
go func() {
for e := range ch {
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)

// filter
log.Debugf("function before: %v", e)
entries := function.RunFunction(e)
log.Debugf("function after: %v", entries)
// filter
if function.IsFunctionEnabled() {
log.Debugf("function before: %v", e)
entries := function.RunFunction(e)
log.Debugf("function after: %v", entries)

for _, entry := range entries {
entry.Parse()
theWriter.Write(entry)
status.AddWriteCount(entry.CmdName)
}
for _, entry := range entries {
entry.Parse()
theWriter.Write(entry)
status.AddWriteCount(entry.CmdName)
}
} else {
theWriter.Write(e)
status.AddWriteCount(e.CmdName)
}
}
wg.Done()
}()
}
wg.Wait()

theWriter.Close() // Wait for all writing operations to complete
utils.ReleaseFileLock() // Release file lock
Expand Down
4 changes: 4 additions & 0 deletions internal/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func Init() {
}
}

func IsFunctionEnabled() bool {
return len(luaString) != 0
}

// DB
// GROUP
// CMD
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
return r
}

func (r *aofReader) StartRead() chan *entry.Entry {
func (r *aofReader) StartRead() []chan *entry.Entry {
//init entry
r.ch = make(chan *entry.Entry, 1024)

Expand Down Expand Up @@ -101,5 +101,5 @@ func (r *aofReader) StartRead() chan *entry.Entry {

}()

return r.ch
return []chan *entry.Entry{r.ch}
}
2 changes: 1 addition & 1 deletion internal/reader/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (

type Reader interface {
status.Statusable
StartRead() chan *entry.Entry
StartRead() []chan *entry.Entry
}
4 changes: 2 additions & 2 deletions internal/reader/rdb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
return r
}

func (r *rdbReader) StartRead() chan *entry.Entry {
func (r *rdbReader) StartRead() []chan *entry.Entry {
log.Infof("[%s] start read", r.stat.Name)
r.ch = make(chan *entry.Entry, 1024)
updateFunc := func(offset int64) {
Expand All @@ -58,7 +58,7 @@ func (r *rdbReader) StartRead() chan *entry.Entry {
close(r.ch)
}()

return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *rdbReader) Status() interface{} {
Expand Down
23 changes: 5 additions & 18 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package reader

import (
"fmt"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/utils"
"fmt"
)

type scanClusterReader struct {
Expand All @@ -25,23 +23,12 @@ func NewScanClusterReader(opts *ScanReaderOptions) Reader {
return rd
}

func (rd *scanClusterReader) StartRead() chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
func (rd *scanClusterReader) StartRead() []chan *entry.Entry {
channels := make([]chan *entry.Entry, 0)
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
for e := range r.StartRead() {
ch <- e
}
wg.Done()
}(r)
channels = append(channels, r.StartRead()...)
}
go func() {
wg.Wait()
close(ch)
}()
return ch
return channels
}

func (rd *scanClusterReader) Status() interface{} {
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func NewScanStandaloneReader(opts *ScanReaderOptions) Reader {
return r
}

func (r *scanStandaloneReader) StartRead() chan *entry.Entry {
func (r *scanStandaloneReader) StartRead() []chan *entry.Entry {
r.subscript()
go r.scan()
go r.fetch()
return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *scanStandaloneReader) subscript() {
Expand Down
23 changes: 5 additions & 18 deletions internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package reader

import (
"fmt"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
"fmt"
)

type syncClusterReader struct {
Expand All @@ -29,23 +27,12 @@ func NewSyncClusterReader(opts *SyncReaderOptions) Reader {
return rd
}

func (rd *syncClusterReader) StartRead() chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
func (rd *syncClusterReader) StartRead() []chan *entry.Entry {
channels := make([]chan *entry.Entry, 0)
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
defer wg.Done()
for e := range r.StartRead() {
ch <- e
}
}(r)
channels = append(channels, r.StartRead()...)
}
go func() {
wg.Wait()
close(ch)
}()
return ch
return channels
}

func (rd *syncClusterReader) Status() interface{} {
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader {
return r
}

func (r *syncStandaloneReader) StartRead() chan *entry.Entry {
func (r *syncStandaloneReader) StartRead() []chan *entry.Entry {
r.ch = make(chan *entry.Entry, 1024)
go func() {
r.sendReplconfListenPort()
Expand All @@ -106,7 +106,7 @@ func (r *syncStandaloneReader) StartRead() chan *entry.Entry {
close(r.ch)
}()

return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) sendReplconfListenPort() {
Expand Down
48 changes: 29 additions & 19 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type redisStandaloneWriter struct {
client *client.Redis
DbId int

chWaitSend chan *entry.Entry
chWaitReply chan *entry.Entry
chWg sync.WaitGroup
offReply bool
Expand All @@ -54,6 +55,9 @@ func NewRedisStandaloneWriter(opts *RedisWriterOptions) Writer {
rw.chWg.Add(1)
go rw.processReply()
}
rw.chWaitSend = make(chan *entry.Entry, 1024)
rw.chWg.Add(1)
go rw.doWrite()
return rw
}

Expand All @@ -65,30 +69,36 @@ func (w *redisStandaloneWriter) Flush() {
}

func (w *redisStandaloneWriter) Close() {
if !w.offReply {
close(w.chWaitReply)
w.chWg.Wait()
}
close(w.chWaitSend)
close(w.chWaitReply)
w.chWg.Wait()
}

func (w *redisStandaloneWriter) Write(e *entry.Entry) {
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}
w.chWaitSend <- e
}

// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
func (w *redisStandaloneWriter) doWrite() {
for e := range w.chWaitSend {
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}

// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
}
w.client.SendBytes(bytes)
w.chWg.Done()
}

func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
Expand Down

0 comments on commit 11c9986

Please sign in to comment.