From 536a43d36a15396d3ceb1c98ab079d11b2eff23d Mon Sep 17 00:00:00 2001 From: Vadzim Ramanenka Date: Wed, 10 Jul 2024 17:29:19 +0200 Subject: [PATCH] CXP-2956: Query table changes from within SqlServerChangeTablePointer --- .../SqlServerChangeTablePointer.java | 25 +++++++++++-------- .../sqlserver/SqlServerConnection.java | 5 ++-- .../SqlServerStreamingChangeEventSource.java | 3 +-- .../source/spi/ChangeTableResultSet.java | 14 ++++++++--- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java index 8191677d1e7..ec8d37f06e5 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeTablePointer.java @@ -46,18 +46,18 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet resultSetMapper; - private final ResultSet resultSet; private final int columnDataOffset; + private final SqlServerConnection connection; + private final Lsn fromLsn; + private final Lsn toLsn; - public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) { - super(changeTable, resultSet, COL_DATA); + public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn) { + super(changeTable, COL_DATA); // Store references to these because we can't get them from our superclass - this.resultSet = resultSet; this.columnDataOffset = COL_DATA; - } - - protected ResultSet getResultSet() { - return resultSet; + this.connection = connection; + this.fromLsn = fromLsn; + this.toLsn = toLsn; } @Override @@ -89,12 +89,17 @@ protected boolean isNewTransaction() throws SQLException { getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0; } + @Override + protected ResultSet getNextResultSet() throws SQLException { + return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn); + } + @Override public Object[] getData() throws SQLException { if (resultSetMapper == null) { this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable()); } - return resultSetMapper.apply(resultSet); + return resultSetMapper.apply(getResultSet()); } /** @@ -110,7 +115,7 @@ public Object[] getData() throws SQLException { */ private ResultSetMapper createResultSetMapper(Table table) throws SQLException { ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table); - final ResultSetMetaData rsmd = resultSet.getMetaData(); + final ResultSetMetaData rsmd = getResultSet().getMetaData(); final int columnCount = rsmd.getColumnCount() - columnDataOffset; final List resultColumns = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; ++i) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index b8f647be153..44c563930ed 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -350,9 +350,10 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce * @param intervalToLsn - closed upper bound of interval of changes to be provided * @throws SQLException */ - public ResultSet getChangesForTable(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn, + public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn) - throws SQLException, InterruptedException { + throws SQLException { + String databaseName = changeTable.getSourceTableId().catalog(); String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]") .collect(Collectors.joining(", ")); String source = changeTable.getCaptureInstance(); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index d7bf65245d3..3ab1b261249 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -239,8 +239,7 @@ else if (!checkAgent) { changeTables = new SqlServerChangeTablePointer[tables.length]; for (int i = 0; i < tables.length; i++) { - ResultSet resultSet = dataConnection.getChangesForTable(databaseName, tables[i], fromLsn, toLsn); - changeTables[i] = new SqlServerChangeTablePointer(tables[i], resultSet); + changeTables[i] = new SqlServerChangeTablePointer(tables[i], dataConnection, fromLsn, toLsn); changeTables[i].next(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java index effbabfb717..0fd4e390c25 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeTableResultSet.java @@ -27,15 +27,14 @@ public abstract class ChangeTableResultSet 0; } + public ResultSet getResultSet() { + return resultSet; + } + + protected abstract ResultSet getNextResultSet() throws SQLException; + public boolean next() throws SQLException { + if (resultSet == null) { + resultSet = getNextResultSet(); + } completed = !resultSet.next(); previousChangePosition = currentChangePosition; currentChangePosition = getNextChangePosition(resultSet);