diff --git a/.github/workflows/debezium-workflow.yml b/.github/workflows/debezium-workflow.yml
index 8278b02c870..12d11e7912d 100644
--- a/.github/workflows/debezium-workflow.yml
+++ b/.github/workflows/debezium-workflow.yml
@@ -44,7 +44,7 @@ jobs:
- name: Get modified files (Common)
id: changed-files-common
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
support/checkstyle/**
@@ -62,49 +62,49 @@ jobs:
- name: Get modified files (MongoDB)
id: changed-files-mongodb
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-connector-mongodb/**
- name: Get modified files (MySQL)
id: changed-files-mysql
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-connector-mysql/**
- name: Get modified files (PostgreSQL)
id: changed-files-postgresql
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-connector-postgres/**
- name: Get modified files (Oracle)
id: changed-files-oracle
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-connector-oracle/**
- name: Get modified files (SQL Server)
id: changed-files-sqlserver
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-connector-sqlserver/**
- name: Get modified files (Quarkus Outbox)
id: changed-files-outbox
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-quarkus-outbox/**
- name: Get modified files (REST Extension)
id: changed-files-rest-extension
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-connect-rest-extension/**
@@ -116,7 +116,7 @@ jobs:
- name: Get modified files (Schema Generator)
id: changed-files-schema-generator
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-schema-generator/**
@@ -128,14 +128,14 @@ jobs:
- name: Get modified files (Debezium Testing)
id: changed-files-debezium-testing
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-testing/**
- name: Get modified files (MySQL DDL parser)
id: changed-files-mysql-ddl-parser
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/**
@@ -144,7 +144,7 @@ jobs:
- name: Get modified files (Oracle DDL parser)
id: changed-files-oracle-ddl-parser
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/oracle/**
@@ -154,14 +154,14 @@ jobs:
- name: Get modified files (Documentation)
id: changed-files-documentation
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
documentation/**
- name: Get modified files (Storage)
id: changed-files-storage
- uses: tj-actions/changed-files@v41.1.1
+ uses: tj-actions/changed-files@v42.0.0
with:
files: |
debezium-storage/**
@@ -183,7 +183,7 @@ jobs:
- name: Cache Maven Repository
id: maven-cache-check
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -231,7 +231,7 @@ jobs:
- name: Cache Maven Repository
id: maven-cache-check
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -272,7 +272,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -310,7 +310,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -347,7 +347,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -383,7 +383,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -423,7 +423,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -456,7 +456,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -492,7 +492,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -525,7 +525,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -558,7 +558,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -590,7 +590,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -622,7 +622,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -655,7 +655,7 @@ jobs:
java-version: 17
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('**/pom.xml') }}
@@ -717,7 +717,7 @@ jobs:
# For this build, we do not care if there are or are not changes in the sibling repository since this
# job will only ever fire if there are changes in the common paths identified in the files_changed job.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
@@ -784,7 +784,7 @@ jobs:
# For this build, we do not care if there are or are not changes in the sibling repository since this
# job will only ever fire if there are changes in the common paths identified in the files_changed job.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
@@ -841,7 +841,7 @@ jobs:
# For this build, we do not care if there are or are not changes in the sibling repository since this
# job will only ever fire if there are changes in the common paths identified in the files_changed job.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
@@ -898,7 +898,7 @@ jobs:
# For this build, we do not care if there are or are not changes in the sibling repository since this
# job will only ever fire if there are changes in the common paths identified in the files_changed job.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
@@ -955,7 +955,7 @@ jobs:
# For this build, we do not care if there are or are not changes in the sibling repository since this
# job will only ever fire if there are changes in the common paths identified in the files_changed job.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
@@ -1013,7 +1013,7 @@ jobs:
# For this build, we do not care if there are or are not changes in the sibling repository since this
# job will only ever fire if there are changes in the common paths identified in the files_changed job.
- name: Cache Maven Repository
- uses: actions/cache@v2
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
diff --git a/.github/workflows/oracle-workflow-test.yml b/.github/workflows/oracle-workflow-test.yml
index d1a0ea73c69..a3e1c6ad77e 100644
--- a/.github/workflows/oracle-workflow-test.yml
+++ b/.github/workflows/oracle-workflow-test.yml
@@ -82,7 +82,7 @@ jobs:
# This workflow uses its own dependency cache rather than the main debezium workflow cache because
# we explicitly want to trigger this build on pushes separate from the other workflow.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
# refresh cache every month to avoid unlimited growth
@@ -140,7 +140,7 @@ jobs:
# This workflow uses its own dependency cache rather than the main debezium workflow cache because
# we explicitly want to trigger this build on pushes separate from the other workflow.
- name: Cache Maven Repository
- uses: actions/cache@v3
+ uses: actions/cache@v4
with:
path: ~/.m2/repository
# refresh cache every month to avoid unlimited growth
diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt
index ece3aca6913..225a0b34c9e 100644
--- a/COPYRIGHT.txt
+++ b/COPYRIGHT.txt
@@ -579,3 +579,4 @@ Pavithrananda Prabhu
حمود سمبول
Peter Hamer
Artem Shubovych
+leoloel
\ No newline at end of file
diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml
index ae0b5aac4af..b8d0ac29915 100644
--- a/debezium-bom/pom.xml
+++ b/debezium-bom/pom.xml
@@ -59,7 +59,7 @@
3.0.0
4.0.1
1.19.1
- 2.7.0
+ 2.9.0
1.5.0
0.9.12
3.5.0.1
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
index b3e3241c820..0401de6e2af 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
@@ -99,7 +99,13 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
- LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
+ if (databaseSchema.isStorageInitializationExecuted()) {
+ LOGGER.info(
+ "A previous offset indicating a completed snapshot has been found, schema will still be snapshotted since we are in schema_only_recovery mode.");
+ }
+ else {
+ LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
+ }
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}
diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
index 97c40289825..491e718b3d0 100644
--- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
+++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java
@@ -66,9 +66,12 @@ public class SqlServerConnection extends JdbcConnection {
private static final String GET_MIN_LSN = "SELECT [#db].sys.fn_cdc_get_min_lsn('#')";
private static final String LOCK_TABLE = "SELECT * FROM [#] WITH (TABLOCKX)";
private static final String INCREMENT_LSN = "SELECT [#db].sys.fn_cdc_increment_lsn(?)";
- private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
- private final String get_all_changes_for_table;
protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))";
+ private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, "
+ + LSN_TIMESTAMP_SELECT_STATEMENT;
+ private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')";
+ private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]";
+ private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
/**
* Queries the list of captured column names and their change table identifiers in the given database.
@@ -133,10 +136,47 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
this.queryFetchSize = config.getQueryFetchSize();
+ getAllChangesForTable = buildGetAllChangesForTableQuery(config.getDataQueryMode(), skippedOperations);
+
+ this.config = config;
+ this.useSingleDatabase = useSingleDatabase;
+
+ this.optionRecompile = false;
+ }
+
+ /**
+ * Creates a new connection using the supplied configuration.
+ *
+ * @param config {@link Configuration} instance, may not be null.
+ * @param valueConverters {@link SqlServerValueConverters} instance
+ * @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
+ * @param optionRecompile Includes query option RECOMPILE on incremental snapshots
+ */
+ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConverters valueConverters,
+ Set skippedOperations, boolean useSingleDatabase,
+ boolean optionRecompile) {
+ this(config, valueConverters, skippedOperations, useSingleDatabase);
+
+ this.optionRecompile = optionRecompile;
+ }
+
+ private String buildGetAllChangesForTableQuery(SqlServerConnectorConfig.DataQueryMode dataQueryMode,
+ Set skippedOperations) {
+ String result = GET_ALL_CHANGES_FOR_TABLE_SELECT + " ";
+ List where = new LinkedList<>();
+ switch (dataQueryMode) {
+ case FUNCTION:
+ result += GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION + " ";
+ break;
+ case DIRECT:
+ result += GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT + " ";
+ where.add("[__$start_lsn] >= ?");
+ where.add("[__$start_lsn] <= ?");
+ break;
+ }
+
if (hasSkippedOperations(skippedOperations)) {
Set skippedOps = new HashSet<>();
- StringBuilder getAllChangesForTableStatement = new StringBuilder(
- "SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') WHERE __$operation NOT IN (");
skippedOperations.forEach((Envelope.Operation operation) -> {
// This number are the __$operation number in the SQLServer
// https://docs.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver15#table-returned
@@ -153,37 +193,16 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver
break;
}
});
- getAllChangesForTableStatement.append(String.join(",", skippedOps));
- getAllChangesForTableStatement.append(") order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC");
- get_all_changes_for_table = getAllChangesForTableStatement.toString();
+ where.add("[__$operation] NOT IN (" + String.join(",", skippedOps) + ")");
}
- else {
- get_all_changes_for_table = GET_ALL_CHANGES_FOR_TABLE;
+ if (!where.isEmpty()) {
+ result += " WHERE " + String.join(" AND ", where) + " ";
}
- getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER,
- Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT));
- this.config = config;
- this.useSingleDatabase = useSingleDatabase;
+ result += GET_ALL_CHANGES_FOR_TABLE_ORDER_BY;
- this.optionRecompile = false;
- }
-
- /**
- * Creates a new connection using the supplied configuration.
- *
- * @param config {@link Configuration} instance, may not be null.
- * @param valueConverters {@link SqlServerValueConverters} instance
- * @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
- * @param optionRecompile Includes query option RECOMPILE on incremental snapshots
- */
- public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConverters valueConverters,
- Set skippedOperations, boolean useSingleDatabase,
- boolean optionRecompile) {
- this(config, valueConverters, skippedOperations, useSingleDatabase);
-
- this.optionRecompile = optionRecompile;
+ return result;
}
private boolean hasSkippedOperations(Set skippedOperations) {
@@ -330,8 +349,14 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan
int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
+ String capturedColumns = String.join(", ", changeTable.getCapturedColumns());
+ String source = changeTable.getCaptureInstance();
+ if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
+ source = changeTable.getChangeTableId().table();
+ }
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
- .replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
+ .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
+ .replace(STATEMENTS_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java
index 367cea2e9aa..edee0e429b3 100644
--- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java
+++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java
@@ -217,6 +217,67 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
}
}
+ /**
+ * The set of predefined data query mode options.
+ */
+ public enum DataQueryMode implements EnumeratedValue {
+
+ /**
+ * In this mode the CDC data is queried by means of calling {@code cdc.[fn_cdc_get_all_changes_#]} function.
+ */
+ FUNCTION("function"),
+
+ /**
+ * In this mode the CDC data is queried from change tables directly.
+ */
+ DIRECT("direct");
+
+ private final String value;
+
+ DataQueryMode(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @return the matching option, or null if no match is found
+ */
+ public static DataQueryMode parse(String value) {
+ if (value == null) {
+ return null;
+ }
+ value = value.trim();
+ for (DataQueryMode option : DataQueryMode.values()) {
+ if (option.getValue().equalsIgnoreCase(value)) {
+ return option;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @param defaultValue the default value; may be null
+ * @return the matching option, or null if no match is found and the non-null default is invalid
+ */
+ public static DataQueryMode parse(String value, String defaultValue) {
+ DataQueryMode mode = parse(value);
+ if (mode == null && defaultValue != null) {
+ mode = parse(defaultValue);
+ }
+ return mode;
+ }
+ }
+
public static final Field USER = RelationalDatabaseConnectorConfig.USER
.optional()
.withNoValidation();
@@ -303,6 +364,17 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(SqlServerSourceInfoStructMaker.class.getName());
+ public static final Field DATA_QUERY_MODE = Field.create("data.query.mode")
+ .withDisplayName("Data query mode")
+ .withEnum(DataQueryMode.class, DataQueryMode.FUNCTION)
+ .withWidth(Width.SHORT)
+ .withImportance(Importance.LOW)
+ .withDescription("Controls how the connector queries CDC data. "
+ + "The default is '" + DataQueryMode.FUNCTION.getValue()
+ + "', which means the data is queried by means of calling cdc.[fn_cdc_get_all_changes_#] function. "
+ + "The value of '" + DataQueryMode.DIRECT.getValue()
+ + "' makes the connector to query the change tables directly.");
+
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
@@ -321,7 +393,8 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
- QUERY_FETCH_SIZE)
+ QUERY_FETCH_SIZE,
+ DATA_QUERY_MODE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
@@ -346,6 +419,7 @@ public static ConfigDef configDef() {
private final int maxTransactionsPerIteration;
private final boolean optionRecompile;
private final int queryFetchSize;
+ private final DataQueryMode dataQueryMode;
public SqlServerConnectorConfig(Configuration config) {
super(
@@ -386,6 +460,8 @@ public SqlServerConnectorConfig(Configuration config) {
}
this.optionRecompile = config.getBoolean(INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE);
+
+ this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
}
public List getDatabaseNames() {
@@ -517,6 +593,10 @@ public Map getSnapshotSelectOverridesByTable() {
return Collections.unmodifiableMap(snapshotSelectOverridesByTable);
}
+ public DataQueryMode getDataQueryMode() {
+ return dataQueryMode;
+ }
+
private static int validateDatabaseNames(Configuration config, Field field, Field.ValidationOutput problems) {
String databaseNames = config.getString(field);
int count = 0;
diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java
index e50338d17ca..1757b08f2ee 100644
--- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java
+++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java
@@ -184,6 +184,83 @@ public void createAndDelete() throws Exception {
stopConnector();
}
+ @Test
+ public void createAndDeleteInDataQueryDirectMode() throws Exception {
+ final int RECORDS_PER_TABLE = 5;
+ final int TABLES = 2;
+ final int ID_START = 10;
+ final Configuration config = TestHelper.defaultConfig()
+ .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
+ .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
+ .build();
+
+ start(SqlServerConnector.class, config);
+ assertConnectorIsRunning();
+
+ // Wait for snapshot completion
+ consumeRecordsByTopic(1);
+
+ for (int i = 0; i < RECORDS_PER_TABLE; i++) {
+ final int id = ID_START + i;
+ connection.execute(
+ "INSERT INTO tablea VALUES(" + id + ", 'a')");
+ connection.execute(
+ "INSERT INTO tableb VALUES(" + id + ", 'b')");
+ }
+
+ final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
+ final List tableA = records.recordsForTopic("server1.testDB1.dbo.tablea");
+ final List tableB = records.recordsForTopic("server1.testDB1.dbo.tableb");
+ assertThat(tableA).hasSize(RECORDS_PER_TABLE);
+ assertThat(tableB).hasSize(RECORDS_PER_TABLE);
+ for (int i = 0; i < RECORDS_PER_TABLE; i++) {
+ final SourceRecord recordA = tableA.get(i);
+ final SourceRecord recordB = tableB.get(i);
+ final List expectedRowA = Arrays.asList(
+ new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
+ new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
+ final List expectedRowB = Arrays.asList(
+ new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
+ new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
+
+ final Struct keyA = (Struct) recordA.key();
+ final Struct valueA = (Struct) recordA.value();
+ assertRecord((Struct) valueA.get("after"), expectedRowA);
+ assertNull(valueA.get("before"));
+
+ final Struct keyB = (Struct) recordB.key();
+ final Struct valueB = (Struct) recordB.value();
+ assertRecord((Struct) valueB.get("after"), expectedRowB);
+ assertNull(valueB.get("before"));
+ }
+
+ connection.execute("DELETE FROM tableB");
+ final SourceRecords deleteRecords = consumeRecordsByTopic(2 * RECORDS_PER_TABLE);
+ final List deleteTableA = deleteRecords.recordsForTopic("server1.testDB1.dbo.tablea");
+ final List deleteTableB = deleteRecords.recordsForTopic("server1.testDB1.dbo.tableb");
+ assertThat(deleteTableA).isNullOrEmpty();
+ assertThat(deleteTableB).hasSize(2 * RECORDS_PER_TABLE);
+
+ for (int i = 0; i < RECORDS_PER_TABLE; i++) {
+ final SourceRecord deleteRecord = deleteTableB.get(i * 2);
+ final SourceRecord tombstoneRecord = deleteTableB.get(i * 2 + 1);
+ final List expectedDeleteRow = Arrays.asList(
+ new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
+ new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
+
+ final Struct deleteKey = (Struct) deleteRecord.key();
+ final Struct deleteValue = (Struct) deleteRecord.value();
+ assertRecord((Struct) deleteValue.get("before"), expectedDeleteRow);
+ assertNull(deleteValue.get("after"));
+
+ final Struct tombstoneKey = (Struct) tombstoneRecord.key();
+ final Struct tombstoneValue = (Struct) tombstoneRecord.value();
+ assertNull(tombstoneValue);
+ }
+
+ stopConnector();
+ }
+
@Test
@FixFor("DBZ-1642")
public void readOnlyApplicationIntent() throws Exception {
diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java
index 306726944ad..d9e4fab8d9e 100644
--- a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java
+++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java
@@ -92,8 +92,11 @@ public class TimezoneConverter> implements Transforma
private String convertedTimezone;
private List includeList;
private List excludeList;
- private static final String SOURCE = "source";
+ private static final String SOURCE = Envelope.FieldName.SOURCE;
private static final String TOPIC = "topic";
+ private static final String FIELD_SOURCE_PREFIX = Envelope.FieldName.SOURCE + ".";
+ private static final String FIELD_BEFORE_PREFIX = Envelope.FieldName.BEFORE + ".";
+ private static final String FIELD_AFTER_PREFIX = Envelope.FieldName.AFTER + ".";
private static final Pattern TIMEZONE_OFFSET_PATTERN = Pattern.compile("^[+-]\\d{2}:\\d{2}(:\\d{2})?$");
private static final Pattern LIST_PATTERN = Pattern.compile("^\\[(source|topic|[\".\\w\\s_]+):([\".\\w\\s_]+(?::[\".\\w\\s_]+)?(?:,|]$))+$");
private final Map> topicFieldsMap = new HashMap<>();
@@ -128,8 +131,6 @@ public R apply(R record) {
}
Struct value = (Struct) record.value();
- Schema schema = value.schema();
-
String table = getTableFromSource(value);
String topic = record.topic();
@@ -190,19 +191,25 @@ private void collectTablesAndTopics(List list) {
if (!topicFieldsMap.containsKey(matchName)) {
topicFieldsMap.put(matchName, new HashSet<>());
}
- topicFieldsMap.get(matchName).add(field);
+ if (field != null) {
+ topicFieldsMap.get(matchName).add(field);
+ }
}
else if (Objects.equals(commonPrefix, SOURCE)) {
if (!tableFieldsMap.containsKey(matchName)) {
tableFieldsMap.put(matchName, new HashSet<>());
}
- tableFieldsMap.get(matchName).add(field);
+ if (field != null) {
+ tableFieldsMap.get(matchName).add(field);
+ }
}
else {
if (!noPrefixFieldsMap.containsKey(matchName)) {
noPrefixFieldsMap.put(matchName, new HashSet<>());
}
- noPrefixFieldsMap.get(matchName).add(field);
+ if (field != null) {
+ noPrefixFieldsMap.get(matchName).add(field);
+ }
}
}
}
@@ -307,12 +314,38 @@ private void handleStructs(Struct value, Type type, String matchName, Set beforeFields = new HashSet<>();
+ Set afterFields = new HashSet<>();
+ Set sourceFields = new HashSet<>();
+
+ if (!fields.isEmpty()) {
+ for (String field : fields) {
+ if (field.startsWith(FIELD_SOURCE_PREFIX)) {
+ sourceFields.add(field.substring(FIELD_SOURCE_PREFIX.length()));
+ }
+ else if (field.startsWith(FIELD_BEFORE_PREFIX)) {
+ beforeFields.add(field.substring(FIELD_BEFORE_PREFIX.length()));
+ }
+ else if (field.startsWith(FIELD_AFTER_PREFIX)) {
+ afterFields.add(field.substring(FIELD_AFTER_PREFIX.length()));
+ }
+ else {
+ beforeFields.add(field);
+ afterFields.add(field);
+ }
+ }
+ }
if (before != null) {
- handleValueForFields(before, type, fields);
+ handleValueForFields(before, type, beforeFields);
}
if (after != null) {
- handleValueForFields(after, type, fields);
+ handleValueForFields(after, type, afterFields);
+ }
+ if (source != null && !sourceFields.isEmpty()) {
+ handleValueForFields(source, type, sourceFields);
}
}
@@ -472,7 +505,7 @@ private void handleInclude(Struct value, String table, String topic) {
Set fields = matchFieldsResult.getFields();
if (matchName != null) {
- if (!fields.contains(null)) {
+ if (!fields.isEmpty()) {
handleStructs(value, Type.INCLUDE, matchName, fields);
}
else {
@@ -480,7 +513,7 @@ private void handleInclude(Struct value, String table, String topic) {
}
}
else {
- handleStructs(value, Type.ALL, table, Set.of(""));
+ handleStructs(value, Type.ALL, table, Collections.emptySet());
}
}
@@ -490,16 +523,16 @@ private void handleExclude(Struct value, String table, String topic) {
Set fields = matchFieldsResult.getFields();
if (matchName == null) {
- handleStructs(value, Type.ALL, table != null ? table : topic, Set.of(""));
+ handleStructs(value, Type.ALL, table != null ? table : topic, Collections.emptySet());
}
- else if (!fields.contains(null)) {
+ else if (!fields.isEmpty()) {
handleStructs(value, Type.EXCLUDE, matchName, fields);
}
}
private void handleAllRecords(Struct value, String table, String topic) {
if (!topicFieldsMap.containsKey(topic) && !tableFieldsMap.containsKey(table) && !noPrefixFieldsMap.containsKey(table)) {
- handleStructs(value, Type.ALL, table != null ? table : topic, Set.of(""));
+ handleStructs(value, Type.ALL, table != null ? table : topic, Collections.emptySet());
}
}
}
diff --git a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java
index eb6371cce08..77fed74696e 100644
--- a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java
+++ b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java
@@ -201,7 +201,7 @@ public void testKafkaConnectTimestamp() {
public void testIncludeListWithTablePrefix() {
final Map props = new HashMap<>();
props.put("converted.timezone", "Atlantic/Azores");
- props.put("include.list", "source:customers:order_date_zoned_timestamp");
+ props.put("include.list", "source:customers:after.order_date_zoned_timestamp");
converter.configure(props);
final Struct before = new Struct(recordSchema);
@@ -810,6 +810,51 @@ public void testUnsupportedLogicalTypes() {
VerifyRecord.isValid(record);
converter.apply(record);
assertThat(logInterceptor.containsMessage("Skipping conversion for unsupported logical type: io.debezium.time.Date for field: order_date")).isTrue();
+ }
+
+ @Test
+ public void testSourceBlockTimestamp() {
+ Map props = new HashMap<>();
+ props.put("converted.timezone", "Europe/Moscow");
+ props.put("include.list", "source:customers:source.ts_ms");
+
+ converter.configure(props);
+
+ final Struct before = new Struct(recordSchema);
+ final Struct source = new Struct(sourceSchema);
+
+ before.put("id", (byte) 1);
+ before.put("name", "Amy Rose");
+ before.put("order_date_zoned_time", "11:15:30.123456789+00:00");
+
+ source.put("table", "customers");
+ source.put("lsn", 1);
+ source.put("ts_ms", 123456789);
+
+ final Envelope envelope = Envelope.defineSchema()
+ .withName("dummy.Envelope")
+ .withRecord(recordSchema)
+ .withSource(sourceSchema)
+ .build();
+ final Struct payload = envelope.create(before, source, Instant.now());
+
+ SourceRecord record = new SourceRecord(
+ new HashMap<>(),
+ new HashMap<>(),
+ "db.server1.table1",
+ envelope.schema(),
+ payload);
+
+ VerifyRecord.isValid(record);
+ final SourceRecord transformedRecord = converter.apply(record);
+ VerifyRecord.isValid(transformedRecord);
+
+ final Struct transformedValue = (Struct) transformedRecord.value();
+ final Struct transformedSource = transformedValue.getStruct(Envelope.FieldName.SOURCE);
+ final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
+
+ assertThat(transformedSource.get("ts_ms")).isEqualTo(123456789);
+ assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("11:15:30.123456789+00:00");
}
}
diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java
index 36566e1d79c..0a21ba7cebf 100644
--- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java
+++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcCommonConfig.java
@@ -45,7 +45,7 @@ public class JdbcCommonConfig {
public JdbcCommonConfig(Configuration config, String prefix) {
config = config.subset(prefix, true);
- LOGGER.info("Configuration for '{}' with prefix '{}': {}", getClass().getSimpleName(), prefix, config.asMap());
+ LOGGER.info("Configuration for '{}' with prefix '{}': {}", getClass().getSimpleName(), prefix, config.withMaskedPasswords().asMap());
if (!config.validateAndRecord(getAllConfigurationFields(), error -> LOGGER.error("Validation error for property with prefix '{}': {}", prefix, error))) {
throw new DebeziumException(
String.format("Error configuring an instance of '%s' with prefix '%s'; check the logs for errors", getClass().getSimpleName(), prefix));
diff --git a/debezium-testing/debezium-testing-system/pom.xml b/debezium-testing/debezium-testing-system/pom.xml
index 5cf8fb52f06..8d17e668211 100644
--- a/debezium-testing/debezium-testing-system/pom.xml
+++ b/debezium-testing/debezium-testing-system/pom.xml
@@ -126,7 +126,7 @@
ORCLPDB1
- 2.6.0-SNAPSHOT
+ ${project.version}
http://debezium-artifact-server.${ocp.project.debezium}.svc.cluster.local:8080
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java
index bf1a923847f..45399b40a11 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectController.java
@@ -111,7 +111,7 @@ public HttpUrl getMetricsURL() {
@Override
public boolean undeploy() {
container.stop();
- return container.isRunning();
+ return !container.isRunning();
}
@Override
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java
index a15940b33db..ac4d1c51014 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaConnectDeployer.java
@@ -36,7 +36,7 @@ public Builder(KafkaConnectConainer container) {
}
public Builder withKafka(DockerKafkaController kafka) {
- container.withKafka(kafka.getContainer());
+ container.withKafka(kafka.getKafkaContainer());
return self();
}
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java
index ddd8b68b5cb..8a91a03c42b 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java
@@ -13,6 +13,10 @@
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.kafka.docker.KafkaContainer;
+import io.debezium.testing.system.tools.kafka.docker.ZookeeperContainer;
+
+import lombok.Getter;
+import lombok.Setter;
/**
* This class provides control over Kafka instance deployed as DockerContainer
@@ -23,24 +27,23 @@ public class DockerKafkaController implements KafkaController {
private static final Logger LOGGER = LoggerFactory.getLogger(DockerKafkaController.class);
- private final KafkaContainer container;
+ @Getter
+ private final KafkaContainer kafkaContainer;
+ @Setter
+ private ZookeeperContainer zookeeperContainer;
public DockerKafkaController(KafkaContainer container) {
- this.container = container;
- }
-
- public KafkaContainer getContainer() {
- return container;
+ this.kafkaContainer = container;
}
@Override
public String getPublicBootstrapAddress() {
- return container.getPublicBootstrapAddress();
+ return kafkaContainer.getPublicBootstrapAddress();
}
@Override
public String getBootstrapAddress() {
- return container.getBootstrapAddress();
+ return kafkaContainer.getBootstrapAddress();
}
@Override
@@ -50,14 +53,15 @@ public String getTlsBootstrapAddress() {
@Override
public boolean undeploy() {
- container.stop();
- return container.isRunning();
+ kafkaContainer.stop();
+ zookeeperContainer.stop();
+ return !zookeeperContainer.isRunning() && !kafkaContainer.isRunning();
}
@Override
public void waitForCluster() {
await()
.atMost(scaled(5), MINUTES)
- .until(container::isRunning);
+ .until(kafkaContainer::isRunning);
}
}
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java
index d18fa180b57..0581a9b8770 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java
@@ -34,7 +34,9 @@ public DockerKafkaController deploy() {
container.withZookeeper(zookeeperContainer);
Startables.deepStart(Stream.of(zookeeperContainer, container)).join();
- return getController(container);
+ DockerKafkaController controller = getController(container);
+ controller.setZookeeperContainer(zookeeperContainer);
+ return controller;
}
public static class Builder
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java
index 05cc651b49d..2e27bfceae4 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java
@@ -5,10 +5,13 @@
*/
package io.debezium.testing.system.tools.kafka.docker;
+import java.time.Duration;
+
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import io.debezium.testing.system.tools.ConfigProperties;
+import io.debezium.testing.system.tools.WaitConditions;
public class KafkaConnectConainer extends GenericContainer {
@@ -28,7 +31,7 @@ public KafkaConnectConainer() {
}
private void defaultConfig() {
- withReuse(true);
+ withReuse(false);
withExposedPorts(KAFKA_CONNECT_API_PORT, KAFKA_JMX_PORT);
addEnv("CONFIG_STORAGE_TOPIC", "connect_config");
addEnv("OFFSET_STORAGE_TOPIC", "connect_offsets");
@@ -36,6 +39,7 @@ private void defaultConfig() {
addEnv("JMXHOST", KAFKA_JMX_HOST);
addEnv("JMXPORT", String.valueOf(KAFKA_JMX_PORT));
withHttpMetrics();
+ withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1)));
withCommand(KAFKA_CONNECT_COMMAND);
}
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java
index 14c512214b8..3d00600957d 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java
@@ -5,6 +5,7 @@
*/
package io.debezium.testing.system.tools.kafka.docker;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.testcontainers.containers.GenericContainer;
@@ -13,6 +14,7 @@
import com.github.dockerjava.api.command.InspectContainerResponse;
import io.debezium.testing.system.tools.ConfigProperties;
+import io.debezium.testing.system.tools.WaitConditions;
public class KafkaContainer extends GenericContainer {
@@ -42,6 +44,7 @@ private void defaultConfig() {
withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://" + getPublicBootstrapAddress() + ",BROKER://" + getBootstrapAddress());
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+ withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1)));
}
public KafkaContainer withZookeeper(ZookeeperContainer zookeeper) {
diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java
index 564708f01be..b53971311ef 100644
--- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java
+++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java
@@ -5,9 +5,12 @@
*/
package io.debezium.testing.system.tools.kafka.docker;
+import java.time.Duration;
+
import org.testcontainers.containers.GenericContainer;
import io.debezium.testing.system.tools.ConfigProperties;
+import io.debezium.testing.system.tools.WaitConditions;
public class ZookeeperContainer extends GenericContainer {
@@ -30,6 +33,7 @@ public String serverAddress() {
private void defaultConfig() {
withExposedPorts(ZOOKEEPER_PORT_CLIENT);
withCommand(ZOOKEEPER_COMMAND);
+ withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1)));
}
}
diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc
index e5144b65120..f34987d0a24 100644
--- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc
+++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc
@@ -248,7 +248,7 @@ Removing offsets should be performed only by advanced users who have experience
This operation is potentially destructive, and should be performed only as a last resort.
====
4. Apply the following changes to the connector configuration:
-.. (Optional) Set the value of xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.captured.tables.ddl`] to `false`.
+.. (Optional) Set the value of xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.store.only.captured.tables.ddl`] to `false`.
This setting causes the snapshot to capture the schema for all tables, and guarantees that, in the future, the connector can reconstruct the schema history for all tables. +
+
[NOTE]
@@ -258,7 +258,7 @@ Snapshots that capture the schema for all tables require more time to complete.
.. Add the tables that you want the connector to capture to xref:{context}-property-table-include-list[`table.include.list`].
.. Set the xref:{context}-property-snapshot-mode[`snapshot.mode`] to one of the following values:
`initial`:: When you restart the connector, it takes a full snapshot of the database that captures the table data and table structures. +
-If you select this option, consider setting the value of the xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.captured.tables.ddl`] property to `false` to enable the connector to capture the schema of all tables.
+If you select this option, consider setting the value of the xref:{context}-property-database-history-store-only-captured-tables-ddl[`schema.history.internal.store.only.captured.tables.ddl`] property to `false` to enable the connector to capture the schema of all tables.
`schema_only`:: When you restart the connector, it takes a snapshot that captures only the table schema.
Unlike a full data snapshot, this option does not capture any table data.
Use this option if you want to restart the connector more quickly than with a full snapshot.
@@ -916,7 +916,10 @@ a|Name of the schema that defines the structure of the key's payload. This schem
ifdef::community[]
[NOTE]
====
-Although the `column.exclude.list` and `column.include.list` connector configuration properties allow you to capture only a subset of table columns, all columns in a primary or unique key are always included in the event's key.
+When {prodname} emits a change event record, it sets the message key for each record to the name of the primary key or unique key column of the source table.
+{prodname} must be able to read these columns to function properly.
+If you set the xref:sqlserver-property-column-include-list[`column.include.list`] or xref:sqlserver-property-column-exclude-list[`column.exclude.list`] properties in the connector configuration,
+be sure that your settings permit the connector to capture the required primary key or unique key columns.
====
[WARNING]
@@ -2519,9 +2522,13 @@ If you include this property in the configuration, do not also set the `table.in
|[[sqlserver-property-column-include-list]]<>
|_empty string_
|An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in the change event message values.
-Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_.
-Note that primary key columns are always included in the event's key, even if not included in the value.
-For now, table primary key has to be always explicitly included in the list of captured columns.+
+Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_. +
+
+[NOTE]
+====
+Each change event record that {prodname} emits for a table includes an event key that contains fields for each column in the table's primary key or unique key.
+To ensure that event keys are generated correctly, if you set this property, be sure to explicitly list the primary key columns of any captured tables.
+====
To match the name of a column, {prodname} applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the column; it does not match substrings that might be present in a column name. +
diff --git a/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc b/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc
index 67e39ee3818..44eba748566 100644
--- a/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc
+++ b/documentation/modules/ROOT/pages/transformations/timezone-converter.adoc
@@ -34,6 +34,13 @@ Providing a fixed UTC offset is useful when converting timestamp fields to a spe
The `include.list` and `exclude.list` configuration options are mutually exclusive. You must specify only one of the options.
====
+The SMT also allows conversion of event metadata fields in the source information block, such as `ts_ms` to the target timezone. In order to convert the metadata fields, you must include the `source` prefix in the `fieldname` of the `include.list` or `exclude.list` configuration option.
+
+[NOTE]
+====
+If the schema for timestamp fields in the source information block, like `ts_ms`, is currently set to `INT64`, which is not a timestamp type, future releases aim to support the conversion of such fields by introducing compatibility for a timestamp schema.
+====
+
[[timezone-converter-usage]]
[[basic-example-timezone-converter]]
@@ -179,14 +186,14 @@ Specify rules by using one of the following formats:
The SMT converts all time-based fields in the matched table.
`source::` :: Matches {prodname} change events with source information blocks that have the specified table name.
-The SMT converts only fields in the specified table that have the specified field name.
+The SMT converts only fields in the specified table that have the specified field name. `fieldname` can be prefixed with `before`, `after`, or `source` to include the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are converted.
`topic:` :: Matches events from the specified topic name, converting all time-based fields in the event record.
-`topic::` :: Matches events from the specified topic name, and converts values for the specified fields only.
+`topic::` :: Matches events from the specified topic name, and converts values for the specified fields only. `fieldname` can be prefixed with `before`, `after`, or `source` to include the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are converted.
`:` :: Applies a heuristic matching algorithm to match against the table name of the source information block, if present; otherwise, matches against the topic name.
-The SMT converts values for the specified field name only.
+The SMT converts values for the specified field name only. `fieldname` can be prefixed with `before`, `after`, or `source` to include the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are converted.
|list
|medium
|[[timezone-converter-exclude-list]]<>
@@ -197,14 +204,14 @@ Specify rules by using one of the following formats:
The SMT excludes all time-based fields in the matched table from conversion.
`source::` :: Matches {prodname} change events with source information blocks that have the specified table name.
-The SMT excludes from conversion fields in the specified table that match the specified field name.
+The SMT excludes from conversion fields in the specified table that match the specified field name. `fieldname` can be prefixed with `before`, `after`, or `source` to exclude the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are excluded from conversion.
`topic:` :: Matches events from the specified topic name, and excludes from conversion all time-based fields in the topic.
-`topic::` :: Matches events from the specified topic name, and excludes from conversion any fields in the topic that have the specified name.
+`topic::` :: Matches events from the specified topic name, and excludes from conversion any fields in the topic that have the specified name. `fieldname` can be prefixed with `before`, `after`, or `source` to exclude the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are excluded from conversion.
`:` :: Applies a heuristic matching algorithm to match against the table name of the source information block, if present; otherwise, matches against the topic name.
-The SMT excludes from conversion only fields that have the specified name.
+The SMT excludes from conversion only fields that have the specified name. `fieldname` can be prefixed with `before`, `after`, or `source` to exclude the appropriate field in the event record. If no prefix is specified, both `before` and `after` fields are excluded from conversion.
|list
|medium
|===
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc
index 4280db5cf80..231156fc3d3 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc
@@ -63,7 +63,7 @@ If this component of the `data` field is omitted, the signal stops the entire in
|5
|`incremental`
-|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. +
+|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. +
Currently, the only valid option is `incremental`. +
If you do not specify a `type` value, the signal fails to stop the incremental snapshot.
|===
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc
index 8a5f03425eb..859badf5213 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot-sql.adoc
@@ -62,7 +62,7 @@ If this component of the `data` field is omitted, the signal stops the entire in
|5
|`incremental`
-|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. +
+|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. +
Currently, the only valid option is `incremental`. +
If you do not specify a `type` value, the signal fails to stop the incremental snapshot.
|===
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc
index 8139a7b3152..22af5054893 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-stopping-an-incremental-snapshot.adoc
@@ -90,7 +90,7 @@ If this component of the `data` field is omitted, the signal stops the entire in
|5
|`incremental`
-|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. +
+|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. +
Currently, the only valid option is `incremental`. +
If you do not specify a `type` value, the signal fails to stop the incremental snapshot.
|===
@@ -125,7 +125,7 @@ If this component of the `data` field is omitted, the signal stops the entire in
|5
|`incremental`
-|A required component of the `data` field of a signal that specifies the kind of snapshot operation that is to be stopped. +
+|A required component of the `data` field of a signal that specifies the type of snapshot operation that is to be stopped. +
Currently, the only valid option is `incremental`. +
If you do not specify a `type` value, the signal fails to stop the incremental snapshot.
|===
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc
index 45a02a66d1a..2fe3b7bd027 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc
@@ -14,7 +14,7 @@ The signal type is `execute-snapshot`, and the `data` field must have the follow
|`type`
|`incremental`
| The type of the snapshot to be executed.
-Currently {prodname} supports only the `incremental` type. +
+Currently {prodname} supports the `incremental` and `blocking` types. +
See the next section for more details.
|`data-collections`
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc
index c1de9c15212..6536faf880a 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc
@@ -4,8 +4,8 @@ You submit a signal to the signaling {data-collection} by using the MongoDB `ins
After {prodname} detects the change in the signaling {data-collection}, it reads the signal, and runs the requested snapshot operation.
-The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the kind of snapshot operation.
-Currently, the only valid option for snapshots operations is the default value, `incremental`.
+The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the type of snapshot operation.
+Currently, the only valid options for snapshots operations are `incremental`and `blocking`.
To specify the {data-collection}s to include in the snapshot, provide a `data-collections` array that lists the {data-collection}s or an array of regular expressions used to match {data-collection}s, for example, +
`{"data-collections": ["public.Collection1", "public.Collection2"]}` +
@@ -80,8 +80,8 @@ The array lists regular expressions which match {data-collection}s by their full
|5
|`incremental`
-|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. +
+Currently supports the `incremental` and `blocking` types. +
If you do not specify a value, the connector runs an incremental snapshot.
|===
@@ -111,7 +111,7 @@ The following example, shows the JSON for an incremental snapshot event that is
|1
|`snapshot`
|Specifies the type of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+Currently, the only valid options are `blocking` and `incremental`. +
Specifying a `type` value in the SQL query that you submit to the signaling {data-collection} is optional. +
If you do not specify a value, the connector runs an incremental snapshot.
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc
index 9e1c81271bc..d43eb9ed198 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot-sql.adoc
@@ -4,8 +4,8 @@ You submit a signal to the signaling {data-collection} as SQL `INSERT` queries.
After {prodname} detects the change in the signaling {data-collection}, it reads the signal, and runs the requested snapshot operation.
-The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the kind of snapshot operation.
-Currently, the only valid option for snapshots operations is the default value, `incremental`.
+The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the type of snapshot operation.
+Currently supports the `incremental` and `blocking` types.
To specify the {data-collection}s to include in the snapshot, provide a `data-collections` array that lists the {data-collection}s or an array of regular expressions used to match {data-collection}s, for example, +
@@ -79,8 +79,8 @@ The array lists regular expressions which match {data-collection}s by their full
|5
|`incremental`
-|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. +
+Currently supports the `incremental` and `blocking` types. +
If you do not specify a value, the connector runs an incremental snapshot.
|6
@@ -164,7 +164,7 @@ The following example, shows the JSON for an incremental snapshot event that is
|1
|`snapshot`
|Specifies the type of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+Currently, the only valid options are `blocking` and `incremental`. +
Specifying a `type` value in the SQL query that you submit to the signaling {data-collection} is optional. +
If you do not specify a value, the connector runs an incremental snapshot.
diff --git a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc
index ac3dc1e8726..7d062965b30 100644
--- a/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc
+++ b/documentation/modules/ROOT/partials/modules/all-connectors/proc-triggering-an-incremental-snapshot.adoc
@@ -8,8 +8,8 @@ end::nosql-based-snapshot[]
After {prodname} detects the change in the signaling {data-collection}, it reads the signal, and runs the requested snapshot operation.
-The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the kind of snapshot operation.
-Currently, the only valid option for snapshots operations is the default value, `incremental`.
+The query that you submit specifies the {data-collection}s to include in the snapshot, and, optionally, specifies the type of snapshot operation.
+Currently, the only valid options for snapshots operations are `incremental` and `blocking`.
To specify the {data-collection}s to include in the snapshot, provide a `data-collections` array that lists the {data-collection}s or an array of regular expressions used to match {data-collection}s, for example, +
tag::sql-based-snapshot[]
@@ -120,8 +120,8 @@ The array lists regular expressions which match {data-collection}s by their full
|5
|`incremental`
-|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. +
+Currently supports the `incremental` and `blocking` types. +
If you do not specify a value, the connector runs an incremental snapshot.
|6
@@ -214,8 +214,8 @@ Rather, during the snapshot, {prodname} generates its own `id` string as a water
The array lists regular expressions which match {data-collection}s by their fully-qualified names, using the same format as you use to specify the name of the connector's signaling {data-collection} in the xref:{context}-property-signal-data-collection[`signal.data.collection`] configuration property.
|`incremental`
-|An optional `type` component of the `data` field of a signal that specifies the kind of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+|An optional `type` component of the `data` field of a signal that specifies the type of snapshot operation to run. +
+Currently supports the `incremental` and `blocking` types. +
If you do not specify a value, the connector runs an incremental snapshot.
|===
@@ -247,7 +247,7 @@ The following example, shows the JSON for an incremental snapshot event that is
|1
|`snapshot`
|Specifies the type of snapshot operation to run. +
-Currently, the only valid option is the default value, `incremental`. +
+Currently, the only valid options are `blocking` and `incremental`. +
Specifying a `type` value in the SQL query that you submit to the signaling {data-collection} is optional. +
If you do not specify a value, the connector runs an incremental snapshot.
diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt
index f1f5f6e4b92..d31fbeb58d3 100644
--- a/jenkins-jobs/scripts/config/Aliases.txt
+++ b/jenkins-jobs/scripts/config/Aliases.txt
@@ -249,3 +249,4 @@ Lourens Naude,Lourens Naudé
overwatcheddude,حمود سمبول
wukachn,Peter Hamer
shybovycha,Artem Shubovych
+Liaoyuxing,leoloel
\ No newline at end of file