From 50c9faea1919b54730be919d70b136fc33cc0d31 Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Tue, 26 Oct 2021 17:22:47 -0700 Subject: [PATCH] CXP-1453: Filtering out non-online databases in SQL Server connector --- .../sqlserver/SqlServerConnection.java | 48 ++++++++++++++++--- .../sqlserver/SqlServerConnector.java | 9 ++-- .../sqlserver/SqlServerConnectorIT.java | 24 ++++++++++ 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 523c147551b..0c2185bfeb9 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -60,7 +60,7 @@ public class SqlServerConnection extends JdbcConnection { public static final String INSTANCE_NAME = "instance"; - private static final String GET_DATABASE_NAME = "SELECT name FROM sys.databases WHERE name = ?"; + private static final String GET_DATABASE_METADATA = "WITH names AS (SELECT * FROM (VALUES #) AS names(name)) SELECT n.name, d.name, d.state, d.state_desc FROM names n LEFT JOIN sys.databases d ON d.name = n.name"; private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class); @@ -573,21 +573,55 @@ public String getNameOfChangeTable(String captureName) { } /** - * Retrieve the name of the database in the original case as it's defined on the server. + * Retrieve the names of the existing online databases in the original case as they are defined on the server. * * Although SQL Server supports case-insensitive collations, the connector uses the database name to build the * produced records' source info and, subsequently, the keys of its committed offset messages. This value * must remain the same during the lifetime of the connector regardless of the case used in the connector * configuration. */ - public String retrieveRealDatabaseName(String databaseName) { + public List retrieveRealOnlineDatabaseNames(List databaseNames) { + if (databaseNames.isEmpty()) { + return databaseNames; + } + + String placeholders = databaseNames.stream() + .map(x -> "(?)") + .collect(Collectors.joining(", ")); + + String query = GET_DATABASE_METADATA.replace(STATEMENTS_PLACEHOLDER, placeholders); + try { - return prepareQueryAndMap(GET_DATABASE_NAME, - ps -> ps.setString(1, databaseName), - singleResultMapper(rs -> rs.getString(1), "Could not retrieve exactly one database name")); + return prepareQueryAndMap(query, + ps -> { + int index = 1; + for (String databaseName : databaseNames) { + ps.setString(index++, databaseName); + } + }, + rs -> { + List result = new ArrayList<>(); + while (rs.next()) { + final String name = rs.getString(1); + final String realName = rs.getString(2); + if (realName == null) { + LOGGER.warn("Database {} does not exist", name); + continue; + } + + final int state = rs.getInt(3); + if (state != 0) { + LOGGER.warn("Database {} is not online (state_desc = {})", realName, rs.getString(4)); + continue; + } + + result.add(realName); + } + return result; + }); } catch (SQLException e) { - throw new RuntimeException("Couldn't obtain database name", e); + throw new RuntimeException("Couldn't retrieve real database names", e); } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java index 802832d8746..b3110b134b9 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java @@ -74,6 +74,10 @@ public List> taskConfigs(int maxTasks) { private List> buildTaskConfigs(SqlServerConnection connection, SqlServerConnectorConfig config, int maxTasks) { List databaseNames = config.getDatabaseNames(); + final List realDatabaseNames = connection.retrieveRealOnlineDatabaseNames(databaseNames); + if (realDatabaseNames.isEmpty()) { + throw new IllegalArgumentException(); + } // Initialize the database list for each task List> databasesByTask = new ArrayList<>(); @@ -83,10 +87,9 @@ private List> buildTaskConfigs(SqlServerConnection connectio } // Add each database to a task list via round-robin. - for (int databaseNameIndex = 0; databaseNameIndex < databaseNames.size(); databaseNameIndex++) { + for (int databaseNameIndex = 0; databaseNameIndex < realDatabaseNames.size(); databaseNameIndex++) { int taskIndex = databaseNameIndex % numTasks; - String realDatabaseName = connection.retrieveRealDatabaseName(databaseNames.get(databaseNameIndex)); - databasesByTask.get(taskIndex).add(realDatabaseName); + databasesByTask.get(taskIndex).add(realDatabaseNames.get(databaseNameIndex)); } // Create a task config for each task, assigning each a list of database names. diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 9bc7a78ac87..75a6d6077c0 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -1005,6 +1005,30 @@ public void testIncludeTable() throws Exception { stopConnector(); } + @Test + @FixFor("DBZ-4346") + public void shouldNotReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialOnlyMode() throws Exception { + // First create a new user with only db_datareader role + String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n" + + "DROP LOGIN test_user\n" + + "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n" + + "CREATE USER test_user FOR LOGIN test_user\n" + + "ALTER ROLE db_denydatareader ADD MEMBER test_user"; + + connection.execute(testUserCreateSql); + + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$") + .with(SqlServerConnectorConfig.USER, "test_user") + .build(); + + SqlServerConnector connector = new SqlServerConnector(); + Config validatedConfig = connector.validate(config.asMap()); + + assertNoConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER); + } + @Test public void testTableIncludeList() throws Exception { final int RECORDS_PER_TABLE = 5;