Skip to content

Commit

Permalink
CXP-2956: Implement keyset pagination in debezium sql server connector (
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Dec 16, 2024
1 parent b926c5c commit fc68fb6
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class Lsn implements Comparable<Lsn>, Nullable {

public static final Lsn NULL = new Lsn(null);

public static final Lsn ZERO = valueOf(new byte[10]);

private final byte[] binary;
private int[] unsignedBinary;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private static final int COL_DATA = 5;

private ResultSetMapper<Object[]> resultSetMapper;
private final ResultSet resultSet;
private final int columnDataOffset;
private final SqlServerConnection connection;
private final Lsn fromLsn;
private final Lsn toLsn;
private final int maxRowsPerResultSet;

public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) {
super(changeTable, resultSet, COL_DATA);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerResultSet) {
super(changeTable, COL_DATA, maxRowsPerResultSet);
// Store references to these because we can't get them from our superclass
this.resultSet = resultSet;
this.columnDataOffset = COL_DATA;
}

protected ResultSet getResultSet() {
return resultSet;
this.connection = connection;
this.fromLsn = fromLsn;
this.toLsn = toLsn;
this.maxRowsPerResultSet = maxRowsPerResultSet;
}

@Override
Expand All @@ -76,7 +78,10 @@ protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLE
@Override
protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLException {
return isCompleted() ? TxLogPosition.NULL
: TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)));
: TxLogPosition.valueOf(
Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)),
Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)),
resultSet.getInt(COL_OPERATION));
}

/**
Expand All @@ -89,12 +94,23 @@ protected boolean isNewTransaction() throws SQLException {
getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0;
}

@Override
protected ResultSet getNextResultSet(TxLogPosition lastPositionSeen) throws SQLException {
if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerResultSet);
}
else {
return connection.getChangesForTable(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(), lastPositionSeen.getOperation(),
toLsn, maxRowsPerResultSet);
}
}

@Override
public Object[] getData() throws SQLException {
if (resultSetMapper == null) {
this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable());
}
return resultSetMapper.apply(resultSet);
return resultSetMapper.apply(getResultSet());
}

/**
Expand All @@ -110,7 +126,7 @@ public Object[] getData() throws SQLException {
*/
private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLException {
ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table);
final ResultSetMetaData rsmd = resultSet.getMetaData();
final ResultSetMetaData rsmd = getResultSet().getMetaData();
final int columnCount = rsmd.getColumnCount() - columnDataOffset;
final List<String> resultColumns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ private String buildGetAllChangesForTableQuery(SqlServerConnectorConfig.DataQuer
break;
case DIRECT:
result += GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT + " ";
where.add("[__$start_lsn] >= ?");
where.add("[__$start_lsn] <= ?");
break;
}
where.add("(([__$start_lsn] = ? AND [__$seqval] = ? AND [__$operation] > ?) " +
"OR ([__$start_lsn] = ? AND [__$seqval] > ?) " +
"OR ([__$start_lsn] > ?))");
where.add("[__$start_lsn] <= ?");

if (hasSkippedOperations(skippedOperations)) {
Set<String> skippedOps = new HashSet<>();
Expand Down Expand Up @@ -344,51 +346,74 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
/**
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTables - the requested tables to obtain changes for
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param seqvalFromLsn - in-transaction sequence value to start after, pass {@link Lsn#ZERO} to fetch all sequence values
* @param operationFrom - operation number to start after, pass 0 to fetch all operations
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @param maxRows - the max number of rows to return, pass 0 for no limit
* @throws SQLException
*/
public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn,
Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer)
throws SQLException, InterruptedException {
final String[] queries = new String[changeTables.length];
final StatementPreparer[] preparers = new StatementPreparer[changeTables.length];

int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
};
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, int operationFrom,
Lsn intervalToLsn, int maxRows)
throws SQLException {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));

idx++;
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
prepareQuery(queries, preparers, consumer);

String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);

if (maxRows > 0) {
query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows));
}

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting {} changes for table {} in range [{}-{}-{}, {}]", maxRows > 0 ? "top " + maxRows : "", changeTable, fromLsn, seqvalFromLsn, operationFrom,
intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}

int paramIndex = 1;
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.FUNCTION) {
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, intervalToLsn.getBinary());
}
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, seqvalFromLsn.getBinary());
statement.setInt(paramIndex++, operationFrom);
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, seqvalFromLsn.getBinary());
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, intervalToLsn.getBinary());

return statement.executeQuery();
}

public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn, int maxRows) throws SQLException {
return getChangesForTable(changeTable, intervalFromLsn, Lsn.ZERO, 0, intervalToLsn, maxRows);
}

public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn) throws SQLException {
return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0);
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance());
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) {
+ "The value of '" + DataQueryMode.DIRECT.getValue()
+ "' makes the connector to query the change tables directly.");

public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size")
.withDisplayName("Streaming fetch size")
.withDefault(0)
.withType(Type.INT)
.withImportance(Importance.LOW)
.withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. "
+ "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
Expand All @@ -498,7 +506,8 @@ public static DataQueryMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
QUERY_FETCH_SIZE,
DATA_QUERY_MODE)
DATA_QUERY_MODE,
STREAMING_FETCH_SIZE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
Expand All @@ -525,6 +534,7 @@ public static ConfigDef configDef() {
private final boolean optionRecompile;
private final int queryFetchSize;
private final DataQueryMode dataQueryMode;
private final int streamingFetchSize;

public SqlServerConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -568,6 +578,7 @@ public SqlServerConnectorConfig(Configuration config) {

this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE);
}

public List<String> getDatabaseNames() {
Expand Down Expand Up @@ -717,4 +728,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel

return count;
}

public int getStreamingFetchSize() {
return streamingFetchSize;
}
}
Loading

0 comments on commit fc68fb6

Please sign in to comment.