diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 68145dff0dc..4db674d9413 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) { + "The value of '" + DataQueryMode.DIRECT.getValue() + "' makes the connector to query the change tables directly."); + public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size") + .withDisplayName("Streaming fetch size") + .withDefault(0) + .withType(Type.INT) + .withImportance(Importance.LOW) + .withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. " + + "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit."); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("SQL Server") .type( @@ -497,7 +505,8 @@ public static DataQueryMode parse(String value, String defaultValue) { INCREMENTAL_SNAPSHOT_CHUNK_SIZE, INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, QUERY_FETCH_SIZE, - DATA_QUERY_MODE) + DATA_QUERY_MODE, + STREAMING_FETCH_SIZE) .events(SOURCE_INFO_STRUCT_MAKER) .excluding( SCHEMA_INCLUDE_LIST, @@ -524,6 +533,7 @@ public static ConfigDef configDef() { private final boolean optionRecompile; private final int queryFetchSize; private final DataQueryMode dataQueryMode; + private final int streamingFetchSize; public SqlServerConnectorConfig(Configuration config) { super( @@ -567,6 +577,7 @@ public SqlServerConnectorConfig(Configuration config) { this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE); } public List getDatabaseNames() { @@ -716,4 +727,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel return count; } + + public int getStreamingFetchSize() { + return streamingFetchSize; + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index a156f131870..73da907b463 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -238,10 +238,8 @@ else if (!checkAgent) { final SqlServerChangeTable[] tables = tablesSlot.get(); changeTables = new SqlServerChangeTablePointer[tables.length]; - final int maxRowsPerResultSet = 3; - for (int i = 0; i < tables.length; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, connectorConfig.getStreamingFetchSize()); changeTables[i].next(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 891170c1006..92902309d62 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -207,6 +207,7 @@ public void createAndDeleteInDataQueryDirectMode() throws Exception { final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3) .build(); start(SqlServerConnector.class, config); diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index 20fe1c2ebc3..d38d0625370 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -3269,6 +3269,11 @@ After the next failure, the connector stops, and user intervention is required t * `function`: The data is queried by calling `cdc.[fn_cdc_get_all_changes_#]` function. This is the default mode. * `direct`: Makes the connector to query change tables directly. Switching to `direct` mode and creating an index on `(\\__$start_lsn ASC, __$seqval ASC, __$operation ASC)` columns for each change table significantly speeds up querying CDC data. +|[[sqlserver-property-streaming-fetch-size]]<> +|`0` +|Specifies the maximum number of rows that should be read in one go from each table while streaming. +The connector will read the table contents in multiple batches of this size. Defaults to `0` which means no limit. + |=== [id="debezium-sqlserver-connector-database-history-configuration-properties"]