Skip to content

Commit

Permalink
feat: support parallel writer (#847)
Browse files Browse the repository at this point in the history
* feat: support parallerl writer in sync & scan mode

* feat: parallel parse, filter, function

Signed-off-by: OxalisCu <[email protected]>

* feat: parallel entry allocation

Signed-off-by: OxalisCu <[email protected]>

* feat: seperate ops log and status monitoring

---------

Signed-off-by: OxalisCu <[email protected]>
  • Loading branch information
OxalisCu authored Oct 17, 2024
1 parent 0f42759 commit 66ca8e7
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 116 deletions.
100 changes: 77 additions & 23 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -116,41 +117,94 @@ func main() {
default:
log.Panicf("no writer config entry found")
}

// create status
status.Init(theReader, theWriter)
if config.Opt.Advanced.StatusPort != 0 {
status.Init(theReader, theWriter)
}
// create log entry count
logEntryCount := status.EntryCount{
ReadCount: 0,
WriteCount: 0,
}

log.Infof("start syncing...")

ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

chrs := theReader.StartRead(ctx)

theWriter.StartWrite(ctx)

readerDone := make(chan bool)

for _, chr := range chrs {
go func(ch chan *entry.Entry) {
for e := range ch {
// calc arguments
e.Parse()

// update reader status
if config.Opt.Advanced.StatusPort != 0 {
status.AddReadCount(e.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.ReadCount, 1)

// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
}

// run lua function
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

// write
for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)

// update writer status
if config.Opt.Advanced.StatusPort != 0 {
status.AddWriteCount(theEntry.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.WriteCount, 1)
}
}
readerDone <- true
}(chr)
}

// caluate ops and log to screen
go func() {
if config.Opt.Advanced.LogInterval <= 0 {
log.Infof("log interval is 0, will not log to screen")
return
}
ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
logEntryCount.UpdateOPS()
log.Infof("%s, %s", logEntryCount.String(), theReader.StatusString())
}
}()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
readerCnt := len(chrs)
Loop:
for {
select {
case e, ok := <-ch:
if !ok {
// ch has been closed, exit the loop
break Loop
}
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)

// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
case done := <-readerDone:
if done {
readerCnt--
}
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)
status.AddWriteCount(theEntry.CmdName)
if readerCnt == 0 {
break Loop
}
case <-ticker.C:
pingEntry := entry.NewEntry()
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
return r
}

func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry {
//init entry
r.ch = make(chan *entry.Entry, 1024)

Expand Down Expand Up @@ -101,5 +101,5 @@ func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {

}()

return r.ch
return []chan *entry.Entry{r.ch}
}
4 changes: 2 additions & 2 deletions internal/reader/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (

type Reader interface {
status.Statusable
StartRead(ctx context.Context) chan *entry.Entry
}
StartRead(ctx context.Context) []chan *entry.Entry
}
5 changes: 3 additions & 2 deletions internal/reader/rdb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"RedisShake/internal/log"
"RedisShake/internal/rdb"
"RedisShake/internal/utils"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -41,7 +42,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
return r
}

func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *rdbReader) StartRead(ctx context.Context) []chan *entry.Entry {
log.Infof("[%s] start read", r.stat.Name)
r.ch = make(chan *entry.Entry, 1024)
updateFunc := func(offset int64) {
Expand All @@ -58,7 +59,7 @@ func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
close(r.ch)
}()

return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *rdbReader) Status() interface{} {
Expand Down
21 changes: 5 additions & 16 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package reader
import (
"context"
"fmt"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/utils"
Expand All @@ -26,23 +25,13 @@ func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader {
return rd
}

func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
func (rd *scanClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
chs := make([]chan *entry.Entry, 0)
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
for e := range r.StartRead(ctx) {
ch <- e
}
wg.Done()
}(r)
ch := r.StartRead(ctx)
chs = append(chs, ch[0])
}
go func() {
wg.Wait()
close(ch)
}()
return ch
return chs
}

func (rd *scanClusterReader) Status() interface{} {
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade
return r
}

func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *scanStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx
if r.opts.Scan {
go r.scan()
Expand All @@ -95,7 +95,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
}
go r.dump()
go r.restore()
return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *scanStandaloneReader) subscript() {
Expand Down
21 changes: 5 additions & 16 deletions internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package reader
import (
"context"
"fmt"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/log"
Expand All @@ -30,23 +29,13 @@ func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader {
return rd
}

func (rd *syncClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
chs := make([]chan *entry.Entry, 0)
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
defer wg.Done()
for e := range r.StartRead(ctx) {
ch <- e
}
}(r)
ch := r.StartRead(ctx)
chs = append(chs, ch[0])
}
go func() {
wg.Wait()
close(ch)
}()
return ch
return chs
}

func (rd *syncClusterReader) Status() interface{} {
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade
return r
}

func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {
Expand All @@ -113,7 +113,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
close(r.ch)
}()

return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) sendReplconfListenPort() {
Expand Down
2 changes: 1 addition & 1 deletion internal/status/entry_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type EntryCount struct {
}

// call this function every second
func (e *EntryCount) updateOPS() {
func (e *EntryCount) UpdateOPS() {
nowTimestampSec := float64(time.Now().UnixNano()) / 1e9
if e.lastUpdateTimestampSec != 0 {
timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec
Expand Down
33 changes: 7 additions & 26 deletions internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package status

import (
"time"

"RedisShake/internal/config"
"RedisShake/internal/log"
)

type Statusable interface {
Expand Down Expand Up @@ -32,9 +29,6 @@ var theWriter Statusable

func AddReadCount(cmd string) {
ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok {
cmdEntryCount = EntryCount{}
Expand All @@ -48,9 +42,6 @@ func AddReadCount(cmd string) {

func AddWriteCount(cmd string) {
ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok {
cmdEntryCount = EntryCount{}
Expand All @@ -68,6 +59,11 @@ func Init(r Statusable, w Statusable) {
setStatusPort()
stat.Time = time.Now().Format("2006-01-02 15:04:05")

// init per cmd entries count
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}

// for update reader/writer stat
go func() {
ticker := time.NewTicker(1 * time.Second)
Expand All @@ -81,29 +77,14 @@ func Init(r Statusable, w Statusable) {
stat.Consistent = lastConsistent && theReader.StatusConsistent() && theWriter.StatusConsistent()
lastConsistent = stat.Consistent
// update OPS
stat.TotalEntriesCount.updateOPS()
stat.TotalEntriesCount.UpdateOPS()
for _, cmdEntryCount := range stat.PerCmdEntriesCount {
cmdEntryCount.updateOPS()
cmdEntryCount.UpdateOPS()
}
}
}
}()

// for log to screen
go func() {
if config.Opt.Advanced.LogInterval <= 0 {
log.Infof("log interval is 0, will not log to screen")
return
}
ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
ch <- func() {
log.Infof("%s, %s", stat.TotalEntriesCount.String(), theReader.StatusString())
}
}
}()

// run all func in ch
go func() {
for f := range ch {
Expand Down
2 changes: 2 additions & 0 deletions internal/writer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package writer
import (
"RedisShake/internal/entry"
"RedisShake/internal/status"
"context"
)

type Writer interface {
status.Statusable
Write(entry *entry.Entry)
StartWrite(ctx context.Context) (ch chan *entry.Entry)
Close()
}
Loading

0 comments on commit 66ca8e7

Please sign in to comment.