diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java index ec8d37f06e5..d033e34fb9b 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java @@ -50,14 +50,16 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet "[" + c + "]") + .collect(Collectors.joining(", ")); + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); + } + String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(TABLE_NAME_PLACEHOLDER, source); + if (maxRows > 0) { + query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows)); + } + return query; + } + /** * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. * - * @param databaseName - the name of the database to query * @param changeTable - the requested table to obtain changes for * @param intervalFromLsn - closed lower bound of interval of changes to be provided * @param intervalToLsn - closed upper bound of interval of changes to be provided + * @param maxRows - the max number of rows to return, set 0 for no limit * @throws SQLException */ public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, - Lsn intervalToLsn) + Lsn intervalToLsn, int maxRows) throws SQLException { - String databaseName = changeTable.getSourceTableId().catalog(); - String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") - .collect(Collectors.joining(", ")); - String source = changeTable.getCaptureInstance(); - if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { - source = changeTable.getChangeTableId().table(); - } - final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(TABLE_NAME_PLACEHOLDER, source); + String query = prepareGetChangesForTableQuery(changeTable, maxRows); + // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available - final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); - LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range [{}, {}]", changeTable, fromLsn, intervalToLsn); PreparedStatement statement = connection().prepareStatement(query); statement.closeOnCompletion(); @@ -380,9 +389,41 @@ public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn interv return statement.executeQuery(); } - private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, + Lsn intervalToLsn) + throws SQLException { + return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0); + } + + public ResultSet getChangesForTableAfter(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, + Lsn intervalToLsn, int maxRows) + throws SQLException { + String query = prepareGetChangesForTableQuery(changeTable, maxRows); + query = query.replace("[__$start_lsn] >= ? AND [__$start_lsn] <= ?", + "([__$start_lsn] > ? OR [__$start_lsn] = ? AND [__$seqval] > ?) AND [__$start_lsn] <= ?"); + + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range [{}-{}, {}]", changeTable, fromLsn, seqvalFromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + statement.closeOnCompletion(); + + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); + } + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, fromLsn.getBinary()); + statement.setBytes(3, seqvalFromLsn.getBinary()); + statement.setBytes(4, intervalToLsn.getBinary()); + + return statement.executeQuery(); + } + + private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn; - return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance()); + return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance()); } /** diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 3ab1b261249..a138a06f4fd 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -238,8 +238,10 @@ else if (!checkAgent) { final SqlServerChangeTable[] tables = tablesSlot.get(); changeTables = new SqlServerChangeTablePointer[tables.length]; + final int maxRowsPerResultSet = connectorConfig.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT ? 1000 : 0; + for (int i = 0; i < tables.length; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet); changeTables[i].next(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index 0fd4e390c25..4776887f131 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -29,13 +29,16 @@ public abstract class ChangeTableResultSet 0 && rowsReadPerResultSet > maxRowsPerResultSet) { + throw new RuntimeException("Number of rows read from the result set is greater than the configured max rows per a result set"); + } + + if (maxRowsPerResultSet > 0 && rowsReadPerResultSet == maxRowsPerResultSet) { + close(); + return next(); + } + + completed = true; + } + previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { @@ -80,7 +100,10 @@ public boolean next() throws SQLException { public void close() { LOGGER.trace("Closing result set of change tables for table {}", changeTable); try { - resultSet.close(); + if (resultSet != null) { + resultSet.close(); + resultSet = null; + } } catch (Exception e) { // ignore