Skip to content

Latest commit

 

History

History
152 lines (126 loc) · 14.3 KB

4.3_job_configuration.md

File metadata and controls

152 lines (126 loc) · 14.3 KB

作业(job)配置

作业配置一般采用 json (HTTP API 提交)或 hcl (nomad 命令行工具提交)文件。样例配置在 /usr/share/dtle/scripts/ 中。

nomad job 的完整配置参考 https://www.nomadproject.io/docs/job-specification/

nomad job 有group/task层级,一个group中的tasks会被放在同一个节点执行。dtle要求src和dest task分别放在src 和 dest group. task 中指定 driver = "dtle", 在config段落中填写dtle专有配置。如

  group "src" {
    task "src" {
      driver = "dtle"
      config {
        ...
      }
    }
  }

dtle专有配置有如下字段:

参数名 必填? 类型 源端/目标端 可配 默认值 说明
Gtid String 源端 默认为 全量+增量 任务 MySQL的GTID集合(区间), 可取值:
1. 默认为空, 则为 <全量+增量> 复制任务
2. 已复制的GTID集合(不是点位), 将从未复制的GTID开始增量复制
GtidStart String 源端 增量复制开始的 GTID 点位. (将自动求差集获取上述 GTID 集合.) 需要保持 Gtid 为空
AutoGtid Bool 源端 false 设为 true 后自动从当前 GTID 开始增量任务. 需要保持 Gtid 和 GtidStart 为空.
BinlogRelay Bool 源端 false 是否使用Binlog Relay(中继)机制. 即先将源端mysql binlog读到本地, 避免源端清除binlog导致任务失败. 注意: 如果使用带有BinlogRelay的纯增量复制(指定GTID), 需要同时填写BinlogFile和BinlogPos.
BinlogFile String 源端 增量任务开始的Binlog文件(即源端mysql上 show master status 的结果).
BinlogPos Int 源端 0 增量任务开始的Binlog位置, 和BinlogFile配套使用.
ReplicateDoDb Object数组 源端 - 如为空[], 则复制整个数据库实例. 可填写多元素. 元素内容见下方说明
ConnectionConfig Object 两端 - MySQL源/目标端信息, 见下方 ConnectionConfig 说明。和KafkaConfig二选一填写。
KafkaConfig Object 目标端 - Kafka目标端信息, 见下方 KafkaConfig 说明。和ConnectionConfig二选一填写。
DropTableIfExists Bool 源端 false 全量复制时, 在目标端删除参与复制的表, 之后由dtle自动创建表结构 (相关参数: SkipCreateDbTable). 如果开启此选项, 目标端数据库用户需要有相应表的DROP权限.
SkipCreateDbTable Bool 源端 false 不为目标库创建复制库和复制表. 如果关闭此选项, 目标端数据库用户需要有相应表的CREATE权限.
ParallelWorkers Int 目标端 1 回放端的并发数. 当值大于1, 且源端MySQL支持 MTS 时, 目标端会进行并行回放
ReplChanBufferSize Int 源端 180 复制任务缓存的大小, 单位为事务数
ChunkSize Int 源端 2000 全量复制时, 每次读取-传输-写入的行数
ExpandSyntaxSupport Bool 源端 false 支持复制 用户权限/存储过程DDL/函数DDL
GroupMaxSize int 源端 1 源端发送数据时, 等待数据包达到一定大小(GroupMaxSize字节)后发送该包. 单位为字节. 默认值1表示即刻发送数据
GroupTimeout int 源端 100 源端发送数据时, 等待数据包达到超时时间(GroupTimeout毫秒)发送该包. 单位为毫秒.
SqlFilter String数组 源端 [] 是否跳过一些事件, 如["NoDML", "NoDMLDelete", "NoDMLInsert", "NoDMLUpdate", "NoDDL", "NoDDLAlterTableAddColumn", "NoDDLAlterTableDropColumn", "NoDDLAlterTableModifyColumn", "NoDDLAlterTableChangeColumn", "NoDDLAlterTableAlterColumn"] (以上为所有的filter)

ReplicateDoDb 每个元素有如下字段:

参数名 必填? 类型 源端/目标端 可配 默认值 说明
TableSchema String 源端 - 数据库名
TableSchemaRegex String 源端 - 数据库映射正则表达式,可用于多个数据库重命名
TableSchemaRename String 源端 - 重命名后的数据库名称,当进行多数据库重命名时,支持正则表达式,使用见demo
Tables Object数组 源端 - 可配置多张表, 类型为Table. 若不配置, 则复制指定数据库中的所有表
Table.TableName String 源端 - 表名
Table.Where String 源端 - 只复制满足该条件的数据行. 语法为SQL表达式, 返回值应为布尔值. 可以引用表中的列名.
Table.TableRegex String 源端 - 表名映射匹配正则表达式,用于多个表同时重命名.
Table.TableRename String 源端 - 重命名后的表名,当进行多表重命名时,支持支持正则表达,见demo
Table.ColumnMapFrom String数组 源端 - 列映射(暂不支持正则表达式)。见demo

注:hcl格式中${SOME_TEXT}会被认为是变量引用。正则替换中输入此类文字时,则需使用双$符号:$${SOME_TEXT}

ConnectionConfig 有如下字段:

参数名 必填? 类型 默认值 说明
Host String - 数据源地址
Port String - 数据源端口
User String - 数据源用户名
Password String - 数据源密码
Charset String utf8mb4 数据源的字符集

KafkaConfig 有如下字段:

参数名 必填? 类型 默认值 说明
Topic String - Kafka Topic
Brokers String数组 - Kafka Brokers, 如 ["127.0.0.1:9192", "..."]
Converter String json Kafka Converter。目前仅支持json
TimeZone String UTC 控制时区转换。
MessageGroupMaxSize int 1 目标端向kafka发送消息时, 等待MySQL事务数据包达到一定大小(MessageGroupMaxSize字节)后将该包序列化并发送. 单位为字节. 默认值1表示即刻发送数据
MessageGroupTimeout int 100 目标端向kafka发送消息时, 等待数据包达到超时时间(MessageGroupTimeout毫秒)发送该包. 单位为毫秒.

nomad job 常用通用配置

affinity

job、group 或 task 级配置。配置后该job/group/task会优先在指定的节点上执行

affinity {
  attribute = "${node.unique.name}"
  value = "nomad3"
}

完整参考

resources

task级配置,src/dest task需各自重复。默认值为 cpu=100memory=300。 以默认值建立大量轻量级任务,会导致资源不够而pending,可适当调小。

任务的内存消耗和每行大小、事物大小、队列长度有关。注意真实资源消耗,避免OOM。

task "src" {
  resources {
    cpu    = 100 # MHz
    memory = 300 # MB
  }
}

restart & reschedule

nomad job 默认有如下 restartreschedule 配置

restart { # group or task level
  interval = "30m"
  attempts = 2
  delay    = "15s"
  mode     = "fail" # "fail" or "delay"
                    # "delay" 意味着interval过后继续尝试
                    # "fail" 则不再尝试
}
reschedule { # job or group level
 delay          = "30s"
 delay_function = "exponential"
 max_delay      = "1h"
 unlimited      = true
}
  • 当task报错时,会根据restart配置,30分钟内在同一节点上重启最多两次
    • 即使失败的job被stop -purge再重新添加,也需要根据restart参数重启
  • 2次重启均失败后,会根据reschedule配置,在其他节点上执行

为了避免无限reschedule带来的问题,dtle安装包提供的样例job配置中(<prefix>/usr/share/dtle/scripts/example.job.*),限制reschedule为每半小时1次:

reschedule {
  attempts = 1
  interval = "30m"
  unlimited = false
}
# 或json格式
"Reschedule": {
  "Attempts": 1,
  "Interval": 1800000000000,
  "Unlimited": false
}