Skip to content

Commit

Permalink
CXP-2956: Make the keyset pagination threshold configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Aug 12, 2024
1 parent 76d69b6 commit 347d4df
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) {
+ "The value of '" + DataQueryMode.DIRECT.getValue()
+ "' makes the connector to query the change tables directly.");

public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size")
.withDisplayName("Streaming fetch size")
.withDefault(0)
.withType(Type.INT)
.withImportance(Importance.LOW)
.withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. "
+ "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
Expand All @@ -497,7 +505,8 @@ public static DataQueryMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
QUERY_FETCH_SIZE,
DATA_QUERY_MODE)
DATA_QUERY_MODE,
STREAMING_FETCH_SIZE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
Expand All @@ -524,6 +533,7 @@ public static ConfigDef configDef() {
private final boolean optionRecompile;
private final int queryFetchSize;
private final DataQueryMode dataQueryMode;
private final int streamingFetchSize;

public SqlServerConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -567,6 +577,7 @@ public SqlServerConnectorConfig(Configuration config) {

this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE);
}

public List<String> getDatabaseNames() {
Expand Down Expand Up @@ -716,4 +727,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel

return count;
}

public int getStreamingFetchSize() {
return streamingFetchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,8 @@ else if (!checkAgent) {
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

final int maxRowsPerResultSet = 3;

for (int i = 0; i < tables.length; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, connectorConfig.getStreamingFetchSize());
changeTables[i].next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public void createAndDeleteInDataQueryDirectMode() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
.with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)
.build();

start(SqlServerConnector.class, config);
Expand Down

0 comments on commit 347d4df

Please sign in to comment.