diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java index ec8d37f06e5..0617919aad2 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java @@ -50,14 +50,16 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet "[" + c + "]") + .collect(Collectors.joining(", ")); + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); + } + String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(TABLE_NAME_PLACEHOLDER, source); + if (maxRows > 0) { + query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows)); + } + return query; + } + /** * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. * - * @param databaseName - the name of the database to query * @param changeTable - the requested table to obtain changes for * @param intervalFromLsn - closed lower bound of interval of changes to be provided * @param intervalToLsn - closed upper bound of interval of changes to be provided + * @param maxRows - the max number of rows to return, set 0 for no limit * @throws SQLException */ public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, - Lsn intervalToLsn) + Lsn intervalToLsn, int maxRows) throws SQLException { - String databaseName = changeTable.getSourceTableId().catalog(); - String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") - .collect(Collectors.joining(", ")); - String source = changeTable.getCaptureInstance(); - if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { - source = changeTable.getChangeTableId().table(); - } - final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(TABLE_NAME_PLACEHOLDER, source); + String query = prepareGetChangesForTableQuery(changeTable, maxRows); + // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available - final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); - LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range [{}, {}]", changeTable, fromLsn, intervalToLsn); PreparedStatement statement = connection().prepareStatement(query); statement.closeOnCompletion(); @@ -380,9 +389,46 @@ public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn interv return statement.executeQuery(); } - private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, + Lsn intervalToLsn) + throws SQLException { + return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0); + } + + public ResultSet getChangesForTableAfter(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, int operationFrom, + Lsn intervalToLsn, int maxRows) + throws SQLException { + String query = prepareGetChangesForTableQuery(changeTable, maxRows); + query = query.replace("[__$start_lsn] >= ? AND [__$start_lsn] <= ?", + "(([__$start_lsn] = ? AND [__$seqval] = ? AND [__$operation] > ?) " + + "OR ([__$start_lsn] = ? AND [__$seqval] > ?) " + + "OR ([__$start_lsn] > ?)) AND [__$start_lsn] <= ?"); + + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range [{}-{}, {}]", changeTable, fromLsn, seqvalFromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + statement.closeOnCompletion(); + + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); + } + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, seqvalFromLsn.getBinary()); + statement.setInt(3, operationFrom); + statement.setBytes(4, fromLsn.getBinary()); + statement.setBytes(5, seqvalFromLsn.getBinary()); + statement.setBytes(6, fromLsn.getBinary()); + statement.setBytes(7, intervalToLsn.getBinary()); + + return statement.executeQuery(); + } + + private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn; - return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance()); + return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance()); } /** diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 3ab1b261249..942db5cb66d 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -238,8 +238,10 @@ else if (!checkAgent) { final SqlServerChangeTable[] tables = tablesSlot.get(); changeTables = new SqlServerChangeTablePointer[tables.length]; + final int maxRowsPerResultSet = connectorConfig.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT ? 3 : 0; + for (int i = 0; i < tables.length; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerResultSet); changeTables[i].next(); } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java index 9ff0c366d8e..8497b2db448 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java @@ -18,13 +18,15 @@ */ public class TxLogPosition implements Nullable, Comparable { - public static final TxLogPosition NULL = new TxLogPosition(null, null); + public static final TxLogPosition NULL = new TxLogPosition(null, null, 0); private final Lsn commitLsn; private final Lsn inTxLsn; + private int operation; - private TxLogPosition(Lsn commitLsn, Lsn inTxLsn) { + private TxLogPosition(Lsn commitLsn, Lsn inTxLsn, int operation) { this.commitLsn = commitLsn; this.inTxLsn = inTxLsn; + this.operation = operation; } public Lsn getCommitLsn() { @@ -35,9 +37,13 @@ public Lsn getInTxLsn() { return inTxLsn; } + public int getOperation() { + return operation; + } + @Override public String toString() { - return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + ")"; + return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + "," + operation + ")"; } @Override @@ -46,6 +52,7 @@ public int hashCode() { int result = 1; result = prime * result + ((commitLsn == null) ? 0 : commitLsn.hashCode()); result = prime * result + ((inTxLsn == null) ? 0 : inTxLsn.hashCode()); + result = prime * result + operation; return result; } @@ -86,15 +93,20 @@ public int compareTo(TxLogPosition o) { return comparison == 0 ? inTxLsn.compareTo(o.inTxLsn) : comparison; } - public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) { + public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn, int operation) { return commitLsn == null && inTxLsn == null ? NULL : new TxLogPosition( commitLsn == null ? Lsn.NULL : commitLsn, - inTxLsn == null ? Lsn.NULL : inTxLsn); + inTxLsn == null ? Lsn.NULL : inTxLsn, + operation); + } + + public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) { + return valueOf(commitLsn, inTxLsn, 0); } public static TxLogPosition valueOf(Lsn commitLsn) { - return valueOf(commitLsn, Lsn.NULL); + return valueOf(commitLsn, Lsn.NULL, 0); } @Override diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index c420f7b6f62..15b65f29fef 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -129,7 +129,8 @@ public static Configuration.Builder defaultConnectorConfig() { return builder.with(CommonConnectorConfig.TOPIC_PREFIX, "server1") .with(SqlServerConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) - .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, false); + .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT); } /** diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index 0fd4e390c25..4776887f131 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -29,13 +29,16 @@ public abstract class ChangeTableResultSet 0 && rowsReadPerResultSet > maxRowsPerResultSet) { + throw new RuntimeException("Number of rows read from the result set is greater than the configured max rows per a result set"); + } + + if (maxRowsPerResultSet > 0 && rowsReadPerResultSet == maxRowsPerResultSet) { + close(); + return next(); + } + + completed = true; + } + previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { @@ -80,7 +100,10 @@ public boolean next() throws SQLException { public void close() { LOGGER.trace("Closing result set of change tables for table {}", changeTable); try { - resultSet.close(); + if (resultSet != null) { + resultSet.close(); + resultSet = null; + } } catch (Exception e) { // ignore