Skip to content

Commit

Permalink
CXP-2956: Query table changes from within SqlServerChangeTablePointer
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Jul 15, 2024
1 parent 375dce4 commit f3886e7
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private static final int COL_DATA = 5;

private ResultSetMapper<Object[]> resultSetMapper;
private final ResultSet resultSet;
private final int columnDataOffset;
private final SqlServerConnection connection;
private final Lsn fromLsn;
private final Lsn toLsn;

public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) {
super(changeTable, resultSet, COL_DATA);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn) {
super(changeTable, COL_DATA);
// Store references to these because we can't get them from our superclass
this.resultSet = resultSet;
this.columnDataOffset = COL_DATA;
}

protected ResultSet getResultSet() {
return resultSet;
this.connection = connection;
this.fromLsn = fromLsn;
this.toLsn = toLsn;
}

@Override
Expand Down Expand Up @@ -89,12 +89,17 @@ protected boolean isNewTransaction() throws SQLException {
getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0;
}

@Override
protected ResultSet getNextResultSet() throws SQLException {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn);
}

@Override
public Object[] getData() throws SQLException {
if (resultSetMapper == null) {
this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable());
}
return resultSetMapper.apply(resultSet);
return resultSetMapper.apply(getResultSet());
}

/**
Expand All @@ -110,7 +115,7 @@ public Object[] getData() throws SQLException {
*/
private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLException {
ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table);
final ResultSetMetaData rsmd = resultSet.getMetaData();
final ResultSetMetaData rsmd = getResultSet().getMetaData();
final int columnCount = rsmd.getColumnCount() - columnDataOffset;
final List<String> resultColumns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,10 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @throws SQLException
*/
public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn,
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
throws SQLException, InterruptedException {
throws SQLException {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ else if (!checkAgent) {
changeTables = new SqlServerChangeTablePointer[tables.length];

for (int i = 0; i < tables.length; i++) {
ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn);
changeTables[i].next();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ public void verifyOffsets() throws Exception {
final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName);
final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1);
final List<Integer> ids = new ArrayList<>();
try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) {
try (ResultSet rs = connection.getChangesForTable(ct, minLsn, maxLsn)) {
while (rs.next()) {
ids.add(rs.getInt("id"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName);
final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1);
final AtomicReference<Boolean> found = new AtomicReference<>(false);
try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) {
try (ResultSet rs = connection.getChangesForTable(ct, minLsn, maxLsn)) {
while (rs.next()) {
if (rs.getInt("id") == -1) {
found.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table
final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName);
final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1);
final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler);
try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) {
try (ResultSet resultSet = connection.getChangesForTable(ct, minLsn, maxLsn)) {
consumer.accept(resultSet);
}
return consumer.isFound();
Expand Down Expand Up @@ -639,7 +639,7 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table
final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName);
final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1);
final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler);
try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) {
try (ResultSet resultSet = connection.getChangesForTable(ct, minLsn, maxLsn)) {
consumer.accept(resultSet);
}
return consumer.isFound();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ public abstract class ChangeTableResultSet<C extends ChangeTable, T extends Comp
private final static Logger LOGGER = LoggerFactory.getLogger(ChangeTableResultSet.class);

private final C changeTable;
private final ResultSet resultSet;
private ResultSet resultSet;
private final int columnDataOffset;
private boolean completed = false;
private T currentChangePosition;
private T previousChangePosition;

public ChangeTableResultSet(C changeTable, ResultSet resultSet, int columnDataOffset) {
public ChangeTableResultSet(C changeTable, int columnDataOffset) {
this.changeTable = changeTable;
this.resultSet = resultSet;
this.columnDataOffset = columnDataOffset;
}

Expand All @@ -59,7 +58,16 @@ public boolean isCurrentPositionSmallerThanPreviousPosition() {
return (previousChangePosition != null) && previousChangePosition.compareTo(currentChangePosition) > 0;
}

public ResultSet getResultSet() {
return resultSet;
}

protected abstract ResultSet getNextResultSet() throws SQLException;

public boolean next() throws SQLException {
if (resultSet == null) {
resultSet = getNextResultSet();
}
completed = !resultSet.next();
previousChangePosition = currentChangePosition;
currentChangePosition = getNextChangePosition(resultSet);
Expand Down

0 comments on commit f3886e7

Please sign in to comment.