Skip to content

Commit

Permalink
Merge pull request #156 from ramanenka/CXP-3167
Browse files Browse the repository at this point in the history
CXP-3167: Account for possible `#` in a column name
  • Loading branch information
ramanenka authored May 16, 2024
2 parents b0aef2c + 36e8e41 commit 4ba244a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class SqlServerConnection extends JdbcConnection {

private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String DATABASE_NAME_PLACEHOLDER = "#db";
private static final String TABLE_NAME_PLACEHOLDER = "#table";
private static final String GET_MAX_LSN = "SELECT [#db].sys.fn_cdc_get_max_lsn()";
private static final String GET_MAX_TRANSACTION_LSN = "SELECT MAX(start_lsn) FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00";
private static final String GET_NTH_TRANSACTION_LSN_FROM_BEGINNING = "SELECT MAX(start_lsn) FROM (SELECT TOP (?) start_lsn FROM [#db].cdc.lsn_time_mapping WHERE tran_id <> 0x00 ORDER BY start_lsn) as next_lsns";
Expand All @@ -77,8 +78,8 @@ public class SqlServerConnection extends JdbcConnection {
protected static final String LSN_TIMESTAMP_SELECT_STATEMENT = "TODATETIMEOFFSET([#db].sys.fn_cdc_map_lsn_to_time([__$start_lsn]), DATEPART(TZOFFSET, SYSDATETIMEOFFSET()))";
private static final String GET_ALL_CHANGES_FOR_TABLE_SELECT = "SELECT [__$start_lsn], [__$seqval], [__$operation], [__$update_mask], #, "
+ LSN_TIMESTAMP_SELECT_STATEMENT;
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#](?, ?, N'all update old')";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#]";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_FUNCTION = "FROM [#db].cdc.[fn_cdc_get_all_changes_#table](?, ?, N'all update old')";
private static final String GET_ALL_CHANGES_FOR_TABLE_FROM_DIRECT = "FROM [#db].cdc.[#table]";
private static final String GET_ALL_CHANGES_FOR_TABLE_ORDER_BY = "ORDER BY [__$start_lsn] ASC, [__$seqval] ASC, [__$operation] ASC";

/**
Expand Down Expand Up @@ -366,7 +367,7 @@ public void getChangesForTables(String databaseName, SqlServerChangeTable[] chan
}
final String query = replaceDatabaseNamePlaceholder(getAllChangesForTable, databaseName)
.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(capturedColumns))
.replace(STATEMENTS_PLACEHOLDER, source);
.replace(TABLE_NAME_PLACEHOLDER, source);
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public void shouldParseSpecialChars() throws Exception {
.build();

connection.execute(
"CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30))",
"INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a')");
"CREATE TABLE [UAT WAG CZ$Fixed Asset] (id int primary key, [my col$a] varchar(30), [my col#b] varchar(30))",
"INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(1, 'a', 'b')");
TestHelper.enableTableCdc(connection, "UAT WAG CZ$Fixed Asset");
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
Expand All @@ -145,6 +145,7 @@ public void shouldParseSpecialChars() throws Exception {
.name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
.field("id", Schema.INT32_SCHEMA)
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
.field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertSchemaMatchesStruct(
(Struct) record.key(),
Expand All @@ -154,7 +155,7 @@ public void shouldParseSpecialChars() throws Exception {
.build());
assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1);

connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b')");
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b', 'c')");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
Expand All @@ -165,6 +166,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g
.name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
.field("id", Schema.INT32_SCHEMA)
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
.field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertSchemaMatchesStruct(
(Struct) record.key(),
Expand Down Expand Up @@ -228,7 +230,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset_Two
start(SqlServerConnector.class, config);
assertConnectorIsRunning();

connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b')");
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b', 'c')");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
Expand All @@ -239,6 +241,7 @@ record = records.recordsForTopic("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset").g
.name("server1.testDB1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
.field("id", Schema.INT32_SCHEMA)
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
.field("my_col_b", Schema.OPTIONAL_STRING_SCHEMA)
.build());
assertSchemaMatchesStruct(
(Struct) record.key(),
Expand Down

0 comments on commit 4ba244a

Please sign in to comment.