Skip to content

Commit

Permalink
CXP-2956: Make the keyset pagination threshold configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Aug 12, 2024
1 parent 76d69b6 commit dd2076c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) {
+ "The value of '" + DataQueryMode.DIRECT.getValue()
+ "' makes the connector to query the change tables directly.");

public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size")
.withDisplayName("Streaming fetch size")
.withDefault(0)
.withType(Type.INT)
.withImportance(Importance.LOW)
.withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. "
+ "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
Expand All @@ -497,7 +505,8 @@ public static DataQueryMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
QUERY_FETCH_SIZE,
DATA_QUERY_MODE)
DATA_QUERY_MODE,
STREAMING_FETCH_SIZE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
Expand All @@ -524,6 +533,7 @@ public static ConfigDef configDef() {
private final boolean optionRecompile;
private final int queryFetchSize;
private final DataQueryMode dataQueryMode;
private final int streamingFetchSize;

public SqlServerConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -567,6 +577,7 @@ public SqlServerConnectorConfig(Configuration config) {

this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE);
}

public List<String> getDatabaseNames() {
Expand Down Expand Up @@ -716,4 +727,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel

return count;
}

public int getStreamingFetchSize() {
return streamingFetchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,8 @@ else if (!checkAgent) {
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

final int maxRowsPerResultSet = 3;

for (int i = 0; i < tables.length; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, connectorConfig.getStreamingFetchSize());
changeTables[i].next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public void createAndDeleteInDataQueryDirectMode() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)
.build();

start(SqlServerConnector.class, config);
Expand Down
5 changes: 5 additions & 0 deletions documentation/modules/ROOT/pages/connectors/sqlserver.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3269,6 +3269,11 @@ After the next failure, the connector stops, and user intervention is required t
* `function`: The data is queried by calling `cdc.[fn_cdc_get_all_changes_#]` function. This is the default mode.
* `direct`: Makes the connector to query change tables directly. Switching to `direct` mode and creating an index on `(\\__$start_lsn ASC, __$seqval ASC, __$operation ASC)` columns for each change table significantly speeds up querying CDC data.

|[[sqlserver-property-streaming-fetch-size]]<<sqlserver-property-streaming-fetch-size, `streaming.fetch.size`>>
|`0`
|Specifies the maximum number of rows that should be read in one go from each table while streaming.
The connector will read the table contents in multiple batches of this size. Defaults to `0` which means no limit.

|===

[id="debezium-sqlserver-connector-database-history-configuration-properties"]
Expand Down

0 comments on commit dd2076c

Please sign in to comment.