Skip to content

Commit

Permalink
CXP-2956: Remove the callback when processing table changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Jul 6, 2024
1 parent 4ba244a commit 5393a8c
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,45 +345,38 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTables - the requested tables to obtain changes for
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @throws SQLException
*/
public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn,
Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer)
public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
throws SQLException, InterruptedException {
final String[] queries = new String[changeTables.length];
final StatementPreparer[] preparers = new StatementPreparer[changeTables.length];

int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
};

idx++;
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
prepareQuery(queries, preparers, consumer);
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());

return statement.executeQuery();
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
Expand All @@ -32,6 +33,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.Notification;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.spi.ChangeTableResultSet;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.ChangeTable;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -229,16 +231,17 @@ else if (!checkAgent) {
tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn));
collectChangeTablesWithKnownStopLsn(partition, tablesSlot.get());
}
try {
dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> {

SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[]{};
try {
{
long eventSerialNoInInitialTx = 1;
final int tableCount = resultSets.length;
final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount];
final SqlServerChangeTable[] tables = tablesSlot.get();
changeTables = new SqlServerChangeTablePointer[tables.length];

for (int i = 0; i < tableCount; i++) {
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]);
for (int i = 0; i < tables.length; i++) {
ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn);
changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet);
changeTables[i].next();
}

Expand Down Expand Up @@ -342,14 +345,17 @@ else if (!checkAgent) {
connectorConfig));
tableWithSmallestLsn.next();
}
});
}
streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn));
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
}
catch (SQLException e) {
tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
}
finally {
Arrays.stream(changeTables).filter(Objects::nonNull).forEach(ChangeTableResultSet::close);
}
}
}
catch (Exception e) {
Expand Down
Loading

0 comments on commit 5393a8c

Please sign in to comment.