Skip to content

Commit

Permalink
CXP-1453: Filtering out non-online databases in SQL Server connector
Browse files Browse the repository at this point in the history
  • Loading branch information
morozov authored and ramanenka committed Dec 16, 2024
1 parent 540eb33 commit b926c5c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class SqlServerConnection extends JdbcConnection {

public static final String INSTANCE_NAME = "instance";

private static final String GET_DATABASE_NAME = "SELECT name FROM sys.databases WHERE name = ?";
private static final String GET_DATABASE_METADATA = "WITH names AS (SELECT * FROM (VALUES #) AS names(name)) SELECT n.name, d.name, d.state, d.state_desc FROM names n LEFT JOIN sys.databases d ON d.name = n.name";

private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class);

Expand Down Expand Up @@ -573,21 +573,55 @@ public String getNameOfChangeTable(String captureName) {
}

/**
* Retrieve the name of the database in the original case as it's defined on the server.
* Retrieve the names of the existing online databases in the original case as they are defined on the server.
*
* Although SQL Server supports case-insensitive collations, the connector uses the database name to build the
* produced records' source info and, subsequently, the keys of its committed offset messages. This value
* must remain the same during the lifetime of the connector regardless of the case used in the connector
* configuration.
*/
public String retrieveRealDatabaseName(String databaseName) {
public List<String> retrieveRealOnlineDatabaseNames(List<String> databaseNames) {
if (databaseNames.isEmpty()) {
return databaseNames;
}

String placeholders = databaseNames.stream()
.map(x -> "(?)")
.collect(Collectors.joining(", "));

String query = GET_DATABASE_METADATA.replace(STATEMENTS_PLACEHOLDER, placeholders);

try {
return prepareQueryAndMap(GET_DATABASE_NAME,
ps -> ps.setString(1, databaseName),
singleResultMapper(rs -> rs.getString(1), "Could not retrieve exactly one database name"));
return prepareQueryAndMap(query,
ps -> {
int index = 1;
for (String databaseName : databaseNames) {
ps.setString(index++, databaseName);
}
},
rs -> {
List<String> result = new ArrayList<>();
while (rs.next()) {
final String name = rs.getString(1);
final String realName = rs.getString(2);
if (realName == null) {
LOGGER.warn("Database {} does not exist", name);
continue;
}

final int state = rs.getInt(3);
if (state != 0) {
LOGGER.warn("Database {} is not online (state_desc = {})", realName, rs.getString(4));
continue;
}

result.add(realName);
}
return result;
});
}
catch (SQLException e) {
throw new RuntimeException("Couldn't obtain database name", e);
throw new RuntimeException("Couldn't retrieve real database names", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
private List<Map<String, String>> buildTaskConfigs(SqlServerConnection connection, SqlServerConnectorConfig config,
int maxTasks) {
List<String> databaseNames = config.getDatabaseNames();
final List<String> realDatabaseNames = connection.retrieveRealOnlineDatabaseNames(databaseNames);
if (realDatabaseNames.isEmpty()) {
throw new IllegalArgumentException();
}

// Initialize the database list for each task
List<List<String>> databasesByTask = new ArrayList<>();
Expand All @@ -83,10 +87,9 @@ private List<Map<String, String>> buildTaskConfigs(SqlServerConnection connectio
}

// Add each database to a task list via round-robin.
for (int databaseNameIndex = 0; databaseNameIndex < databaseNames.size(); databaseNameIndex++) {
for (int databaseNameIndex = 0; databaseNameIndex < realDatabaseNames.size(); databaseNameIndex++) {
int taskIndex = databaseNameIndex % numTasks;
String realDatabaseName = connection.retrieveRealDatabaseName(databaseNames.get(databaseNameIndex));
databasesByTask.get(taskIndex).add(realDatabaseName);
databasesByTask.get(taskIndex).add(realDatabaseNames.get(databaseNameIndex));
}

// Create a task config for each task, assigning each a list of database names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,30 @@ public void testIncludeTable() throws Exception {
stopConnector();
}

@Test
@FixFor("DBZ-4346")
public void shouldNotReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialOnlyMode() throws Exception {
// First create a new user with only db_datareader role
String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n"
+ "DROP LOGIN test_user\n"
+ "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n"
+ "CREATE USER test_user FOR LOGIN test_user\n"
+ "ALTER ROLE db_denydatareader ADD MEMBER test_user";

connection.execute(testUserCreateSql);

final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$")
.with(SqlServerConnectorConfig.USER, "test_user")
.build();

SqlServerConnector connector = new SqlServerConnector();
Config validatedConfig = connector.validate(config.asMap());

assertNoConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER);
}

@Test
public void testTableIncludeList() throws Exception {
final int RECORDS_PER_TABLE = 5;
Expand Down

0 comments on commit b926c5c

Please sign in to comment.