From b7b332b319873ec70b553e59bbee566028972ec2 Mon Sep 17 00:00:00 2001 From: OxalisCu <2127298698@qq.com> Date: Tue, 24 Sep 2024 12:49:30 +0800 Subject: [PATCH] feat: parallel parse, filter, function Signed-off-by: OxalisCu <2127298698@qq.com> --- cmd/redis-shake/main.go | 62 ++++++++++++++--------- internal/reader/aof_reader.go | 4 +- internal/reader/interface.go | 4 +- internal/reader/rdb_reader.go | 5 +- internal/reader/scan_cluster_reader.go | 21 ++------ internal/reader/scan_standalone_reader.go | 4 +- internal/reader/sync_cluster_reader.go | 21 ++------ internal/reader/sync_standalone_reader.go | 4 +- 8 files changed, 60 insertions(+), 65 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 2e35afc7..9d36eca0 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -121,39 +121,55 @@ func main() { log.Infof("start syncing...") - chr := theReader.StartRead(ctx) + go waitShutdown(cancel) + + chrs := theReader.StartRead(ctx) theWriter.StartWrite(ctx) - go waitShutdown(cancel) + readerDone := make(chan bool) + + for _, chr := range chrs { + go func(ch chan *entry.Entry) { + for e := range ch { + // calc arguments + e.Parse() + status.AddReadCount(e.CmdName) + + // filter + if !filter.Filter(e) { + log.Debugf("skip command: %v", e) + continue + } + + // run lua function + log.Debugf("function before: %v", e) + entries := luaRuntime.RunFunction(e) + log.Debugf("function after: %v", entries) + + // write + for _, theEntry := range entries { + theEntry.Parse() + theWriter.Write(theEntry) + status.AddWriteCount(theEntry.CmdName) + } + } + readerDone <- true + }(chr) + } ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() + readerCnt := len(chrs) Loop: for { select { - case e, ok := <-chr: - if !ok { - // ch has been closed, exit the loop - break Loop + case done := <-readerDone: + if done { + readerCnt-- } - // calc arguments - e.Parse() - status.AddReadCount(e.CmdName) - - // filter - if !filter.Filter(e) { - log.Debugf("skip command: %v", e) - continue - } - log.Debugf("function before: %v", e) - entries := luaRuntime.RunFunction(e) - log.Debugf("function after: %v", entries) - - for _, theEntry := range entries { - theEntry.Parse() - theWriter.Write(theEntry) - status.AddWriteCount(theEntry.CmdName) + if readerCnt == 0 { + break Loop } case <-ticker.C: pingEntry := entry.NewEntry() diff --git a/internal/reader/aof_reader.go b/internal/reader/aof_reader.go index d21d1443..ea3a5fb6 100644 --- a/internal/reader/aof_reader.go +++ b/internal/reader/aof_reader.go @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader { return r } -func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry { +func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry { //init entry r.ch = make(chan *entry.Entry, 1024) @@ -101,5 +101,5 @@ func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry { }() - return r.ch + return []chan *entry.Entry{r.ch} } diff --git a/internal/reader/interface.go b/internal/reader/interface.go index 9a84104d..5502bf3e 100644 --- a/internal/reader/interface.go +++ b/internal/reader/interface.go @@ -8,5 +8,5 @@ import ( type Reader interface { status.Statusable - StartRead(ctx context.Context) chan *entry.Entry -} \ No newline at end of file + StartRead(ctx context.Context) []chan *entry.Entry +} diff --git a/internal/reader/rdb_reader.go b/internal/reader/rdb_reader.go index f99b49ce..63569e3f 100644 --- a/internal/reader/rdb_reader.go +++ b/internal/reader/rdb_reader.go @@ -8,6 +8,7 @@ import ( "RedisShake/internal/log" "RedisShake/internal/rdb" "RedisShake/internal/utils" + "github.com/dustin/go-humanize" ) @@ -41,7 +42,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader { return r } -func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry { +func (r *rdbReader) StartRead(ctx context.Context) []chan *entry.Entry { log.Infof("[%s] start read", r.stat.Name) r.ch = make(chan *entry.Entry, 1024) updateFunc := func(offset int64) { @@ -58,7 +59,7 @@ func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry { close(r.ch) }() - return r.ch + return []chan *entry.Entry{r.ch} } func (r *rdbReader) Status() interface{} { diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index 42f6f1e6..4b2b59af 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -3,7 +3,6 @@ package reader import ( "context" "fmt" - "sync" "RedisShake/internal/entry" "RedisShake/internal/utils" @@ -26,23 +25,13 @@ func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader { return rd } -func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry { - ch := make(chan *entry.Entry, 1024) - var wg sync.WaitGroup +func (rd *scanClusterReader) StartRead(ctx context.Context) []chan *entry.Entry { + chs := make([]chan *entry.Entry, 0) for _, r := range rd.readers { - wg.Add(1) - go func(r Reader) { - for e := range r.StartRead(ctx) { - ch <- e - } - wg.Done() - }(r) + ch := r.StartRead(ctx) + chs = append(chs, ch[0]) } - go func() { - wg.Wait() - close(ch) - }() - return ch + return chs } func (rd *scanClusterReader) Status() interface{} { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index a4bea7d2..355edc34 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -85,7 +85,7 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade return r } -func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry { +func (r *scanStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry { r.ctx = ctx if r.opts.Scan { go r.scan() @@ -95,7 +95,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry } go r.dump() go r.restore() - return r.ch + return []chan *entry.Entry{r.ch} } func (r *scanStandaloneReader) subscript() { diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index 89986dad..c30d2db8 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -3,7 +3,6 @@ package reader import ( "context" "fmt" - "sync" "RedisShake/internal/entry" "RedisShake/internal/log" @@ -30,23 +29,13 @@ func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader { return rd } -func (rd *syncClusterReader) StartRead(ctx context.Context) chan *entry.Entry { - ch := make(chan *entry.Entry, 1024) - var wg sync.WaitGroup +func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry { + chs := make([]chan *entry.Entry, 0) for _, r := range rd.readers { - wg.Add(1) - go func(r Reader) { - defer wg.Done() - for e := range r.StartRead(ctx) { - ch <- e - } - }(r) + ch := r.StartRead(ctx) + chs = append(chs, ch[0]) } - go func() { - wg.Wait() - close(ch) - }() - return ch + return chs } func (rd *syncClusterReader) Status() interface{} { diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 77542f7d..27b83293 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -93,7 +93,7 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade return r } -func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry { +func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry { r.ctx = ctx r.ch = make(chan *entry.Entry, 1024) go func() { @@ -113,7 +113,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry close(r.ch) }() - return r.ch + return []chan *entry.Entry{r.ch} } func (r *syncStandaloneReader) sendReplconfListenPort() {