Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support parallel architecture #847

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 77 additions & 23 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -116,41 +117,94 @@ func main() {
default:
log.Panicf("no writer config entry found")
}

// create status
status.Init(theReader, theWriter)
if config.Opt.Advanced.StatusPort != 0 {
status.Init(theReader, theWriter)
}
// create log entry count
logEntryCount := status.EntryCount{
ReadCount: 0,
WriteCount: 0,
}

log.Infof("start syncing...")

ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

chrs := theReader.StartRead(ctx)

theWriter.StartWrite(ctx)

readerDone := make(chan bool)

for _, chr := range chrs {
go func(ch chan *entry.Entry) {
for e := range ch {
// calc arguments
e.Parse()

// update reader status
if config.Opt.Advanced.StatusPort != 0 {
status.AddReadCount(e.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.ReadCount, 1)

// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
}

// run lua function
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

// write
for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)

// update writer status
if config.Opt.Advanced.StatusPort != 0 {
status.AddWriteCount(theEntry.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.WriteCount, 1)
}
}
readerDone <- true
}(chr)
}

// caluate ops and log to screen
go func() {
if config.Opt.Advanced.LogInterval <= 0 {
log.Infof("log interval is 0, will not log to screen")
return
}
ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
logEntryCount.UpdateOPS()
log.Infof("%s, %s", logEntryCount.String(), theReader.StatusString())
}
}()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
readerCnt := len(chrs)
Loop:
for {
select {
case e, ok := <-ch:
if !ok {
// ch has been closed, exit the loop
break Loop
}
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)

// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
case done := <-readerDone:
if done {
readerCnt--
}
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)
status.AddWriteCount(theEntry.CmdName)
if readerCnt == 0 {
break Loop
}
case <-ticker.C:
pingEntry := entry.NewEntry()
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
return r
}

func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry {
//init entry
r.ch = make(chan *entry.Entry, 1024)

Expand Down Expand Up @@ -101,5 +101,5 @@ func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {

}()

return r.ch
return []chan *entry.Entry{r.ch}
}
4 changes: 2 additions & 2 deletions internal/reader/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (

type Reader interface {
status.Statusable
StartRead(ctx context.Context) chan *entry.Entry
}
StartRead(ctx context.Context) []chan *entry.Entry
}
5 changes: 3 additions & 2 deletions internal/reader/rdb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"RedisShake/internal/log"
"RedisShake/internal/rdb"
"RedisShake/internal/utils"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -41,7 +42,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
return r
}

func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *rdbReader) StartRead(ctx context.Context) []chan *entry.Entry {
log.Infof("[%s] start read", r.stat.Name)
r.ch = make(chan *entry.Entry, 1024)
updateFunc := func(offset int64) {
Expand All @@ -58,7 +59,7 @@ func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
close(r.ch)
}()

return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *rdbReader) Status() interface{} {
Expand Down
21 changes: 5 additions & 16 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package reader
import (
"context"
"fmt"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/utils"
Expand All @@ -26,23 +25,13 @@ func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader {
return rd
}

func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
func (rd *scanClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
chs := make([]chan *entry.Entry, 0)
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
for e := range r.StartRead(ctx) {
ch <- e
}
wg.Done()
}(r)
ch := r.StartRead(ctx)
chs = append(chs, ch[0])
}
go func() {
wg.Wait()
close(ch)
}()
return ch
return chs
}

func (rd *scanClusterReader) Status() interface{} {
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade
return r
}

func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *scanStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx
if r.opts.Scan {
go r.scan()
Expand All @@ -95,7 +95,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
}
go r.dump()
go r.restore()
return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *scanStandaloneReader) subscript() {
Expand Down
21 changes: 5 additions & 16 deletions internal/reader/sync_cluster_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package reader
import (
"context"
"fmt"
"sync"

"RedisShake/internal/entry"
"RedisShake/internal/log"
Expand All @@ -30,23 +29,13 @@ func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader {
return rd
}

func (rd *syncClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
chs := make([]chan *entry.Entry, 0)
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
defer wg.Done()
for e := range r.StartRead(ctx) {
ch <- e
}
}(r)
ch := r.StartRead(ctx)
chs = append(chs, ch[0])
}
go func() {
wg.Wait()
close(ch)
}()
return ch
return chs
}

func (rd *syncClusterReader) Status() interface{} {
Expand Down
4 changes: 2 additions & 2 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade
return r
}

func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry {
func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {
Expand All @@ -113,7 +113,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
close(r.ch)
}()

return r.ch
return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) sendReplconfListenPort() {
Expand Down
2 changes: 1 addition & 1 deletion internal/status/entry_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type EntryCount struct {
}

// call this function every second
func (e *EntryCount) updateOPS() {
func (e *EntryCount) UpdateOPS() {
nowTimestampSec := float64(time.Now().UnixNano()) / 1e9
if e.lastUpdateTimestampSec != 0 {
timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec
Expand Down
33 changes: 7 additions & 26 deletions internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package status

import (
"time"

"RedisShake/internal/config"
"RedisShake/internal/log"
)

type Statusable interface {
Expand Down Expand Up @@ -32,9 +29,6 @@ var theWriter Statusable

func AddReadCount(cmd string) {
ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok {
cmdEntryCount = EntryCount{}
Expand All @@ -48,9 +42,6 @@ func AddReadCount(cmd string) {

func AddWriteCount(cmd string) {
ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok {
cmdEntryCount = EntryCount{}
Expand All @@ -68,6 +59,11 @@ func Init(r Statusable, w Statusable) {
setStatusPort()
stat.Time = time.Now().Format("2006-01-02 15:04:05")

// init per cmd entries count
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}

// for update reader/writer stat
go func() {
ticker := time.NewTicker(1 * time.Second)
Expand All @@ -81,29 +77,14 @@ func Init(r Statusable, w Statusable) {
stat.Consistent = lastConsistent && theReader.StatusConsistent() && theWriter.StatusConsistent()
lastConsistent = stat.Consistent
// update OPS
stat.TotalEntriesCount.updateOPS()
stat.TotalEntriesCount.UpdateOPS()
for _, cmdEntryCount := range stat.PerCmdEntriesCount {
cmdEntryCount.updateOPS()
cmdEntryCount.UpdateOPS()
}
}
}
}()

// for log to screen
go func() {
if config.Opt.Advanced.LogInterval <= 0 {
log.Infof("log interval is 0, will not log to screen")
return
}
ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
ch <- func() {
log.Infof("%s, %s", stat.TotalEntriesCount.String(), theReader.StatusString())
}
}
}()

// run all func in ch
go func() {
for f := range ch {
Expand Down
2 changes: 2 additions & 0 deletions internal/writer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package writer
import (
"RedisShake/internal/entry"
"RedisShake/internal/status"
"context"
)

type Writer interface {
status.Statusable
Write(entry *entry.Entry)
StartWrite(ctx context.Context) (ch chan *entry.Entry)
Close()
}
Loading
Loading