diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 1b734b10ac9..ec6cfcd8a1a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) { + "The value of '" + DataQueryMode.DIRECT.getValue() + "' makes the connector to query the change tables directly."); + public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size") + .withDisplayName("Streaming fetch size") + .withDefault(0) + .withType(Type.INT) + .withImportance(Importance.LOW) + .withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. " + + "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit."); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("SQL Server") .type( @@ -498,7 +506,8 @@ public static DataQueryMode parse(String value, String defaultValue) { INCREMENTAL_SNAPSHOT_CHUNK_SIZE, INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, QUERY_FETCH_SIZE, - DATA_QUERY_MODE) + DATA_QUERY_MODE, + STREAMING_FETCH_SIZE) .events(SOURCE_INFO_STRUCT_MAKER) .excluding( SCHEMA_INCLUDE_LIST, @@ -525,6 +534,7 @@ public static ConfigDef configDef() { private final boolean optionRecompile; private final int queryFetchSize; private final DataQueryMode dataQueryMode; + private final int streamingFetchSize; public SqlServerConnectorConfig(Configuration config) { super( @@ -568,6 +578,7 @@ public SqlServerConnectorConfig(Configuration config) { this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE); } public List getDatabaseNames() { @@ -717,4 +728,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel return count; } + + public int getStreamingFetchSize() { + return streamingFetchSize; + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 9135a7d1053..648d1aaa6e7 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -240,10 +240,8 @@ else if (!checkAgent) { final SqlServerChangeTable[] tables = tablesSlot.get(); changeTables = new SqlServerChangeTablePointer[tables.length]; - final int maxRowsPerResultSet = 3; - for (int i = 0; i < tables.length; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, connectorConfig.getStreamingFetchSize()); changeTables[i].next(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index d521703964a..d64ffe284b3 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.management.InstanceNotFoundException; @@ -125,12 +126,39 @@ public void after() throws SQLException { } @Test - public void createAndDelete() throws Exception { + public void createAndDeleteWithDataQueryModeFunctionWithoutFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.FUNCTION) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 0)); + } + + @Test + public void createAndDeleteWithDataQueryModeFunctionWithFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.FUNCTION) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)); + } + + @Test + public void createAndDeleteWithDataQueryModeDirectWithoutFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 0)); + } + + @Test + public void createAndDeleteWithDataQueryModeDirectWithFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)); + } + + private void createAndDelete(UnaryOperator configAugmenter) throws Exception { final int RECORDS_PER_TABLE = 5; final int TABLES = 2; final int ID_START = 10; - final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + final Configuration config = configAugmenter.apply(TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)) .build(); start(SqlServerConnector.class, config); @@ -208,6 +236,7 @@ public void createAndDeleteInDataQueryDirectMode() throws Exception { final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3) .build(); start(SqlServerConnector.class, config); diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index f1603295e6f..5b9f2758fd2 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -3310,6 +3310,12 @@ After the next failure, the connector stops, and user intervention is required t |`600000` (10 minutes) |Specifies the time, in milliseconds, that the connector waits for a query to complete. Set the value to `0` (zero) to remove the timeout limit. + +|[[sqlserver-property-streaming-fetch-size]]<> +|`0` +|Specifies the maximum number of rows that should be read in one go from each table while streaming. +The connector will read the table contents in multiple batches of this size. Defaults to `0` which means no limit. + |=== [id="debezium-sqlserver-connector-database-history-configuration-properties"]