From 6b3e723bb3b7a00a606b680ef03fda71b97e778a Mon Sep 17 00:00:00 2001 From: suxb201 Date: Thu, 31 Aug 2023 16:22:29 +0800 Subject: [PATCH] docs: add function documents --- .gitignore | 1 + docs/src/zh/function/best_practices.md | 4 +++ docs/src/zh/function/introduction.md | 47 +++++++++++++++++++++++++- docs/src/zh/reader/scan_reader.md | 13 +++++-- docs/src/zh/reader/sync_reader.md | 9 +++-- docs/src/zh/writer/redis_writer.md | 13 +++++-- internal/commands/keys.go | 4 ++- internal/commands/keys_test.go | 8 ++--- internal/entry/entry.go | 11 +++--- internal/function/function.go | 6 ++++ 10 files changed, 98 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index b83fe407..def000a1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ __pycache__/ bin/ dist/ tmp/ +data/ *.log *.rdb *.aof diff --git a/docs/src/zh/function/best_practices.md b/docs/src/zh/function/best_practices.md index 9b69f622..9be902f2 100644 --- a/docs/src/zh/function/best_practices.md +++ b/docs/src/zh/function/best_practices.md @@ -4,4 +4,8 @@ outline: deep # 最佳实践 +## 修改 + +### 修改 Key 的前缀 + TODO diff --git a/docs/src/zh/function/introduction.md b/docs/src/zh/function/introduction.md index 50e0f7d4..02e84ecb 100644 --- a/docs/src/zh/function/introduction.md +++ b/docs/src/zh/function/introduction.md @@ -4,5 +4,50 @@ outline: deep # 什么是 function -TODO +RedisShake 通过提供 function 功能,实现了全面的 [ETL(提取-转换-加载)](https://en.wikipedia.org/wiki/Extract,_transform,_load) 功能,以便对数据进行处理。通过利用 function 功能,可以实现以下几种操作: +* 更改数据所属的 db,比如将源端的 `db` 0 写入到目的端的 `db` 1。 +* 对数据进行筛选,例如,只将 key 以 `user:` 开头的源数据写入到目标端。 +* 改变 Key 的前缀,例如,将源端的 key `prefix_old_key` 写入到目标端的 key `prefix_new_key`。 +* ... +要使用 function 功能,只需编写一份 lua 脚本。RedisShake 在从源端获取数据后,会将数据转换为 Redis 命令。然后,它会处理这些命令,从中解析出 `KEYS`、`ARGV`、`SLOTS`、`GROUP` 等信息,并将这些信息传递给 lua 脚本。lua 脚本会处理这些数据,并返回处理后的命令。最后,RedisShake 会将处理后的数据写入到目标端。 + +以下是一个具体的例子: +```toml +function = """ +shake.log(DB) +if DB == 0 +then + return +end +shake.call(DB, ARGV) +""" + +[sync_reader] +address = "127.0.0.1:6379" + +[redis_writer] +address = "127.0.0.1:6380" +``` +`DB` 是 RedisShake 提供的信息,表示当前数据所属的 db。`shake.log` 用于打印日志,`shake.call` 用于调用 Redis 命令。上述脚本的目的是丢弃源端 `db` 0 的数据,将其他 `db` 的数据写入到目标端。 + +除了 `DB`,还有其他信息如 `KEYS`、`ARGV`、`SLOTS`、`GROUP` 等,可供调用的函数有 `shake.log` 和 `shake.call`,具体请参考 [function API](#function-api)。 + +关于更多的示例,可以参考 [最佳实践](./best_practices.md)。 + +## function API + +### 变量 +| 变量 | 类型 | 示例 | 描述 | +|-|-|-|-----| +| DB | number | 1 | 命令所属的 `db` | +| GROUP | string | "LIST" | 命令所属的 `group`,符合 [Command key specifications](https://redis.io/docs/reference/key-specs/),可以在 [commands](https://github.com/tair-opensource/RedisShake/tree/v4/scripts/commands) 中查询每个命令的 `group` 字段 | +| CMD | string | "XGROUP-DELCONSUMER" | 命令的名称 | +| KEYS | table | \{"key1", "key2"\} | 命令的所有 Key | +| KEY_INDEXES | table | \{2, 4\} | 命令的所有 Key 在 `ARGV` 中的索引 | +| SLOTS | table | \{9189, 4998\} | 当前命令的所有 Key 所属的 [slot](https://redis.io/docs/reference/cluster-spec/#key-distribution-model) | +| ARGV | table | \{"mset", "key1", "value1", "key2", "value2"\} | 命令的所有参数 | + +### 函数 +* `shake.call(DB, ARGV)`:返回一个 Redis 命令,RedisShake 会将该命令写入目标端。 +* `shake.log(DB, ARGV)`:打印日志。 \ No newline at end of file diff --git a/docs/src/zh/reader/scan_reader.md b/docs/src/zh/reader/scan_reader.md index ea99e26d..ea61718e 100644 --- a/docs/src/zh/reader/scan_reader.md +++ b/docs/src/zh/reader/scan_reader.md @@ -22,13 +22,20 @@ cluster = false # set to true if source is a redis cluster address = "127.0.0.1:6379" # when cluster is true, set address to one of the cluster node username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required -ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription tls = false +ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription ``` -* 当源端为集群时,配置 cluster 为 true,address 为集群中的任意一个节点即可。`scan_reader` 会通过 `cluster nodes` 命令自动获取集群中的所有节点,并建立连接获取数据。 -* 开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/) +* `cluster`:源端是否为集群 +* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可 +* 鉴权: + * 当源端使用 ACL 账号时,配置 `username` 和 `password` + * 当源端使用传统账号时,仅配置 `password` + * 当源端无鉴权时,不配置 `username` 和 `password` +* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书 +* `ksn`:开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/) 能力来订阅 Key 的变化。当 Key 发生变化时,RedisShake 会使用 `DUMP` 与 `RESTORE` 命令来从源端读取 Key 的内容,并写入目标端。 + ::: warning Redis keyspace notifications 不会感知到 `FLUSHALL` 与 `FLUSHDB` 命令,因此在使用 `ksn` 参数时,需要确保源端数据库不会执行这两个命令。 ::: diff --git a/docs/src/zh/reader/sync_reader.md b/docs/src/zh/reader/sync_reader.md index b4407dc5..a60001ff 100644 --- a/docs/src/zh/reader/sync_reader.md +++ b/docs/src/zh/reader/sync_reader.md @@ -22,5 +22,10 @@ password = "" # keep empty if no authentication is required tls = false ``` -* 当源端为集群时,配置 `cluster` 为 true,`address` 为集群中的任意一个节点即可。`sync_reader` 会通过 `cluster nodes` 命令获取集群中的所有节点信息,并建立连接获取数据。 - +* `cluster`:源端是否为集群 +* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可 +* 鉴权: + * 当源端使用 ACL 账号时,配置 `username` 和 `password` + * 当源端使用传统账号时,仅配置 `password` + * 当源端无鉴权时,不配置 `username` 和 `password` +* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书 \ No newline at end of file diff --git a/docs/src/zh/writer/redis_writer.md b/docs/src/zh/writer/redis_writer.md index f9f23ca9..72063c80 100644 --- a/docs/src/zh/writer/redis_writer.md +++ b/docs/src/zh/writer/redis_writer.md @@ -15,5 +15,14 @@ password = "" # keep empty if no authentication is required tls = false ``` -* 当目的端为集群时,配置 cluster 为 true,address 为集群中的任意一个节点即可。`redis_writer` 会通过 `cluster nodes` 命令获取集群中的所有节点,并建立连接。 -* 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)。 \ No newline at end of file +* `cluster`:源端是否为集群 +* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可 +* 鉴权: + * 当源端使用 ACL 账号时,配置 `username` 和 `password` + * 当源端使用传统账号时,仅配置 `password` + * 当源端无鉴权时,不配置 `username` 和 `password` +* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书 + +注意事项: +1. 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)。 +2. 应尽量保证目的端版本大于等于源端版本,否则可能会出现不支持的命令。如确实需要降低版本,可以设置 `target_redis_proto_max_bulk_len` 为 0,来避免使用 `restore` 命令恢复数据。 diff --git a/internal/commands/keys.go b/internal/commands/keys.go index 6a2d4545..7d13d21d 100644 --- a/internal/commands/keys.go +++ b/internal/commands/keys.go @@ -10,7 +10,7 @@ import ( ) // CalcKeys https://redis.io/docs/reference/key-specs/ -func CalcKeys(argv []string) (cmaName string, group string, keys []string) { +func CalcKeys(argv []string) (cmaName string, group string, keys []string, keysIndexes []int) { argc := len(argv) group = "unknown" cmaName = strings.ToUpper(argv[0]) @@ -64,6 +64,7 @@ func CalcKeys(argv []string) (cmaName string, group string, keys []string) { keyStep := spec.findKeysRangeKeyStep for inx := begin; inx <= lastKeyInx && limitCount > 0; inx += keyStep { keys = append(keys, argv[inx]) + keysIndexes = append(keysIndexes, inx+1) limitCount -= 1 } case "keynum": @@ -79,6 +80,7 @@ func CalcKeys(argv []string) (cmaName string, group string, keys []string) { step := spec.findKeysKeynumKeyStep for inx := begin + firstKey; keyCount > 0; inx += step { keys = append(keys, argv[inx]) + keysIndexes = append(keysIndexes, inx+1) keyCount -= 1 } default: diff --git a/internal/commands/keys_test.go b/internal/commands/keys_test.go index 84ba5050..e6c7bd74 100644 --- a/internal/commands/keys_test.go +++ b/internal/commands/keys_test.go @@ -18,25 +18,25 @@ func testEq(a, b []string) bool { func TestCalcKeys(t *testing.T) { // SET - cmd, group, keys := CalcKeys([]string{"SET", "key", "value"}) + cmd, group, keys, _ := CalcKeys([]string{"SET", "key", "value"}) if cmd != "SET" || group != "STRING" || !testEq(keys, []string{"key"}) { t.Errorf("CalcKeys(SET key value) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys) } // MSET - cmd, group, keys = CalcKeys([]string{"MSET", "key1", "value1", "key2", "value2"}) + cmd, group, keys, _ = CalcKeys([]string{"MSET", "key1", "value1", "key2", "value2"}) if cmd != "MSET" || group != "STRING" || !testEq(keys, []string{"key1", "key2"}) { t.Errorf("CalcKeys(MSET key1 value1 key2 value2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys) } // XADD - cmd, group, keys = CalcKeys([]string{"XADD", "key", "*", "field1", "value1", "field2", "value2"}) + cmd, group, keys, _ = CalcKeys([]string{"XADD", "key", "*", "field1", "value1", "field2", "value2"}) if cmd != "XADD" || group != "STREAM" || !testEq(keys, []string{"key"}) { t.Errorf("CalcKeys(XADD key * field1 value1 field2 value2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys) } // ZUNIONSTORE - cmd, group, keys = CalcKeys([]string{"ZUNIONSTORE", "key", "2", "key1", "key2"}) + cmd, group, keys, _ = CalcKeys([]string{"ZUNIONSTORE", "key", "2", "key1", "key2"}) if cmd != "ZUNIONSTORE" || group != "SORTED_SET" || !testEq(keys, []string{"key", "key1", "key2"}) { t.Errorf("CalcKeys(ZUNIONSTORE key 2 key1 key2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys) } diff --git a/internal/entry/entry.go b/internal/entry/entry.go index 15399418..e81a51d8 100644 --- a/internal/entry/entry.go +++ b/internal/entry/entry.go @@ -12,10 +12,11 @@ type Entry struct { DbId int // required Argv []string // required - CmdName string - Group string - Keys []string - Slots []int + CmdName string + Group string + Keys []string + KeyIndexes []int + Slots []int // for stat SerializedSize int64 @@ -51,6 +52,6 @@ func (e *Entry) Serialize() []byte { } func (e *Entry) Parse() { - e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv) + e.CmdName, e.Group, e.Keys, e.KeyIndexes = commands.CalcKeys(e.Argv) e.Slots = commands.CalcSlots(e.Keys) } diff --git a/internal/function/function.go b/internal/function/function.go index 19054762..4a13393d 100644 --- a/internal/function/function.go +++ b/internal/function/function.go @@ -23,6 +23,7 @@ func Init() { // GROUP // CMD // KEYS +// KEY_INDEXES // SLOTS // ARGV @@ -49,6 +50,11 @@ func RunFunction(e *entry.Entry) []*entry.Entry { for _, slot := range e.Slots { slots.Append(lua.LNumber(slot)) } + keyIndexes := L.NewTable() + for _, keyIndex := range e.KeyIndexes { + keyIndexes.Append(lua.LNumber(keyIndex)) + } + L.SetGlobal("KEY_INDEXES", keyIndexes) L.SetGlobal("SLOTS", slots) argv := L.NewTable() for _, arg := range e.Argv {