Skip to content

Latest commit

 

History

History
225 lines (214 loc) · 5.08 KB

5.3_kafka_message_format.md

File metadata and controls

225 lines (214 loc) · 5.08 KB

Kafka 消息格式

dtle Kafka 输出, 消息格式兼容 Debezium

其消息格式具体可参考 https://debezium.io/docs/tutorial/#viewing-the-change-eventsl

此处概要说明

  • 每行数据变更会有一个消息
  • 每个消息分为key和value
    • key是该次变更的主键
    • value是该次变更的整行数据
  • key和value各自又有schema和payload
    • payload是具体的数据
    • schema指明了数据的格式, 即payload的解读方式, 可以理解为“类定义”
      • 注意和SQL schema含义不同
      • 表结构会包含在 Kafka Connect schema 中

Key

以下是一个消息的key. 只是简单的包含了主键.

{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

Value

以下是一个消息的value, 其类型为 topic.schema.table.Envelope, 拥有5个字段

  • before, 复杂类型 topic.schema.table.Value, 为该表的表结构.
  • after, 复杂类型, 同上
  • source, 复杂类型, 为该次变更的元数据
  • op: string. 用"c", "d", "u" 分别表达操作类型: 增、删、改
  • ts_ms: int64. dtle 处理该行变更的时间.
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": true,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_sec"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "boolean",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "[email protected]"
    },
    "source": {
      "version": "0.8.3.Final",
      "name": "dbserver1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers"
    },
    "op": "c",
    "ts_ms": 1486500577691
  }
}

MySQL数据类型到 “Kafka Connect schema types”的转换

https://debezium.io/docs/connectors/mysql/#data-types