Skip to content

Commit

Permalink
feature: Add support for function
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Aug 17, 2023
1 parent acd3e2e commit 945819f
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 187 deletions.
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ BIN_DIR=$(pwd)/bin/
rm -rf "$BIN_DIR"
mkdir -p "$BIN_DIR"

cp -r configs/* "$BIN_DIR"
cp shake.toml "$BIN_DIR"

dist() {
echo "try build GOOS=$1 GOARCH=$2"
Expand All @@ -20,7 +20,7 @@ dist() {
echo "build success GOOS=$1 GOARCH=$2"

cd "$BIN_DIR"
tar -czvf ./redis-shake-"$1"-"$2".tar.gz ./sync.toml ./scan.toml ./restore.toml ./redis-shake ./filters ./cluster_helper
tar -czvf ./redis-shake-"$1"-"$2".tar.gz ./shake.toml
cd ..
}

Expand Down
22 changes: 11 additions & 11 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package main

import (
"RedisShake/internal/commands"
"RedisShake/internal/config"
"RedisShake/internal/function"
"RedisShake/internal/log"
"RedisShake/internal/reader"
"RedisShake/internal/status"
"RedisShake/internal/transform"
"RedisShake/internal/utils"
"RedisShake/internal/writer"
"github.com/mcuadros/go-defaults"
Expand All @@ -20,7 +19,7 @@ func main() {
utils.ChdirAndAcquireFileLock()
utils.SetNcpu()
utils.SetPprofPort()
transform.Init()
function.Init()

// create reader
var theReader reader.Reader
Expand Down Expand Up @@ -103,18 +102,19 @@ func main() {
ch := theReader.StartRead()
for e := range ch {
// calc arguments
e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv)
e.Slots = commands.CalcSlots(e.Keys)
e.Preprocess()

// filter
code := transform.Transform(e)
if code == transform.Allow {
theWriter.Write(e)
status.AddEntryCount(e.CmdName, true)
} else if code == transform.Disallow {
log.Debugf("function before: %v", e)
entries := function.RunFunction(e)
log.Debugf("function after: %v", entries)
if len(entries) == 0 {
status.AddEntryCount(e.CmdName, false)
} else {
log.Panicf("error when run lua filter. entry: %s", e.String())
for _, entry := range entries {
theWriter.Write(entry)
status.AddEntryCount(entry.CmdName, true)
}
}
}

Expand Down
8 changes: 0 additions & 8 deletions configs/transform/aliyun.lua

This file was deleted.

8 changes: 0 additions & 8 deletions configs/transform/aws.lua

This file was deleted.

12 changes: 0 additions & 12 deletions configs/transform/key_prefix.lua

This file was deleted.

24 changes: 0 additions & 24 deletions configs/transform/print.lua

This file was deleted.

8 changes: 0 additions & 8 deletions configs/transform/skip_scripts.lua

This file was deleted.

12 changes: 0 additions & 12 deletions configs/transform/swap_db.lua

This file was deleted.

6 changes: 3 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type AdvancedOptions struct {
Dir string `mapstructure:"dir" default:"data"`

Ncpu int `mapstructure:"ncpu" default:"4"`
Ncpu int `mapstructure:"ncpu" default:"0"`

PprofPort int `mapstructure:"pprof_port" default:"0"`
StatusPort int `mapstructure:"status_port" default:"6479"`
Expand Down Expand Up @@ -42,8 +42,8 @@ func (opt *AdvancedOptions) GetPSyncCommand(address string) string {
}

type ShakeOptions struct {
Transform string `mapstructure:"transform" default:""`
Advanced AdvancedOptions
Function string `mapstructure:"function" default:""`
Advanced AdvancedOptions
}

var Opt ShakeOptions
Expand Down
10 changes: 8 additions & 2 deletions internal/entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package entry

import (
"RedisShake/internal/client/proto"
"RedisShake/internal/commands"
"RedisShake/internal/log"
"bytes"
"strings"
)

type Entry struct {
DbId int
Argv []string
DbId int // required
Argv []string // required

CmdName string
Group string
Expand Down Expand Up @@ -48,3 +49,8 @@ func (e *Entry) Serialize() []byte {
e.SerializedSize = int64(buf.Len())
return buf.Bytes()
}

func (e *Entry) Preprocess() {
e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv)
e.Slots = commands.CalcSlots(e.Keys)
}
84 changes: 84 additions & 0 deletions internal/function/function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package function

import (
"RedisShake/internal/config"
"RedisShake/internal/entry"
"RedisShake/internal/log"
lua "github.com/yuin/gopher-lua"
"strings"
)

var luaString string

func Init() {
luaString = config.Opt.Function
luaString = strings.TrimSpace(luaString)
if len(luaString) == 0 {
log.Infof("no function script")
return
}
}

// DB
// GROUP
// CMD
// KEYS
// SLOTS
// ARGV

// shake.call(DB, ARGV)
// shake.log()

func RunFunction(e *entry.Entry) []*entry.Entry {
entries := make([]*entry.Entry, 0)
if len(luaString) == 0 {
entries = append(entries, e)
return entries
}

L := lua.NewState()
L.SetGlobal("DB", lua.LNumber(e.DbId))
L.SetGlobal("GROUP", lua.LString(e.Group))
L.SetGlobal("CMD", lua.LString(e.CmdName))
keys := L.NewTable()
for _, key := range e.Keys {
keys.Append(lua.LString(key))
}
L.SetGlobal("KEYS", keys)
slots := L.NewTable()
for _, slot := range e.Slots {
slots.Append(lua.LNumber(slot))
}
L.SetGlobal("SLOTS", slots)
argv := L.NewTable()
for _, arg := range e.Argv {
argv.Append(lua.LString(arg))
}
L.SetGlobal("ARGV", argv)
shake := L.NewTypeMetatable("shake")
L.SetGlobal("shake", shake)

L.SetField(shake, "call", L.NewFunction(func(ls *lua.LState) int {
db := ls.ToInt(1)
argv := ls.ToTable(2)
var argvStrings []string
for i := 1; i <= argv.Len(); i++ {
argvStrings = append(argvStrings, argv.RawGetInt(i).String())
}
entries = append(entries, &entry.Entry{
DbId: db,
Argv: argvStrings,
})
return 0
}))
L.SetField(shake, "log", L.NewFunction(func(ls *lua.LState) int {
log.Infof("lua log: %v", ls.ToString(1))
return 0
}))
err := L.DoString(luaString)
if err != nil {
log.Panicf("load function script failed: %v", err)
}

return entries
}
2 changes: 1 addition & 1 deletion internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Statusable interface {
type Stat struct {
Time string `json:"start_time"`
Consistent bool `json:"consistent"`
// transform
// function
TotalEntriesCount EntryCount `json:"total_entries_count"`
PerCmdEntriesCount map[string]EntryCount `json:"per_cmd_entries_count"`
// reader
Expand Down
63 changes: 0 additions & 63 deletions internal/transform/transform.go

This file was deleted.

Loading

0 comments on commit 945819f

Please sign in to comment.