From 375dce4c0728b18975cd451e9cb1b3ccd1801f92 Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Fri, 5 Jul 2024 15:28:35 +0200 Subject: [PATCH] CXP-2956: Remove the callback when processing table changes --- .../sqlserver/SqlServerConnection.java | 59 +++-- .../SqlServerStreamingChangeEventSource.java | 210 +++++++++--------- .../sqlserver/SqlServerConnectorIT.java | 7 +- .../sqlserver/TransactionMetadataIT.java | 7 +- .../connector/sqlserver/util/TestHelper.java | 31 ++- .../source/spi/ChangeTableResultSet.java | 13 +- 6 files changed, 163 insertions(+), 164 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 0c2185bfeb9..b8f647be153 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -345,45 +345,38 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. * * @param databaseName - the name of the database to query - * @param changeTables - the requested tables to obtain changes for + * @param changeTable - the requested table to obtain changes for * @param intervalFromLsn - closed lower bound of interval of changes to be provided * @param intervalToLsn - closed upper bound of interval of changes to be provided - * @param consumer - the change processor * @throws SQLException */ - public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn, - Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer) + public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn, + Lsn intervalToLsn) throws SQLException, InterruptedException { - final String[] queries = new String[changeTables.length]; - final StatementPreparer[] preparers = new StatementPreparer[changeTables.length]; - - int idx = 0; - for (SqlServerChangeTable changeTable : changeTables) { - String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") - .collect(Collectors.joining(", ")); - String source = changeTable.getCaptureInstance(); - if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { - source = changeTable.getChangeTableId().table(); - } - final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(TABLE_NAME_PLACEHOLDER, source); - queries[idx] = query; - // If the table was added in the middle of queried buffer we need - // to adjust from to the first LSN available - final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); - LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); - preparers[idx] = statement -> { - if (queryFetchSize > 0) { - statement.setFetchSize(queryFetchSize); - } - statement.setBytes(1, fromLsn.getBinary()); - statement.setBytes(2, intervalToLsn.getBinary()); - }; - - idx++; + String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") + .collect(Collectors.joining(", ")); + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); + } + final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(TABLE_NAME_PLACEHOLDER, source); + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + statement.closeOnCompletion(); + + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); } - prepareQuery(queries, preparers, consumer); + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, intervalToLsn.getBinary()); + + return statement.executeQuery(); } private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 8db2f857490..d7bf65245d3 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.PriorityQueue; import java.util.Queue; @@ -32,6 +33,7 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.Notification; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.spi.ChangeTableResultSet; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.ChangeTable; import io.debezium.relational.Table; @@ -229,120 +231,119 @@ else if (!checkAgent) { tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn)); collectChangeTablesWithKnownStopLsn(partition, tablesSlot.get()); } - try { - dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> { - - long eventSerialNoInInitialTx = 1; - final int tableCount = resultSets.length; - final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount]; - final SqlServerChangeTable[] tables = tablesSlot.get(); - for (int i = 0; i < tableCount; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]); - changeTables[i].next(); - } + SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[]{}; + try { + long eventSerialNoInInitialTx = 1; + final SqlServerChangeTable[] tables = tablesSlot.get(); + changeTables = new SqlServerChangeTablePointer[tables.length]; + + for (int i = 0; i < tables.length; i++) { + ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet); + changeTables[i].next(); + } - for (;;) { - SqlServerChangeTablePointer tableWithSmallestLsn = null; - for (SqlServerChangeTablePointer changeTable : changeTables) { - if (changeTable.isCompleted()) { - continue; - } - if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) { - tableWithSmallestLsn = changeTable; - } + for (;;) { + SqlServerChangeTablePointer tableWithSmallestLsn = null; + for (SqlServerChangeTablePointer changeTable : changeTables) { + if (changeTable.isCompleted()) { + continue; } - if (tableWithSmallestLsn == null) { - // No more LSNs available - break; + if (tableWithSmallestLsn == null || changeTable.compareTo(tableWithSmallestLsn) < 0) { + tableWithSmallestLsn = changeTable; } + } + if (tableWithSmallestLsn == null) { + // No more LSNs available + break; + } - if (!(tableWithSmallestLsn.getChangePosition().isAvailable() && tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable())) { - LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn); - tableWithSmallestLsn.next(); - continue; - } + if (!(tableWithSmallestLsn.getChangePosition().isAvailable() && tableWithSmallestLsn.getChangePosition().getInTxLsn().isAvailable())) { + LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", tableWithSmallestLsn); + tableWithSmallestLsn.next(); + continue; + } - if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { - LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); - changesStoppedBeingMonotonic.set(false); - } + if (tableWithSmallestLsn.isNewTransaction() && changesStoppedBeingMonotonic.get()) { + LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes"); + changesStoppedBeingMonotonic.set(false); + } - // After restart for changes that are not monotonic to avoid data loss - if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { - LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); - changesStoppedBeingMonotonic.set(true); - } + // After restart for changes that are not monotonic to avoid data loss + if (tableWithSmallestLsn.isCurrentPositionSmallerThanPreviousPosition()) { + LOGGER.info("Disabling skipping changes due to not monotonic order of changes"); + changesStoppedBeingMonotonic.set(true); + } - // After restart for changes that were executed before the last committed offset - if (!changesStoppedBeingMonotonic.get() && - tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { - LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, - lastProcessedPositionOnStart); - tableWithSmallestLsn.next(); - continue; - } - // After restart for change that was the last committed and operations in it before the last committed offset - if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 - && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { - LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", - tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); - eventSerialNoInInitialTx++; - tableWithSmallestLsn.next(); - continue; - } - if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && - tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { - LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, - tableWithSmallestLsn.getChangePosition()); - tableWithSmallestLsn.next(); - continue; - } - LOGGER.trace("Processing change {}", tableWithSmallestLsn); - LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints); - if (!schemaChangeCheckpoints.isEmpty()) { - if (tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(schemaChangeCheckpoints.peek().getStartLsn()) >= 0) { - migrateTable(partition, schemaChangeCheckpoints, offsetContext); - } + // After restart for changes that were executed before the last committed offset + if (!changesStoppedBeingMonotonic.get() && + tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) < 0) { + LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", tableWithSmallestLsn, + lastProcessedPositionOnStart); + tableWithSmallestLsn.next(); + continue; + } + // After restart for change that was the last committed and operations in it before the last committed offset + if (!changesStoppedBeingMonotonic.get() && tableWithSmallestLsn.getChangePosition().compareTo(lastProcessedPositionOnStart) == 0 + && eventSerialNoInInitialTx <= lastProcessedEventSerialNoOnStart) { + LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", + tableWithSmallestLsn, eventSerialNoInInitialTx, lastProcessedPositionOnStart, lastProcessedEventSerialNoOnStart); + eventSerialNoInInitialTx++; + tableWithSmallestLsn.next(); + continue; + } + if (tableWithSmallestLsn.getChangeTable().getStopLsn().isAvailable() && + tableWithSmallestLsn.getChangeTable().getStopLsn().compareTo(tableWithSmallestLsn.getChangePosition().getCommitLsn()) <= 0) { + LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", tableWithSmallestLsn, + tableWithSmallestLsn.getChangePosition()); + tableWithSmallestLsn.next(); + continue; + } + LOGGER.trace("Processing change {}", tableWithSmallestLsn); + LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints); + if (!schemaChangeCheckpoints.isEmpty()) { + if (tableWithSmallestLsn.getChangePosition().getCommitLsn().compareTo(schemaChangeCheckpoints.peek().getStartLsn()) >= 0) { + migrateTable(partition, schemaChangeCheckpoints, offsetContext); } - final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId(); - final int operation = tableWithSmallestLsn.getOperation(); - final Object[] data = tableWithSmallestLsn.getData(); - - // UPDATE consists of two consecutive events, first event contains - // the row before it was updated and the second the row after - // it was updated - int eventCount = 1; - if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { - if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { - throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId - + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); - } - eventCount = 2; + } + final TableId tableId = tableWithSmallestLsn.getChangeTable().getSourceTableId(); + final int operation = tableWithSmallestLsn.getOperation(); + final Object[] data = tableWithSmallestLsn.getData(); + + // UPDATE consists of two consecutive events, first event contains + // the row before it was updated and the second the row after + // it was updated + int eventCount = 1; + if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { + if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { + throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); } - final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null; - - final ResultSet resultSet = tableWithSmallestLsn.getResultSet(); - offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount); - offsetContext.event( - tableWithSmallestLsn.getChangeTable().getSourceTableId(), - resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant()); - - dispatcher - .dispatchDataChangeEvent( - partition, - tableId, - new SqlServerChangeRecordEmitter( - partition, - offsetContext, - operation, - data, - dataNext, - clock, - connectorConfig)); - tableWithSmallestLsn.next(); + eventCount = 2; } - }); + final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableWithSmallestLsn.getData() : null; + + final ResultSet resultSet = tableWithSmallestLsn.getResultSet(); + offsetContext.setChangePosition(tableWithSmallestLsn.getChangePosition(), eventCount); + offsetContext.event( + tableWithSmallestLsn.getChangeTable().getSourceTableId(), + resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant()); + + dispatcher + .dispatchDataChangeEvent( + partition, + tableId, + new SqlServerChangeRecordEmitter( + partition, + offsetContext, + operation, + data, + dataNext, + clock, + connectorConfig)); + tableWithSmallestLsn.next(); + } streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn)); // Terminate the transaction otherwise CDC could not be disabled for tables dataConnection.rollback(); @@ -350,6 +351,9 @@ else if (!checkAgent) { catch (SQLException e) { tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get())); } + finally { + Arrays.stream(changeTables).filter(Objects::nonNull).forEach(ChangeTableResultSet::close); + } } } catch (Exception e) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 20c79653492..86b33637aeb 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -24,7 +24,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -882,14 +881,12 @@ public void verifyOffsets() throws Exception { try { final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); final List ids = new ArrayList<>(); - connection.getChangesForTables(TestHelper.TEST_DATABASE_1, tables, minLsn, maxLsn, resultsets -> { - final ResultSet rs = resultsets[0]; + try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) { while (rs.next()) { ids.add(rs.getInt("id")); } - }); + } if (ids.equals(expectedIds)) { resultMap.put(tableName, true); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java index aa05f47d5cf..3d3a7c0faed 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java @@ -11,7 +11,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -159,16 +158,14 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); final AtomicReference found = new AtomicReference<>(false); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TestHelper.TEST_DATABASE_1, tables, minLsn, maxLsn, resultsets -> { - final ResultSet rs = resultsets[0]; + try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) { while (rs.next()) { if (rs.getInt("id") == -1) { found.set(true); break; } } - }); + } return found.get(); } catch (Exception e) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index ec8d76c2848..0a33b1159d5 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -582,9 +582,10 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table try { final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); - final CdcRecordFoundBlockingMultiResultSetConsumer consumer = new CdcRecordFoundBlockingMultiResultSetConsumer(handler); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TEST_DATABASE_1, tables, minLsn, maxLsn, consumer); + final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); + try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) { + consumer.accept(resultSet); + } return consumer.isFound(); } catch (Exception e) { @@ -637,9 +638,10 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table try { final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); - final CdcRecordFoundBlockingMultiResultSetConsumer consumer = new CdcRecordFoundBlockingMultiResultSetConsumer(handler); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TEST_DATABASE_1, tables, minLsn, maxLsn, consumer); + final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); + try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) { + consumer.accept(resultSet); + } return consumer.isFound(); } catch (Exception e) { @@ -687,23 +689,20 @@ public interface CdcRecordHandler { * A multiple result-set consumer used internally by {@link #waitForCdcRecord(SqlServerConnection, String, CdcRecordHandler)} * that allows returning whether the provided {@link CdcRecordHandler} detected the expected condition or not. */ - static class CdcRecordFoundBlockingMultiResultSetConsumer implements JdbcConnection.BlockingMultiResultSetConsumer { + static class CdcRecordFoundBlockingResultSetConsumer implements JdbcConnection.BlockingResultSetConsumer { private final CdcRecordHandler handler; private boolean found; - CdcRecordFoundBlockingMultiResultSetConsumer(CdcRecordHandler handler) { + CdcRecordFoundBlockingResultSetConsumer(CdcRecordHandler handler) { this.handler = handler; } @Override - public void accept(ResultSet[] rs) throws SQLException, InterruptedException { - if (rs.length == 1) { - final ResultSet resultSet = rs[0]; - while (resultSet.next()) { - if (handler.apply(resultSet)) { - this.found = true; - break; - } + public void accept(final ResultSet resultSet) throws SQLException, InterruptedException { + while (resultSet.next()) { + if (handler.apply(resultSet)) { + this.found = true; + break; } } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index b57fd67607a..effbabfb717 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -64,12 +64,21 @@ public boolean next() throws SQLException { previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { - LOGGER.trace("Closing result set of change tables for table {}", changeTable); - resultSet.close(); + close(); } return !completed; } + public void close() { + LOGGER.trace("Closing result set of change tables for table {}", changeTable); + try { + resultSet.close(); + } + catch (Exception e) { + // ignore + } + } + /** * Get the column data from the source change table's result-set */