Skip to content

Commit

Permalink
CXP-2956: Implement keyset pagination of CDC queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Aug 7, 2024
1 parent f3886e7 commit 43afa6f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private final SqlServerConnection connection;
private final Lsn fromLsn;
private final Lsn toLsn;
private final int maxRowsPerResultSet;

public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn) {
super(changeTable, COL_DATA);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerResultSet) {
super(changeTable, COL_DATA, maxRowsPerResultSet);
// Store references to these because we can't get them from our superclass
this.columnDataOffset = COL_DATA;
this.connection = connection;
this.fromLsn = fromLsn;
this.toLsn = toLsn;
this.maxRowsPerResultSet = maxRowsPerResultSet;
}

@Override
Expand All @@ -76,7 +78,10 @@ protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLE
@Override
protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLException {
return isCompleted() ? TxLogPosition.NULL
: TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)));
: TxLogPosition.valueOf(
Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)),
Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)),
resultSet.getInt(COL_OPERATION));
}

/**
Expand All @@ -90,8 +95,13 @@ protected boolean isNewTransaction() throws SQLException {
}

@Override
protected ResultSet getNextResultSet() throws SQLException {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn);
protected ResultSet getNextResultSet(TxLogPosition lastPositionSeen) throws SQLException {
if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerResultSet);
}
else {
return connection.getChangesForTableAfter(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(), lastPositionSeen.getOperation(), toLsn, maxRowsPerResultSet);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,41 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
}, "Minimum LSN query must return exactly one value"));
}

private String prepareGetChangesForTableQuery(SqlServerChangeTable changeTable, int maxRows) {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
if (maxRows > 0) {
query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows));
}
return query;
}

/**
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param maxRows - the max number of rows to return, set 0 for no limit
* @throws SQLException
*/
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
Lsn intervalToLsn, int maxRows)
throws SQLException {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
String query = prepareGetChangesForTableQuery(changeTable, maxRows);

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range [{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();
Expand All @@ -380,9 +389,46 @@ public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn interv
return statement.executeQuery();
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
throws SQLException {
return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0);
}

public ResultSet getChangesForTableAfter(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, int operationFrom,
Lsn intervalToLsn, int maxRows)
throws SQLException {
String query = prepareGetChangesForTableQuery(changeTable, maxRows);
query = query.replace("[__$start_lsn] >= ? AND [__$start_lsn] <= ?",
"(([__$start_lsn] = ? AND [__$seqval] = ? AND [__$operation] > ?) " +
"OR ([__$start_lsn] = ? AND [__$seqval] > ?) " +
"OR ([__$start_lsn] > ?)) AND [__$start_lsn] <= ?");

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range [{}-{}, {}]", changeTable, fromLsn, seqvalFromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, seqvalFromLsn.getBinary());
statement.setInt(3, operationFrom);
statement.setBytes(4, fromLsn.getBinary());
statement.setBytes(5, seqvalFromLsn.getBinary());
statement.setBytes(6, fromLsn.getBinary());
statement.setBytes(7, intervalToLsn.getBinary());

return statement.executeQuery();
}

private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance());
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ else if (!checkAgent) {
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

final int maxRowsPerResultSet = connectorConfig.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT ? 3 : 0;

for (int i = 0; i < tables.length; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet);
changeTables[i].next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
public class TxLogPosition implements Nullable, Comparable<TxLogPosition> {

public static final TxLogPosition NULL = new TxLogPosition(null, null);
public static final TxLogPosition NULL = new TxLogPosition(null, null, 0);
private final Lsn commitLsn;
private final Lsn inTxLsn;
private int operation;

private TxLogPosition(Lsn commitLsn, Lsn inTxLsn) {
private TxLogPosition(Lsn commitLsn, Lsn inTxLsn, int operation) {
this.commitLsn = commitLsn;
this.inTxLsn = inTxLsn;
this.operation = operation;
}

public Lsn getCommitLsn() {
Expand All @@ -35,9 +37,13 @@ public Lsn getInTxLsn() {
return inTxLsn;
}

public int getOperation() {
return operation;
}

@Override
public String toString() {
return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + ")";
return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + "," + operation + ")";
}

@Override
Expand All @@ -46,6 +52,7 @@ public int hashCode() {
int result = 1;
result = prime * result + ((commitLsn == null) ? 0 : commitLsn.hashCode());
result = prime * result + ((inTxLsn == null) ? 0 : inTxLsn.hashCode());
result = prime * result + operation;
return result;
}

Expand Down Expand Up @@ -86,15 +93,20 @@ public int compareTo(TxLogPosition o) {
return comparison == 0 ? inTxLsn.compareTo(o.inTxLsn) : comparison;
}

public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) {
public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn, int operation) {
return commitLsn == null && inTxLsn == null ? NULL
: new TxLogPosition(
commitLsn == null ? Lsn.NULL : commitLsn,
inTxLsn == null ? Lsn.NULL : inTxLsn);
inTxLsn == null ? Lsn.NULL : inTxLsn,
operation);
}

public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) {
return valueOf(commitLsn, inTxLsn, 0);
}

public static TxLogPosition valueOf(Lsn commitLsn) {
return valueOf(commitLsn, Lsn.NULL);
return valueOf(commitLsn, Lsn.NULL, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public static Configuration.Builder defaultConnectorConfig() {
return builder.with(CommonConnectorConfig.TOPIC_PREFIX, "server1")
.with(SqlServerConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ public abstract class ChangeTableResultSet<C extends ChangeTable, T extends Comp
private final C changeTable;
private ResultSet resultSet;
private final int columnDataOffset;
private final int maxRowsPerResultSet;
private int rowsReadPerResultSet;
private boolean completed = false;
private T currentChangePosition;
private T previousChangePosition;

public ChangeTableResultSet(C changeTable, int columnDataOffset) {
public ChangeTableResultSet(C changeTable, int columnDataOffset, int maxRowsPerResultSet) {
this.changeTable = changeTable;
this.columnDataOffset = columnDataOffset;
this.maxRowsPerResultSet = maxRowsPerResultSet;
}

public C getChangeTable() {
Expand All @@ -62,13 +65,30 @@ public ResultSet getResultSet() {
return resultSet;
}

protected abstract ResultSet getNextResultSet() throws SQLException;
protected abstract ResultSet getNextResultSet(T lastChangePositionSeen) throws SQLException;

public boolean next() throws SQLException {
if (resultSet == null) {
resultSet = getNextResultSet();
resultSet = getNextResultSet(currentChangePosition);
rowsReadPerResultSet = 0;
}
completed = !resultSet.next();

if (resultSet.next()) {
rowsReadPerResultSet++;
}
else {
if (maxRowsPerResultSet > 0 && rowsReadPerResultSet > maxRowsPerResultSet) {
throw new RuntimeException("Number of rows read from the result set is greater than the configured max rows per a result set");
}

if (maxRowsPerResultSet > 0 && rowsReadPerResultSet == maxRowsPerResultSet) {
close();
return next();
}

completed = true;
}

previousChangePosition = currentChangePosition;
currentChangePosition = getNextChangePosition(resultSet);
if (completed) {
Expand All @@ -80,7 +100,10 @@ public boolean next() throws SQLException {
public void close() {
LOGGER.trace("Closing result set of change tables for table {}", changeTable);
try {
resultSet.close();
if (resultSet != null) {
resultSet.close();
resultSet = null;
}
}
catch (Exception e) {
// ignore
Expand Down

0 comments on commit 43afa6f

Please sign in to comment.