Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXP-2956: Implement keyset pagination in debezium sql server connector (2nd iteration) #157

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class Lsn implements Comparable<Lsn>, Nullable {

public static final Lsn NULL = new Lsn(null);

public static final Lsn ZERO = valueOf(new byte[10]);

private final byte[] binary;
private int[] unsignedBinary;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ public class SqlServerChangeTablePointer extends ChangeTableResultSet<SqlServerC
private static final int COL_DATA = 5;

private ResultSetMapper<Object[]> resultSetMapper;
private final ResultSet resultSet;
private final int columnDataOffset;
private final SqlServerConnection connection;
private final Lsn fromLsn;
private final Lsn toLsn;
private final int maxRowsPerResultSet;

public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, ResultSet resultSet) {
super(changeTable, resultSet, COL_DATA);
public SqlServerChangeTablePointer(SqlServerChangeTable changeTable, SqlServerConnection connection, Lsn fromLsn, Lsn toLsn, int maxRowsPerResultSet) {
super(changeTable, COL_DATA, maxRowsPerResultSet);
// Store references to these because we can't get them from our superclass
this.resultSet = resultSet;
this.columnDataOffset = COL_DATA;
}

protected ResultSet getResultSet() {
return resultSet;
this.connection = connection;
this.fromLsn = fromLsn;
this.toLsn = toLsn;
this.maxRowsPerResultSet = maxRowsPerResultSet;
}

@Override
Expand All @@ -76,7 +78,10 @@ protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLE
@Override
protected TxLogPosition getNextChangePosition(ResultSet resultSet) throws SQLException {
return isCompleted() ? TxLogPosition.NULL
: TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)), Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)));
: TxLogPosition.valueOf(
Lsn.valueOf(resultSet.getBytes(COL_COMMIT_LSN)),
Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)),
resultSet.getInt(COL_OPERATION));
}

/**
Expand All @@ -89,12 +94,23 @@ protected boolean isNewTransaction() throws SQLException {
getChangePosition().getCommitLsn().compareTo(getPreviousChangePosition().getCommitLsn()) > 0;
}

@Override
protected ResultSet getNextResultSet(TxLogPosition lastPositionSeen) throws SQLException {
if (lastPositionSeen == null || lastPositionSeen.equals(TxLogPosition.NULL)) {
return connection.getChangesForTable(getChangeTable(), fromLsn, toLsn, maxRowsPerResultSet);
}
else {
return connection.getChangesForTable(getChangeTable(), lastPositionSeen.getCommitLsn(), lastPositionSeen.getInTxLsn(), lastPositionSeen.getOperation(),
toLsn, maxRowsPerResultSet);
}
}

@Override
public Object[] getData() throws SQLException {
if (resultSetMapper == null) {
this.resultSetMapper = createResultSetMapper(getChangeTable().getSourceTable());
}
return resultSetMapper.apply(resultSet);
return resultSetMapper.apply(getResultSet());
}

/**
Expand All @@ -110,7 +126,7 @@ public Object[] getData() throws SQLException {
*/
private ResultSetMapper<Object[]> createResultSetMapper(Table table) throws SQLException {
ColumnUtils.MappedColumns columnMap = ColumnUtils.toMap(table);
final ResultSetMetaData rsmd = resultSet.getMetaData();
final ResultSetMetaData rsmd = getResultSet().getMetaData();
final int columnCount = rsmd.getColumnCount() - columnDataOffset;
final List<String> resultColumns = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ private String buildGetAllChangesForTableQuery(SqlServerConnectorConfig.DataQuer
break;
case DIRECT:
result += GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT + " ";
where.add("[__$start_lsn] >= ?");
where.add("[__$start_lsn] <= ?");
break;
}
where.add("(([__$start_lsn] = ? AND [__$seqval] = ? AND [__$operation] > ?) " +
"OR ([__$start_lsn] = ? AND [__$seqval] > ?) " +
"OR ([__$start_lsn] > ?))");
where.add("[__$start_lsn] <= ?");

if (hasSkippedOperations(skippedOperations)) {
Set<String> skippedOps = new HashSet<>();
Expand Down Expand Up @@ -344,51 +346,74 @@ public Lsn getMinLsn(String databaseName, String changeTableName) throws SQLExce
/**
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param databaseName - the name of the database to query
* @param changeTables - the requested tables to obtain changes for
* @param changeTable - the requested table to obtain changes for
* @param intervalFromLsn - closed lower bound of interval of changes to be provided
* @param seqvalFromLsn - in-transaction sequence value to start after, pass {@link Lsn#ZERO} to fetch all sequence values
* @param operationFrom - operation number to start after, pass 0 to fetch all operations
* @param intervalToLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @param maxRows - the max number of rows to return, pass 0 for no limit
* @throws SQLException
*/
public void getChangesForTables(String databaseName, SqlServerChangeTable[] changeTables, Lsn intervalFromLsn,
Lsn intervalToLsn, BlockingMultiResultSetConsumer consumer)
throws SQLException, InterruptedException {
final String[] queries = new String[changeTables.length];
final StatementPreparer[] preparers = new StatementPreparer[changeTables.length];

int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(databaseName, changeTable, intervalFromLsn);
LOGGER.trace("Getting changes for table {} in range[{}, {}]", changeTable, fromLsn, intervalToLsn);
preparers[idx] = statement -> {
if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, intervalToLsn.getBinary());
};
public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn seqvalFromLsn, int operationFrom,
Lsn intervalToLsn, int maxRows)
throws SQLException {
String databaseName = changeTable.getSourceTableId().catalog();
String capturedColumns = changeTable.getCapturedColumns().stream().map(c -> "[" + c + "]")
.collect(Collectors.joining(", "));

idx++;
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
prepareQuery(queries, preparers, consumer);

String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(TABLE_NAME_PLACEHOLDER, source);

if (maxRows > 0) {
query = query.replace("SELECT ", String.format("SELECT TOP %d ", maxRows));
}

// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
final Lsn fromLsn = getFromLsn(changeTable, intervalFromLsn);
LOGGER.trace("Getting {} changes for table {} in range [{}-{}-{}, {}]", maxRows > 0 ? "top " + maxRows : "", changeTable, fromLsn, seqvalFromLsn, operationFrom,
intervalToLsn);

PreparedStatement statement = connection().prepareStatement(query);
statement.closeOnCompletion();

if (queryFetchSize > 0) {
statement.setFetchSize(queryFetchSize);
}

int paramIndex = 1;
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.FUNCTION) {
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, intervalToLsn.getBinary());
}
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, seqvalFromLsn.getBinary());
statement.setInt(paramIndex++, operationFrom);
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, seqvalFromLsn.getBinary());
statement.setBytes(paramIndex++, fromLsn.getBinary());
statement.setBytes(paramIndex++, intervalToLsn.getBinary());

return statement.executeQuery();
}

public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn, int maxRows) throws SQLException {
return getChangesForTable(changeTable, intervalFromLsn, Lsn.ZERO, 0, intervalToLsn, maxRows);
}

public ResultSet getChangesForTable(SqlServerChangeTable changeTable, Lsn intervalFromLsn, Lsn intervalToLsn) throws SQLException {
return getChangesForTable(changeTable, intervalFromLsn, intervalToLsn, 0);
}

private Lsn getFromLsn(String databaseName, SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
private Lsn getFromLsn(SqlServerChangeTable changeTable, Lsn intervalFromLsn) throws SQLException {
Lsn fromLsn = changeTable.getStartLsn().compareTo(intervalFromLsn) > 0 ? changeTable.getStartLsn() : intervalFromLsn;
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(databaseName, changeTable.getCaptureInstance());
return fromLsn.getBinary() != null ? fromLsn : getMinLsn(changeTable.getSourceTableId().catalog(), changeTable.getCaptureInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,14 @@ public static DataQueryMode parse(String value, String defaultValue) {
+ "The value of '" + DataQueryMode.DIRECT.getValue()
+ "' makes the connector to query the change tables directly.");

public static final Field STREAMING_FETCH_SIZE = Field.create("streaming.fetch.size")
.withDisplayName("Streaming fetch size")
.withDefault(0)
.withType(Type.INT)
.withImportance(Importance.LOW)
.withDescription("Specifies the maximum number of rows that should be read in one go from each table while streaming. "
+ "The connector will read the table contents in multiple batches of this size. Defaults to 0 which means no limit.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
Expand All @@ -497,7 +505,8 @@ public static DataQueryMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
QUERY_FETCH_SIZE,
DATA_QUERY_MODE)
DATA_QUERY_MODE,
STREAMING_FETCH_SIZE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
Expand All @@ -524,6 +533,7 @@ public static ConfigDef configDef() {
private final boolean optionRecompile;
private final int queryFetchSize;
private final DataQueryMode dataQueryMode;
private final int streamingFetchSize;

public SqlServerConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -567,6 +577,7 @@ public SqlServerConnectorConfig(Configuration config) {

this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.streamingFetchSize = config.getInteger(STREAMING_FETCH_SIZE);
}

public List<String> getDatabaseNames() {
Expand Down Expand Up @@ -716,4 +727,8 @@ private static int validateDatabaseNames(Configuration config, Field field, Fiel

return count;
}

public int getStreamingFetchSize() {
return streamingFetchSize;
}
}
Loading
Loading