Skip to content

Commit

Permalink
DBZ-7273: Introduce mode to query capture tables directly
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanenka authored and jpechane committed Jan 24, 2024
1 parent 187db57 commit 5462605
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ public class SqlServerConnection extends JdbcConnection {
private static final String GET_MIN_LSN = "SELECT [#db].sys.fn_cdc_get_min_lsn('#')";
private static final String LOCK_TABLE = "SELECT * FROM [#] WITH (TABLOCKX)";
private static final String INCREMENT_LSN = "SELECT [#db].sys.fn_cdc_increment_lsn(?)";
private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";
private final String get_all_changes_for_table;
protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))";
private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, "
+ LSN_TIMESTAMP_SELECT_STATEMENT;
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]";
private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";

/**
* Queries the list of captured column names and their change table identifiers in the given database.
Expand Down Expand Up @@ -133,10 +136,47 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
this.queryFetchSize = config.getQueryFetchSize();

getAllChangesForTable = buildGetAllChangesForTableQuery(config.getDataQueryMode(), skippedOperations);

this.config = config;
this.useSingleDatabase = useSingleDatabase;

this.optionRecompile = false;
}

/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
* @param valueConverters {@link SqlServerValueConverters} instance
* @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
* @param optionRecompile Includes query option RECOMPILE on incremental snapshots
*/
public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConverters valueConverters,
Set<Envelope.Operation> skippedOperations, boolean useSingleDatabase,
boolean optionRecompile) {
this(config, valueConverters, skippedOperations, useSingleDatabase);

this.optionRecompile = optionRecompile;
}

private String buildGetAllChangesForTableQuery(SqlServerConnectorConfig.DataQueryMode dataQueryMode,
Set<Envelope.Operation> skippedOperations) {
String result = GET_ALL_CHANGES_FOR_TABLE_SELECT + " ";
List<String> where = new LinkedList<>();
switch (dataQueryMode) {
case FUNCTION:
result += GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION + " ";
break;
case DIRECT:
result += GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT + " ";
where.add("[__$start_lsn] >= ?");
where.add("[__$start_lsn] <= ?");
break;
}

if (hasSkippedOperations(skippedOperations)) {
Set<String> skippedOps = new HashSet<>();
StringBuilder getAllChangesForTableStatement = new StringBuilder(
"SELECT *# FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old') WHERE __$operation NOT IN (");
skippedOperations.forEach((Envelope.Operation operation) -> {
// This number are the __$operation number in the SQLServer
// https://docs.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver15#table-returned
Expand All @@ -153,37 +193,16 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver
break;
}
});
getAllChangesForTableStatement.append(String.join(",", skippedOps));
getAllChangesForTableStatement.append(") order by [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC");
get_all_changes_for_table = getAllChangesForTableStatement.toString();
where.add("[__$operation] NOT IN (" + String.join(",", skippedOps) + ")");
}
else {
get_all_changes_for_table = GET_ALL_CHANGES_FOR_TABLE;

if (!where.isEmpty()) {
result += " WHERE " + String.join(" AND ", where) + " ";
}

getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER,
Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT));
this.config = config;
this.useSingleDatabase = useSingleDatabase;
result += GET_ALL_CHANGES_FOR_TABLE_ORDER_BY;

this.optionRecompile = false;
}

/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
* @param valueConverters {@link SqlServerValueConverters} instance
* @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
* @param optionRecompile Includes query option RECOMPILE on incremental snapshots
*/
public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConverters valueConverters,
Set<Envelope.Operation> skippedOperations, boolean useSingleDatabase,
boolean optionRecompile) {
this(config, valueConverters, skippedOperations, useSingleDatabase);

this.optionRecompile = optionRecompile;
return result;
}

private boolean hasSkippedOperations(Set<Envelope.Operation> skippedOperations) {
Expand Down Expand Up @@ -330,8 +349,14 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan

int idx = 0;
for (SqlServerChangeTable changeTable : changeTables) {
String capturedColumns = String.join(", ", changeTable.getCapturedColumns());
String source = changeTable.getCaptureInstance();
if (config.getDataQueryMode() == SqlServerConnectorConfig.DataQueryMode.DIRECT) {
source = changeTable.getChangeTableId().table();
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(STATEMENTS_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,67 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
}
}

/**
* The set of predefined data query mode options.
*/
public enum DataQueryMode implements EnumeratedValue {

/**
* In this mode the CDC data is queried by means of calling {@code cdc.[fn_cdc_get_all_changes_#]} function.
*/
FUNCTION("function"),

/**
* In this mode the CDC data is queried from change tables directly.
*/
DIRECT("direct");

private final String value;

DataQueryMode(String value) {
this.value = value;
}

@Override
public String getValue() {
return value;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DataQueryMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (DataQueryMode option : DataQueryMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static DataQueryMode parse(String value, String defaultValue) {
DataQueryMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}

public static final Field USER = RelationalDatabaseConnectorConfig.USER
.optional()
.withNoValidation();
Expand Down Expand Up @@ -303,6 +364,17 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(SqlServerSourceInfoStructMaker.class.getName());

public static final Field DATA_QUERY_MODE = Field.create("data.query.mode")
.withDisplayName("Data query mode")
.withEnum(DataQueryMode.class, DataQueryMode.FUNCTION)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Controls how the connector queries CDC data. "
+ "The default is '" + DataQueryMode.FUNCTION.getValue()
+ "', which means the data is queried by means of calling cdc.[fn_cdc_get_all_changes_#] function. "
+ "The value of '" + DataQueryMode.DIRECT.getValue()
+ "' makes the connector to query the change tables directly.");

private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SQL Server")
.type(
Expand All @@ -321,7 +393,8 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
QUERY_FETCH_SIZE)
QUERY_FETCH_SIZE,
DATA_QUERY_MODE)
.events(SOURCE_INFO_STRUCT_MAKER)
.excluding(
SCHEMA_INCLUDE_LIST,
Expand All @@ -346,6 +419,7 @@ public static ConfigDef configDef() {
private final int maxTransactionsPerIteration;
private final boolean optionRecompile;
private final int queryFetchSize;
private final DataQueryMode dataQueryMode;

public SqlServerConnectorConfig(Configuration config) {
super(
Expand Down Expand Up @@ -386,6 +460,8 @@ public SqlServerConnectorConfig(Configuration config) {
}

this.optionRecompile = config.getBoolean(INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE);

this.dataQueryMode = DataQueryMode.parse(config.getString(DATA_QUERY_MODE), DATA_QUERY_MODE.defaultValueAsString());
}

public List<String> getDatabaseNames() {
Expand Down Expand Up @@ -517,6 +593,10 @@ public Map<DataCollectionId, String> getSnapshotSelectOverridesByTable() {
return Collections.unmodifiableMap(snapshotSelectOverridesByTable);
}

public DataQueryMode getDataQueryMode() {
return dataQueryMode;
}

private static int validateDatabaseNames(Configuration config, Field field, Field.ValidationOutput problems) {
String databaseNames = config.getString(field);
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,83 @@ public void createAndDelete() throws Exception {
stopConnector();
}

@Test
public void createAndDeleteInDataQueryDirectMode() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.DATA_QUERY_MODE, SqlServerConnectorConfig.DataQueryMode.DIRECT)
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();

// Wait for snapshot completion
consumeRecordsByTopic(1);

for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')");
}

final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = records.recordsForTopic("server1.testDB1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.testDB1.dbo.tableb");
assertThat(tableA).hasSize(RECORDS_PER_TABLE);
assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowA = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));

final Struct keyA = (Struct) recordA.key();
final Struct valueA = (Struct) recordA.value();
assertRecord((Struct) valueA.get("after"), expectedRowA);
assertNull(valueA.get("before"));

final Struct keyB = (Struct) recordB.key();
final Struct valueB = (Struct) recordB.value();
assertRecord((Struct) valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}

connection.execute("DELETE FROM tableB");
final SourceRecords deleteRecords = consumeRecordsByTopic(2 * RECORDS_PER_TABLE);
final List<SourceRecord> deleteTableA = deleteRecords.recordsForTopic("server1.testDB1.dbo.tablea");
final List<SourceRecord> deleteTableB = deleteRecords.recordsForTopic("server1.testDB1.dbo.tableb");
assertThat(deleteTableA).isNullOrEmpty();
assertThat(deleteTableB).hasSize(2 * RECORDS_PER_TABLE);

for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord deleteRecord = deleteTableB.get(i * 2);
final SourceRecord tombstoneRecord = deleteTableB.get(i * 2 + 1);
final List<SchemaAndValueField> expectedDeleteRow = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));

final Struct deleteKey = (Struct) deleteRecord.key();
final Struct deleteValue = (Struct) deleteRecord.value();
assertRecord((Struct) deleteValue.get("before"), expectedDeleteRow);
assertNull(deleteValue.get("after"));

final Struct tombstoneKey = (Struct) tombstoneRecord.key();
final Struct tombstoneValue = (Struct) tombstoneRecord.value();
assertNull(tombstoneValue);
}

stopConnector();
}

@Test
@FixFor("DBZ-1642")
public void readOnlyApplicationIntent() throws Exception {
Expand Down

0 comments on commit 5462605

Please sign in to comment.