From 945819f50d8a942ccec527390237913c76c300a7 Mon Sep 17 00:00:00 2001 From: suxb201 Date: Thu, 17 Aug 2023 11:18:44 +0800 Subject: [PATCH] feature: Add support for function --- build.sh | 4 +- cmd/redis-shake/main.go | 22 ++++---- configs/transform/aliyun.lua | 8 --- configs/transform/aws.lua | 8 --- configs/transform/key_prefix.lua | 12 ----- configs/transform/print.lua | 24 --------- configs/transform/skip_scripts.lua | 8 --- configs/transform/swap_db.lua | 12 ----- internal/config/config.go | 6 +-- internal/entry/entry.go | 10 +++- internal/function/function.go | 84 ++++++++++++++++++++++++++++++ internal/status/status.go | 2 +- internal/transform/transform.go | 63 ---------------------- shake.toml | 45 +++++----------- tests/cases/function.py | 70 +++++++++++++++++++++++++ 15 files changed, 191 insertions(+), 187 deletions(-) delete mode 100644 configs/transform/aliyun.lua delete mode 100644 configs/transform/aws.lua delete mode 100644 configs/transform/key_prefix.lua delete mode 100644 configs/transform/print.lua delete mode 100644 configs/transform/skip_scripts.lua delete mode 100644 configs/transform/swap_db.lua create mode 100644 internal/function/function.go delete mode 100644 internal/transform/transform.go create mode 100644 tests/cases/function.py diff --git a/build.sh b/build.sh index d4132c12..43325e00 100755 --- a/build.sh +++ b/build.sh @@ -7,7 +7,7 @@ BIN_DIR=$(pwd)/bin/ rm -rf "$BIN_DIR" mkdir -p "$BIN_DIR" -cp -r configs/* "$BIN_DIR" +cp shake.toml "$BIN_DIR" dist() { echo "try build GOOS=$1 GOARCH=$2" @@ -20,7 +20,7 @@ dist() { echo "build success GOOS=$1 GOARCH=$2" cd "$BIN_DIR" - tar -czvf ./redis-shake-"$1"-"$2".tar.gz ./sync.toml ./scan.toml ./restore.toml ./redis-shake ./filters ./cluster_helper + tar -czvf ./redis-shake-"$1"-"$2".tar.gz ./shake.toml cd .. } diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 7b1e5500..ccdd0792 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -1,12 +1,11 @@ package main import ( - "RedisShake/internal/commands" "RedisShake/internal/config" + "RedisShake/internal/function" "RedisShake/internal/log" "RedisShake/internal/reader" "RedisShake/internal/status" - "RedisShake/internal/transform" "RedisShake/internal/utils" "RedisShake/internal/writer" "github.com/mcuadros/go-defaults" @@ -20,7 +19,7 @@ func main() { utils.ChdirAndAcquireFileLock() utils.SetNcpu() utils.SetPprofPort() - transform.Init() + function.Init() // create reader var theReader reader.Reader @@ -103,18 +102,19 @@ func main() { ch := theReader.StartRead() for e := range ch { // calc arguments - e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv) - e.Slots = commands.CalcSlots(e.Keys) + e.Preprocess() // filter - code := transform.Transform(e) - if code == transform.Allow { - theWriter.Write(e) - status.AddEntryCount(e.CmdName, true) - } else if code == transform.Disallow { + log.Debugf("function before: %v", e) + entries := function.RunFunction(e) + log.Debugf("function after: %v", entries) + if len(entries) == 0 { status.AddEntryCount(e.CmdName, false) } else { - log.Panicf("error when run lua filter. entry: %s", e.String()) + for _, entry := range entries { + theWriter.Write(entry) + status.AddEntryCount(entry.CmdName, true) + } } } diff --git a/configs/transform/aliyun.lua b/configs/transform/aliyun.lua deleted file mode 100644 index 2cab44ce..00000000 --- a/configs/transform/aliyun.lua +++ /dev/null @@ -1,8 +0,0 @@ --- Aliyun Redis 4.0: skip OPINFO command -function transform(id, is_base, group, cmd_name, keys, slots, db_id, timestamp_ms) - if cmd_name == "OPINFO" then - return 1, db_id -- disallow - else - return 0, db_id -- allow - end -end \ No newline at end of file diff --git a/configs/transform/aws.lua b/configs/transform/aws.lua deleted file mode 100644 index afeee287..00000000 --- a/configs/transform/aws.lua +++ /dev/null @@ -1,8 +0,0 @@ --- ElastiCache: skip REPLCONF command -function transform(id, is_base, group, cmd_name, keys, slots, db_id, timestamp_ms) - if cmd_name == "REPLCONF" then - return 1, db_id -- disallow - else - return 0, db_id -- allow - end -end \ No newline at end of file diff --git a/configs/transform/key_prefix.lua b/configs/transform/key_prefix.lua deleted file mode 100644 index e17d18c2..00000000 --- a/configs/transform/key_prefix.lua +++ /dev/null @@ -1,12 +0,0 @@ --- skip keys prefixed with ABC -function filter(id, is_base, group, cmd_name, keys, slots, db_id, timestamp_ms) - if #keys ~= 1 then - return 0, db_id -- allow - end - - if string.sub(keys[1], 0, 3) == "ABC" then - return 1, db_id -- disallow - end - - return 0, db_id -- allow -end \ No newline at end of file diff --git a/configs/transform/print.lua b/configs/transform/print.lua deleted file mode 100644 index ff9e7fc4..00000000 --- a/configs/transform/print.lua +++ /dev/null @@ -1,24 +0,0 @@ ---- function name must be `filter` ---- ---- arguments: ---- @id number: the sequence of the cmd ---- @is_base boolean: whether the command is decoded from dump.rdb file ---- @group string: the group of cmd ---- @cmd_name string: cmd name ---- @keys table: keys of the command ---- @slots table: slots of the command ---- @db_id: database id ---- @timestamp_ms number: timestamp in milliseconds, 0 if not available - ---- return: ---- @code number: ---- * 0: allow ---- * 1: disallow ---- * 2: error occurred ---- @db_id number: redirection database id - -function filter(id, is_base, group, cmd_name, keys, slots, db_id, timestamp_ms) - print(string.format("lua filter. id=[%d], is_base=[%s], db_id=[%d], group=[%s], cmd_name=[%s], keys=[%s], slots=[%s], timestamp_ms=[%d]", - id, tostring(is_base), db_id, group, cmd_name, table.concat(keys, ", "), table.concat(slots, ", "), timestamp_ms)) - return 0, db_id -end \ No newline at end of file diff --git a/configs/transform/skip_scripts.lua b/configs/transform/skip_scripts.lua deleted file mode 100644 index 7e0dafad..00000000 --- a/configs/transform/skip_scripts.lua +++ /dev/null @@ -1,8 +0,0 @@ --- skip all scripts included LUA scripts and Redis Functions. -function filter(id, is_base, group, cmd_name, keys, slots, db_id, timestamp_ms) - if group == "SCRIPTING" then - return 1, db_id -- disallow - else - return 0, db_id -- allow - end -end \ No newline at end of file diff --git a/configs/transform/swap_db.lua b/configs/transform/swap_db.lua deleted file mode 100644 index 2cc6b1e9..00000000 --- a/configs/transform/swap_db.lua +++ /dev/null @@ -1,12 +0,0 @@ ---- dbid: 0 -> 1 ---- dbid: 1 -> 0 ---- dbid: others -> drop -function filter(id, is_base, group, cmd_name, keys, slots, db_id, timestamp_ms) - if db_id == 0 then - return 0, 1 - elseif db_id == 1 then - return 0, 0 - else - return 1, db_id - end -end \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index 07d5c0f7..09205bd0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,7 +11,7 @@ import ( type AdvancedOptions struct { Dir string `mapstructure:"dir" default:"data"` - Ncpu int `mapstructure:"ncpu" default:"4"` + Ncpu int `mapstructure:"ncpu" default:"0"` PprofPort int `mapstructure:"pprof_port" default:"0"` StatusPort int `mapstructure:"status_port" default:"6479"` @@ -42,8 +42,8 @@ func (opt *AdvancedOptions) GetPSyncCommand(address string) string { } type ShakeOptions struct { - Transform string `mapstructure:"transform" default:""` - Advanced AdvancedOptions + Function string `mapstructure:"function" default:""` + Advanced AdvancedOptions } var Opt ShakeOptions diff --git a/internal/entry/entry.go b/internal/entry/entry.go index 0356be54..c21965d8 100644 --- a/internal/entry/entry.go +++ b/internal/entry/entry.go @@ -2,14 +2,15 @@ package entry import ( "RedisShake/internal/client/proto" + "RedisShake/internal/commands" "RedisShake/internal/log" "bytes" "strings" ) type Entry struct { - DbId int - Argv []string + DbId int // required + Argv []string // required CmdName string Group string @@ -48,3 +49,8 @@ func (e *Entry) Serialize() []byte { e.SerializedSize = int64(buf.Len()) return buf.Bytes() } + +func (e *Entry) Preprocess() { + e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv) + e.Slots = commands.CalcSlots(e.Keys) +} diff --git a/internal/function/function.go b/internal/function/function.go new file mode 100644 index 00000000..19054762 --- /dev/null +++ b/internal/function/function.go @@ -0,0 +1,84 @@ +package function + +import ( + "RedisShake/internal/config" + "RedisShake/internal/entry" + "RedisShake/internal/log" + lua "github.com/yuin/gopher-lua" + "strings" +) + +var luaString string + +func Init() { + luaString = config.Opt.Function + luaString = strings.TrimSpace(luaString) + if len(luaString) == 0 { + log.Infof("no function script") + return + } +} + +// DB +// GROUP +// CMD +// KEYS +// SLOTS +// ARGV + +// shake.call(DB, ARGV) +// shake.log() + +func RunFunction(e *entry.Entry) []*entry.Entry { + entries := make([]*entry.Entry, 0) + if len(luaString) == 0 { + entries = append(entries, e) + return entries + } + + L := lua.NewState() + L.SetGlobal("DB", lua.LNumber(e.DbId)) + L.SetGlobal("GROUP", lua.LString(e.Group)) + L.SetGlobal("CMD", lua.LString(e.CmdName)) + keys := L.NewTable() + for _, key := range e.Keys { + keys.Append(lua.LString(key)) + } + L.SetGlobal("KEYS", keys) + slots := L.NewTable() + for _, slot := range e.Slots { + slots.Append(lua.LNumber(slot)) + } + L.SetGlobal("SLOTS", slots) + argv := L.NewTable() + for _, arg := range e.Argv { + argv.Append(lua.LString(arg)) + } + L.SetGlobal("ARGV", argv) + shake := L.NewTypeMetatable("shake") + L.SetGlobal("shake", shake) + + L.SetField(shake, "call", L.NewFunction(func(ls *lua.LState) int { + db := ls.ToInt(1) + argv := ls.ToTable(2) + var argvStrings []string + for i := 1; i <= argv.Len(); i++ { + argvStrings = append(argvStrings, argv.RawGetInt(i).String()) + } + entries = append(entries, &entry.Entry{ + DbId: db, + Argv: argvStrings, + }) + return 0 + })) + L.SetField(shake, "log", L.NewFunction(func(ls *lua.LState) int { + log.Infof("lua log: %v", ls.ToString(1)) + return 0 + })) + err := L.DoString(luaString) + if err != nil { + log.Panicf("load function script failed: %v", err) + } + + return entries +} diff --git a/internal/status/status.go b/internal/status/status.go index e21b5de2..db013bd2 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -15,7 +15,7 @@ type Statusable interface { type Stat struct { Time string `json:"start_time"` Consistent bool `json:"consistent"` - // transform + // function TotalEntriesCount EntryCount `json:"total_entries_count"` PerCmdEntriesCount map[string]EntryCount `json:"per_cmd_entries_count"` // reader diff --git a/internal/transform/transform.go b/internal/transform/transform.go deleted file mode 100644 index 39697fb0..00000000 --- a/internal/transform/transform.go +++ /dev/null @@ -1,63 +0,0 @@ -package transform - -import ( - "RedisShake/internal/config" - "RedisShake/internal/entry" - "RedisShake/internal/log" - lua "github.com/yuin/gopher-lua" - "strings" -) - -const ( - Allow = 0 - Disallow = 1 - Error = 2 -) - -var luaInstance *lua.LState - -func Init() { - luaString := config.Opt.Transform - luaString = strings.TrimSpace(luaString) - if len(luaString) == 0 { - log.Infof("no transform script") - return - } - luaInstance = lua.NewState() - err := luaInstance.DoString(luaString) - if err != nil { - log.Panicf("load transform script failed: %v", err) - } - log.Infof("load transform script success") -} - -func Transform(e *entry.Entry) int { - if luaInstance == nil { - return Allow - } - - keys := luaInstance.NewTable() - for _, key := range e.Keys { - keys.Append(lua.LString(key)) - } - - slots := luaInstance.NewTable() - for _, slot := range e.Slots { - slots.Append(lua.LNumber(slot)) - } - - f := luaInstance.GetGlobal("filter") - luaInstance.Push(f) - luaInstance.Push(lua.LString(e.Group)) // group - luaInstance.Push(lua.LString(e.CmdName)) // cmd name - luaInstance.Push(keys) // keys - luaInstance.Push(slots) // slots - luaInstance.Push(lua.LNumber(e.DbId)) // dbid - - luaInstance.Call(8, 2) - - code := int(luaInstance.Get(1).(lua.LNumber)) - e.DbId = int(luaInstance.Get(2).(lua.LNumber)) - luaInstance.Pop(2) - return code -} diff --git a/shake.toml b/shake.toml index d94224c0..8062954e 100644 --- a/shake.toml +++ b/shake.toml @@ -1,48 +1,27 @@ -#transform = "" +transform = "" -#[RdbReader] -#filepath = "/data/dump.rdb" - - -#[SyncStandaloneReader] -#address = "127.0.0.1:6379" -#username = "" # keep empty if not using ACL -#password = "" # keep empty if no authentication is required -#tls = false - - -[SyncClusterReader] +[sync_standlone_reader] address = "127.0.0.1:6379" -username = "" # keep empty if not using ACL -password = "" # keep empty if no authentication is required +username = "" # keep empty if not using ACL +password = "" # keep empty if no authentication is required tls = false - -[RedisStandaloneWriter] +[redis_standalone_writer] address = "127.0.0.1:6380" -username = "" # keep empty if not using ACL -password = "" # keep empty if no authentication is required +username = "" # keep empty if not using ACL +password = "" # keep empty if no authentication is required tls = false - -#[RedisClusterWriter] -#address = "127.0.0.1:6380" -#username = "" # keep empty if not using ACL -#password = "" # keep empty if no authentication is required -#tls = false - - [advanced] dir = "data" -ncpu = 3 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores - -pprof_port = 0 # pprof port, 0 means disable +ncpu = 0 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores +pprof_port = 0 # pprof port, 0 means disable status_port = 0 # status port, 0 means disable # log log_file = "shake.log" -log_level = "info" # debug, info or warn -log_interval = 5 # in seconds +log_level = "info" # debug, info or warn +log_interval = 5 # in seconds # redis-shake gets key and value from rdb file, and uses RESTORE command to # create the key in target redis. Redis RESTORE will return a "Target key name @@ -66,4 +45,4 @@ target_redis_client_max_querybuf_len = 1024_000_000 target_redis_proto_max_bulk_len = 512_000_000 # If the source is Elasticache or MemoryDB, you can set this item. -aws_psync = "" \ No newline at end of file +aws_psync = "" diff --git a/tests/cases/function.py b/tests/cases/function.py new file mode 100644 index 00000000..1279d2d7 --- /dev/null +++ b/tests/cases/function.py @@ -0,0 +1,70 @@ +import pybbt as p + +import helpers as h + + +@p.subcase() +def filter_db(): + src = h.Redis() + dst = h.Redis() + + opts = h.ShakeOpts.create_sync_opts(src, dst) + opts["function"] = """ + shake.log(DB) + if DB == 0 + then + return + end + shake.call(DB, ARGV) + """ + p.log(f"opts: {opts}") + shake = h.Shake(opts) + + for db in range(16): + src.do("select", db) + src.do("set", "key", "value") + + # wait sync done + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10) + + dst.do("select", 0) + p.ASSERT_EQ(dst.do("get", "key"), None) + for db in range(1, 16): + dst.do("select", db) + p.ASSERT_EQ(dst.do("get", "key"), b"value") + + +@p.subcase() +def split_mset_to_set(): + src = h.Redis() + dst = h.Redis() + opts = h.ShakeOpts.create_sync_opts(src, dst) + opts["function"] = """ + shake.log(KEYS) + if CMD == "MSET" + then + for i = 2, #ARGV, 2 -- MSET k1 v1 k2 v2 k3 v3 ... + do + shake.call(1, {"SET", ARGV[i], ARGV[i+1]}) -- move to db 1 + end + end + """ + p.log(f"opts: {opts}") + shake = h.Shake(opts) + src.do("mset", "k1", "v1", "k2", "v2", "k3", "v3") + # wait sync done + p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10) + dst.do("select", 1) + p.ASSERT_EQ(dst.do("get", "k1"), b"v1") + p.ASSERT_EQ(dst.do("get", "k2"), b"v2") + p.ASSERT_EQ(dst.do("get", "k3"), b"v3") + + +@p.case(tags=["function"]) +def main(): + filter_db() + split_mset_to_set() + + +if __name__ == '__main__': + main()