Skip to content

Commit

Permalink
docs: add function documents
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Sep 1, 2023
1 parent 47e5b74 commit 6b3e723
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ __pycache__/
bin/
dist/
tmp/
data/
*.log
*.rdb
*.aof
4 changes: 4 additions & 0 deletions docs/src/zh/function/best_practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ outline: deep

# 最佳实践

## 修改

### 修改 Key 的前缀

TODO
47 changes: 46 additions & 1 deletion docs/src/zh/function/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,50 @@ outline: deep

# 什么是 function

TODO
RedisShake 通过提供 function 功能,实现了全面的 [ETL(提取-转换-加载)](https://en.wikipedia.org/wiki/Extract,_transform,_load) 功能,以便对数据进行处理。通过利用 function 功能,可以实现以下几种操作:
* 更改数据所属的 db,比如将源端的 `db` 0 写入到目的端的 `db` 1。
* 对数据进行筛选,例如,只将 key 以 `user:` 开头的源数据写入到目标端。
* 改变 Key 的前缀,例如,将源端的 key `prefix_old_key` 写入到目标端的 key `prefix_new_key`
* ...

要使用 function 功能,只需编写一份 lua 脚本。RedisShake 在从源端获取数据后,会将数据转换为 Redis 命令。然后,它会处理这些命令,从中解析出 `KEYS``ARGV``SLOTS``GROUP` 等信息,并将这些信息传递给 lua 脚本。lua 脚本会处理这些数据,并返回处理后的命令。最后,RedisShake 会将处理后的数据写入到目标端。

以下是一个具体的例子:
```toml
function = """
shake.log(DB)
if DB == 0
then
return
end
shake.call(DB, ARGV)
"""

[sync_reader]
address = "127.0.0.1:6379"

[redis_writer]
address = "127.0.0.1:6380"
```
`DB` 是 RedisShake 提供的信息,表示当前数据所属的 db。`shake.log` 用于打印日志,`shake.call` 用于调用 Redis 命令。上述脚本的目的是丢弃源端 `db` 0 的数据,将其他 `db` 的数据写入到目标端。

除了 `DB`,还有其他信息如 `KEYS``ARGV``SLOTS``GROUP` 等,可供调用的函数有 `shake.log``shake.call`,具体请参考 [function API](#function-api)

关于更多的示例,可以参考 [最佳实践](./best_practices.md)

## function API

### 变量
| 变量 | 类型 | 示例 | 描述 |
|-|-|-|-----|
| DB | number | 1 | 命令所属的 `db` |
| GROUP | string | "LIST" | 命令所属的 `group`,符合 [Command key specifications](https://redis.io/docs/reference/key-specs/),可以在 [commands](https://github.com/tair-opensource/RedisShake/tree/v4/scripts/commands) 中查询每个命令的 `group` 字段 |
| CMD | string | "XGROUP-DELCONSUMER" | 命令的名称 |
| KEYS | table | \{"key1", "key2"\} | 命令的所有 Key |
| KEY_INDEXES | table | \{2, 4\} | 命令的所有 Key 在 `ARGV` 中的索引 |
| SLOTS | table | \{9189, 4998\} | 当前命令的所有 Key 所属的 [slot](https://redis.io/docs/reference/cluster-spec/#key-distribution-model) |
| ARGV | table | \{"mset", "key1", "value1", "key2", "value2"\} | 命令的所有参数 |

### 函数
* `shake.call(DB, ARGV)`:返回一个 Redis 命令,RedisShake 会将该命令写入目标端。
* `shake.log(DB, ARGV)`:打印日志。
13 changes: 10 additions & 3 deletions docs/src/zh/reader/scan_reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@ cluster = false # set to true if source is a redis cluster
address = "127.0.0.1:6379" # when cluster is true, set address to one of the cluster node
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
tls = false
ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
```

* 当源端为集群时,配置 cluster 为 true,address 为集群中的任意一个节点即可。`scan_reader` 会通过 `cluster nodes` 命令自动获取集群中的所有节点,并建立连接获取数据。
* 开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/)
* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书
* `ksn`:开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/)
能力来订阅 Key 的变化。当 Key 发生变化时,RedisShake 会使用 `DUMP``RESTORE` 命令来从源端读取 Key 的内容,并写入目标端。

::: warning
Redis keyspace notifications 不会感知到 `FLUSHALL``FLUSHDB` 命令,因此在使用 `ksn` 参数时,需要确保源端数据库不会执行这两个命令。
:::
9 changes: 7 additions & 2 deletions docs/src/zh/reader/sync_reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@ password = "" # keep empty if no authentication is required
tls = false
```

* 当源端为集群时,配置 `cluster` 为 true,`address` 为集群中的任意一个节点即可。`sync_reader` 会通过 `cluster nodes` 命令获取集群中的所有节点信息,并建立连接获取数据。

* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书
13 changes: 11 additions & 2 deletions docs/src/zh/writer/redis_writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ password = "" # keep empty if no authentication is required
tls = false
```

* 当目的端为集群时,配置 cluster 为 true,address 为集群中的任意一个节点即可。`redis_writer` 会通过 `cluster nodes` 命令获取集群中的所有节点,并建立连接。
* 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)
* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL,不需要配置证书因为 RedisShake 没有校验服务器证书

注意事项:
1. 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)
2. 应尽量保证目的端版本大于等于源端版本,否则可能会出现不支持的命令。如确实需要降低版本,可以设置 `target_redis_proto_max_bulk_len` 为 0,来避免使用 `restore` 命令恢复数据。
4 changes: 3 additions & 1 deletion internal/commands/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// CalcKeys https://redis.io/docs/reference/key-specs/
func CalcKeys(argv []string) (cmaName string, group string, keys []string) {
func CalcKeys(argv []string) (cmaName string, group string, keys []string, keysIndexes []int) {
argc := len(argv)
group = "unknown"
cmaName = strings.ToUpper(argv[0])
Expand Down Expand Up @@ -64,6 +64,7 @@ func CalcKeys(argv []string) (cmaName string, group string, keys []string) {
keyStep := spec.findKeysRangeKeyStep
for inx := begin; inx <= lastKeyInx && limitCount > 0; inx += keyStep {
keys = append(keys, argv[inx])
keysIndexes = append(keysIndexes, inx+1)
limitCount -= 1
}
case "keynum":
Expand All @@ -79,6 +80,7 @@ func CalcKeys(argv []string) (cmaName string, group string, keys []string) {
step := spec.findKeysKeynumKeyStep
for inx := begin + firstKey; keyCount > 0; inx += step {
keys = append(keys, argv[inx])
keysIndexes = append(keysIndexes, inx+1)
keyCount -= 1
}
default:
Expand Down
8 changes: 4 additions & 4 deletions internal/commands/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ func testEq(a, b []string) bool {

func TestCalcKeys(t *testing.T) {
// SET
cmd, group, keys := CalcKeys([]string{"SET", "key", "value"})
cmd, group, keys, _ := CalcKeys([]string{"SET", "key", "value"})
if cmd != "SET" || group != "STRING" || !testEq(keys, []string{"key"}) {
t.Errorf("CalcKeys(SET key value) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}

// MSET
cmd, group, keys = CalcKeys([]string{"MSET", "key1", "value1", "key2", "value2"})
cmd, group, keys, _ = CalcKeys([]string{"MSET", "key1", "value1", "key2", "value2"})
if cmd != "MSET" || group != "STRING" || !testEq(keys, []string{"key1", "key2"}) {
t.Errorf("CalcKeys(MSET key1 value1 key2 value2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}

// XADD
cmd, group, keys = CalcKeys([]string{"XADD", "key", "*", "field1", "value1", "field2", "value2"})
cmd, group, keys, _ = CalcKeys([]string{"XADD", "key", "*", "field1", "value1", "field2", "value2"})
if cmd != "XADD" || group != "STREAM" || !testEq(keys, []string{"key"}) {
t.Errorf("CalcKeys(XADD key * field1 value1 field2 value2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}

// ZUNIONSTORE
cmd, group, keys = CalcKeys([]string{"ZUNIONSTORE", "key", "2", "key1", "key2"})
cmd, group, keys, _ = CalcKeys([]string{"ZUNIONSTORE", "key", "2", "key1", "key2"})
if cmd != "ZUNIONSTORE" || group != "SORTED_SET" || !testEq(keys, []string{"key", "key1", "key2"}) {
t.Errorf("CalcKeys(ZUNIONSTORE key 2 key1 key2) failed. cmd=%s, group=%s, keys=%v", cmd, group, keys)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ type Entry struct {
DbId int // required
Argv []string // required

CmdName string
Group string
Keys []string
Slots []int
CmdName string
Group string
Keys []string
KeyIndexes []int
Slots []int

// for stat
SerializedSize int64
Expand Down Expand Up @@ -51,6 +52,6 @@ func (e *Entry) Serialize() []byte {
}

func (e *Entry) Parse() {
e.CmdName, e.Group, e.Keys = commands.CalcKeys(e.Argv)
e.CmdName, e.Group, e.Keys, e.KeyIndexes = commands.CalcKeys(e.Argv)
e.Slots = commands.CalcSlots(e.Keys)
}
6 changes: 6 additions & 0 deletions internal/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Init() {
// GROUP
// CMD
// KEYS
// KEY_INDEXES
// SLOTS
// ARGV

Expand All @@ -49,6 +50,11 @@ func RunFunction(e *entry.Entry) []*entry.Entry {
for _, slot := range e.Slots {
slots.Append(lua.LNumber(slot))
}
keyIndexes := L.NewTable()
for _, keyIndex := range e.KeyIndexes {
keyIndexes.Append(lua.LNumber(keyIndex))
}
L.SetGlobal("KEY_INDEXES", keyIndexes)
L.SetGlobal("SLOTS", slots)
argv := L.NewTable()
for _, arg := range e.Argv {
Expand Down

0 comments on commit 6b3e723

Please sign in to comment.