Skip to content

Commit

Permalink
CXP-2841: [SPIKE] Implement keyset pagination in debezium sql server …
Browse files Browse the repository at this point in the history
…connector
  • Loading branch information
ramanenka committed Nov 28, 2023
1 parent 7c8892c commit 03e626f
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private static final int COL_DATA = 5;

private ResultSetMapper<Object[]> resultSetMapper;
private final ResultSet resultSet;
private final int columnDataOffset;
private final SqlServerConnection connection;
private final Lsn fromLsn;
private final Lsn toLsn;
private final int maxRowsPerTablePerIteration;

public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) {
super(changeTable, resultSet, COL_DATA);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerTablePerIteration) {
super(changeTable, COL_DATA, maxRowsPerTablePerIteration);
// Store references to these because we can't get them from our superclass
this.resultSet = resultSet;
this.columnDataOffset = COL_DATA;
}

protected ResultSet getResultSet() {
return resultSet;
this.connection = connection;
this.fromLsn = fromLsn;
this.toLsn = toLsn;
this.maxRowsPerTablePerIteration = maxRowsPerTablePerIteration;
}

@Override
Expand All @@ -76,7 +78,10 @@ protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLE
@Override
protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLException {
return isCompleted() ? TxLogPosition.NULL
: TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)));
: TxLogPosition.valueOf(
Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)),
Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)),
resultSet.getInt(COL_OPERATION));
}

/**
Expand All @@ -89,12 +94,23 @@ protected boolean isNewTransaction() throws SQLException {
getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0;
}

@Override
protected ResultSet nextResultSet(TxLogPosition lastPositionSeen) throws SQLException {
if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerTablePerIteration);
}
else {
return connection.getChangesForTableAfter(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(),
lastPositionSeen.getOperation(), toLsn, maxRowsPerTablePerIteration);
}
}

@Override
public Object[] getData() throws SQLException {
if (resultSetMapper == null) {
this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable());
}
return resultSetMapper.apply(resultSet);
return resultSetMapper.apply(getResultSet());
}

/**
Expand All @@ -110,7 +126,7 @@ public Object[] getData() throws SQLException {
*/
private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLException {
ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table);
final ResultSetMetaData rsmd = resultSet.getMetaData();
final ResultSetMetaData rsmd = getResultSet().getMetaData();
final int columnCount = rsmd.getColumnCount() - columnDataOffset;
final List<String> resultColumns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
Expand Down Expand Up @@ -351,6 +352,67 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan
prepareQuery(queries, preparers, consumer);
}

/**
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTables - the requested tables to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @throws SQLException
*/
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn, int maxRows) throws SQLException {
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, changeTable.getSourceTableId().catalog())
.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance())
.replace("SELECT", String.format("SELECT TOP %d ", maxRows));
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(changeTable.getSourceTableId().catalog(), changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());

return statement.executeQuery();
}

public ResultSet getChangesForTableAfter(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn,
int operationFrom, Lsn intervalToLsn, int maxRows)
throws SQLException {
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, changeTable.getSourceTableId().catalog())
.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance())
.replace("SELECT", String.format("SELECT TOP %d ", maxRows))
.replace("order by",
"WHERE ([__$start_lsn]=? AND [__$seqval]=? AND [__$operation]>?) OR ([__$start_lsn]=? AND [__$seqval]>?) OR ([__$start_lsn]>?) order by");

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(changeTable.getSourceTableId().catalog(), changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
statement.setBytes(3, fromLsn.getBinary());
statement.setBytes(4, seqvalFromLsn.getBinary());
statement.setInt(5, operationFrom);
statement.setBytes(6, fromLsn.getBinary());
statement.setBytes(7, seqvalFromLsn.getBinary());
statement.setBytes(8, fromLsn.getBinary());

return statement.executeQuery();
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance());
Expand Down
Loading

0 comments on commit 03e626f

Please sign in to comment.