From f3886e74f48ef9255cc5e9ced2bcec488f0807bf Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Wed, 10 Jul 2024 17:29:19 +0200 Subject: [PATCH] CXP-2956: Query table changes from within SqlServerChangeTablePointer --- .../SqlServerChangeTablePointer.java | 25 +++++++++++-------- .../sqlserver/SqlServerConnection.java | 5 ++-- .../SqlServerStreamingChangeEventSource.java | 3 +-- .../sqlserver/SqlServerConnectorIT.java | 2 +- .../sqlserver/TransactionMetadataIT.java | 2 +- .../connector/sqlserver/util/TestHelper.java | 4 +-- .../source/spi/ChangeTableResultSet.java | 14 ++++++++--- 7 files changed, 34 insertions(+), 21 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java index 8191677d1e7..ec8d37f06e5 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java @@ -46,18 +46,18 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet resultSetMapper; - private final ResultSet resultSet; private final int columnDataOffset; + private final SqlServerConnection connection; + private final Lsn fromLsn; + private final Lsn toLsn; - public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) { - super(changeTable, resultSet, COL_DATA); + public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn) { + super(changeTable, COL_DATA); // Store references to these because we can't get them from our superclass - this.resultSet = resultSet; this.columnDataOffset = COL_DATA; - } - - protected ResultSet getResultSet() { - return resultSet; + this.connection = connection; + this.fromLsn = fromLsn; + this.toLsn = toLsn; } @Override @@ -89,12 +89,17 @@ protected boolean isNewTransaction() throws SQLException { getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0; } + @Override + protected ResultSet getNextResultSet() throws SQLException { + return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn); + } + @Override public Object[] getData() throws SQLException { if (resultSetMapper == null) { this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable()); } - return resultSetMapper.apply(resultSet); + return resultSetMapper.apply(getResultSet()); } /** @@ -110,7 +115,7 @@ public Object[] getData() throws SQLException { */ private ResultSetMapper createResultSetMapper(Table table) throws SQLException { ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table); - final ResultSetMetaData rsmd = resultSet.getMetaData(); + final ResultSetMetaData rsmd = getResultSet().getMetaData(); final int columnCount = rsmd.getColumnCount() - columnDataOffset; final List resultColumns = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; ++i) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index b8f647be153..44c563930ed 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -350,9 +350,10 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce * @param intervalToLsn - closed upper bound of interval of changes to be provided * @throws SQLException */ - public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn, + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn) - throws SQLException, InterruptedException { + throws SQLException { + String databaseName = changeTable.getSourceTableId().catalog(); String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") .collect(Collectors.joining(", ")); String source = changeTable.getCaptureInstance(); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index d7bf65245d3..3ab1b261249 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -239,8 +239,7 @@ else if (!checkAgent) { changeTables = new SqlServerChangeTablePointer[tables.length]; for (int i = 0; i < tables.length; i++) { - ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn); - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn); changeTables[i].next(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 86b33637aeb..891170c1006 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -882,7 +882,7 @@ public void verifyOffsets() throws Exception { final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); final List ids = new ArrayList<>(); - try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) { + try (ResultSet rs = connection.getChangesForTable(ct, minLsn, maxLsn)) { while (rs.next()) { ids.add(rs.getInt("id")); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java index 3d3a7c0faed..0d20cfab39c 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TransactionMetadataIT.java @@ -158,7 +158,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final Lsn minLsn = connection.getMinLsn(TestHelper.TEST_DATABASE_1, tableName); final Lsn maxLsn = connection.getMaxLsn(TestHelper.TEST_DATABASE_1); final AtomicReference found = new AtomicReference<>(false); - try (ResultSet rs = connection.getChangesForTable(TestHelper.TEST_DATABASE_1, ct, minLsn, maxLsn)) { + try (ResultSet rs = connection.getChangesForTable(ct, minLsn, maxLsn)) { while (rs.next()) { if (rs.getInt("id") == -1) { found.set(true); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index 0a33b1159d5..c420f7b6f62 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -583,7 +583,7 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); - try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) { + try (ResultSet resultSet = connection.getChangesForTable(ct, minLsn, maxLsn)) { consumer.accept(resultSet); } return consumer.isFound(); @@ -639,7 +639,7 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table final Lsn minLsn = connection.getMinLsn(TEST_DATABASE_1, ctTableName); final Lsn maxLsn = connection.getMaxLsn(TEST_DATABASE_1); final CdcRecordFoundBlockingResultSetConsumer consumer = new CdcRecordFoundBlockingResultSetConsumer(handler); - try (ResultSet resultSet = connection.getChangesForTable(TEST_DATABASE_1, ct, minLsn, maxLsn)) { + try (ResultSet resultSet = connection.getChangesForTable(ct, minLsn, maxLsn)) { consumer.accept(resultSet); } return consumer.isFound(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index effbabfb717..0fd4e390c25 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -27,15 +27,14 @@ public abstract class ChangeTableResultSet 0; } + public ResultSet getResultSet() { + return resultSet; + } + + protected abstract ResultSet getNextResultSet() throws SQLException; + public boolean next() throws SQLException { + if (resultSet == null) { + resultSet = getNextResultSet(); + } completed = !resultSet.next(); previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet);