Skip to content

Commit

Permalink
CXP-2956: Make the keyset pagination threshold configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Oct 29, 2024
1 parent 4afe7b6 commit cdf3d8c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) {
+ "The value of '" + DataQueryMode.DIRECT.getValue()
+ "' makes the connector to query the change tables directly.");

public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size")
.withDisplayName("Streaming fetch size")
.withDefault(0)
.withType(Type.INT)
.withImportance(Importance.LOW)
.withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. "
+ "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
Expand All @@ -498,7 +506,8 @@ public static DataQueryMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
QUERY_FETCH_SIZE,
DATA_QUERY_MODE)
DATA_QUERY_MODE,
STREAMING_FETCH_SIZE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
Expand All @@ -525,6 +534,7 @@ public static ConfigDef configDef() {
private final boolean optionRecompile;
private final int queryFetchSize;
private final DataQueryMode dataQueryMode;
private final int streamingFetchSize;

public SqlServerConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -568,6 +578,7 @@ public SqlServerConnectorConfig(Configuration config) {

this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE);
}

public List<String> getDatabaseNames() {
Expand Down Expand Up @@ -717,4 +728,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel

return count;
}

public int getStreamingFetchSize() {
return streamingFetchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,8 @@ else if (!checkAgent) {
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

final int maxRowsPerResultSet = 3;

for (int i = 0; i < tables.length; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, connectorConfig.getStreamingFetchSize());
changeTables[i].next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import javax.management.InstanceNotFoundException;
Expand Down Expand Up @@ -125,12 +126,39 @@ public void after() throws SQLException {
}

@Test
public void createAndDelete() throws Exception {
public void createAndDeleteWithDataQueryModeFunctionWithoutFetchThreshold() throws Exception {
createAndDelete(builder -> builder
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.FUNCTION)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 0));
}

@Test
public void createAndDeleteWithDataQueryModeFunctionWithFetchThreshold() throws Exception {
createAndDelete(builder -> builder
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.FUNCTION)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3));
}

@Test
public void createAndDeleteWithDataQueryModeDirectWithoutFetchThreshold() throws Exception {
createAndDelete(builder -> builder
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 0));
}

@Test
public void createAndDeleteWithDataQueryModeDirectWithFetchThreshold() throws Exception {
createAndDelete(builder -> builder
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3));
}

private void createAndDelete(UnaryOperator<Configuration.Builder> configAugmenter) throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
final Configuration config = configAugmenter.apply(TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL))
.build();

start(SqlServerConnector.class, config);
Expand Down Expand Up @@ -208,6 +236,7 @@ public void createAndDeleteInDataQueryDirectMode() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)
.build();

start(SqlServerConnector.class, config);
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/sqlserver.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3310,6 +3310,12 @@ After the next failure, the connector stops, and user intervention is required t
|`600000` (10 minutes)
|Specifies the time, in milliseconds, that the connector waits for a query to complete.
Set the value to `0` (zero) to remove the timeout limit.

|[[sqlserver-property-streaming-fetch-size]]<<sqlserver-property-streaming-fetch-size, `streaming.fetch.size`>>
|`0`
|Specifies the maximum number of rows that should be read in one go from each table while streaming.
The connector will read the table contents in multiple batches of this size. Defaults to `0` which means no limit.

|===

[id="debezium-sqlserver-connector-database-history-configuration-properties"]
Expand Down

0 comments on commit cdf3d8c

Please sign in to comment.