diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java index 8191677d1e7..6ba698c44e2 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java @@ -46,18 +46,20 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet resultSetMapper; - private final ResultSet resultSet; private final int columnDataOffset; + private final SqlServerConnection connection; + private final Lsn fromLsn; + private final Lsn toLsn; + private final int maxRowsPerTablePerIteration; - public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) { - super(changeTable, resultSet, COL_DATA); + public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerTablePerIteration) { + super(changeTable, COL_DATA, maxRowsPerTablePerIteration); // Store references to these because we can't get them from our superclass - this.resultSet = resultSet; this.columnDataOffset = COL_DATA; - } - - protected ResultSet getResultSet() { - return resultSet; + this.connection = connection; + this.fromLsn = fromLsn; + this.toLsn = toLsn; + this.maxRowsPerTablePerIteration = maxRowsPerTablePerIteration; } @Override @@ -76,7 +78,10 @@ protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLE @Override protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLException { return isCompleted() ? TxLogPosition.NULL - : TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN))); + : TxLogPosition.valueOf( + Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), + Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)), + resultSet.getInt(COL_OPERATION)); } /** @@ -89,12 +94,23 @@ protected boolean isNewTransaction() throws SQLException { getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0; } + @Override + protected ResultSet nextResultSet(TxLogPosition lastPositionSeen) throws SQLException { + if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) { + return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerTablePerIteration); + } + else { + return connection.getChangesForTableAfter(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(), + lastPositionSeen.getOperation(), toLsn, maxRowsPerTablePerIteration); + } + } + @Override public Object[] getData() throws SQLException { if (resultSetMapper == null) { this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable()); } - return resultSetMapper.apply(resultSet); + return resultSetMapper.apply(getResultSet()); } /** @@ -110,7 +126,7 @@ public Object[] getData() throws SQLException { */ private ResultSetMapper createResultSetMapper(Table table) throws SQLException { ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table); - final ResultSetMetaData rsmd = resultSet.getMetaData(); + final ResultSetMetaData rsmd = getResultSet().getMetaData(); final int columnCount = rsmd.getColumnCount() - columnDataOffset; final List resultColumns = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; ++i) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 0b50f36e28a..6079aceac7c 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -8,6 +8,7 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -351,6 +352,67 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan prepareQuery(queries, preparers, consumer); } + /** + * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. + * + * @param databaseName - the name of the database to query + * @param changeTables - the requested tables to obtain changes for + * @param intervalFromLsn - closed lower bound of interval of changes to be provided + * @param intervalToLsn - closed upper bound of interval of changes to be provided + * @param consumer - the change processor + * @throws SQLException + */ + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn, int maxRows) throws SQLException { + final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, changeTable.getSourceTableId().catalog()) + .replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()) + .replace("SELECT", String.format("SELECT TOP %d ", maxRows)); + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(changeTable.getSourceTableId().catalog(), changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + statement.closeOnCompletion(); + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); + } + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, intervalToLsn.getBinary()); + + return statement.executeQuery(); + } + + public ResultSet getChangesForTableAfter(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, + int operationFrom, Lsn intervalToLsn, int maxRows) + throws SQLException { + final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, changeTable.getSourceTableId().catalog()) + .replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()) + .replace("SELECT", String.format("SELECT TOP %d ", maxRows)) + .replace("order by", + "WHERE ([__$start_lsn]=? AND [__$seqval]=? AND [__$operation]>?) OR ([__$start_lsn]=? AND [__$seqval]>?) OR ([__$start_lsn]>?) order by"); + + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(changeTable.getSourceTableId().catalog(), changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + statement.closeOnCompletion(); + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); + } + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, intervalToLsn.getBinary()); + statement.setBytes(3, fromLsn.getBinary()); + statement.setBytes(4, seqvalFromLsn.getBinary()); + statement.setInt(5, operationFrom); + statement.setBytes(6, fromLsn.getBinary()); + statement.setBytes(7, seqvalFromLsn.getBinary()); + statement.setBytes(8, fromLsn.getBinary()); + + return statement.executeQuery(); + } + private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn; return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance()); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index d343f962ecd..03323e917fc 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -228,120 +228,118 @@ else if (!checkAgent) { .filter(t -> !t.getStopLsn().isAvailable() || t.getStopLsn().compareTo(fromLsn) > 0) .toArray(SqlServerChangeTable[]::new)); - try { - dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> { - - long eventSerialNoInInitialTx = 1; - final int tableCount = resultSets.length; - final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount]; - final SqlServerChangeTable[] tables = tablesSlot.get(); - - for (int i = 0; i < tableCount; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]); - changeTables[i].next(); - } + long eventSerialNoInInitialTx = 1; + final SqlServerChangeTable[] tables = tablesSlot.get(); + final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tables.length]; + + final int maxRowsPerTablePerIteration = 10; + for (int i = 0; i < tables.length; i++) { + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, maxRowsPerTablePerIteration); + changeTables[i].next(); + } - for (;;) { - SqlServerChangeTablePointer tableWithSmallestLsn = null; - for (SqlServerChangeTablePointer changeTable : changeTables) { - if (changeTable.isCompleted()) { - continue; - } - if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) { - tableWithSmallestLsn = changeTable; - } + try { + for (;;) { + SqlServerChangeTablePointer tableWithSmallestLsn = null; + for (SqlServerChangeTablePointer changeTable : changeTables) { + if (changeTable.isCompleted()) { + continue; } - if (tableWithSmallestLsn == null) { - // No more LSNs available - break; + if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) { + tableWithSmallestLsn = changeTable; } + } + if (tableWithSmallestLsn == null) { + // No more LSNs available + break; + } - if (!(tableWithSmallestLsn.getChangePosition().isAvailable() && tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable())) { - LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn); - tableWithSmallestLsn.next(); - continue; - } + if (!(tableWithSmallestLsn.getChangePosition().isAvailable() && tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable())) { + LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn); + tableWithSmallestLsn.next(); + continue; + } - if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { - LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); - changesStoppedBeingMonotonic.set(false); - } + if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { + LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); + changesStoppedBeingMonotonic.set(false); + } - // After restart for changes that are not monotonic to avoid data loss - if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { - LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); - changesStoppedBeingMonotonic.set(true); - } + // After restart for changes that are not monotonic to avoid data loss + if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { + LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); + changesStoppedBeingMonotonic.set(true); + } - // After restart for changes that were executed before the last committed offset - if (!changesStoppedBeingMonotonic.get() && - tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { - LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, - lastProcessedPositionOnStart); - tableWithSmallestLsn.next(); - continue; - } - // After restart for change that was the last committed and operations in it before the last committed offset - if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 - && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { - LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", - tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); - eventSerialNoInInitialTx++; - tableWithSmallestLsn.next(); - continue; - } - if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && - tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { - LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, - tableWithSmallestLsn.getChangePosition()); - tableWithSmallestLsn.next(); - continue; - } - LOGGER.trace("Processing change {}", tableWithSmallestLsn); - LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints); - if (!schemaChangeCheckpoints.isEmpty()) { - if (tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(schemaChangeCheckpoints.peek().getStartLsn()) >= 0) { - migrateTable(partition, schemaChangeCheckpoints, offsetContext); - } + // After restart for changes that were executed before the last committed offset + if (!changesStoppedBeingMonotonic.get() && + tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { + LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, + lastProcessedPositionOnStart); + tableWithSmallestLsn.next(); + continue; + } + // After restart for change that was the last committed and operations in it before the last committed offset + if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 + && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { + LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", + tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); + eventSerialNoInInitialTx++; + tableWithSmallestLsn.next(); + continue; + } + if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && + tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { + LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, + tableWithSmallestLsn.getChangePosition()); + tableWithSmallestLsn.next(); + continue; + } + LOGGER.trace("Processing change {}", tableWithSmallestLsn); + LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints); + if (!schemaChangeCheckpoints.isEmpty()) { + if (tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(schemaChangeCheckpoints.peek().getStartLsn()) >= 0) { + migrateTable(partition, schemaChangeCheckpoints, offsetContext); } - final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId(); - final int operation = tableWithSmallestLsn.getOperation(); - final Object[] data = tableWithSmallestLsn.getData(); - - // UPDATE consists of two consecutive events, first event contains - // the row before it was updated and the second the row after - // it was updated - int eventCount = 1; - if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { - if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { - throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId - + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); - } - eventCount = 2; + } + final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId(); + final int operation = tableWithSmallestLsn.getOperation(); + final Object[] data = tableWithSmallestLsn.getData(); + + // UPDATE consists of two consecutive events, first event contains + // the row before it was updated and the second the row after + // it was updated + int eventCount = 1; + if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { + if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { + throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); } - final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null; - - final ResultSet resultSet = tableWithSmallestLsn.getResultSet(); - offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount); - offsetContext.event( - tableWithSmallestLsn.getChangeTable().getSourceTableId(), - resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant()); - - dispatcher - .dispatchDataChangeEvent( - partition, - tableId, - new SqlServerChangeRecordEmitter( - partition, - offsetContext, - operation, - data, - dataNext, - clock, - connectorConfig)); - tableWithSmallestLsn.next(); + eventCount = 2; } - }); + final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null; + + final ResultSet resultSet = tableWithSmallestLsn.getResultSet(); + offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount); + offsetContext.event( + tableWithSmallestLsn.getChangeTable().getSourceTableId(), + resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant()); + + dispatcher + .dispatchDataChangeEvent( + partition, + tableId, + new SqlServerChangeRecordEmitter( + partition, + offsetContext, + operation, + data, + dataNext, + clock, + connectorConfig)); + tableWithSmallestLsn.next(); + } + streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn)); // Terminate the transaction otherwise CDC could not be disabled for tables dataConnection.rollback(); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java index 9ff0c366d8e..b3f7bbcba53 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java @@ -18,13 +18,15 @@ */ public class TxLogPosition implements Nullable, Comparable { - public static final TxLogPosition NULL = new TxLogPosition(null, null); + public static final TxLogPosition NULL = new TxLogPosition(null, null, -1); private final Lsn commitLsn; private final Lsn inTxLsn; + private final int operation; - private TxLogPosition(Lsn commitLsn, Lsn inTxLsn) { + private TxLogPosition(Lsn commitLsn, Lsn inTxLsn, int operation) { this.commitLsn = commitLsn; this.inTxLsn = inTxLsn; + this.operation = operation; } public Lsn getCommitLsn() { @@ -35,6 +37,10 @@ public Lsn getInTxLsn() { return inTxLsn; } + public int getOperation() { + return operation; + } + @Override public String toString() { return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + ")"; @@ -77,7 +83,7 @@ else if (!commitLsn.equals(other.commitLsn)) { else if (!inTxLsn.equals(other.inTxLsn)) { return false; } - return true; + return operation == other.operation; } @Override @@ -86,11 +92,16 @@ public int compareTo(TxLogPosition o) { return comparison == 0 ? inTxLsn.compareTo(o.inTxLsn) : comparison; } - public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) { + public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn, int operation) { return commitLsn == null && inTxLsn == null ? NULL : new TxLogPosition( commitLsn == null ? Lsn.NULL : commitLsn, - inTxLsn == null ? Lsn.NULL : inTxLsn); + inTxLsn == null ? Lsn.NULL : inTxLsn, + operation); + } + + public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) { + return valueOf(commitLsn, inTxLsn, -1); } public static TxLogPosition valueOf(Lsn commitLsn) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 33469f99413..fa52a416e2b 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -109,7 +109,7 @@ public void after() throws SQLException { @Test public void createAndDelete() throws Exception { - final int RECORDS_PER_TABLE = 5; + final int RECORDS_PER_TABLE = 50; final int TABLES = 2; final int ID_START = 10; final Configuration config = TestHelper.defaultConfig() diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index b57fd67607a..860b559d93d 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -13,6 +13,8 @@ import io.debezium.relational.ChangeTable; +import javax.xml.transform.Result; + /** * A wrapper around a JDBC {@link ResultSet} for a change table for processing rows. * @@ -27,16 +29,18 @@ public abstract class ChangeTableResultSet 0; } + public ResultSet getResultSet() { + return resultSet; + } + + protected abstract ResultSet nextResultSet(T lastChangePositionSeen) throws SQLException; + public boolean next() throws SQLException { - completed = !resultSet.next(); + if (resultSet == null) { + resultSet = nextResultSet(currentChangePosition); + rowsReadPerResultSet = 0; + } + + if (resultSet.next()) { + rowsReadPerResultSet++; + } else { + if (maxRowsPerResultSet > 0 && rowsReadPerResultSet > maxRowsPerResultSet) { + throw new RuntimeException("Number of rows read from the result set is greater than the configured max rows per a result set"); + } + if (maxRowsPerResultSet > 0 && rowsReadPerResultSet == maxRowsPerResultSet) { + resultSet.close(); + resultSet = null; + return next(); + } + + completed = true; + } + previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) {