diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 09630f173e9..0b50f36e28a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -52,7 +52,7 @@ public class SqlServerConnection extends JdbcConnection { public static final String INSTANCE_NAME = "instance"; - private static final String GET_DATABASE_NAME = "SELECT name FROM sys.databases WHERE name = ?"; + private static final String GET_DATABASE_METADATA = "WITH names AS (SELECT * FROM (VALUES #) AS names(name)) SELECT n.name, d.name, d.state, d.state_desc FROM names n LEFT JOIN sys.databases d ON d.name = n.name"; private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class); @@ -536,21 +536,55 @@ public String getNameOfChangeTable(String captureName) { } /** - * Retrieve the name of the database in the original case as it's defined on the server. + * Retrieve the names of the existing online databases in the original case as they are defined on the server. * * Although SQL Server supports case-insensitive collations, the connector uses the database name to build the * produced records' source info and, subsequently, the keys of its committed offset messages. This value * must remain the same during the lifetime of the connector regardless of the case used in the connector * configuration. */ - public String retrieveRealDatabaseName(String databaseName) { + public List retrieveRealOnlineDatabaseNames(List databaseNames) { + if (databaseNames.isEmpty()) { + return databaseNames; + } + + String placeholders = databaseNames.stream() + .map(x -> "(?)") + .collect(Collectors.joining(", ")); + + String query = GET_DATABASE_METADATA.replace(STATEMENTS_PLACEHOLDER, placeholders); + try { - return prepareQueryAndMap(GET_DATABASE_NAME, - ps -> ps.setString(1, databaseName), - singleResultMapper(rs -> rs.getString(1), "Could not retrieve exactly one database name")); + return prepareQueryAndMap(query, + ps -> { + int index = 1; + for (String databaseName : databaseNames) { + ps.setString(index++, databaseName); + } + }, + rs -> { + List result = new ArrayList<>(); + while (rs.next()) { + final String name = rs.getString(1); + final String realName = rs.getString(2); + if (realName == null) { + LOGGER.warn("Database {} does not exist", name); + continue; + } + + final int state = rs.getInt(3); + if (state != 0) { + LOGGER.warn("Database {} is not online (state_desc = {})", realName, rs.getString(4)); + continue; + } + + result.add(realName); + } + return result; + }); } catch (SQLException e) { - throw new RuntimeException("Couldn't obtain database name", e); + throw new RuntimeException("Couldn't retrieve real database names", e); } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java index ab250dea0b2..d0a14af0b24 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java @@ -71,6 +71,10 @@ public List> taskConfigs(int maxTasks) { private List> buildTaskConfigs(SqlServerConnection connection, SqlServerConnectorConfig config, int maxTasks) { List databaseNames = config.getDatabaseNames(); + final List realDatabaseNames = connection.retrieveRealOnlineDatabaseNames(databaseNames); + if (realDatabaseNames.isEmpty()) { + throw new IllegalArgumentException(); + } // Initialize the database list for each task List> databasesByTask = new ArrayList<>(); @@ -80,10 +84,9 @@ private List> buildTaskConfigs(SqlServerConnection connectio } // Add each database to a task list via round-robin. - for (int databaseNameIndex = 0; databaseNameIndex < databaseNames.size(); databaseNameIndex++) { + for (int databaseNameIndex = 0; databaseNameIndex < realDatabaseNames.size(); databaseNameIndex++) { int taskIndex = databaseNameIndex % numTasks; - String realDatabaseName = connection.retrieveRealDatabaseName(databaseNames.get(databaseNameIndex)); - databasesByTask.get(taskIndex).add(realDatabaseName); + databasesByTask.get(taskIndex).add(realDatabaseNames.get(databaseNameIndex)); } // Create a task config for each task, assigning each a list of database names. diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 3cf3169985b..33469f99413 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -910,6 +910,30 @@ public void testIncludeTable() throws Exception { stopConnector(); } + @Test + @FixFor("DBZ-4346") + public void shouldNotReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialOnlyMode() throws Exception { + // First create a new user with only db_datareader role + String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n" + + "DROP LOGIN test_user\n" + + "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n" + + "CREATE USER test_user FOR LOGIN test_user\n" + + "ALTER ROLE db_denydatareader ADD MEMBER test_user"; + + connection.execute(testUserCreateSql); + + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$") + .with(SqlServerConnectorConfig.USER, "test_user") + .build(); + + SqlServerConnector connector = new SqlServerConnector(); + Config validatedConfig = connector.validate(config.asMap()); + + assertNoConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER); + } + @Test public void testTableIncludeList() throws Exception { final int RECORDS_PER_TABLE = 5;