From 675eba18c8b8618d7554453dfa91fd6914fee935 Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Fri, 5 Jul 2024 15:28:35 +0200 Subject: [PATCH] CXP-2956: Remove the callback when processing table changes --- .../sqlserver/SqlServerConnection.java | 59 +++++++-------- .../SqlServerStreamingChangeEventSource.java | 20 ++++-- .../sqlserver/SqlServerConnectorIT.java | 71 +++++++++---------- .../sqlserver/TransactionMetadataIT.java | 7 +- .../connector/sqlserver/util/TestHelper.java | 31 ++++---- .../source/spi/ChangeTableResultSet.java | 13 +++- 6 files changed, 103 insertions(+), 98 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 0c2185bfeb9..ae9c44bcaa3 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -345,45 +345,40 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. * * @param databaseName - the name of the database to query - * @param changeTables - the requested tables to obtain changes for + * @param changeTable - the requested table to obtain changes for * @param intervalFromLsn - closed lower bound of interval of changes to be provided * @param intervalToLsn - closed upper bound of interval of changes to be provided - * @param consumer - the change processor * @throws SQLException */ - public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn, - Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer) + public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn, + Lsn intervalToLsn) throws SQLException, InterruptedException { - final String[] queries = new String[changeTables.length]; - final StatementPreparer[] preparers = new StatementPreparer[changeTables.length]; - - int idx = 0; - for (SqlServerChangeTable changeTable : changeTables) { - String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") - .collect(Collectors.joining(", ")); - String source = changeTable.getCaptureInstance(); - if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { - source = changeTable.getChangeTableId().table(); - } - final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(TABLE_NAME_PLACEHOLDER, source); - queries[idx] = query; - // If the table was added in the middle of queried buffer we need - // to adjust from to the first LSN available - final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); - LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); - preparers[idx] = statement -> { - if (queryFetchSize > 0) { - statement.setFetchSize(queryFetchSize); - } - statement.setBytes(1, fromLsn.getBinary()); - statement.setBytes(2, intervalToLsn.getBinary()); - }; + String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") + .collect(Collectors.joining(", ")); + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); + } + final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(TABLE_NAME_PLACEHOLDER, source); + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + if (!statement.isCloseOnCompletion()) { + throw new RuntimeException("isCloseOnCompletion is false!!!"); + } - idx++; + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); } - prepareQuery(queries, preparers, consumer); + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, intervalToLsn.getBinary()); + + return statement.executeQuery(); } private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 8db2f857490..df49457dc48 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -229,16 +229,17 @@ else if (!checkAgent) { tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn)); collectChangeTablesWithKnownStopLsn(partition, tablesSlot.get()); } - try { - dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> { + SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[]{}; + try { + { long eventSerialNoInInitialTx = 1; - final int tableCount = resultSets.length; - final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount]; final SqlServerChangeTable[] tables = tablesSlot.get(); + changeTables = new SqlServerChangeTablePointer[tables.length]; - for (int i = 0; i < tableCount; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]); + for (int i = 0; i < tables.length; i++) { + ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet); changeTables[i].next(); } @@ -342,7 +343,7 @@ else if (!checkAgent) { connectorConfig)); tableWithSmallestLsn.next(); } - }); + } streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn)); // Terminate the transaction otherwise CDC could not be disabled for tables dataConnection.rollback(); @@ -350,6 +351,11 @@ else if (!checkAgent) { catch (SQLException e) { tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get())); } + finally { + for (SqlServerChangeTablePointer t : changeTables) { + t.close(); + } + } } } catch (Exception e) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 20c79653492..94e1640f05d 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -19,12 +19,12 @@ import static org.junit.Assert.assertNull; import java.io.IOException; +import java.nio.file.Files; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -48,6 +48,7 @@ import org.awaitility.Awaitility; import org.jetbrains.annotations.NotNull; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; @@ -113,7 +114,7 @@ public void before() throws SQLException { TestHelper.enableTableCdc(connection, "tableb"); initializeConnectorTestFramework(); - Testing.Files.delete(SCHEMA_HISTORY_PATH); + Files.delete(SCHEMA_HISTORY_PATH); // Testing.Print.enable(); } @@ -299,7 +300,7 @@ public void readOnlyApplicationIntent() throws Exception { TestHelper.waitForSnapshotToBeCompleted(); consumeRecordsByTopic(1); - TestHelper.waitForStreamingStarted(); + waitForStreamingStarted(); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START + i; connection.execute( @@ -882,14 +883,12 @@ public void verifyOffsets() throws Exception { try { final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); final List ids = new ArrayList<>(); - connection.getChangesForTables(TestHelper.TEST_DATABASE_1, tables, minLsn, maxLsn, resultsets -> { - final ResultSet rs = resultsets[0]; + try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) { while (rs.next()) { ids.add(rs.getInt("id")); } - }); + } if (ids.equals(expectedIds)) { resultMap.put(tableName, true); } @@ -898,7 +897,7 @@ public void verifyOffsets() throws Exception { } } catch (Exception e) { - org.junit.Assert.fail("Failed to fetch changes for table " + tableName + ": " + e.getMessage()); + Assert.fail("Failed to fetch changes for table " + tableName + ": " + e.getMessage()); } } }); @@ -1503,7 +1502,7 @@ public void whenCaptureInstanceExcludesColumnsAndColumnsRenamedExpectNoErrors() start(SqlServerConnector.class, config); assertConnectorIsRunning(); - TestHelper.waitForStreamingStarted(); + waitForStreamingStarted(); TestHelper.disableTableCdc(connection, "excluded_column_table_a"); connection.execute("EXEC sp_RENAME 'excluded_column_table_a.name', 'first_name', 'COLUMN'"); @@ -1561,7 +1560,7 @@ public void excludeColumnWhenCaptureInstanceExcludesColumns() throws Exception { start(SqlServerConnector.class, config); assertConnectorIsRunning(); - TestHelper.waitForStreamingStarted(); + waitForStreamingStarted(); connection.execute("INSERT INTO excluded_column_table_a VALUES(10, 'some_name', 120)"); final SourceRecords records = consumeRecordsByTopic(1); @@ -1824,7 +1823,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final List expectedRow = Arrays.asList( new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a")); - assertRecord(((Struct) records.allRecordsInOrder().get(0).value()).getStruct(Envelope.FieldName.AFTER), expectedRow); + assertRecord(((Struct) records.allRecordsInOrder().get(0).value()).getStruct(AFTER), expectedRow); } connection.setAutoCommit(false); @@ -2021,7 +2020,7 @@ public void keylessTable() throws Exception { assertThat(update1.key()).isNull(); assertThat(update1.keySchema()).isNull(); assertRecord(((Struct) update1.value()).getStruct(Envelope.FieldName.BEFORE), key2); - assertRecord(((Struct) update1.value()).getStruct(Envelope.FieldName.AFTER), key3); + assertRecord(((Struct) update1.value()).getStruct(AFTER), key3); connection.execute( "DELETE FROM keyless WHERE id=3"); @@ -2154,7 +2153,7 @@ public void shouldDetectPurgedHistory() throws Exception { connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')"); } - Testing.Files.delete(SCHEMA_HISTORY_PATH); + Files.delete(SCHEMA_HISTORY_PATH); final LogInterceptor logInterceptor = new LogInterceptor(SqlServerConnectorIT.class); start(SqlServerConnector.class, config); @@ -2704,14 +2703,14 @@ public void shouldIgnoreNullOffsetsWhenRecoveringHistory() { TestHelper.waitForDatabaseSnapshotToBeCompleted(TestHelper.TEST_DATABASE_1); stopConnector(); - TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_2); + TestHelper.createTestDatabases(TEST_DATABASE_2); final Configuration config2 = TestHelper.defaultConfig( - TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2) + TestHelper.TEST_DATABASE_1, TEST_DATABASE_2) .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) .build(); start(SqlServerConnector.class, config2); assertConnectorIsRunning(); - TestHelper.waitForDatabaseSnapshotToBeCompleted(TestHelper.TEST_DATABASE_2); + TestHelper.waitForDatabaseSnapshotToBeCompleted(TEST_DATABASE_2); stopConnector(); } @@ -2895,15 +2894,15 @@ public void shouldStreamToNewTableAfterRestart() throws Exception { @Test public void shouldIgnoreOfflineAndNonExistingDatabases() throws Exception { - TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2); + TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TEST_DATABASE_2); final Map props = TestHelper.defaultConnectorConfig() .with(SqlServerConnectorConfig.DATABASE_NAMES.name(), TestHelper.TEST_DATABASE_1 + "," - + TestHelper.TEST_DATABASE_2 + ",non-existing-database") + + TEST_DATABASE_2 + ",non-existing-database") .build() .asMap(); final LogInterceptor logInterceptor = new LogInterceptor(SqlServerConnection.class); - connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + " SET OFFLINE"); + connection.execute("ALTER DATABASE " + TEST_DATABASE_2 + " SET OFFLINE"); try { SqlServerConnector connector = new SqlServerConnector(); @@ -2919,20 +2918,20 @@ public void shouldIgnoreOfflineAndNonExistingDatabases() throws Exception { final String message1 = "Database non-existing-database does not exist"; assertThat(logInterceptor.containsMessage(message1)).isTrue(); - final String message2 = "Database " + TestHelper.TEST_DATABASE_2 + " is not online (state_desc = OFFLINE)"; + final String message2 = "Database " + TEST_DATABASE_2 + " is not online (state_desc = OFFLINE)"; assertThat(logInterceptor.containsMessage(message2)).isTrue(); } finally { // Set the database back online, since otherwise, it will be impossible to create it again // https://docs.microsoft.com/en-us/sql/t-sql/statements/drop-database-transact-sql?view=sql-server-ver15#general-remarks - connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + " SET ONLINE"); + connection.execute("ALTER DATABASE " + TEST_DATABASE_2 + " SET ONLINE"); } } @Test public void shouldStopRetriableRestartsAtConfiguredMaximumDuringSnapshot() throws Exception { shouldStopRetriableRestartsAtConfiguredMaximum(() -> { - connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + " SET OFFLINE WITH ROLLBACK IMMEDIATE"); + connection.execute("ALTER DATABASE " + TEST_DATABASE_2 + " SET OFFLINE WITH ROLLBACK IMMEDIATE"); TestHelper.waitForDatabaseSnapshotToBeCompleted(TestHelper.TEST_DATABASE_1); }); } @@ -2940,8 +2939,8 @@ public void shouldStopRetriableRestartsAtConfiguredMaximumDuringSnapshot() throw @Test public void shouldStopRetriableRestartsAtConfiguredMaximumDuringStreaming() throws Exception { shouldStopRetriableRestartsAtConfiguredMaximum(() -> { - TestHelper.waitForStreamingStarted(); - connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + waitForStreamingStarted(); + connection.execute("ALTER DATABASE " + TEST_DATABASE_2 + " SET OFFLINE WITH ROLLBACK IMMEDIATE"); }); } @@ -2968,7 +2967,7 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception { start(SqlServerConnector.class, config); - TestHelper.waitForStreamingStarted(); + waitForStreamingStarted(); int expectedRecordCount = 2; SourceRecords sourceRecords = consumeRecordsByTopic(expectedRecordCount); @@ -2986,7 +2985,7 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception { connection.execute("INSERT INTO always_snapshot VALUES (3,'Test3');"); start(SqlServerConnector.class, config); - TestHelper.waitForStreamingStarted(); + waitForStreamingStarted(); sourceRecords = consumeRecordsByTopic(expectedRecordCount); // Check we get up-to-date data in the snapshot. @@ -3010,7 +3009,7 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception { public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { Configuration.Builder builder = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.tablea") .with(SqlServerConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()); @@ -3040,7 +3039,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { @Test public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLException, InterruptedException { - Testing.Files.delete(SCHEMA_HISTORY_PATH); + Files.delete(SCHEMA_HISTORY_PATH); purgeDatabaseLogs(); @@ -3114,7 +3113,7 @@ public void shouldAllowForCustomSnapshot() throws InterruptedException, SQLExcep final String pkField = "id"; Configuration config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.CUSTOM.getValue()) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) .with(SqlServerConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) .with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM) .with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) @@ -3156,7 +3155,7 @@ record = s2recs.get(0); stopConnector(); config = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.CUSTOM.getValue()) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) .with(SqlServerConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) .with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM) .with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) @@ -3180,7 +3179,7 @@ record = s2recs.get(0); @FixFor("DBZ-7593") public void shouldCaptureTableSchemaForAllTablesIncludingNonCaptured() throws Exception { Configuration.Builder builder = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.tablea") .with(SqlServerConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, "false") .with(SqlServerConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true"); @@ -3219,7 +3218,7 @@ public void shouldCaptureTableSchemaForAllTablesIncludingNonCaptured() throws Ex @FixFor("DBZ-7593") public void shouldOnlyCaptureTableSchemaForIncluded() throws Exception { Configuration.Builder builder = TestHelper.defaultConfig() - .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.tablea") .with(SqlServerConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, "true") .with(SqlServerConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true"); @@ -3261,7 +3260,7 @@ private void purgeDatabaseLogs() throws SQLException { } private void shouldStopRetriableRestartsAtConfiguredMaximum(SqlRunnable scenario) throws Exception { - TestHelper.createTestDatabase(TestHelper.TEST_DATABASE_2); + TestHelper.createTestDatabase(TEST_DATABASE_2); connection = TestHelper.testConnection(TEST_DATABASE_2); connection.execute( "CREATE TABLE tablea (id int primary key, cola varchar(30))", @@ -3269,10 +3268,10 @@ private void shouldStopRetriableRestartsAtConfiguredMaximum(SqlRunnable scenario "INSERT INTO tablea VALUES(1, 'a')"); TestHelper.enableTableCdc(connection, "tablea"); TestHelper.enableTableCdc(connection, "tableb"); - Testing.Files.delete(SCHEMA_HISTORY_PATH); + Files.delete(SCHEMA_HISTORY_PATH); final Configuration config1 = TestHelper.defaultConnectorConfig() - .with(SqlServerConnectorConfig.DATABASE_NAMES.name(), TestHelper.TEST_DATABASE_1 + "," + TestHelper.TEST_DATABASE_2) + .with(SqlServerConnectorConfig.DATABASE_NAMES.name(), TestHelper.TEST_DATABASE_1 + "," + TEST_DATABASE_2) .with("errors.max.retries", 1) .build(); final LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class); @@ -3295,7 +3294,7 @@ private void shouldStopRetriableRestartsAtConfiguredMaximum(SqlRunnable scenario // Set the database back online, since otherwise, it will be impossible to create it again // https://docs.microsoft.com/en-us/sql/t-sql/statements/drop-database-transact-sql?view=sql-server-ver15#general-remarks try { - connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + " SET ONLINE"); + connection.execute("ALTER DATABASE " + TEST_DATABASE_2 + " SET ONLINE"); } catch (SQLException e) { Testing.print("Exception while setting database online " + e.getMessage()); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java index aa05f47d5cf..3d3a7c0faed 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java @@ -11,7 +11,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -159,16 +158,14 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); final AtomicReference found = new AtomicReference<>(false); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TestHelper.TEST_DATABASE_1, tables, minLsn, maxLsn, resultsets -> { - final ResultSet rs = resultsets[0]; + try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) { while (rs.next()) { if (rs.getInt("id") == -1) { found.set(true); break; } } - }); + } return found.get(); } catch (Exception e) { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index ec8d76c2848..0a33b1159d5 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -582,9 +582,10 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table try { final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); - final CdcRecordFoundBlockingMultiResultSetConsumer consumer = new CdcRecordFoundBlockingMultiResultSetConsumer(handler); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TEST_DATABASE_1, tables, minLsn, maxLsn, consumer); + final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); + try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) { + consumer.accept(resultSet); + } return consumer.isFound(); } catch (Exception e) { @@ -637,9 +638,10 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table try { final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); - final CdcRecordFoundBlockingMultiResultSetConsumer consumer = new CdcRecordFoundBlockingMultiResultSetConsumer(handler); - SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); - connection.getChangesForTables(TEST_DATABASE_1, tables, minLsn, maxLsn, consumer); + final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); + try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) { + consumer.accept(resultSet); + } return consumer.isFound(); } catch (Exception e) { @@ -687,23 +689,20 @@ public interface CdcRecordHandler { * A multiple result-set consumer used internally by {@link #waitForCdcRecord(SqlServerConnection, String, CdcRecordHandler)} * that allows returning whether the provided {@link CdcRecordHandler} detected the expected condition or not. */ - static class CdcRecordFoundBlockingMultiResultSetConsumer implements JdbcConnection.BlockingMultiResultSetConsumer { + static class CdcRecordFoundBlockingResultSetConsumer implements JdbcConnection.BlockingResultSetConsumer { private final CdcRecordHandler handler; private boolean found; - CdcRecordFoundBlockingMultiResultSetConsumer(CdcRecordHandler handler) { + CdcRecordFoundBlockingResultSetConsumer(CdcRecordHandler handler) { this.handler = handler; } @Override - public void accept(ResultSet[] rs) throws SQLException, InterruptedException { - if (rs.length == 1) { - final ResultSet resultSet = rs[0]; - while (resultSet.next()) { - if (handler.apply(resultSet)) { - this.found = true; - break; - } + public void accept(final ResultSet resultSet) throws SQLException, InterruptedException { + while (resultSet.next()) { + if (handler.apply(resultSet)) { + this.found = true; + break; } } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index b57fd67607a..effbabfb717 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -64,12 +64,21 @@ public boolean next() throws SQLException { previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { - LOGGER.trace("Closing result set of change tables for table {}", changeTable); - resultSet.close(); + close(); } return !completed; } + public void close() { + LOGGER.trace("Closing result set of change tables for table {}", changeTable); + try { + resultSet.close(); + } + catch (Exception e) { + // ignore + } + } + /** * Get the column data from the source change table's result-set */