From 7c37eaedb6b84c7c3ecd87f7c46f9c4f778efb94 Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Fri, 5 Jul 2024 15:28:35 +0200 Subject: [PATCH] CXP-2956: Remove the callback when processing table changes --- .../sqlserver/SqlServerConnection.java | 60 +++++++++---------- .../SqlServerStreamingChangeEventSource.java | 20 ++++--- .../source/spi/ChangeTableResultSet.java | 12 +++- 3 files changed, 50 insertions(+), 42 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 0c2185bfeb9..8a2cfb831af 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -345,45 +345,39 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. * * @param databaseName - the name of the database to query - * @param changeTables - the requested tables to obtain changes for + * @param changeTable - the requested table to obtain changes for * @param intervalFromLsn - closed lower bound of interval of changes to be provided * @param intervalToLsn - closed upper bound of interval of changes to be provided - * @param consumer - the change processor * @throws SQLException */ - public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn, - Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer) - throws SQLException, InterruptedException { - final String[] queries = new String[changeTables.length]; - final StatementPreparer[] preparers = new StatementPreparer[changeTables.length]; - - int idx = 0; - for (SqlServerChangeTable changeTable : changeTables) { - String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") - .collect(Collectors.joining(", ")); - String source = changeTable.getCaptureInstance(); - if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { - source = changeTable.getChangeTableId().table(); + public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn, + Lsn intervalToLsn) throws SQLException, InterruptedException { + String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") + .collect(Collectors.joining(", ")); + String source = changeTable.getCaptureInstance(); + if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) { + source = changeTable.getChangeTableId().table(); + } + final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) + .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) + .replace(TABLE_NAME_PLACEHOLDER, source); + // If the table was added in the middle of queried buffer we need + // to adjust from to the first LSN available + final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); + LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); + + PreparedStatement statement = connection().prepareStatement(query); + if (!statement.isCloseOnCompletion()) { + throw new RuntimeException("isCloseOnCompletion is false!!!"); + } + + if (queryFetchSize > 0) { + statement.setFetchSize(queryFetchSize); } - final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName) - .replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns)) - .replace(TABLE_NAME_PLACEHOLDER, source); - queries[idx] = query; - // If the table was added in the middle of queried buffer we need - // to adjust from to the first LSN available - final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn); - LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn); - preparers[idx] = statement -> { - if (queryFetchSize > 0) { - statement.setFetchSize(queryFetchSize); - } - statement.setBytes(1, fromLsn.getBinary()); - statement.setBytes(2, intervalToLsn.getBinary()); - }; + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, intervalToLsn.getBinary()); - idx++; - } - prepareQuery(queries, preparers, consumer); + return statement.executeQuery(); } private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 8db2f857490..df49457dc48 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -229,16 +229,17 @@ else if (!checkAgent) { tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn)); collectChangeTablesWithKnownStopLsn(partition, tablesSlot.get()); } - try { - dataConnection.getChangesForTables(databaseName, tablesSlot.get(), fromLsn, toLsn, resultSets -> { + SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[]{}; + try { + { long eventSerialNoInInitialTx = 1; - final int tableCount = resultSets.length; - final SqlServerChangeTablePointer[] changeTables = new SqlServerChangeTablePointer[tableCount]; final SqlServerChangeTable[] tables = tablesSlot.get(); + changeTables = new SqlServerChangeTablePointer[tables.length]; - for (int i = 0; i < tableCount; i++) { - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSets[i]); + for (int i = 0; i < tables.length; i++) { + ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet); changeTables[i].next(); } @@ -342,7 +343,7 @@ else if (!checkAgent) { connectorConfig)); tableWithSmallestLsn.next(); } - }); + } streamingExecutionContext.setLastProcessedPosition(TxLogPosition.valueOf(toLsn)); // Terminate the transaction otherwise CDC could not be disabled for tables dataConnection.rollback(); @@ -350,6 +351,11 @@ else if (!checkAgent) { catch (SQLException e) { tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get())); } + finally { + for (SqlServerChangeTablePointer t : changeTables) { + t.close(); + } + } } } catch (Exception e) { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index b57fd67607a..853b2027f40 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -64,12 +64,20 @@ public boolean next() throws SQLException { previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet); if (completed) { - LOGGER.trace("Closing result set of change tables for table {}", changeTable); - resultSet.close(); + close(); } return !completed; } + public void close() { + LOGGER.trace("Closing result set of change tables for table {}", changeTable); + try { + resultSet.close(); + } catch (Exception e) { + // ignore + } + } + /** * Get the column data from the source change table's result-set */