Skip to content

Commit

Permalink
CXP-2956: Implement keyset pagination of CDC queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Jul 15, 2024
1 parent 95aaf65 commit a660230
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private final SqlServerConnection connection;
private final Lsn fromLsn;
private final Lsn toLsn;
private final int maxRowsPerResultSet;

public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn) {
super(changeTable, COL_DATA);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerResultSet) {
super(changeTable, COL_DATA, maxRowsPerResultSet);
// Store references to these because we can't get them from our superclass
this.columnDataOffset = COL_DATA;
this.connection = connection;
this.fromLsn = fromLsn;
this.toLsn = toLsn;
this.maxRowsPerResultSet = maxRowsPerResultSet;
}

@Override
Expand Down Expand Up @@ -90,8 +92,13 @@ protected boolean isNewTransaction() throws SQLException {
}

@Override
protected ResultSet getNextResultSet() throws SQLException {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn);
protected ResultSet getNextResultSet(TxLogPosition lastPositionSeen) throws SQLException {
if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerResultSet);
}
else {
return connection.getChangesForTableAfter(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(), toLsn, maxRowsPerResultSet);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,41 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
}, "Minimum LSN query must return exactly one value"));
}

private String prepareGetChangesForTableQuery(SqlServerChangeTable changeTable, int maxRows) {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
if (maxRows > 0) {
query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows));
}
return query;
}

/**
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param maxRows - the max number of rows to return, set 0 for no limit
* @throws SQLException
*/
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
Lsn intervalToLsn, int maxRows)
throws SQLException {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
String query = prepareGetChangesForTableQuery(changeTable, maxRows);

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range [{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();
Expand All @@ -380,9 +389,41 @@ public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn interv
return statement.executeQuery();
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
throws SQLException {
return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0);
}

public ResultSet getChangesForTableAfter(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn,
Lsn intervalToLsn, int maxRows)
throws SQLException {
String query = prepareGetChangesForTableQuery(changeTable, maxRows);
query = query.replace("[__$start_lsn] >= ? AND [__$start_lsn] <= ?",
"([__$start_lsn] > ? OR [__$start_lsn] = ? AND [__$seqval] > ?) AND [__$start_lsn] <= ?");

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range [{}-{}, {}]", changeTable, fromLsn, seqvalFromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, fromLsn.getBinary());
statement.setBytes(3, seqvalFromLsn.getBinary());
statement.setBytes(4, intervalToLsn.getBinary());

return statement.executeQuery();
}

private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance());
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ else if (!checkAgent) {
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

final int maxRowsPerResultSet = connectorConfig.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT ? 1000 : 0;

for (int i = 0; i < tables.length; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet);
changeTables[i].next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ public abstract class ChangeTableResultSet<C extends ChangeTable, T extends Comp
private final C changeTable;
private ResultSet resultSet;
private final int columnDataOffset;
private final int maxRowsPerResultSet;
private int rowsReadPerResultSet;
private boolean completed = false;
private T currentChangePosition;
private T previousChangePosition;

public ChangeTableResultSet(C changeTable, int columnDataOffset) {
public ChangeTableResultSet(C changeTable, int columnDataOffset, int maxRowsPerResultSet) {
this.changeTable = changeTable;
this.columnDataOffset = columnDataOffset;
this.maxRowsPerResultSet = maxRowsPerResultSet;
}

public C getChangeTable() {
Expand All @@ -62,13 +65,30 @@ public ResultSet getResultSet() {
return resultSet;
}

protected abstract ResultSet getNextResultSet() throws SQLException;
protected abstract ResultSet getNextResultSet(T lastChangePositionSeen) throws SQLException;

public boolean next() throws SQLException {
if (resultSet == null) {
resultSet = getNextResultSet();
resultSet = getNextResultSet(currentChangePosition);
rowsReadPerResultSet = 0;
}
completed = !resultSet.next();

if (resultSet.next()) {
rowsReadPerResultSet++;
}
else {
if (maxRowsPerResultSet > 0 && rowsReadPerResultSet > maxRowsPerResultSet) {
throw new RuntimeException("Number of rows read from the result set is greater than the configured max rows per a result set");
}

if (maxRowsPerResultSet > 0 && rowsReadPerResultSet == maxRowsPerResultSet) {
close();
return next();
}

completed = true;
}

previousChangePosition = currentChangePosition;
currentChangePosition = getNextChangePosition(resultSet);
if (completed) {
Expand All @@ -80,7 +100,10 @@ public boolean next() throws SQLException {
public void close() {
LOGGER.trace("Closing result set of change tables for table {}", changeTable);
try {
resultSet.close();
if (resultSet != null) {
resultSet.close();
resultSet = null;
}
}
catch (Exception e) {
// ignore
Expand Down

0 comments on commit a660230

Please sign in to comment.