Skip to content

Commit

Permalink
CXP-2956: Remove the callback when processing table changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka committed Jul 15, 2024
1 parent 4ba244a commit 375dce4
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,45 +345,38 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTables - the requested tables to obtain changes for
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @throws SQLException
*/
public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn,
Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer)
public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn,
Lsn intervalToLsn)
throws SQLException, InterruptedException {
final String[] queries = new String[changeTables.length];
final StatementPreparer[] preparers = new StatementPreparer[changeTables.length];

int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
};

idx++;
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
prepareQuery(queries, preparers, consumer);
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());

return statement.executeQuery();
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Expand Down
Loading

0 comments on commit 375dce4

Please sign in to comment.