From fc68fb608b4f90c10812b7731e332079cf0946fa Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Tue, 29 Oct 2024 19:28:29 +0300 Subject: [PATCH] CXP-2956: Implement keyset pagination in debezium sql server connector (#159) --- .../io/debezium/connector/sqlserver/Lsn.java | 2 + .../SqlServerChangeTablePointer.java | 38 +++- .../sqlserver/SqlServerConnection.java | 101 +++++---- .../sqlserver/SqlServerConnectorConfig.java | 17 +- .../SqlServerStreamingChangeEventSource.java | 207 +++++++++--------- .../connector/sqlserver/TxLogPosition.java | 24 +- .../sqlserver/SqlServerConnectorIT.java | 42 +++- .../sqlserver/TransactionMetadataIT.java | 7 +- .../connector/sqlserver/util/TestHelper.java | 31 ++- .../source/spi/ChangeTableResultSet.java | 52 ++++- .../ROOT/pages/connectors/sqlserver.adoc | 6 + 11 files changed, 334 insertions(+), 193 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java index e551b7d048e..aea018ffe9f 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java @@ -22,6 +22,8 @@ public class Lsn implements Comparable, Nullable { public static final Lsn NULL = new Lsn(null); + public static final Lsn ZERO = valueOf(new byte[10]); + private final byte[] binary; private int[] unsignedBinary; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java index b670f4cf713..3183a51a252 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java @@ -46,18 +46,20 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet resultSetMapper; - private final ResultSet resultSet; private final int columnDataOffset; + private final SqlServerConnection connection; + private final Lsn fromLsn; + private final Lsn toLsn; + private final int maxRowsPerResultSet; - public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) { - super(changeTable, resultSet, COL_DATA); + public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerResultSet) { + super(changeTable, COL_DATA, maxRowsPerResultSet); // Store references to these because we can't get them from our superclass - this.resultSet = resultSet; this.columnDataOffset = COL_DATA; - } - - protected ResultSet getResultSet() { - return resultSet; + this.connection = connection; + this.fromLsn = fromLsn; + this.toLsn = toLsn; + this.maxRowsPerResultSet = maxRowsPerResultSet; } @Override @@ -76,7 +78,10 @@ protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLE @Override protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLException { return isCompleted() ? TxLogPosition.NULL - : TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN))); + : TxLogPosition.valueOf( + Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), + Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)), + resultSet.getInt(COL_OPERATION)); } /** @@ -89,12 +94,23 @@ protected boolean isNewTransaction() throws SQLException { getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0; } + @Override + protected ResultSet getNextResultSet(TxLogPosition lastPositionSeen) throws SQLException { + if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) { + return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerResultSet); + } + else { + return connection.getChangesForTable(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(), lastPositionSeen.getOperation(), + toLsn, maxRowsPerResultSet); + } + } + @Override public Object[] getData() throws SQLException { if (resultSetMapper == null) { this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable()); } - return resultSetMapper.apply(resultSet); + return resultSetMapper.apply(getResultSet()); } /** @@ -110,7 +126,7 @@ public Object[] getData() throws SQLException { */ private ResultSetMapper createResultSetMapper(Table table) throws SQLException { ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table); - final ResultSetMetaData rsmd = resultSet.getMetaData(); + final ResultSetMetaData rsmd = getResultSet().getMetaData(); final int columnCount = rsmd.getColumnCount() - columnDataOffset; final List resultColumns = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; ++i) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 0c2185bfeb9..17145f304ba 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -180,10 +180,12 @@ private String buildGetAllChangesForTableQuery(SqlServerConnectorConfig.DataQuer break; case DIRECT: result += GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT + " "; - where.add("[__$start_lsn] >= ?"); - where.add("[__$start_lsn] <= ?"); break; } + where.add("(([__$start_lsn] = ? AND [__$seqval] = ? AND [__$operation] > ?) " + + "OR ([__$start_lsn] = ? AND [__$seqval] > ?) " + + "OR ([__$start_lsn] > ?))"); + where.add("[__$start_lsn] <= ?"); if (hasSkippedOperations(skippedOperations)) { Set skippedOps = new HashSet<>(); @@ -344,51 +346,74 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce /** * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. * - * @param databaseName - the name of the database to query - * @param changeTables - the requested tables to obtain changes for + * @param changeTable - the requested table to obtain changes for * @param intervalFromLsn - closed lower bound of interval of changes to be provided + * @param seqvalFromLsn - in-transaction sequence value to start after, pass {@link Lsn#ZERO} to fetch all sequence values + * @param operationFrom - operation number to start after, pass 0 to fetch all operations * @param intervalToLsn - closed upper bound of interval of changes to be provided - * @param consumer - the change processor + * @param maxRows - the max number of rows to return, pass 0 for no limit * @throws SQLException */ - public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn, - Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer) - throws SQLException, InterruptedException { - final String[] queries = new String[changeTables.length]; - final StatementPreparer[] preparers = new StatementPreparer[changeTables.length]; - - int idx = 0; - for (SqlServerChangeTable changeTable : changeTables) { - String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") - .collect(Collectors.joining(", ")); - String source = changeTable.getCaptureInstance(); - if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { - source = changeTable.getChangeTableId().table(); - } - final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(TABLE_NAME_PLACEHOLDER, source); - queries[idx] = query; - // If the table was added in the middle of queried buffer we need - // to adjust from to the first LSN available - final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); - LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); - preparers[idx] = statement -> { - if (queryFetchSize > 0) { - statement.setFetchSize(queryFetchSize); - } - statement.setBytes(1, fromLsn.getBinary()); - statement.setBytes(2, intervalToLsn.getBinary()); - }; + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, int operationFrom, + Lsn intervalToLsn, int maxRows) + throws SQLException { + String databaseName = changeTable.getSourceTableId().catalog(); + String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") + .collect(Collectors.joining(", ")); - idx++; + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); } - prepareQuery(queries, preparers, consumer); + + String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(TABLE_NAME_PLACEHOLDER, source); + + if (maxRows > 0) { + query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows)); + } + + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn); + LOGGER.trace("Getting {} changes for table {} in range [{}-{}-{}, {}]", maxRows > 0 ? "top " + maxRows : "", changeTable, fromLsn, seqvalFromLsn, operationFrom, + intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + statement.closeOnCompletion(); + + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); + } + + int paramIndex = 1; + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.FUNCTION) { + statement.setBytes(paramIndex++, fromLsn.getBinary()); + statement.setBytes(paramIndex++, intervalToLsn.getBinary()); + } + statement.setBytes(paramIndex++, fromLsn.getBinary()); + statement.setBytes(paramIndex++, seqvalFromLsn.getBinary()); + statement.setInt(paramIndex++, operationFrom); + statement.setBytes(paramIndex++, fromLsn.getBinary()); + statement.setBytes(paramIndex++, seqvalFromLsn.getBinary()); + statement.setBytes(paramIndex++, fromLsn.getBinary()); + statement.setBytes(paramIndex++, intervalToLsn.getBinary()); + + return statement.executeQuery(); + } + + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn, int maxRows) throws SQLException { + return getChangesForTable(changeTable, intervalFromLsn, Lsn.ZERO, 0, intervalToLsn, maxRows); + } + + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn) throws SQLException { + return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0); } - private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { + private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn; - return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance()); + return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance()); } /** diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 1b734b10ac9..ec6cfcd8a1a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) { + "The value of '" + DataQueryMode.DIRECT.getValue() + "' makes the connector to query the change tables directly."); + public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size") + .withDisplayName("Streaming fetch size") + .withDefault(0) + .withType(Type.INT) + .withImportance(Importance.LOW) + .withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. " + + "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit."); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("SQL Server") .type( @@ -498,7 +506,8 @@ public static DataQueryMode parse(String value, String defaultValue) { INCREMENTAL_SNAPSHOT_CHUNK_SIZE, INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, QUERY_FETCH_SIZE, - DATA_QUERY_MODE) + DATA_QUERY_MODE, + STREAMING_FETCH_SIZE) .events(SOURCE_INFO_STRUCT_MAKER) .excluding( SCHEMA_INCLUDE_LIST, @@ -525,6 +534,7 @@ public static ConfigDef configDef() { private final boolean optionRecompile; private final int queryFetchSize; private final DataQueryMode dataQueryMode; + private final int streamingFetchSize; public SqlServerConnectorConfig(Configuration config) { super( @@ -568,6 +578,7 @@ public SqlServerConnectorConfig(Configuration config) { this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE); } public List getDatabaseNames() { @@ -717,4 +728,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel return count; } + + public int getStreamingFetchSize() { + return streamingFetchSize; + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 27657f5dcd3..0d496a94a3c 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -16,6 +16,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.PriorityQueue; import java.util.Queue; @@ -34,6 +35,7 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.Notification; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.spi.ChangeTableResultSet; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.ChangeTable; import io.debezium.relational.Table; @@ -231,120 +233,118 @@ else if (!checkAgent) { tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn)); collectChangeTablesWithKnownStopLsn(partition, tablesSlot.get()); } - try { - dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> { - long eventSerialNoInInitialTx = 1; - final int tableCount = resultSets.length; - final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount]; - final SqlServerChangeTable[] tables = tablesSlot.get(); + SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[]{}; + try { + long eventSerialNoInInitialTx = 1; + final SqlServerChangeTable[] tables = tablesSlot.get(); + changeTables = new SqlServerChangeTablePointer[tables.length]; - for (int i = 0; i < tableCount; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]); - changeTables[i].next(); - } + for (int i = 0; i < tables.length; i++) { + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn, connectorConfig.getStreamingFetchSize()); + changeTables[i].next(); + } - for (;;) { - SqlServerChangeTablePointer tableWithSmallestLsn = null; - for (SqlServerChangeTablePointer changeTable : changeTables) { - if (changeTable.isCompleted()) { - continue; - } - if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) { - tableWithSmallestLsn = changeTable; - } + for (;;) { + SqlServerChangeTablePointer tableWithSmallestLsn = null; + for (SqlServerChangeTablePointer changeTable : changeTables) { + if (changeTable.isCompleted()) { + continue; } - if (tableWithSmallestLsn == null) { - // No more LSNs available - break; + if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) { + tableWithSmallestLsn = changeTable; } + } + if (tableWithSmallestLsn == null) { + // No more LSNs available + break; + } - if (!(tableWithSmallestLsn.getChangePosition().isAvailable() && tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable())) { - LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn); - tableWithSmallestLsn.next(); - continue; - } + if (!(tableWithSmallestLsn.getChangePosition().isAvailable() && tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable())) { + LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn); + tableWithSmallestLsn.next(); + continue; + } - if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { - LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); - changesStoppedBeingMonotonic.set(false); - } + if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { + LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); + changesStoppedBeingMonotonic.set(false); + } - // After restart for changes that are not monotonic to avoid data loss - if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { - LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); - changesStoppedBeingMonotonic.set(true); - } + // After restart for changes that are not monotonic to avoid data loss + if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { + LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); + changesStoppedBeingMonotonic.set(true); + } - // After restart for changes that were executed before the last committed offset - if (!changesStoppedBeingMonotonic.get() && - tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { - LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, - lastProcessedPositionOnStart); - tableWithSmallestLsn.next(); - continue; - } - // After restart for change that was the last committed and operations in it before the last committed offset - if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 - && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { - LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", - tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); - eventSerialNoInInitialTx++; - tableWithSmallestLsn.next(); - continue; - } - if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && - tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { - LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, - tableWithSmallestLsn.getChangePosition()); - tableWithSmallestLsn.next(); - continue; - } - LOGGER.trace("Processing change {}", tableWithSmallestLsn); - LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints); - if (!schemaChangeCheckpoints.isEmpty()) { - if (tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(schemaChangeCheckpoints.peek().getStartLsn()) >= 0) { - migrateTable(partition, schemaChangeCheckpoints, offsetContext); - } + // After restart for changes that were executed before the last committed offset + if (!changesStoppedBeingMonotonic.get() && + tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { + LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, + lastProcessedPositionOnStart); + tableWithSmallestLsn.next(); + continue; + } + // After restart for change that was the last committed and operations in it before the last committed offset + if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 + && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { + LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", + tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); + eventSerialNoInInitialTx++; + tableWithSmallestLsn.next(); + continue; + } + if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && + tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { + LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, + tableWithSmallestLsn.getChangePosition()); + tableWithSmallestLsn.next(); + continue; + } + LOGGER.trace("Processing change {}", tableWithSmallestLsn); + LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints); + if (!schemaChangeCheckpoints.isEmpty()) { + if (tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(schemaChangeCheckpoints.peek().getStartLsn()) >= 0) { + migrateTable(partition, schemaChangeCheckpoints, offsetContext); } - final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId(); - final int operation = tableWithSmallestLsn.getOperation(); - final Object[] data = tableWithSmallestLsn.getData(); - - // UPDATE consists of two consecutive events, first event contains - // the row before it was updated and the second the row after - // it was updated - int eventCount = 1; - if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { - if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { - throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId - + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); - } - eventCount = 2; + } + final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId(); + final int operation = tableWithSmallestLsn.getOperation(); + final Object[] data = tableWithSmallestLsn.getData(); + + // UPDATE consists of two consecutive events, first event contains + // the row before it was updated and the second the row after + // it was updated + int eventCount = 1; + if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { + if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { + throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); } - final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null; - - final ResultSet resultSet = tableWithSmallestLsn.getResultSet(); - offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount); - offsetContext.event( - tableWithSmallestLsn.getChangeTable().getSourceTableId(), - resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant()); - - dispatcher - .dispatchDataChangeEvent( - partition, - tableId, - new SqlServerChangeRecordEmitter( - partition, - offsetContext, - operation, - data, - dataNext, - clock, - connectorConfig)); - tableWithSmallestLsn.next(); + eventCount = 2; } - }); + final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null; + + final ResultSet resultSet = tableWithSmallestLsn.getResultSet(); + offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount); + offsetContext.event( + tableWithSmallestLsn.getChangeTable().getSourceTableId(), + resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant()); + + dispatcher + .dispatchDataChangeEvent( + partition, + tableId, + new SqlServerChangeRecordEmitter( + partition, + offsetContext, + operation, + data, + dataNext, + clock, + connectorConfig)); + tableWithSmallestLsn.next(); + } streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn)); // Terminate the transaction otherwise CDC could not be disabled for tables dataConnection.rollback(); @@ -352,6 +352,9 @@ else if (!checkAgent) { catch (SQLException e) { tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get())); } + finally { + Arrays.stream(changeTables).filter(Objects::nonNull).forEach(ChangeTableResultSet::close); + } } } catch (Exception e) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java index 9ff0c366d8e..8497b2db448 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/TxLogPosition.java @@ -18,13 +18,15 @@ */ public class TxLogPosition implements Nullable, Comparable { - public static final TxLogPosition NULL = new TxLogPosition(null, null); + public static final TxLogPosition NULL = new TxLogPosition(null, null, 0); private final Lsn commitLsn; private final Lsn inTxLsn; + private int operation; - private TxLogPosition(Lsn commitLsn, Lsn inTxLsn) { + private TxLogPosition(Lsn commitLsn, Lsn inTxLsn, int operation) { this.commitLsn = commitLsn; this.inTxLsn = inTxLsn; + this.operation = operation; } public Lsn getCommitLsn() { @@ -35,9 +37,13 @@ public Lsn getInTxLsn() { return inTxLsn; } + public int getOperation() { + return operation; + } + @Override public String toString() { - return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + ")"; + return this == NULL ? "NULL" : commitLsn + "(" + inTxLsn + "," + operation + ")"; } @Override @@ -46,6 +52,7 @@ public int hashCode() { int result = 1; result = prime * result + ((commitLsn == null) ? 0 : commitLsn.hashCode()); result = prime * result + ((inTxLsn == null) ? 0 : inTxLsn.hashCode()); + result = prime * result + operation; return result; } @@ -86,15 +93,20 @@ public int compareTo(TxLogPosition o) { return comparison == 0 ? inTxLsn.compareTo(o.inTxLsn) : comparison; } - public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) { + public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn, int operation) { return commitLsn == null && inTxLsn == null ? NULL : new TxLogPosition( commitLsn == null ? Lsn.NULL : commitLsn, - inTxLsn == null ? Lsn.NULL : inTxLsn); + inTxLsn == null ? Lsn.NULL : inTxLsn, + operation); + } + + public static TxLogPosition valueOf(Lsn commitLsn, Lsn inTxLsn) { + return valueOf(commitLsn, inTxLsn, 0); } public static TxLogPosition valueOf(Lsn commitLsn) { - return valueOf(commitLsn, Lsn.NULL); + return valueOf(commitLsn, Lsn.NULL, 0); } @Override diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 7a3f1953ec7..c89b9a99d20 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -24,7 +24,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -35,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.management.InstanceNotFoundException; @@ -127,12 +127,39 @@ public void after() throws SQLException { } @Test - public void createAndDelete() throws Exception { + public void createAndDeleteWithDataQueryModeFunctionWithoutFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.FUNCTION) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 0)); + } + + @Test + public void createAndDeleteWithDataQueryModeFunctionWithFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.FUNCTION) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)); + } + + @Test + public void createAndDeleteWithDataQueryModeDirectWithoutFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 0)); + } + + @Test + public void createAndDeleteWithDataQueryModeDirectWithFetchThreshold() throws Exception { + createAndDelete(builder -> builder + .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3)); + } + + private void createAndDelete(UnaryOperator configAugmenter) throws Exception { final int RECORDS_PER_TABLE = 5; final int TABLES = 2; final int ID_START = 10; - final Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + final Configuration config = configAugmenter.apply(TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)) .build(); start(SqlServerConnector.class, config); @@ -210,6 +237,7 @@ public void createAndDeleteInDataQueryDirectMode() throws Exception { final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT) + .with(SqlServerConnectorConfig.STREAMING_FETCH_SIZE, 3) .build(); start(SqlServerConnector.class, config); @@ -886,14 +914,12 @@ public void verifyOffsets() throws Exception { try { final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); final List ids = new ArrayList<>(); - connection.getChangesForTables(TestHelper.TEST_DATABASE_1, tables, minLsn, maxLsn, resultsets -> { - final ResultSet rs = resultsets[0]; + try (ResultSet rs = connection.getChangesForTable(ct, minLsn, maxLsn)) { while (rs.next()) { ids.add(rs.getInt("id")); } - }); + } if (ids.equals(expectedIds)) { resultMap.put(tableName, true); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java index 961b9e7eab3..e29d873eafc 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java @@ -11,7 +11,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -158,16 +157,14 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); final AtomicReference found = new AtomicReference<>(false); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TestHelper.TEST_DATABASE_1, tables, minLsn, maxLsn, resultsets -> { - final ResultSet rs = resultsets[0]; + try (ResultSet rs = connection.getChangesForTable(ct, minLsn, maxLsn)) { while (rs.next()) { if (rs.getInt("id") == -1) { found.set(true); break; } } - }); + } return found.get(); } catch (Exception e) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index ec8d76c2848..c420f7b6f62 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -582,9 +582,10 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table try { final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); - final CdcRecordFoundBlockingMultiResultSetConsumer consumer = new CdcRecordFoundBlockingMultiResultSetConsumer(handler); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TEST_DATABASE_1, tables, minLsn, maxLsn, consumer); + final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); + try (ResultSet resultSet = connection.getChangesForTable(ct, minLsn, maxLsn)) { + consumer.accept(resultSet); + } return consumer.isFound(); } catch (Exception e) { @@ -637,9 +638,10 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table try { final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); - final CdcRecordFoundBlockingMultiResultSetConsumer consumer = new CdcRecordFoundBlockingMultiResultSetConsumer(handler); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TEST_DATABASE_1, tables, minLsn, maxLsn, consumer); + final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); + try (ResultSet resultSet = connection.getChangesForTable(ct, minLsn, maxLsn)) { + consumer.accept(resultSet); + } return consumer.isFound(); } catch (Exception e) { @@ -687,23 +689,20 @@ public interface CdcRecordHandler { * A multiple result-set consumer used internally by {@link #waitForCdcRecord(SqlServerConnection, String, CdcRecordHandler)} * that allows returning whether the provided {@link CdcRecordHandler} detected the expected condition or not. */ - static class CdcRecordFoundBlockingMultiResultSetConsumer implements JdbcConnection.BlockingMultiResultSetConsumer { + static class CdcRecordFoundBlockingResultSetConsumer implements JdbcConnection.BlockingResultSetConsumer { private final CdcRecordHandler handler; private boolean found; - CdcRecordFoundBlockingMultiResultSetConsumer(CdcRecordHandler handler) { + CdcRecordFoundBlockingResultSetConsumer(CdcRecordHandler handler) { this.handler = handler; } @Override - public void accept(ResultSet[] rs) throws SQLException, InterruptedException { - if (rs.length == 1) { - final ResultSet resultSet = rs[0]; - while (resultSet.next()) { - if (handler.apply(resultSet)) { - this.found = true; - break; - } + public void accept(final ResultSet resultSet) throws SQLException, InterruptedException { + while (resultSet.next()) { + if (handler.apply(resultSet)) { + this.found = true; + break; } } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index b57fd67607a..4776887f131 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -27,16 +27,18 @@ public abstract class ChangeTableResultSet 0; } + public ResultSet getResultSet() { + return resultSet; + } + + protected abstract ResultSet getNextResultSet(T lastChangePositionSeen) throws SQLException; + public boolean next() throws SQLException { - completed = !resultSet.next(); + if (resultSet == null) { + resultSet = getNextResultSet(currentChangePosition); + rowsReadPerResultSet = 0; + } + + if (resultSet.next()) { + rowsReadPerResultSet++; + } + else { + if (maxRowsPerResultSet > 0 && rowsReadPerResultSet > maxRowsPerResultSet) { + throw new RuntimeException("Number of rows read from the result set is greater than the configured max rows per a result set"); + } + + if (maxRowsPerResultSet > 0 && rowsReadPerResultSet == maxRowsPerResultSet) { + close(); + return next(); + } + + completed = true; + } + previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { - LOGGER.trace("Closing result set of change tables for table {}", changeTable); - resultSet.close(); + close(); } return !completed; } + public void close() { + LOGGER.trace("Closing result set of change tables for table {}", changeTable); + try { + if (resultSet != null) { + resultSet.close(); + resultSet = null; + } + } + catch (Exception e) { + // ignore + } + } + /** * Get the column data from the source change table's result-set */ diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index bac1500f133..ab1f3e116ca 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -3310,6 +3310,12 @@ After the next failure, the connector stops, and user intervention is required t |`600000` (10 minutes) |Specifies the time, in milliseconds, that the connector waits for a query to complete. Set the value to `0` (zero) to remove the timeout limit. + +|[[sqlserver-property-streaming-fetch-size]]<> +|`0` +|Specifies the maximum number of rows that should be read in one go from each table while streaming. +The connector will read the table contents in multiple batches of this size. Defaults to `0` which means no limit. + |=== [id="debezium-sqlserver-connector-database-history-configuration-properties"]