Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing SQL Server flink CDC setup #1

Open
wants to merge 97 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
59b661a
[WIP][cdc-connector][jdbc-sqlserver]Flink cdc pipeline support sqlser…
ChengJie1053 Jun 30, 2024
06ec7a1
code optimization
ChengJie1053 Jun 30, 2024
26724b1
code optimization
ChengJie1053 Jun 30, 2024
9a4dede
[FLINK-35237][cdc-common] Allow Sink to choose HashFunction in PrePar…
dingxin-tech May 13, 2024
26ff6d2
[FLINK-35237][cdc-common] Improve the interfaces and reorganize the d…
leonardBang Jul 19, 2024
fcb4cd8
[FLINK-35871][doc] Add missed "snapshot" mode to mysql connector star…
lvyanquan Jul 22, 2024
c5f391c
[FLINK-35865][base] Support Byte and Short in ObjectUtils (#3481)
GOODBOY008 Jul 23, 2024
313bace
[minor][doc][cdc-connector][oracle] Update OracleSchema#getTableSchem…
ChengJie1053 Jul 23, 2024
ea71b23
[FLINK-35874][cdc-connector][mysql] Check pureBinlogPhaseTables set b…
qiaozongmi Jul 24, 2024
91fc7bb
[FLINK-35740][cdc-connector][mysql] Allow column as chunk key even it…
SML0127 Jul 24, 2024
768d528
[minor][test] Add Flink CDC 3.1.1 version to migration test version list
yuxiqian Jul 25, 2024
241eb03
[FLINK-35868][cdc-connector][mongodb] Bump dependency version to supp…
yuxiqian Jul 25, 2024
b15a226
[FLINK-35391][cdc-connector][paimon] Bump dependency of Paimon Pipeli…
Jul 29, 2024
707fc04
Merge pull request #1 from apache/master
zakhalex Jul 29, 2024
1a19cca
Merge pull request #2 from zakhalex/master-sqlserver
zakhalex Jul 29, 2024
5917f78
[FLINK-35736][test] Add migration test scripts & CI workflows
yuxiqian Jul 30, 2024
85bfd6a
[docs][minor] Add transform piece for pipeline example
leonardBang Jul 30, 2024
a39959f
[FLINK-35072][doris] Support applying AlterColumnTypeEvent to Doris p…
yuxiqian Jul 30, 2024
2fc7f9e
Changes needed to get sqlserver hook-up to work
Jul 30, 2024
4b4334f
AZ-000 - latest changes to data types
Jul 31, 2024
6f6e0a6
Merge pull request #3 from AZ/master_temp
Jul 31, 2024
83ecc1d
Merge pull request #4 from AZ/feature/AZ-000-fixing-sql-server
Jul 31, 2024
1388cf9
[FLINK-34877][cdc] Support type cast conversion in pipeline transform
aiwenmo Jul 31, 2024
aca6788
DE-145 - fixing Data type errors
Jul 31, 2024
957a276
Update DecimalType.java
Jul 31, 2024
70aab12
Merge pull request #5 from AZ/feature/AZ-000-fixing-sql-server
Jul 31, 2024
8f2939e
[hotfix][ci] Migrate to Docker Compose V2 (#3505)
yuxiqian Aug 2, 2024
47f5660
[FLINK-35344][cdc-base] Move same code from multiple subclasses to Jd…
loserwang1024 Aug 2, 2024
57cdd25
AZ-000 - Update pom.xml
zakhalex Aug 2, 2024
63a1ca2
AZ-000 - formatting adjustments (through spotless)
zakhalex Aug 3, 2024
3018937
Merge pull request #3 from apache/master
zakhalex Aug 3, 2024
0c49959
[FLINK-35524][cdc-base] Clear connections pools when reader exist. (#…
loserwang1024 Aug 6, 2024
986f37b
[FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching f…
Thorne-coder Aug 6, 2024
f6d1d48
[FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk UUID-typed c…
LiPL2017 Aug 6, 2024
3315be3
[hotfix][starrocks] Fix StarRocks FE startup failure due to insuffici…
yuxiqian Aug 6, 2024
44dafe3
[FLINK-35813][cdc-runtime] Do not clear state field in TransformSchem…
yuxiqian Aug 6, 2024
d76beec
[FLINK-35234][minor][cdc-common] Fix potential NPE in ConfigurationUtils
loserwang1024 Aug 6, 2024
7d23dc4
Merge pull request #4 from apache/master
zakhalex Aug 6, 2024
7ca1135
[FLINK-35743][cdc-runtime] Correct the temporal function semantics
aiwenmo Aug 6, 2024
1778ef6
Revert "[hotfix][starrocks] Fix StarRocks FE startup failure due to i…
GOODBOY008 Aug 6, 2024
f06cc1f
[hotfix][ci] Clean up disk space
GOODBOY008 Aug 6, 2024
4561a8a
[FLINK-34638][cdc-common] Support column with default value
lvyanquan Dec 28, 2023
03a2ae3
[FLINK-35893][cdc-runtime] Write CURRENT_VERSION of TableChangeInfo t…
leonardBang Jul 25, 2024
54c6a0b
[hotfix][e2e] Add missing default values field in E2e test
yuxiqian Aug 7, 2024
5ed9e05
[FLINK-35242][cdc-common][cdc-runtime] Support TRY_EVOLVE and LENIENT…
yuxiqian Aug 8, 2024
21fade0
[FLINK-35432][pipeline-connector][mysql] Support catch modify event i…
hk-lrzy Aug 8, 2024
9d6154f
[FLINK-35791][kafka] Add database and table info of Canal / Debezium …
lvyanquan Aug 8, 2024
af02ce1
[build][e2e] Separate Pipeline and Source E2e tests and cover flink 1…
yuxiqian Aug 8, 2024
2dabfc0
[FLINK-35730][cdc-cli] PipelineDefinitionParser supports parsing pip…
aiwenmo Aug 8, 2024
81d916f
[FLINK-35272][cdc-runtime] Transform supports omitting and renaming …
yuxiqian Aug 8, 2024
d6b687b
[FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode (#3…
czy006 Aug 8, 2024
b937db2
[hotfix][cdc-connector][mongodb] Fix unstable test cases for snapshot…
yuxiqian Aug 9, 2024
09f36a4
[FLINK-35143][pipeline-connector][mysql] Expose newly added tables ca…
qg-lin Aug 9, 2024
07446d1
[FLINK-35715][common] Ignore the compact optimize for mysql timestamp…
ruanhang1993 Aug 9, 2024
4bf5a39
[FLINK-34688][cdc-connector][mysql] Make scan newly table trigger con…
loserwang1024 Aug 9, 2024
e0d6d1d
[FLINK-35442][cdc-connect][kafka] add key.format and partition.strate…
lvyanquan Aug 9, 2024
e2bb917
[FLINK-34876][transform] Support UDF functions in transform (#3465)
yuxiqian Aug 9, 2024
b361db5
[FLINK-36007][[cdc-composer] Loading factory and added jar in one se…
loserwang1024 Aug 9, 2024
84ef9d5
[FLINK-35981][cdc-runtime] Transform supports referencing one column …
MOBIN-F Aug 9, 2024
b9c628d
[hotfix][doc] Fix doris document dead links and typo
GOODBOY008 Aug 9, 2024
3bb1f3c
[FLINK-35938][pipeline-connector][paimon] Use filterAndCommit API for…
lvyanquan Aug 12, 2024
5284df5
[FLINK-36023][cdc-composer] Flink CDC K8S Native Application Mode add…
loserwang1024 Aug 12, 2024
17d21a9
[FLINK-35984][cdc-runtime] Fix bug that metadata column name can not …
MOBIN-F Aug 12, 2024
64f996b
[FLINK-35891][pipeline-connector][paimon] Support dynamic bucket in P…
lvyanquan Aug 12, 2024
15ab858
[FLINK-35884][pipeline-connector][mysql] MySQL pipeline connector sup…
beryllw Aug 12, 2024
ac14dba
[FLINK-35894][pipeline-connector][es] Introduce Elasticsearch Sink Co…
proletarians Jul 12, 2024
8137f9d
[FLINK-35894][pipeline-connector][es] Support for ElasticSearch 6, 7 …
proletarians Aug 2, 2024
2885d12
[FLINK-35991][cdc-runtime] Resolve operator conflicts in transform SQ…
yuxiqian Aug 7, 2024
1042095
[FLINK-36034][cdc-runtime] Get rid of Flink table planner dependency …
yuxiqian Aug 12, 2024
ed83229
[release] Update version to 3.3-SNAPSHOT
PatrickRen Aug 13, 2024
874ff4f
[hotfix][cdc-runtime] Invalidate cache correctly to avoid classloader…
yuxiqian Aug 13, 2024
3cb91fb
[FLINK-35805][transform] Add __data_event_type__ metadata column
yuxiqian Aug 13, 2024
d99e718
[FLINK-36054][cdc][build] Fix Flink CDC parent pom and release script…
PatrickRen Aug 14, 2024
c5396fb
[FLINK-36056][cdc][connector/elasticsearch] Change flink.connector.el…
proletarians Aug 15, 2024
cbb33bb
[FLINK-35638][OceanBase][test] Refactor OceanBase test cases and remo…
whhe Aug 18, 2024
92b5483
[minor][docs] Compress images to reduce file size and improve website…
GOODBOY008 Aug 19, 2024
0470fdb
[FLINK-36082][pipeline-connector][kafka] Fix lamda NotSerializableExc…
lvyanquan Aug 19, 2024
7f08c6c
[FLINK-36088][pipeline-connector][paimon] Fix NPE in BucketAssignOper…
lvyanquan Aug 19, 2024
d3473de
[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volat…
loserwang1024 Aug 19, 2024
78fda8b
[FLINK-35243][cdc-common] Extends more schema change event types support
yuxiqian Aug 19, 2024
aedccb6
[FLINK-36111][minor][pipeline-connector/paimon] Improve MultiTableCom…
MOBIN-F Aug 20, 2024
77c6338
[FLINK-36081][cdc-connector][mysql] Remove the schemas of outdated ta…
kevinwangcs Aug 21, 2024
6205a5a
[FLINK-36094][cdc-runtime] Improve the Exception that SchemaRegistryR…
loserwang1024 Aug 22, 2024
565032e
[FLINK-36115][pipeline-connector][mysql] Introduce scan.newly-added-…
lvyanquan Aug 22, 2024
ee843e2
[FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler thread s…
yuxiqian Aug 22, 2024
3837887
[FLINK-36092][cdc-runtime] Fix schema evolution failure with wildcard…
yuxiqian Aug 22, 2024
52f2019
[FLINK-35056][cdc-connector/sqlserver] Fix scale mapping from SQL Ser…
morozov Aug 22, 2024
060d203
[hotfix][cdc-runtime] Fix schema registry hanging in multiple paralle…
yuxiqian Aug 23, 2024
aea2b6a
[hotfix][cdc-connector][mongodb] Fix LegacyMongoDBSourceExampleTest c…
Jiabao-Sun Aug 25, 2024
2e938a9
[FLINK-36128][cdc-runtime] Fix potential unrecoverable in-flight data…
yuxiqian Aug 26, 2024
0df63e2
[hotfix][cdc-runtime] Keep upstream pending requests in order to avoi…
yuxiqian Aug 26, 2024
3c1517f
[FLINK-36148][pipeline-connector/mysql] Fix that newly added table ca…
lvyanquan Aug 27, 2024
afe9c3c
[FLINK-36150][pipeline-connector/mysql] tables.exclude should work ev…
loserwang1024 Aug 27, 2024
a876af2
[minor][cdc-runtime] Run schema coordinator logic asynchronously to a…
yuxiqian Aug 27, 2024
0e9a176
[hotfix][pipeline-connector/mysql] Fix primary key restraints missing…
yuxiqian Aug 28, 2024
cb1b232
[minor][tests] Fix test testDanglingDroppingTableDuringBinlogMode due…
yuxiqian Aug 28, 2024
d5362c2
Merge pull request #5 from apache/master
zakhalex Aug 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 98 additions & 12 deletions .github/workflows/flink_cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,8 @@ jobs:
- name: Run license check
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb

migration_test:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify

compile_and_test:
needs: license_check
# Only run the CI pipeline for the flink-cdc-connectors repository
# if: github.repository == 'apache/flink-cdc-connectors'
runs-on: ubuntu-latest
Expand Down Expand Up @@ -263,3 +252,100 @@ jobs:
done
fi
exit 0


migration_test_ut:
needs: license_check
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify

pipeline_migration_test:
needs: migration_test_ut
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ '8', '11' ]

steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: maven
- name: Install dependencies
run: gem install terminal-table
- name: Prepare CDC versions
run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
- name: Prepare Flink distro
run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
working-directory: ./tools/mig-test
- name: Patch Flink configs
run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
working-directory: ./tools/mig-test
- name: Start containers
run: cd conf && docker-compose up -d
working-directory: ./tools/mig-test
- name: Run migration tests
run: FLINK_HOME=./flink-1.18.1/ ruby run_migration_test.rb
working-directory: ./tools/mig-test
- name: Stop containers
if: always()
run: cd conf && docker-compose down
working-directory: ./tools/mig-test

data_stream_migration_test:
needs: migration_test_ut
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ '8', '11' ]

steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: maven
- name: Install dependencies
run: gem install terminal-table
- name: Prepare CDC versions
run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
- name: Prepare Flink distro
run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
working-directory: ./tools/mig-test
- name: Patch Flink configs
run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
working-directory: ./tools/mig-test
- name: Compile Dummy DataStream Jobs
run: cd datastream && ruby compile_jobs.rb
working-directory: ./tools/mig-test
- name: Start containers
run: cd conf && docker-compose up -d
working-directory: ./tools/mig-test
- name: Run migration tests
run: cd datastream && FLINK_HOME=../flink-1.18.1/ ruby run_migration_test.rb
working-directory: ./tools/mig-test
- name: Stop containers
if: always()
run: cd conf && docker-compose down
working-directory: ./tools/mig-test
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ full database synchronization, sharding table synchronization, schema evolution
password: pass

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: adb.web_order\.*
sink-table: adb.ods_web_orders
description: sync sharding tables to one destination table
- source-table: adb.web_order\.*
sink-table: adb.ods_web_orders
description: sync sharding tables to one destination table

pipeline:
name: MySQL to Doris Pipeline
Expand Down
12 changes: 8 additions & 4 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Flink SQL> SELECT * FROM orders;
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td> MySQL CDC 消费者可选的启动模式,
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 "snapshot"。
请查阅 <a href="#a-name-id-002-a">启动模式</a> 章节了解更多详细信息。</td>
</tr>
<tr>
Expand Down Expand Up @@ -500,7 +500,7 @@ CREATE TABLE products (
* (3)在快照读取之前,Source 不需要数据库锁权限。

如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 `5400-6400`,
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk)
且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk)
然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。

#### 并发读取
Expand Down Expand Up @@ -550,7 +550,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服

当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。

在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块
在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块
快照块被分配给多个快照读取器。每个快照读取器使用 [区块读取算法](#snapshot-chunk-reading) 并将读取的数据发送到下游。
Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。
如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
Expand All @@ -565,7 +565,9 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业

在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 会识别表的主键列,并使用主键中的第一列作为用作分片列。
如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、
否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。

对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
例如,如果你有一个主键列为`id`的表,它是自动增量 BIGINT 类型,最小值为`0`,最大值为`100`,
Expand Down Expand Up @@ -629,6 +631,7 @@ MySQLSource.builder()
.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
.startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
.startupOptions(StartupOptions.snapshot()) // 仅读取快照
...
.build()
```
Expand All @@ -642,6 +645,7 @@ CREATE TABLE mysql_source (...) WITH (
'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
'scan.startup.mode' = 'timestamp', -- 从特定位点启动
'scan.startup.mode' = 'snapshot', -- 仅读取快照
'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/flink-sources/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ You can also read [tutorials]({{< ref "docs/connectors/flink-sources/tutorials/b

| Connector | Database | Driver |
|----------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 |
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0, 6.0, 6.1, 7.0 | MongoDB Driver: 4.11.2 |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.28 |
| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
Expand Down
4 changes: 3 additions & 1 deletion docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pipeline:
<td style="word-wrap: break-word;">initial</td>
<td>String</td>
<td> MySQL CDC 消费者可选的启动模式,
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。</td>
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 ""snapshot"。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.file</td>
Expand Down Expand Up @@ -280,6 +280,7 @@ pipeline:
- `latest-offset`:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
- `specific-offset`:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
- `timestamp`:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
- `snapshot`: 只进行快照阶段,跳过增量阶段,快照阶段读取结束后退出。

例如,可以在 YAML 配置文件中这样指定启动模式:

Expand All @@ -290,6 +291,7 @@ source:
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.mode: snapshot # Read snapshot only
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
Expand Down
11 changes: 11 additions & 0 deletions docs/content.zh/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
Expand Down
Loading
Loading